Expose capability discovery plus admin-only config and request inspection endpoints so clients and operators can understand gateway behavior without reading code. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2340 lines
90 KiB
Python
2340 lines
90 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import time
|
|
import uuid
|
|
from collections import deque
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException, Request
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
|
|
from .anthropic_schema import (
|
|
AnthropicMessagesRequest,
|
|
affinity_key_for_anthropic,
|
|
anthropic_to_internal_messages,
|
|
flatten_anthropic_content,
|
|
)
|
|
from .auth import (
|
|
AnthropicAuthError,
|
|
require_admin_access,
|
|
require_anthropic_key,
|
|
require_bearer,
|
|
require_metrics_access,
|
|
)
|
|
from .concurrency import BackpressureRejected, InFlightGuard
|
|
from .config import Settings, load_settings
|
|
from .http.execution_core import (
|
|
_apply_cached_instance_or_invalidate as _shared_apply_cached_instance_or_invalidate,
|
|
_resolve_ask_mode as _shared_resolve_ask_mode,
|
|
_tool_config_summary,
|
|
UpstreamExecutionError,
|
|
complete_execution,
|
|
finalize_stream_execution,
|
|
prepare_execution_context,
|
|
release_execution,
|
|
start_execution,
|
|
)
|
|
from .http.tool_emulation import (
|
|
action_output_prompt,
|
|
extract_anthropic_tool_choice as _em_extract_anthropic_tool_choice,
|
|
extract_anthropic_tools as _em_extract_anthropic_tools,
|
|
extract_openai_tool_choice as _em_extract_openai_tool_choice,
|
|
extract_openai_tools as _em_extract_openai_tools,
|
|
force_tooling_prompt,
|
|
has_tool_request as _em_has_tool_request,
|
|
infer_declared_tool_call_from_text,
|
|
infer_tool_calls_from_text,
|
|
inject_tooling,
|
|
openai_tool_call_from_emulated,
|
|
parse_action_blocks,
|
|
)
|
|
from .http.openai_responses import handle_responses
|
|
from .http.tool_bridge import (
|
|
_allowed_stream_tool_event,
|
|
_allowed_tool_events,
|
|
_anthropic_forced_tool_name,
|
|
_anthropic_tool_result_block,
|
|
_anthropic_tool_use_block,
|
|
_extract_function_call_event_from_text,
|
|
_extract_hash_tool_call_event_from_text,
|
|
_extract_tool_calls_from_text,
|
|
_forced_tool_fallback_event,
|
|
_infer_tool_event_from_declared_tools,
|
|
_json_string,
|
|
_openai_forced_tool_name,
|
|
_openai_tool_call,
|
|
)
|
|
from .http.tooling_policy import (
|
|
_anthropic_has_tooling_context,
|
|
_anthropic_tool_config,
|
|
_openai_has_tooling_context,
|
|
_openai_tool_config,
|
|
)
|
|
from .lingma_pool import LingmaPool, PoolInstance
|
|
from .logging_config import configure_logging, get_logger, request_id_var
|
|
from .model_map import build_model_name_map, flatten_model_keys, resolve_model
|
|
from .openai_schema import (
|
|
ChatCompletionChoice,
|
|
ChatCompletionResponse,
|
|
ChatCompletionsRequest,
|
|
ModelData,
|
|
ModelsResponse,
|
|
ResponsesRequest,
|
|
flatten_content,
|
|
)
|
|
from .session_bundle import encode_bundle, pack_workdir
|
|
from .session_cache import SessionCache, hash_branch_context
|
|
from .stats import StatsCollector, estimate_tokens
|
|
|
|
|
|
settings: Settings = load_settings()
|
|
configure_logging(settings.log_level)
|
|
logger = get_logger("lingma_gateway")
|
|
|
|
pool: LingmaPool | None = None
|
|
stats_collector = StatsCollector()
|
|
chat_guard = InFlightGuard(
|
|
max_in_flight=settings.gateway_max_in_flight,
|
|
queue_timeout_sec=settings.gateway_queue_timeout_sec,
|
|
)
|
|
session_cache = SessionCache(
|
|
max_entries=settings.session_cache_max_entries
|
|
if settings.session_reuse_enabled
|
|
else 0,
|
|
ttl_sec=settings.session_cache_ttl_sec,
|
|
)
|
|
|
|
STREAMING_RESPONSE_HEADERS = {
|
|
"Cache-Control": "no-cache, no-transform",
|
|
"X-Accel-Buffering": "no",
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
_DEBUG_REQUEST_LOG: deque[dict[str, Any]] = deque(maxlen=100)
|
|
|
|
|
|
def _require_pool() -> LingmaPool:
|
|
if pool is None:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={
|
|
"error": {
|
|
"message": "pool not initialized",
|
|
"type": "service_unavailable",
|
|
}
|
|
},
|
|
)
|
|
return pool
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_app: FastAPI):
|
|
global pool
|
|
pool = LingmaPool.build(
|
|
lingma_bin=settings.lingma_bin,
|
|
base_work_dir=settings.lingma_work_dir,
|
|
legacy_socket_port=settings.lingma_socket_port,
|
|
startup_timeout=settings.lingma_startup_timeout,
|
|
rpc_timeout=settings.lingma_rpc_timeout,
|
|
default_model=settings.default_model,
|
|
default_ask_mode=settings.default_ask_mode,
|
|
accounts=settings.accounts,
|
|
instance_count=settings.instance_count,
|
|
auto_login_headless=settings.auto_login_headless,
|
|
auto_login_timeout=settings.auto_login_timeout,
|
|
auto_login_max_retry=settings.auto_login_max_retry,
|
|
)
|
|
logger.info(
|
|
"gateway startup: pool_size=%d max_in_flight=%d",
|
|
pool.size(),
|
|
settings.gateway_max_in_flight,
|
|
)
|
|
_log_auth_posture()
|
|
await pool.start()
|
|
try:
|
|
yield
|
|
finally:
|
|
if pool is not None:
|
|
await pool.close()
|
|
|
|
|
|
app = FastAPI(title="Lingma OpenAI Gateway", version="0.4.0", lifespan=lifespan)
|
|
|
|
|
|
@app.exception_handler(AnthropicAuthError)
|
|
async def _anthropic_auth_error_handler(_request: Request, exc: AnthropicAuthError):
|
|
"""Render auth failures on /v1/messages in the Anthropic wire format.
|
|
|
|
FastAPI's default handler wraps everything in `{"detail": ...}`, which
|
|
Anthropic SDKs don't parse. We emit the canonical
|
|
`{"type":"error","error":{"type":"...","message":"..."}}` instead.
|
|
"""
|
|
return JSONResponse(
|
|
status_code=exc.status_code,
|
|
content={
|
|
"type": "error",
|
|
"error": {"type": exc.error_type, "message": exc.message},
|
|
},
|
|
)
|
|
|
|
|
|
@app.middleware("http")
|
|
async def request_id_middleware(request: Request, call_next):
|
|
req_id = request.headers.get("x-request-id") or f"req-{uuid.uuid4().hex[:12]}"
|
|
token = request_id_var.set(req_id)
|
|
start = time.monotonic()
|
|
status_code = 500
|
|
try:
|
|
response = await call_next(request)
|
|
status_code = response.status_code
|
|
response.headers["x-request-id"] = req_id
|
|
return response
|
|
finally:
|
|
elapsed_ms = int((time.monotonic() - start) * 1000)
|
|
logger.info(
|
|
"http %s %s -> %s in %dms",
|
|
request.method,
|
|
request.url.path,
|
|
status_code,
|
|
elapsed_ms,
|
|
extra={
|
|
"ctx_method": request.method,
|
|
"ctx_path": request.url.path,
|
|
"ctx_status": status_code,
|
|
"ctx_elapsed_ms": elapsed_ms,
|
|
},
|
|
)
|
|
request_id_var.reset(token)
|
|
|
|
|
|
def auth_guard(request: Request):
|
|
require_bearer(request, settings.api_keys)
|
|
|
|
|
|
def anthropic_auth_guard(request: Request):
|
|
require_anthropic_key(request, settings.api_keys)
|
|
|
|
|
|
def metrics_auth_guard(request: Request):
|
|
require_metrics_access(
|
|
request,
|
|
settings.api_keys,
|
|
settings.metrics_token,
|
|
public=settings.metrics_public,
|
|
)
|
|
|
|
|
|
def admin_auth_guard(request: Request):
|
|
require_admin_access(request, settings.api_keys, settings.admin_token)
|
|
|
|
|
|
def _log_auth_posture() -> None:
|
|
"""Loud warnings on misconfigured auth so ops can't miss them."""
|
|
if not settings.api_keys:
|
|
logger.warning(
|
|
"AUTH DISABLED: API_KEYS is empty, /v1/* is wide open. "
|
|
"Set API_KEYS before exposing this gateway to anything "
|
|
"other than localhost."
|
|
)
|
|
if not settings.admin_token:
|
|
logger.warning(
|
|
"ADMIN_TOKEN not set: /internal/* reuses API_KEYS for auth. "
|
|
"For production set a dedicated ADMIN_TOKEN so rotating chat "
|
|
"keys doesn't require exporting the session bundle."
|
|
)
|
|
if settings.metrics_public:
|
|
logger.warning(
|
|
"METRICS_PUBLIC=true: /metrics is open. Only enable this "
|
|
"when the gateway is behind a private-network scraper."
|
|
)
|
|
|
|
|
|
def _safe_setting_value(key: str, value: Any) -> Any:
|
|
key_upper = key.upper()
|
|
if any(
|
|
marker in key_upper
|
|
for marker in {"KEY", "TOKEN", "PASSWORD", "SECRET", "BUNDLE"}
|
|
):
|
|
if isinstance(value, list):
|
|
return ["***" for _ in value]
|
|
return "***"
|
|
return value
|
|
|
|
|
|
def _redact_debug_value(path: tuple[str, ...], value: Any) -> Any:
|
|
if isinstance(value, dict):
|
|
return {
|
|
k: _redact_debug_value(path + (str(k).lower(),), v)
|
|
for k, v in value.items()
|
|
}
|
|
if isinstance(value, list):
|
|
return [_redact_debug_value(path + ("[]",), item) for item in value]
|
|
if isinstance(value, str):
|
|
lowered_path = "/".join(path)
|
|
if any(marker in lowered_path for marker in ("authorization", "x-api-key", "api_key", "token", "password", "secret", "session_bundle")):
|
|
return "***"
|
|
if value.startswith("data:"):
|
|
return "[redacted-data-url]"
|
|
if "session bundle" in value.lower():
|
|
return "[redacted-session-bundle]"
|
|
if any(part in {"args", "arguments"} for part in path) and len(value) > 2048:
|
|
return value[:1024] + "... [truncated]"
|
|
return value
|
|
|
|
|
|
def _record_debug_request(protocol: str, path: str, body: dict[str, Any], request: Request) -> None:
|
|
_DEBUG_REQUEST_LOG.appendleft(
|
|
{
|
|
"timestamp": int(time.time()),
|
|
"protocol": protocol,
|
|
"path": path,
|
|
"request_id": request.headers.get("x-request-id", ""),
|
|
"body": _redact_debug_value((), body),
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/internal/debug/requests", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_debug_requests(limit: int = 20):
|
|
safe_limit = min(max(limit, 1), 100)
|
|
return JSONResponse(
|
|
content={
|
|
"ok": True,
|
|
"count": min(safe_limit, len(_DEBUG_REQUEST_LOG)),
|
|
"items": list(_DEBUG_REQUEST_LOG)[:safe_limit],
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/healthz")
|
|
async def healthz():
|
|
if pool is None:
|
|
return {"ok": False, "time": int(time.time()), "reason": "pool uninitialized"}
|
|
insts = pool.stats()
|
|
ready = sum(1 for i in insts if i["state"] == "ready")
|
|
return {
|
|
"ok": ready > 0,
|
|
"time": int(time.time()),
|
|
"pool_size": len(insts),
|
|
"pool_ready": ready,
|
|
"instances": [
|
|
{"name": i["name"], "state": i["state"], "in_flight": i["in_flight"]}
|
|
for i in insts
|
|
],
|
|
}
|
|
|
|
|
|
def _capabilities_payload() -> dict[str, Any]:
|
|
return {
|
|
"service": "lingma-openai-gateway",
|
|
"version": app.version,
|
|
"protocols": {
|
|
"openai": {
|
|
"models": True,
|
|
"chat_completions": True,
|
|
"responses": True,
|
|
"streaming": True,
|
|
"response_tool_calls": True,
|
|
"request_tools_forwarded": settings.tool_forward_enabled,
|
|
},
|
|
"anthropic": {
|
|
"messages": True,
|
|
"count_tokens": True,
|
|
"streaming": True,
|
|
"response_tool_use": True,
|
|
"request_tools_forwarded": settings.tool_forward_enabled,
|
|
},
|
|
},
|
|
"features": {
|
|
"session_reuse": {
|
|
"enabled": settings.session_reuse_enabled,
|
|
"cache_max_entries": settings.session_cache_max_entries,
|
|
"cache_ttl_sec": settings.session_cache_ttl_sec,
|
|
},
|
|
"tooling": {
|
|
"forward_enabled": settings.tool_forward_enabled,
|
|
"allowlist": settings.tool_allowlist,
|
|
"emulation_bridge_enabled": True,
|
|
},
|
|
"pool": {
|
|
"configured_instance_count": settings.instance_count,
|
|
"default_model": settings.default_model,
|
|
"default_ask_mode": settings.default_ask_mode,
|
|
},
|
|
"auth": {
|
|
"v1_requires_auth": bool(settings.api_keys),
|
|
"admin_token_configured": bool(settings.admin_token),
|
|
"metrics_public": settings.metrics_public,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@app.get("/capabilities")
|
|
async def capabilities():
|
|
return JSONResponse(content=_capabilities_payload())
|
|
|
|
|
|
@app.get("/v1/capabilities", dependencies=[Depends(anthropic_auth_guard)])
|
|
async def v1_capabilities():
|
|
return JSONResponse(content=_capabilities_payload())
|
|
|
|
|
|
async def _ensure_instance_logged_in(inst: PoolInstance) -> dict:
|
|
client = inst.client
|
|
auto_login = inst.auto_login
|
|
|
|
try:
|
|
status = await client.auth_status()
|
|
except Exception as exc:
|
|
logger.warning("[%s] auth_status failed before chat: %s", inst.name, exc)
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={
|
|
"error": {
|
|
"message": "Lingma is not ready",
|
|
"type": "service_unavailable",
|
|
}
|
|
},
|
|
)
|
|
|
|
if status and status.get("id"):
|
|
return status
|
|
|
|
if not settings.auto_login_enabled:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail={
|
|
"error": {
|
|
"message": "Lingma not logged in",
|
|
"type": "invalid_request_error",
|
|
}
|
|
},
|
|
)
|
|
|
|
if settings.dedicated_domain_url:
|
|
try:
|
|
current = await client.get_endpoint()
|
|
current_ep = (
|
|
(current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
|
)
|
|
if current_ep != settings.dedicated_domain_url:
|
|
await client.update_endpoint(settings.dedicated_domain_url)
|
|
except Exception as exc:
|
|
logger.warning("[%s] switch dedicated endpoint failed: %s", inst.name, exc)
|
|
|
|
try:
|
|
login_url, _login_raw = await client.generate_login_url()
|
|
except Exception as exc:
|
|
logger.warning("[%s] generate_login_url failed: %s", inst.name, exc)
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail={
|
|
"error": {
|
|
"message": "generate login url failed",
|
|
"type": "upstream_error",
|
|
}
|
|
},
|
|
)
|
|
|
|
if not login_url:
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail={
|
|
"error": {
|
|
"message": "generate login url failed",
|
|
"type": "upstream_error",
|
|
}
|
|
},
|
|
)
|
|
|
|
await auto_login.ensure_started(login_url)
|
|
try:
|
|
await auto_login.wait_done(timeout=settings.auto_login_timeout + 20)
|
|
except Exception as exc:
|
|
logger.warning("[%s] auto_login wait_done failed: %s", inst.name, exc)
|
|
|
|
try:
|
|
status = await client.auth_status()
|
|
except Exception as exc:
|
|
logger.warning("[%s] post-login auth_status failed: %s", inst.name, exc)
|
|
status = None
|
|
|
|
if status and status.get("id"):
|
|
return status
|
|
|
|
logger.warning(
|
|
"[%s] auto login did not result in a logged-in session: %s",
|
|
inst.name,
|
|
auto_login.status(),
|
|
)
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail={
|
|
"error": {
|
|
"message": "Lingma auto login failed",
|
|
"type": "invalid_request_error",
|
|
}
|
|
},
|
|
)
|
|
|
|
|
|
def _affinity_key_for(req: ChatCompletionsRequest) -> str | None:
|
|
"""Derive a stable affinity key so that follow-ups go to the same instance.
|
|
|
|
Priority: explicit `user` > hash of the first/system message.
|
|
"""
|
|
if req.user:
|
|
return req.user.strip() or None
|
|
for m in req.messages:
|
|
if m.role == "system":
|
|
text = flatten_content(m.content)
|
|
if text:
|
|
return "sys:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16]
|
|
if req.messages:
|
|
first = req.messages[0]
|
|
text = flatten_content(first.content)
|
|
if text:
|
|
return "first:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16]
|
|
return None
|
|
|
|
|
|
def _extract_api_key(request: Request) -> str:
|
|
h = request.headers.get("authorization", "")
|
|
if h.lower().startswith("bearer "):
|
|
return h[7:].strip()
|
|
return ""
|
|
|
|
|
|
def _last_user_text(messages: list[dict]) -> str:
|
|
"""Extract the text of the latest user message (trailing from end).
|
|
|
|
Used when we hit the session cache and only need to send the delta.
|
|
Falls back to the last message regardless of role if no user is found.
|
|
"""
|
|
for m in reversed(messages):
|
|
if m.get("role") == "user":
|
|
return flatten_content(m.get("content")) or ""
|
|
if messages:
|
|
return flatten_content(messages[-1].get("content")) or ""
|
|
return ""
|
|
|
|
|
|
@app.get("/v1/models", dependencies=[Depends(anthropic_auth_guard)])
|
|
async def v1_models():
|
|
p = _require_pool()
|
|
inst = p.pick()
|
|
await _ensure_instance_logged_in(inst)
|
|
await stats_collector.inc_models()
|
|
models = await inst.client.query_models()
|
|
keys = flatten_model_keys(models)
|
|
name_map = build_model_name_map(models)
|
|
resp = ModelsResponse(data=[ModelData(id=k, name=name_map.get(k)) for k in keys])
|
|
return JSONResponse(content=resp.model_dump())
|
|
|
|
|
|
def _messages_to_prompt(messages: list[dict]) -> str:
|
|
parts: list[str] = []
|
|
for m in messages:
|
|
role = m.get("role", "user")
|
|
text = flatten_content(m.get("content"))
|
|
if not text and m.get("tool_calls"):
|
|
text = f"[tool_calls] {json.dumps(m['tool_calls'], ensure_ascii=False)}"
|
|
if not text:
|
|
continue
|
|
parts.append(f"[{role}] {text}")
|
|
return "\n".join(parts).strip()
|
|
|
|
|
|
def _assistant_tool_calls_to_emulation_text(tool_calls: Any) -> str:
|
|
if not isinstance(tool_calls, list):
|
|
return ""
|
|
blocks: list[str] = []
|
|
for item in tool_calls:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
fn = item.get("function") if isinstance(item.get("function"), dict) else None
|
|
name = str((fn or {}).get("name") or item.get("name") or "").strip()
|
|
if not name:
|
|
continue
|
|
arguments = (fn or {}).get("arguments")
|
|
if isinstance(arguments, str):
|
|
try:
|
|
arguments = json.loads(arguments)
|
|
except Exception:
|
|
arguments = {"raw": arguments}
|
|
if not isinstance(arguments, dict):
|
|
arguments = {}
|
|
blocks.append(
|
|
"```json action\n"
|
|
+ json.dumps(
|
|
{"tool": name, "parameters": arguments}, ensure_ascii=False, indent=2
|
|
)
|
|
+ "\n```"
|
|
)
|
|
return "\n\n".join(blocks)
|
|
|
|
|
|
def _tool_action_block(name: str, arguments: dict[str, Any]) -> str:
|
|
return (
|
|
"```json action\n"
|
|
+ json.dumps(
|
|
{"tool": name, "parameters": arguments}, ensure_ascii=False, indent=2
|
|
)
|
|
+ "\n```"
|
|
)
|
|
|
|
|
|
def _anthropic_flattened_tool_history_to_emulation_text(text: str) -> str:
|
|
if not text:
|
|
return ""
|
|
out: list[str] = []
|
|
for line in text.splitlines():
|
|
stripped = line.strip()
|
|
if stripped.startswith("[tool_use]"):
|
|
raw = stripped[len("[tool_use]") :].strip()
|
|
try:
|
|
payload = json.loads(raw)
|
|
except Exception:
|
|
out.append(line)
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
out.append(line)
|
|
continue
|
|
name = str(payload.get("name") or "").strip()
|
|
arguments = payload.get("input")
|
|
if name and isinstance(arguments, dict):
|
|
out.append(_tool_action_block(name, arguments))
|
|
else:
|
|
out.append(line)
|
|
continue
|
|
if stripped.startswith("[tool_result]"):
|
|
out.append(action_output_prompt(None, stripped[len("[tool_result]") :].strip()))
|
|
continue
|
|
out.append(line)
|
|
return "\n".join(part for part in out if part).strip()
|
|
|
|
|
|
def _messages_to_emulation_prompt(
|
|
messages: list[dict[str, Any]],
|
|
*,
|
|
system_text: str,
|
|
tools: list[dict[str, Any]] | None,
|
|
tool_choice: Any,
|
|
) -> str:
|
|
filtered: list[tuple[str, str]] = []
|
|
for message in messages:
|
|
role = str(message.get("role") or "").strip().lower()
|
|
if role in {"system", "developer"}:
|
|
continue
|
|
text = flatten_content(message.get("content"))
|
|
if role == "assistant" and message.get("tool_calls"):
|
|
projected = _assistant_tool_calls_to_emulation_text(message.get("tool_calls"))
|
|
if projected:
|
|
text = "\n\n".join(part for part in [text, projected] if part)
|
|
if role == "tool":
|
|
text = action_output_prompt(message.get("tool_call_id"), text)
|
|
role = "user"
|
|
if not text:
|
|
continue
|
|
if role not in {"user", "assistant"}:
|
|
continue
|
|
filtered.append((role, text))
|
|
|
|
if not filtered:
|
|
return system_text.strip()
|
|
|
|
em_tools = _em_extract_openai_tools(tools)
|
|
em_choice = _em_extract_openai_tool_choice(tool_choice)
|
|
injected_system = inject_tooling(system_text, em_tools, em_choice)
|
|
|
|
parts: list[str] = []
|
|
for role, text in filtered:
|
|
label = "User" if role == "user" else "Assistant"
|
|
parts.append(f"{label}: {text}")
|
|
if injected_system:
|
|
parts.append(injected_system)
|
|
parts.append("Assistant:")
|
|
return "\n\n".join(parts).strip()
|
|
|
|
|
|
def _effective_tool_config_for_emulation(
|
|
tool_config: dict[str, Any] | None,
|
|
*,
|
|
use_emulation: bool,
|
|
) -> dict[str, Any] | None:
|
|
if use_emulation:
|
|
return None
|
|
return tool_config
|
|
|
|
|
|
def _emulation_tools(raw_tools: list[dict[str, Any]] | None, tool_config: dict[str, Any] | None) -> list[dict[str, Any]] | None:
|
|
if isinstance(tool_config, dict) and isinstance(tool_config.get("tools"), list):
|
|
return tool_config.get("tools")
|
|
return raw_tools
|
|
|
|
|
|
def _anthropic_messages_to_emulation_prompt(
|
|
messages: list[dict[str, Any]],
|
|
*,
|
|
system_text: str,
|
|
tools: list[dict[str, Any]] | None,
|
|
tool_choice: Any,
|
|
) -> str:
|
|
filtered: list[tuple[str, str]] = []
|
|
for message in messages:
|
|
role = str(message.get("role") or "").strip().lower()
|
|
text = str(message.get("content") or "").strip()
|
|
if role == "assistant" and "[tool_use]" in text:
|
|
text = _anthropic_flattened_tool_history_to_emulation_text(text)
|
|
elif role == "user" and "[tool_result]" in text:
|
|
text = _anthropic_flattened_tool_history_to_emulation_text(text)
|
|
if role == "tool":
|
|
text = action_output_prompt(message.get("tool_call_id"), text)
|
|
role = "user"
|
|
if not text:
|
|
continue
|
|
if role not in {"user", "assistant"}:
|
|
continue
|
|
filtered.append((role, text))
|
|
|
|
if not filtered:
|
|
return system_text.strip()
|
|
|
|
em_tools = _em_extract_anthropic_tools(tools)
|
|
em_choice = _em_extract_anthropic_tool_choice(tool_choice)
|
|
injected_system = inject_tooling(system_text, em_tools, em_choice)
|
|
|
|
parts: list[str] = []
|
|
for role, text in filtered:
|
|
label = "User" if role == "user" else "Assistant"
|
|
parts.append(f"{label}: {text}")
|
|
if injected_system:
|
|
parts.append(injected_system)
|
|
parts.append("Assistant:")
|
|
return "\n\n".join(parts).strip()
|
|
|
|
|
|
def _include_usage(stream_options: dict | None) -> bool:
|
|
if not isinstance(stream_options, dict):
|
|
return False
|
|
return bool(stream_options.get("include_usage"))
|
|
|
|
|
|
def _resolve_ask_mode(model: str, has_tooling_context: bool) -> str:
|
|
return _shared_resolve_ask_mode(
|
|
model,
|
|
has_tooling_context,
|
|
default_ask_mode=settings.default_ask_mode,
|
|
)
|
|
|
|
|
|
async def _apply_cached_instance_or_invalidate(
|
|
*,
|
|
protocol: str,
|
|
inst: PoolInstance,
|
|
cached_instance_name: str | None,
|
|
cached_session_id: str | None,
|
|
lookup_key: str | None,
|
|
) -> str | None:
|
|
return await _shared_apply_cached_instance_or_invalidate(
|
|
protocol=protocol,
|
|
logger=logger,
|
|
session_cache=session_cache,
|
|
inst=inst,
|
|
cached_instance_name=cached_instance_name,
|
|
cached_session_id=cached_session_id,
|
|
lookup_key=lookup_key,
|
|
)
|
|
|
|
|
|
def _streaming_response(event_stream) -> StreamingResponse:
|
|
return StreamingResponse(
|
|
event_stream,
|
|
media_type="text/event-stream",
|
|
headers=STREAMING_RESPONSE_HEADERS,
|
|
)
|
|
|
|
|
|
def _stream_event_type(event: Any) -> str:
|
|
if isinstance(event, dict):
|
|
t = event.get("type")
|
|
if t in {"text", "tool"}:
|
|
return t
|
|
return "text"
|
|
|
|
|
|
def _stream_text(event: Any) -> str:
|
|
if isinstance(event, dict):
|
|
if event.get("type") == "text":
|
|
text = event.get("text")
|
|
if isinstance(text, str):
|
|
return text
|
|
return ""
|
|
if isinstance(event, str):
|
|
return event
|
|
return ""
|
|
|
|
|
|
@app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)])
|
|
async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
|
p = _require_pool()
|
|
|
|
messages_dump = [m.model_dump() for m in req.messages]
|
|
_record_debug_request("openai", "/v1/chat/completions", req.model_dump(mode="json"), request)
|
|
api_key = _extract_api_key(request) or "-"
|
|
|
|
# ------------------------------------------------------------- session reuse
|
|
# Look up the "conversation prefix" (everything except the latest user turn)
|
|
# in the session cache. A hit lets us:
|
|
# 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache.
|
|
# 2. Send only the new user message instead of the whole history.
|
|
# 3. Stick the request to the pool instance that originally served it.
|
|
tool_config = _openai_tool_config(req, settings=settings)
|
|
has_tooling_context = _openai_has_tooling_context(req, messages_dump)
|
|
logger.info(
|
|
"chat.request stream=%s tooling=%s tool_config=%s",
|
|
req.stream,
|
|
has_tooling_context,
|
|
_tool_config_summary(tool_config),
|
|
)
|
|
execution = await prepare_execution_context(
|
|
protocol="chat",
|
|
requested_model=req.model,
|
|
has_tooling_context=has_tooling_context,
|
|
tool_config=tool_config,
|
|
messages_dump=messages_dump,
|
|
api_key=api_key,
|
|
affinity_key=_affinity_key_for(req),
|
|
pool=p,
|
|
session_cache=session_cache,
|
|
logger=logger,
|
|
default_model=settings.default_model,
|
|
default_ask_mode=settings.default_ask_mode,
|
|
ensure_instance_logged_in=_ensure_instance_logged_in,
|
|
last_user_text=_last_user_text,
|
|
messages_to_prompt=_messages_to_prompt,
|
|
)
|
|
ask_mode = execution.ask_mode
|
|
write_key = execution.write_key
|
|
cached_session_id = execution.cached_session_id
|
|
inst = execution.inst
|
|
model = execution.model
|
|
prompt = execution.prompt
|
|
is_reply = execution.is_reply
|
|
|
|
include_usage = _include_usage(req.stream_options)
|
|
emulation_tools = _emulation_tools(req.tools, tool_config)
|
|
em_tools = _em_extract_openai_tools(emulation_tools)
|
|
em_choice = _em_extract_openai_tool_choice(req.tool_choice)
|
|
use_emulation = has_tooling_context
|
|
if use_emulation:
|
|
system_parts = [
|
|
flatten_content(m.content)
|
|
for m in req.messages
|
|
if m.role in {"system", "developer"} and flatten_content(m.content)
|
|
]
|
|
prompt = _messages_to_emulation_prompt(
|
|
messages_dump,
|
|
system_text="\n\n".join(system_parts),
|
|
tools=emulation_tools,
|
|
tool_choice=req.tool_choice,
|
|
)
|
|
execution.prompt = prompt
|
|
effective_tool_config = _effective_tool_config_for_emulation(
|
|
tool_config,
|
|
use_emulation=use_emulation,
|
|
)
|
|
|
|
try:
|
|
started = await start_execution(
|
|
protocol="chat",
|
|
execution=execution,
|
|
stream=req.stream,
|
|
chat_guard=chat_guard,
|
|
logger=logger,
|
|
estimate_tokens=estimate_tokens,
|
|
)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": {
|
|
"message": "messages is empty",
|
|
"type": "invalid_request_error",
|
|
}
|
|
},
|
|
)
|
|
except BackpressureRejected as exc:
|
|
retry_after = max(1, int(exc.retry_after))
|
|
logger.warning("chat rejected by backpressure, retry_after=%ds", retry_after)
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail={
|
|
"error": {
|
|
"message": "Too many in-flight requests, please retry later",
|
|
"type": "rate_limit_error",
|
|
"code": "backpressure",
|
|
}
|
|
},
|
|
headers={"Retry-After": str(retry_after)},
|
|
)
|
|
|
|
ticket = started.ticket
|
|
prompt_tokens = started.prompt_tokens
|
|
|
|
ticket_transferred = False
|
|
|
|
try:
|
|
if req.stream:
|
|
created = int(time.time())
|
|
completion_id = f"chatcmpl-{uuid.uuid4().hex}"
|
|
completion_tokens_holder = {"n": 0}
|
|
stream_meta: dict = {}
|
|
forced_tool_name = _openai_forced_tool_name(req.tool_choice)
|
|
|
|
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
|
|
success = False
|
|
tool_call_indexes: dict[str, int] = {}
|
|
saw_tool_call = False
|
|
buffered_text_parts: list[str] = []
|
|
|
|
def _text_payload(text: str) -> str:
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {"content": text},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
|
|
try:
|
|
async for chunk in _inst.client.chat_stream(
|
|
prompt,
|
|
model,
|
|
ask_mode,
|
|
session_id=cached_session_id,
|
|
is_reply=is_reply,
|
|
tool_config=effective_tool_config,
|
|
out_meta=_meta,
|
|
):
|
|
if _stream_event_type(chunk) == "tool":
|
|
tool = _allowed_stream_tool_event(
|
|
chunk,
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if not tool:
|
|
continue
|
|
|
|
if buffered_text_parts:
|
|
for buffered_text in buffered_text_parts:
|
|
yield _text_payload(buffered_text)
|
|
buffered_text_parts.clear()
|
|
|
|
tool_id = str(tool.get("id") or "")
|
|
if not tool_id:
|
|
tool_id = f"call_{len(tool_call_indexes)}"
|
|
idx = tool_call_indexes.get(tool_id)
|
|
if idx is None:
|
|
idx = len(tool_call_indexes)
|
|
tool_call_indexes[tool_id] = idx
|
|
saw_tool_call = True
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": idx,
|
|
**_openai_tool_call(
|
|
tool, forced_id=tool_id
|
|
),
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
continue
|
|
|
|
text = _stream_text(chunk)
|
|
if not text:
|
|
continue
|
|
buffered_text_parts.append(text)
|
|
completion_tokens_holder["n"] += estimate_tokens(text)
|
|
if use_emulation:
|
|
continue
|
|
|
|
full_text = "".join(buffered_text_parts)
|
|
if req.tools:
|
|
if "[tool_calls]".startswith(full_text) or "[tool_calls]" in full_text:
|
|
continue
|
|
|
|
if forced_tool_name and not saw_tool_call:
|
|
continue
|
|
|
|
# Yield all buffered text
|
|
text_to_yield = "".join(buffered_text_parts)
|
|
buffered_text_parts.clear()
|
|
yield _text_payload(text_to_yield)
|
|
|
|
if buffered_text_parts and not saw_tool_call:
|
|
merged_text = "".join(buffered_text_parts)
|
|
|
|
extracted_tool_calls = _extract_tool_calls_from_text(merged_text)
|
|
if extracted_tool_calls:
|
|
saw_tool_call = True
|
|
for i, tc in enumerate(extracted_tool_calls):
|
|
tool_id = str(tc.get("id") or f"call_inferred_{i}")
|
|
tool_call_indexes[tool_id] = i
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": i,
|
|
**_openai_tool_call(tc, forced_id=tool_id),
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
buffered_text_parts.clear()
|
|
|
|
if buffered_text_parts and forced_tool_name and not saw_tool_call:
|
|
merged_text = "".join(buffered_text_parts)
|
|
inferred = _extract_function_call_event_from_text(
|
|
merged_text,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if inferred is None:
|
|
inferred = _extract_hash_tool_call_event_from_text(
|
|
merged_text,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if inferred is None:
|
|
inferred = _forced_tool_fallback_event(
|
|
merged_text,
|
|
forced_tool_name=forced_tool_name,
|
|
tools=req.tools,
|
|
)
|
|
if inferred is not None:
|
|
tool_id = "call_inferred_0"
|
|
tool_call_indexes[tool_id] = 0
|
|
saw_tool_call = True
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": 0,
|
|
**_openai_tool_call(
|
|
inferred, forced_id=tool_id
|
|
),
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
buffered_text_parts.clear()
|
|
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
|
|
if buffered_text_parts and req.tools and not saw_tool_call:
|
|
merged_text = "".join(buffered_text_parts)
|
|
inferred = _infer_tool_event_from_declared_tools(
|
|
merged_text,
|
|
tools=req.tools,
|
|
)
|
|
if inferred is not None:
|
|
tool_id = "call_inferred_0"
|
|
tool_call_indexes[tool_id] = 0
|
|
saw_tool_call = True
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": 0,
|
|
**_openai_tool_call(
|
|
inferred, forced_id=tool_id
|
|
),
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
buffered_text_parts.clear()
|
|
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
|
|
if buffered_text_parts and req.tools and not saw_tool_call:
|
|
merged_text = "".join(buffered_text_parts)
|
|
parsed_calls, remaining = parse_action_blocks(merged_text, em_tools)
|
|
if parsed_calls:
|
|
saw_tool_call = True
|
|
for i, call in enumerate(parsed_calls):
|
|
tool_id = call.id or f"call_inferred_{i}"
|
|
tool_call_indexes[tool_id] = i
|
|
payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {
|
|
"tool_calls": [
|
|
{
|
|
"index": i,
|
|
**openai_tool_call_from_emulated(call),
|
|
}
|
|
]
|
|
},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
|
|
buffered_text_parts = [remaining] if remaining else []
|
|
|
|
if buffered_text_parts and saw_tool_call:
|
|
text_to_yield = "".join(buffered_text_parts)
|
|
buffered_text_parts.clear()
|
|
yield _text_payload(text_to_yield)
|
|
|
|
done_payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"delta": {},
|
|
"finish_reason": "tool_calls"
|
|
if saw_tool_call
|
|
else "stop",
|
|
}
|
|
],
|
|
}
|
|
yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n"
|
|
|
|
if include_usage:
|
|
usage_payload = {
|
|
"id": completion_id,
|
|
"object": "chat.completion.chunk",
|
|
"created": created,
|
|
"model": model,
|
|
"choices": [],
|
|
"usage": {
|
|
"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens_holder["n"],
|
|
"total_tokens": prompt_tokens
|
|
+ completion_tokens_holder["n"],
|
|
},
|
|
}
|
|
yield f"data: {json.dumps(usage_payload, ensure_ascii=False)}\n\n"
|
|
|
|
yield "data: [DONE]\n\n"
|
|
success = True
|
|
except asyncio.CancelledError:
|
|
logger.info(
|
|
"chat.stream cancelled by client (inst=%s, session_id=%s)",
|
|
_inst.name,
|
|
cached_session_id,
|
|
)
|
|
raise
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"chat.stream error (inst=%s, session_id=%s, prompt_tokens=%s, completion_tokens=%s): %s",
|
|
_inst.name,
|
|
cached_session_id,
|
|
prompt_tokens,
|
|
completion_tokens_holder["n"],
|
|
exc,
|
|
)
|
|
finally:
|
|
await finalize_stream_execution(
|
|
success=success,
|
|
write_key=write_key,
|
|
session_id=_meta.get("session_id"),
|
|
inst=_inst,
|
|
ticket=_ticket,
|
|
session_cache=session_cache,
|
|
stats_collector=stats_collector,
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens_holder["n"],
|
|
)
|
|
|
|
ticket_transferred = True
|
|
return _streaming_response(event_stream())
|
|
|
|
try:
|
|
completed = await complete_execution(
|
|
protocol="chat",
|
|
execution=execution,
|
|
prompt_tokens=prompt_tokens,
|
|
tool_config=effective_tool_config,
|
|
logger=logger,
|
|
stats_collector=stats_collector,
|
|
session_cache=session_cache,
|
|
estimate_tokens=estimate_tokens,
|
|
)
|
|
except UpstreamExecutionError:
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail={
|
|
"error": {
|
|
"message": "upstream lingma error",
|
|
"type": "upstream_error",
|
|
}
|
|
},
|
|
)
|
|
|
|
result = completed.result
|
|
completion_tokens = completed.completion_tokens
|
|
forced_tool_name = _openai_forced_tool_name(req.tool_choice)
|
|
tool_events = _allowed_tool_events(
|
|
result.get("toolEvents"),
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
message_content = result.get("text") or ""
|
|
tool_calls: list[dict[str, Any]] = []
|
|
saw_tool_call = False
|
|
for idx, item in enumerate(tool_events):
|
|
tool_id = str(item.get("id") or f"call_{idx}")
|
|
tool_calls.append(_openai_tool_call(item, forced_id=tool_id))
|
|
saw_tool_call = True
|
|
|
|
if not saw_tool_call:
|
|
extracted_tool_calls = _extract_tool_calls_from_text(message_content)
|
|
if extracted_tool_calls:
|
|
for idx, tc in enumerate(extracted_tool_calls):
|
|
tool_id = str(tc.get("id") or f"call_inferred_{idx}")
|
|
tool_calls.append(_openai_tool_call(tc, forced_id=tool_id))
|
|
saw_tool_call = True
|
|
message_content = ""
|
|
|
|
if not saw_tool_call and forced_tool_name:
|
|
inferred = _extract_function_call_event_from_text(
|
|
message_content,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if inferred is None:
|
|
inferred = _extract_hash_tool_call_event_from_text(
|
|
message_content,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if inferred is None:
|
|
inferred = _forced_tool_fallback_event(
|
|
message_content,
|
|
forced_tool_name=forced_tool_name,
|
|
tools=req.tools,
|
|
)
|
|
if inferred is not None:
|
|
tool_calls.append(
|
|
_openai_tool_call(inferred, forced_id="call_inferred_0")
|
|
)
|
|
saw_tool_call = True
|
|
message_content = ""
|
|
if not saw_tool_call and req.tools:
|
|
inferred = _infer_tool_event_from_declared_tools(
|
|
message_content,
|
|
tools=req.tools,
|
|
)
|
|
if inferred is not None:
|
|
tool_calls.append(
|
|
_openai_tool_call(inferred, forced_id="call_inferred_0")
|
|
)
|
|
saw_tool_call = True
|
|
message_content = ""
|
|
if not saw_tool_call and em_tools:
|
|
parsed_calls, remaining = parse_action_blocks(message_content, em_tools)
|
|
if parsed_calls:
|
|
for call in parsed_calls:
|
|
tool_calls.append(openai_tool_call_from_emulated(call))
|
|
saw_tool_call = True
|
|
message_content = remaining
|
|
if not saw_tool_call and em_tools:
|
|
inferred_call = infer_declared_tool_call_from_text(message_content, em_tools)
|
|
if inferred_call is None:
|
|
inferred_calls = infer_tool_calls_from_text(message_content, em_tools)
|
|
inferred_call = inferred_calls[0] if inferred_calls else None
|
|
if inferred_call is not None:
|
|
tool_calls.append(openai_tool_call_from_emulated(inferred_call))
|
|
saw_tool_call = True
|
|
message_content = ""
|
|
if not saw_tool_call and em_tools:
|
|
retry_prompt = f"{prompt}\n\n{force_tooling_prompt(em_choice)}"
|
|
retry_result = await inst.client.chat_complete(
|
|
retry_prompt,
|
|
model,
|
|
ask_mode,
|
|
session_id=None,
|
|
is_reply=False,
|
|
tool_config=effective_tool_config,
|
|
)
|
|
retry_text = retry_result.get("text") or ""
|
|
parsed_calls, remaining = parse_action_blocks(retry_text, em_tools)
|
|
if parsed_calls:
|
|
for call in parsed_calls:
|
|
tool_calls.append(openai_tool_call_from_emulated(call))
|
|
saw_tool_call = True
|
|
message_content = remaining
|
|
else:
|
|
inferred_call = infer_declared_tool_call_from_text(retry_text, em_tools)
|
|
if inferred_call is None:
|
|
inferred_calls = infer_tool_calls_from_text(retry_text, em_tools)
|
|
inferred_call = inferred_calls[0] if inferred_calls else None
|
|
if inferred_call is not None:
|
|
tool_calls.append(openai_tool_call_from_emulated(inferred_call))
|
|
saw_tool_call = True
|
|
message_content = ""
|
|
response = ChatCompletionResponse(
|
|
id=f"chatcmpl-{uuid.uuid4().hex}",
|
|
created=int(time.time()),
|
|
model=model,
|
|
choices=[
|
|
ChatCompletionChoice(
|
|
index=0,
|
|
finish_reason="tool_calls" if saw_tool_call else "stop",
|
|
message={
|
|
"role": "assistant",
|
|
"content": message_content,
|
|
"tool_calls": tool_calls or None,
|
|
},
|
|
)
|
|
],
|
|
)
|
|
|
|
data = response.model_dump()
|
|
data["latency"] = {
|
|
"first_token_ms": result.get("firstTokenLatencyMs"),
|
|
"total_ms": result.get("totalLatencyMs"),
|
|
}
|
|
data["usage"] = {
|
|
"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
"total_tokens": prompt_tokens + completion_tokens,
|
|
}
|
|
data["served_by"] = inst.name
|
|
return JSONResponse(content=data)
|
|
finally:
|
|
if not ticket_transferred:
|
|
release_execution(ticket=ticket, inst=inst)
|
|
|
|
|
|
@app.post("/responses", dependencies=[Depends(auth_guard)])
|
|
@app.post("/v1/responses", dependencies=[Depends(auth_guard)])
|
|
async def v1_responses(req: ResponsesRequest, request: Request):
|
|
return await handle_responses(
|
|
req,
|
|
request,
|
|
chat_completions_handler=v1_chat_completions,
|
|
streaming_response_headers=STREAMING_RESPONSE_HEADERS,
|
|
)
|
|
|
|
|
|
def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse:
|
|
"""Build an Anthropic-shaped error response (`type:error` envelope)."""
|
|
return JSONResponse(
|
|
status_code=status_code,
|
|
content={"type": "error", "error": {"type": error_type, "message": message}},
|
|
)
|
|
|
|
|
|
def _anthropic_stop_reason(
|
|
completion_tokens: int,
|
|
max_tokens: int,
|
|
*,
|
|
has_pending_tool_use: bool = False,
|
|
) -> str:
|
|
"""Approximate Anthropic `stop_reason`."""
|
|
if has_pending_tool_use:
|
|
return "tool_use"
|
|
if max_tokens and completion_tokens >= max_tokens:
|
|
return "max_tokens"
|
|
return "end_turn"
|
|
|
|
|
|
@app.post("/v1/messages/count_tokens")
|
|
async def v1_messages_count_tokens(req: AnthropicMessagesRequest, request: Request):
|
|
"""Anthropic-compatible token counting endpoint.
|
|
|
|
Claude Code may probe this endpoint; return Anthropic-shaped response.
|
|
"""
|
|
try:
|
|
require_anthropic_key(request, settings.api_keys)
|
|
except AnthropicAuthError as exc:
|
|
return _anthropic_error(exc.status_code, exc.error_type, exc.message)
|
|
|
|
messages_dump = anthropic_to_internal_messages(req)
|
|
prompt = _messages_to_prompt(messages_dump)
|
|
return JSONResponse(content={"input_tokens": estimate_tokens(prompt)})
|
|
|
|
|
|
@app.post("/v1/messages")
|
|
async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
|
"""Anthropic Messages API compatible endpoint.
|
|
|
|
Wire contract:
|
|
* auth: `x-api-key` header (fallback Authorization: Bearer)
|
|
* body: Anthropic Messages spec (system top-level, content blocks, ...)
|
|
* stream: named-event SSE (message_start / content_block_delta / ...)
|
|
|
|
Internally we:
|
|
1. Normalise to the gateway's internal message list (`role/content` dicts)
|
|
2. Reuse the same pool pick + session cache + backpressure guard as
|
|
`/v1/chat/completions`. Session-cache keys include the API key, so
|
|
Anthropic and OpenAI callers on the same key share KV-cache warmth.
|
|
3. Re-wrap outputs in Anthropic's response / SSE format.
|
|
"""
|
|
# ------------------------------------------------------------- auth
|
|
try:
|
|
require_anthropic_key(request, settings.api_keys)
|
|
except AnthropicAuthError as exc:
|
|
return _anthropic_error(exc.status_code, exc.error_type, exc.message)
|
|
|
|
# ------------------------------------------------------------- plumbing
|
|
try:
|
|
p = _require_pool()
|
|
except HTTPException as exc:
|
|
return _anthropic_error(
|
|
exc.status_code, "overloaded_error", "gateway not ready"
|
|
)
|
|
|
|
messages_dump = anthropic_to_internal_messages(req)
|
|
_record_debug_request("anthropic", "/v1/messages", req.model_dump(mode="json"), request)
|
|
# Prefer the auth token actually accepted so session-cache bucketing is
|
|
# consistent regardless of which auth header style the caller used.
|
|
api_key = (
|
|
request.headers.get("x-api-key", "").strip() or _extract_api_key(request) or "-"
|
|
)
|
|
|
|
# ------------------------------------------------------------- session reuse
|
|
try:
|
|
tool_config = _anthropic_tool_config(req, settings=settings)
|
|
except HTTPException as exc:
|
|
detail = exc.detail if isinstance(exc.detail, dict) else {}
|
|
error = detail.get("error") if isinstance(detail.get("error"), dict) else {}
|
|
message = error.get("message") or str(detail) or "invalid tool configuration"
|
|
return _anthropic_error(exc.status_code, "invalid_request_error", message)
|
|
has_tooling_context = _anthropic_has_tooling_context(req)
|
|
logger.info(
|
|
"anthropic.request stream=%s tooling=%s tool_config=%s",
|
|
req.stream,
|
|
has_tooling_context,
|
|
_tool_config_summary(tool_config),
|
|
)
|
|
try:
|
|
execution = await prepare_execution_context(
|
|
protocol="anthropic",
|
|
requested_model=req.model,
|
|
has_tooling_context=has_tooling_context,
|
|
tool_config=tool_config,
|
|
messages_dump=messages_dump,
|
|
api_key=api_key,
|
|
affinity_key=affinity_key_for_anthropic(req),
|
|
pool=p,
|
|
session_cache=session_cache,
|
|
logger=logger,
|
|
default_model=settings.default_model,
|
|
default_ask_mode=settings.default_ask_mode,
|
|
ensure_instance_logged_in=_ensure_instance_logged_in,
|
|
last_user_text=_last_user_text,
|
|
messages_to_prompt=_messages_to_prompt,
|
|
)
|
|
except HTTPException as exc:
|
|
err_type = (
|
|
"authentication_error" if exc.status_code == 401 else "overloaded_error"
|
|
)
|
|
detail = exc.detail if isinstance(exc.detail, dict) else {}
|
|
msg = (
|
|
(detail.get("error") or {}).get("message")
|
|
or str(detail)
|
|
or "upstream error"
|
|
)
|
|
return _anthropic_error(exc.status_code, err_type, msg)
|
|
ask_mode = execution.ask_mode
|
|
write_key = execution.write_key
|
|
cached_session_id = execution.cached_session_id
|
|
inst = execution.inst
|
|
model = execution.model
|
|
prompt = execution.prompt
|
|
is_reply = execution.is_reply
|
|
emulation_tools = _emulation_tools(req.tools, tool_config)
|
|
em_anthropic_tools = _em_extract_anthropic_tools(emulation_tools)
|
|
em_anthropic_choice = _em_extract_anthropic_tool_choice(req.tool_choice)
|
|
use_emulation = has_tooling_context
|
|
if use_emulation:
|
|
system_text = flatten_anthropic_content(req.system) if req.system else ""
|
|
prompt = _anthropic_messages_to_emulation_prompt(
|
|
messages_dump,
|
|
system_text=system_text,
|
|
tools=emulation_tools,
|
|
tool_choice=req.tool_choice,
|
|
)
|
|
execution.prompt = prompt
|
|
effective_tool_config = _effective_tool_config_for_emulation(
|
|
tool_config,
|
|
use_emulation=use_emulation,
|
|
)
|
|
|
|
try:
|
|
started = await start_execution(
|
|
protocol="anthropic",
|
|
execution=execution,
|
|
stream=req.stream,
|
|
chat_guard=chat_guard,
|
|
logger=logger,
|
|
estimate_tokens=estimate_tokens,
|
|
extra_log_context={"ctx_api": "anthropic"},
|
|
)
|
|
except ValueError:
|
|
return _anthropic_error(400, "invalid_request_error", "messages is empty")
|
|
except BackpressureRejected as exc:
|
|
retry_after = max(1, int(exc.retry_after))
|
|
logger.warning(
|
|
"anthropic rejected by backpressure, retry_after=%ds", retry_after
|
|
)
|
|
resp = _anthropic_error(
|
|
429,
|
|
"overloaded_error",
|
|
"too many in-flight requests, please retry later",
|
|
)
|
|
resp.headers["Retry-After"] = str(retry_after)
|
|
return resp
|
|
|
|
ticket = started.ticket
|
|
prompt_tokens = started.prompt_tokens
|
|
message_id = f"msg_{uuid.uuid4().hex}"
|
|
|
|
ticket_transferred = False
|
|
|
|
def _sse(event: str, data: dict) -> str:
|
|
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
|
|
|
try:
|
|
if req.stream:
|
|
completion_tokens_holder = {"n": 0}
|
|
stream_meta: dict = {}
|
|
max_tokens = req.max_tokens
|
|
forced_tool_name = _anthropic_forced_tool_name(req.tool_choice)
|
|
aggregate_emulated_tools = bool(em_anthropic_tools)
|
|
|
|
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
|
|
success = False
|
|
block_index = 0
|
|
text_block_open = False
|
|
saw_pending_tool_use = False
|
|
buffered_text_parts: list[str] = []
|
|
try:
|
|
# 1) message_start — Anthropic SDKs read this first to get
|
|
# the message envelope (id/model/initial usage).
|
|
start_payload = {
|
|
"type": "message_start",
|
|
"message": {
|
|
"id": message_id,
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"model": model,
|
|
"content": [],
|
|
"stop_reason": None,
|
|
"stop_sequence": None,
|
|
# input_tokens is authoritative here; output_tokens
|
|
# is seeded to 0 and updated in message_delta.
|
|
"usage": {
|
|
"input_tokens": prompt_tokens,
|
|
"output_tokens": 0,
|
|
},
|
|
},
|
|
}
|
|
yield _sse("message_start", start_payload)
|
|
|
|
async for chunk in _inst.client.chat_stream(
|
|
prompt,
|
|
model,
|
|
ask_mode,
|
|
session_id=cached_session_id,
|
|
is_reply=is_reply,
|
|
tool_config=effective_tool_config,
|
|
out_meta=_meta,
|
|
):
|
|
if _stream_event_type(chunk) == "tool":
|
|
if text_block_open:
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{
|
|
"type": "content_block_stop",
|
|
"index": block_index,
|
|
},
|
|
)
|
|
block_index += 1
|
|
text_block_open = False
|
|
|
|
tool = _allowed_stream_tool_event(
|
|
chunk,
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if not tool:
|
|
continue
|
|
|
|
tool_id = str(
|
|
tool.get("id") or f"toolu_stream_{block_index}"
|
|
)
|
|
|
|
tool_use_block = _anthropic_tool_use_block(
|
|
tool, forced_id=tool_id
|
|
)
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": tool_use_block,
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
block_index += 1
|
|
|
|
tool_result_block = _anthropic_tool_result_block(
|
|
tool, forced_id=tool_id
|
|
)
|
|
if tool_result_block is not None:
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": tool_result_block,
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{
|
|
"type": "content_block_stop",
|
|
"index": block_index,
|
|
},
|
|
)
|
|
block_index += 1
|
|
else:
|
|
saw_pending_tool_use = True
|
|
continue
|
|
|
|
text = _stream_text(chunk)
|
|
if not text:
|
|
continue
|
|
if aggregate_emulated_tools:
|
|
buffered_text_parts.append(text)
|
|
completion_tokens_holder["n"] += estimate_tokens(text)
|
|
continue
|
|
|
|
buffered_text_parts.append(text)
|
|
merged_text = "".join(buffered_text_parts)
|
|
|
|
parsed_calls, remaining = parse_action_blocks(
|
|
merged_text, em_anthropic_tools
|
|
)
|
|
if not parsed_calls:
|
|
inferred = infer_declared_tool_call_from_text(
|
|
merged_text,
|
|
em_anthropic_tools,
|
|
)
|
|
if inferred is None:
|
|
inferred_calls = infer_tool_calls_from_text(
|
|
merged_text,
|
|
em_anthropic_tools,
|
|
)
|
|
inferred = inferred_calls[0] if inferred_calls else None
|
|
if inferred is not None:
|
|
parsed_calls = [inferred]
|
|
remaining = ""
|
|
|
|
if parsed_calls:
|
|
if text_block_open:
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
block_index += 1
|
|
text_block_open = False
|
|
saw_pending_tool_use = True
|
|
for call in parsed_calls:
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": {
|
|
"type": "tool_use",
|
|
"id": call.id,
|
|
"name": call.name,
|
|
"input": {},
|
|
},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_delta",
|
|
{
|
|
"type": "content_block_delta",
|
|
"index": block_index,
|
|
"delta": {
|
|
"type": "input_json_delta",
|
|
"partial_json": json.dumps(call.arguments, ensure_ascii=False),
|
|
},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
block_index += 1
|
|
buffered_text_parts = [remaining] if remaining else []
|
|
if not buffered_text_parts:
|
|
continue
|
|
|
|
text_to_emit = "".join(buffered_text_parts)
|
|
buffered_text_parts.clear()
|
|
completion_tokens_holder["n"] += estimate_tokens(text_to_emit)
|
|
if not text_block_open:
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": {"type": "text", "text": ""},
|
|
},
|
|
)
|
|
text_block_open = True
|
|
|
|
yield _sse(
|
|
"content_block_delta",
|
|
{
|
|
"type": "content_block_delta",
|
|
"index": block_index,
|
|
"delta": {"type": "text_delta", "text": text_to_emit},
|
|
},
|
|
)
|
|
|
|
if aggregate_emulated_tools:
|
|
merged_text = "".join(buffered_text_parts)
|
|
parsed_calls, remaining = parse_action_blocks(
|
|
merged_text, em_anthropic_tools
|
|
)
|
|
if not parsed_calls:
|
|
inferred = infer_declared_tool_call_from_text(
|
|
merged_text,
|
|
em_anthropic_tools,
|
|
)
|
|
if inferred is None:
|
|
inferred_calls = infer_tool_calls_from_text(
|
|
merged_text,
|
|
em_anthropic_tools,
|
|
)
|
|
inferred = inferred_calls[0] if inferred_calls else None
|
|
if inferred is not None:
|
|
parsed_calls = [inferred]
|
|
remaining = ""
|
|
|
|
if parsed_calls:
|
|
if remaining.strip():
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": {"type": "text", "text": ""},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_delta",
|
|
{
|
|
"type": "content_block_delta",
|
|
"index": block_index,
|
|
"delta": {"type": "text_delta", "text": remaining},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
block_index += 1
|
|
for call in parsed_calls:
|
|
saw_pending_tool_use = True
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": {
|
|
"type": "tool_use",
|
|
"id": call.id,
|
|
"name": call.name,
|
|
"input": {},
|
|
},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_delta",
|
|
{
|
|
"type": "content_block_delta",
|
|
"index": block_index,
|
|
"delta": {
|
|
"type": "input_json_delta",
|
|
"partial_json": json.dumps(call.arguments, ensure_ascii=False),
|
|
},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
block_index += 1
|
|
elif merged_text.strip():
|
|
yield _sse(
|
|
"content_block_start",
|
|
{
|
|
"type": "content_block_start",
|
|
"index": block_index,
|
|
"content_block": {"type": "text", "text": ""},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_delta",
|
|
{
|
|
"type": "content_block_delta",
|
|
"index": block_index,
|
|
"delta": {"type": "text_delta", "text": merged_text},
|
|
},
|
|
)
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
|
|
if text_block_open:
|
|
yield _sse(
|
|
"content_block_stop",
|
|
{"type": "content_block_stop", "index": block_index},
|
|
)
|
|
|
|
# 5) message_delta carries the terminal stop_reason and
|
|
# the final cumulative output_tokens count.
|
|
stop_reason = _anthropic_stop_reason(
|
|
completion_tokens_holder["n"],
|
|
max_tokens,
|
|
has_pending_tool_use=saw_pending_tool_use,
|
|
)
|
|
yield _sse(
|
|
"message_delta",
|
|
{
|
|
"type": "message_delta",
|
|
"delta": {
|
|
"stop_reason": stop_reason,
|
|
"stop_sequence": None,
|
|
},
|
|
"usage": {"output_tokens": completion_tokens_holder["n"]},
|
|
},
|
|
)
|
|
|
|
# 6) message_stop — terminal event, no [DONE] sentinel.
|
|
yield _sse("message_stop", {"type": "message_stop"})
|
|
success = True
|
|
except asyncio.CancelledError:
|
|
logger.info("anthropic.stream cancelled (inst=%s)", _inst.name)
|
|
raise
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"anthropic.stream error (inst=%s): %s", _inst.name, exc
|
|
)
|
|
# Best-effort error frame. Anthropic clients treat any
|
|
# unexpected event gracefully; we prefer visibility over
|
|
# silent truncation.
|
|
try:
|
|
yield _sse(
|
|
"error",
|
|
{
|
|
"type": "error",
|
|
"error": {
|
|
"type": "api_error",
|
|
"message": str(exc) or "upstream error",
|
|
},
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
await finalize_stream_execution(
|
|
success=success,
|
|
write_key=write_key,
|
|
session_id=_meta.get("session_id"),
|
|
inst=_inst,
|
|
ticket=_ticket,
|
|
session_cache=session_cache,
|
|
stats_collector=stats_collector,
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens_holder["n"],
|
|
)
|
|
|
|
ticket_transferred = True
|
|
return _streaming_response(event_stream())
|
|
|
|
try:
|
|
completed = await complete_execution(
|
|
protocol="anthropic",
|
|
execution=execution,
|
|
prompt_tokens=prompt_tokens,
|
|
tool_config=effective_tool_config,
|
|
logger=logger,
|
|
stats_collector=stats_collector,
|
|
session_cache=session_cache,
|
|
estimate_tokens=estimate_tokens,
|
|
)
|
|
except UpstreamExecutionError:
|
|
return _anthropic_error(502, "api_error", "upstream lingma error")
|
|
|
|
result = completed.result
|
|
text = result.get("text") or ""
|
|
completion_tokens = completed.completion_tokens
|
|
|
|
content_blocks: list[dict[str, Any]] = []
|
|
if text:
|
|
content_blocks.append({"type": "text", "text": text})
|
|
forced_tool_name = _anthropic_forced_tool_name(req.tool_choice)
|
|
tool_events = _allowed_tool_events(
|
|
result.get("toolEvents"),
|
|
tool_config=tool_config,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
saw_pending_tool_use = False
|
|
saw_tool_event = False
|
|
for idx, item in enumerate(tool_events):
|
|
saw_tool_event = True
|
|
tool_id = str(item.get("id") or f"toolu_nonstream_{idx}")
|
|
content_blocks.append(_anthropic_tool_use_block(item, forced_id=tool_id))
|
|
tool_result = _anthropic_tool_result_block(item, forced_id=tool_id)
|
|
if tool_result is not None:
|
|
content_blocks.append(tool_result)
|
|
else:
|
|
saw_pending_tool_use = True
|
|
|
|
if not saw_tool_event and em_anthropic_tools:
|
|
parsed_calls, remaining = parse_action_blocks(text, em_anthropic_tools)
|
|
if parsed_calls:
|
|
content_blocks = []
|
|
if remaining:
|
|
content_blocks.append({"type": "text", "text": remaining})
|
|
for call in parsed_calls:
|
|
content_blocks.append(
|
|
{
|
|
"type": "tool_use",
|
|
"id": call.id,
|
|
"name": call.name,
|
|
"input": call.arguments,
|
|
}
|
|
)
|
|
saw_tool_event = True
|
|
saw_pending_tool_use = True
|
|
text = remaining
|
|
|
|
if not saw_tool_event and em_anthropic_tools:
|
|
inferred_calls = infer_tool_calls_from_text(text, em_anthropic_tools)
|
|
inferred_call = inferred_calls[0] if inferred_calls else None
|
|
if inferred_call is not None:
|
|
content_blocks = [
|
|
{
|
|
"type": "tool_use",
|
|
"id": inferred_call.id,
|
|
"name": inferred_call.name,
|
|
"input": inferred_call.arguments,
|
|
}
|
|
]
|
|
saw_tool_event = True
|
|
saw_pending_tool_use = True
|
|
text = ""
|
|
|
|
if not saw_tool_event and em_anthropic_tools and not text.strip():
|
|
retry_prompt = f"{prompt}\n\n{force_tooling_prompt(em_anthropic_choice)}"
|
|
retry_result = await inst.client.chat_complete(
|
|
retry_prompt,
|
|
model,
|
|
ask_mode,
|
|
session_id=None,
|
|
is_reply=False,
|
|
tool_config=effective_tool_config,
|
|
)
|
|
retry_text = retry_result.get("text") or ""
|
|
parsed_calls, remaining = parse_action_blocks(retry_text, em_anthropic_tools)
|
|
if parsed_calls:
|
|
content_blocks = []
|
|
if remaining:
|
|
content_blocks.append({"type": "text", "text": remaining})
|
|
for call in parsed_calls:
|
|
content_blocks.append(
|
|
{
|
|
"type": "tool_use",
|
|
"id": call.id,
|
|
"name": call.name,
|
|
"input": call.arguments,
|
|
}
|
|
)
|
|
saw_tool_event = True
|
|
saw_pending_tool_use = True
|
|
text = remaining
|
|
else:
|
|
inferred_call = infer_declared_tool_call_from_text(retry_text, em_anthropic_tools)
|
|
if inferred_call is None:
|
|
inferred_calls = infer_tool_calls_from_text(retry_text, em_anthropic_tools)
|
|
inferred_call = inferred_calls[0] if inferred_calls else None
|
|
if inferred_call is not None:
|
|
content_blocks = [
|
|
{
|
|
"type": "tool_use",
|
|
"id": inferred_call.id,
|
|
"name": inferred_call.name,
|
|
"input": inferred_call.arguments,
|
|
}
|
|
]
|
|
saw_tool_event = True
|
|
saw_pending_tool_use = True
|
|
text = ""
|
|
|
|
if not saw_tool_event and forced_tool_name:
|
|
inferred = _extract_function_call_event_from_text(
|
|
text,
|
|
forced_tool_name=forced_tool_name,
|
|
)
|
|
if inferred is not None:
|
|
content_blocks = [
|
|
_anthropic_tool_use_block(inferred, forced_id="toolu_inferred_0")
|
|
]
|
|
saw_tool_event = True
|
|
saw_pending_tool_use = True
|
|
|
|
response_body: dict = {
|
|
"id": message_id,
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"model": model,
|
|
"content": content_blocks,
|
|
"stop_reason": _anthropic_stop_reason(
|
|
completion_tokens,
|
|
req.max_tokens,
|
|
has_pending_tool_use=saw_pending_tool_use,
|
|
),
|
|
"stop_sequence": None,
|
|
"usage": {
|
|
"input_tokens": prompt_tokens,
|
|
"output_tokens": completion_tokens,
|
|
},
|
|
}
|
|
return JSONResponse(content=response_body)
|
|
finally:
|
|
if not ticket_transferred:
|
|
release_execution(ticket=ticket, inst=inst)
|
|
|
|
|
|
@app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_auto_login_start(instance: str | None = None):
|
|
p = _require_pool()
|
|
target = None
|
|
if instance:
|
|
for inst in p.instances:
|
|
if inst.name == instance:
|
|
target = inst
|
|
break
|
|
if target is None:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail={"error": {"message": f"instance not found: {instance}"}},
|
|
)
|
|
else:
|
|
target = p.pick()
|
|
|
|
client = target.client
|
|
auto_login = target.auto_login
|
|
|
|
status = await client.auth_status()
|
|
if status and status.get("id"):
|
|
return {
|
|
"ok": True,
|
|
"state": "already_logged_in",
|
|
"instance": target.name,
|
|
"auth": status,
|
|
}
|
|
|
|
if settings.dedicated_domain_url:
|
|
try:
|
|
current = await client.get_endpoint()
|
|
current_ep = (
|
|
(current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
|
)
|
|
if current_ep != settings.dedicated_domain_url:
|
|
await client.update_endpoint(settings.dedicated_domain_url)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"[%s] switch dedicated endpoint failed: %s", target.name, exc
|
|
)
|
|
|
|
try:
|
|
login_url, _login_raw = await client.generate_login_url()
|
|
except Exception as exc:
|
|
logger.warning("[%s] generate_login_url failed: %s", target.name, exc)
|
|
raise HTTPException(
|
|
status_code=502, detail={"error": {"message": "generate login url failed"}}
|
|
)
|
|
|
|
if not login_url:
|
|
raise HTTPException(
|
|
status_code=502, detail={"error": {"message": "generate login url failed"}}
|
|
)
|
|
|
|
started = await auto_login.ensure_started(login_url)
|
|
return {
|
|
"ok": True,
|
|
"state": "running" if started else "already_running",
|
|
"instance": target.name,
|
|
"auto_login": auto_login.status(),
|
|
}
|
|
|
|
|
|
@app.get("/internal/auto-login/status", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_auto_login_status():
|
|
p = _require_pool()
|
|
out = []
|
|
for inst in p.instances:
|
|
try:
|
|
auth = await inst.client.auth_status()
|
|
except Exception as exc:
|
|
auth = {"error": str(exc)}
|
|
out.append(
|
|
{
|
|
"instance": inst.name,
|
|
"auto_login": inst.auto_login.status(),
|
|
"auth": auth,
|
|
"state": inst.client.state,
|
|
}
|
|
)
|
|
return {"ok": True, "instances": out}
|
|
|
|
|
|
@app.post("/internal/session/export", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_session_export(instance: str | None = None):
|
|
"""Export a logged-in Lingma session as a base64 tar.gz bundle.
|
|
|
|
The returned `bundle_b64` can be dropped into `LINGMA_SESSION_BUNDLE`
|
|
(or the `session_bundle` field in `LINGMA_ACCOUNTS` JSON) on any other
|
|
deployment to skip Playwright login entirely.
|
|
|
|
Safety:
|
|
- Requires a valid API key.
|
|
- Only works on instances that are currently authenticated (prevents
|
|
exporting garbage from a half-initialised workDir).
|
|
- Response is not streamed to logs; callers must store it themselves.
|
|
"""
|
|
p = _require_pool()
|
|
target = None
|
|
if instance:
|
|
for inst in p.instances:
|
|
if inst.name == instance:
|
|
target = inst
|
|
break
|
|
if target is None:
|
|
raise HTTPException(
|
|
status_code=404, detail={"error": f"instance {instance} not found"}
|
|
)
|
|
else:
|
|
target = p.pick()
|
|
|
|
try:
|
|
status = await target.client.auth_status()
|
|
except Exception as exc:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail={"error": f"instance {target.name} not ready: {exc}"},
|
|
)
|
|
if not (status and status.get("id")):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail={"error": f"instance {target.name} is not logged in"},
|
|
)
|
|
|
|
try:
|
|
raw = pack_workdir(target.cfg.work_dir)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=500, detail={"error": str(exc)})
|
|
|
|
bundle_b64 = encode_bundle(raw)
|
|
logger.info(
|
|
"session bundle exported from %s (%d bytes raw, %d bytes b64)",
|
|
target.name,
|
|
len(raw),
|
|
len(bundle_b64),
|
|
)
|
|
return {
|
|
"instance": target.name,
|
|
"account": target.cfg.account.username or "",
|
|
"raw_bytes": len(raw),
|
|
"bundle_b64": bundle_b64,
|
|
}
|
|
|
|
|
|
@app.get("/internal/models/raw", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_models_raw(instance: str | None = None):
|
|
"""Return the raw `config/queryModels` response from Lingma.
|
|
|
|
This is the authoritative source for per-key displayName, description,
|
|
capability flags, etc. We only ever extract `key` + `displayName` for
|
|
OpenAI compatibility, but clients may want to inspect everything.
|
|
"""
|
|
p = _require_pool()
|
|
target = None
|
|
if instance:
|
|
for inst in p.instances:
|
|
if inst.name == instance:
|
|
target = inst
|
|
break
|
|
if target is None:
|
|
raise HTTPException(
|
|
status_code=404, detail={"error": f"instance {instance} not found"}
|
|
)
|
|
else:
|
|
target = p.pick()
|
|
await _ensure_instance_logged_in(target)
|
|
raw = await target.client.query_models()
|
|
name_map = build_model_name_map(raw if isinstance(raw, dict) else {})
|
|
return {
|
|
"instance": target.name,
|
|
"raw": raw,
|
|
"extracted_name_map": name_map,
|
|
"exposed_keys": flatten_model_keys(raw if isinstance(raw, dict) else {}),
|
|
}
|
|
|
|
|
|
@app.get("/internal/stats", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_stats():
|
|
p = _require_pool()
|
|
return {
|
|
"ok": True,
|
|
"stats": await stats_collector.snapshot(),
|
|
"concurrency": chat_guard.stats(),
|
|
"pool": p.stats(),
|
|
"session_cache": session_cache.stats(),
|
|
}
|
|
|
|
|
|
@app.get("/internal/effective-config", dependencies=[Depends(admin_auth_guard)])
|
|
async def internal_effective_config():
|
|
cfg = settings
|
|
return JSONResponse(content={
|
|
"ok": True,
|
|
"settings": {
|
|
"host": cfg.host,
|
|
"port": cfg.port,
|
|
"api_keys": _safe_setting_value("api_keys", cfg.api_keys),
|
|
"metrics_token": _safe_setting_value("metrics_token", cfg.metrics_token),
|
|
"admin_token": _safe_setting_value("admin_token", cfg.admin_token),
|
|
"metrics_public": cfg.metrics_public,
|
|
"log_level": cfg.log_level,
|
|
"gateway_max_in_flight": cfg.gateway_max_in_flight,
|
|
"gateway_queue_timeout_sec": cfg.gateway_queue_timeout_sec,
|
|
"lingma_bin": cfg.lingma_bin,
|
|
"lingma_work_dir": cfg.lingma_work_dir,
|
|
"lingma_socket_port": cfg.lingma_socket_port,
|
|
"lingma_startup_timeout": cfg.lingma_startup_timeout,
|
|
"lingma_rpc_timeout": cfg.lingma_rpc_timeout,
|
|
"default_model": cfg.default_model,
|
|
"default_ask_mode": cfg.default_ask_mode,
|
|
"dedicated_domain_url": cfg.dedicated_domain_url,
|
|
"auto_login_enabled": cfg.auto_login_enabled,
|
|
"auto_login_headless": cfg.auto_login_headless,
|
|
"auto_login_timeout": cfg.auto_login_timeout,
|
|
"auto_login_max_retry": cfg.auto_login_max_retry,
|
|
"instance_count": cfg.instance_count,
|
|
"session_reuse_enabled": cfg.session_reuse_enabled,
|
|
"session_cache_max_entries": cfg.session_cache_max_entries,
|
|
"session_cache_ttl_sec": cfg.session_cache_ttl_sec,
|
|
"tool_forward_enabled": cfg.tool_forward_enabled,
|
|
"tool_allowlist": cfg.tool_allowlist,
|
|
"accounts": [
|
|
{
|
|
"username": account.username,
|
|
"password": _safe_setting_value("password", account.password),
|
|
"session_bundle_b64": _safe_setting_value(
|
|
"session_bundle_b64", account.session_bundle_b64
|
|
),
|
|
"session_bundle_file": account.session_bundle_file,
|
|
}
|
|
for account in cfg.accounts
|
|
],
|
|
},
|
|
"feature_flags": {
|
|
"tool_forward_enabled": cfg.tool_forward_enabled,
|
|
"session_reuse_enabled": cfg.session_reuse_enabled,
|
|
"metrics_public": cfg.metrics_public,
|
|
"auto_login_enabled": cfg.auto_login_enabled,
|
|
},
|
|
})
|
|
|
|
|
|
@app.get("/metrics", dependencies=[Depends(metrics_auth_guard)])
|
|
async def metrics():
|
|
base = await stats_collector.prometheus_text()
|
|
lines = list(chat_guard.prometheus_lines())
|
|
if pool is not None:
|
|
lines.extend(pool.prometheus_lines())
|
|
lines.extend(session_cache.prometheus_lines())
|
|
extra = "\n".join(lines) + "\n"
|
|
return StreamingResponse(
|
|
iter([base + extra]), media_type="text/plain; version=0.0.4"
|
|
)
|