from __future__ import annotations import ast import asyncio import hashlib import json import time import uuid from contextlib import asynccontextmanager from typing import Any from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from .anthropic_schema import ( AnthropicMessagesRequest, affinity_key_for_anthropic, anthropic_to_internal_messages, ) from .auth import ( AnthropicAuthError, require_admin_access, require_anthropic_key, require_bearer, require_metrics_access, ) from .concurrency import BackpressureRejected, InFlightGuard from .config import Settings, load_settings from .lingma_pool import LingmaPool, PoolInstance from .logging_config import configure_logging, get_logger, request_id_var from .model_map import build_model_name_map, flatten_model_keys, resolve_model from .openai_schema import ( ChatCompletionChoice, ChatCompletionResponse, ChatCompletionsRequest, ModelData, ModelsResponse, ResponsesRequest, flatten_content, ) from .session_bundle import encode_bundle, pack_workdir from .session_cache import SessionCache, hash_branch_context from .stats import StatsCollector, estimate_tokens settings: Settings = load_settings() configure_logging(settings.log_level) logger = get_logger("lingma_gateway") pool: LingmaPool | None = None stats_collector = StatsCollector() chat_guard = InFlightGuard( max_in_flight=settings.gateway_max_in_flight, queue_timeout_sec=settings.gateway_queue_timeout_sec, ) session_cache = SessionCache( max_entries=settings.session_cache_max_entries if settings.session_reuse_enabled else 0, ttl_sec=settings.session_cache_ttl_sec, ) STREAMING_RESPONSE_HEADERS = { "Cache-Control": "no-cache, no-transform", "X-Accel-Buffering": "no", "Connection": "keep-alive", } def _require_pool() -> LingmaPool: if pool is None: raise HTTPException( status_code=503, detail={"error": {"message": "pool not initialized", "type": "service_unavailable"}}, ) return pool @asynccontextmanager async def lifespan(_app: FastAPI): global pool pool = LingmaPool.build( lingma_bin=settings.lingma_bin, base_work_dir=settings.lingma_work_dir, legacy_socket_port=settings.lingma_socket_port, startup_timeout=settings.lingma_startup_timeout, rpc_timeout=settings.lingma_rpc_timeout, default_model=settings.default_model, default_ask_mode=settings.default_ask_mode, accounts=settings.accounts, instance_count=settings.instance_count, auto_login_headless=settings.auto_login_headless, auto_login_timeout=settings.auto_login_timeout, auto_login_max_retry=settings.auto_login_max_retry, ) logger.info( "gateway startup: pool_size=%d max_in_flight=%d", pool.size(), settings.gateway_max_in_flight, ) _log_auth_posture() await pool.start() try: yield finally: if pool is not None: await pool.close() app = FastAPI(title="Lingma OpenAI Gateway", version="0.4.0", lifespan=lifespan) @app.exception_handler(AnthropicAuthError) async def _anthropic_auth_error_handler(_request: Request, exc: AnthropicAuthError): """Render auth failures on /v1/messages in the Anthropic wire format. FastAPI's default handler wraps everything in `{"detail": ...}`, which Anthropic SDKs don't parse. We emit the canonical `{"type":"error","error":{"type":"...","message":"..."}}` instead. """ return JSONResponse( status_code=exc.status_code, content={ "type": "error", "error": {"type": exc.error_type, "message": exc.message}, }, ) @app.middleware("http") async def request_id_middleware(request: Request, call_next): req_id = request.headers.get("x-request-id") or f"req-{uuid.uuid4().hex[:12]}" token = request_id_var.set(req_id) start = time.monotonic() status_code = 500 try: response = await call_next(request) status_code = response.status_code response.headers["x-request-id"] = req_id return response finally: elapsed_ms = int((time.monotonic() - start) * 1000) logger.info( "http %s %s -> %s in %dms", request.method, request.url.path, status_code, elapsed_ms, extra={ "ctx_method": request.method, "ctx_path": request.url.path, "ctx_status": status_code, "ctx_elapsed_ms": elapsed_ms, }, ) request_id_var.reset(token) def auth_guard(request: Request): require_bearer(request, settings.api_keys) def anthropic_auth_guard(request: Request): require_anthropic_key(request, settings.api_keys) def metrics_auth_guard(request: Request): require_metrics_access( request, settings.api_keys, settings.metrics_token, public=settings.metrics_public, ) def admin_auth_guard(request: Request): require_admin_access(request, settings.api_keys, settings.admin_token) def _log_auth_posture() -> None: """Loud warnings on misconfigured auth so ops can't miss them.""" if not settings.api_keys: logger.warning( "AUTH DISABLED: API_KEYS is empty, /v1/* is wide open. " "Set API_KEYS before exposing this gateway to anything " "other than localhost." ) if not settings.admin_token: logger.warning( "ADMIN_TOKEN not set: /internal/* reuses API_KEYS for auth. " "For production set a dedicated ADMIN_TOKEN so rotating chat " "keys doesn't require exporting the session bundle." ) if settings.metrics_public: logger.warning( "METRICS_PUBLIC=true: /metrics is open. Only enable this " "when the gateway is behind a private-network scraper." ) @app.get("/healthz") async def healthz(): if pool is None: return {"ok": False, "time": int(time.time()), "reason": "pool uninitialized"} insts = pool.stats() ready = sum(1 for i in insts if i["state"] == "ready") return { "ok": ready > 0, "time": int(time.time()), "pool_size": len(insts), "pool_ready": ready, "instances": [ {"name": i["name"], "state": i["state"], "in_flight": i["in_flight"]} for i in insts ], } async def _ensure_instance_logged_in(inst: PoolInstance) -> dict: client = inst.client auto_login = inst.auto_login try: status = await client.auth_status() except Exception as exc: logger.warning("[%s] auth_status failed before chat: %s", inst.name, exc) raise HTTPException( status_code=503, detail={"error": {"message": "Lingma is not ready", "type": "service_unavailable"}}, ) if status and status.get("id"): return status if not settings.auto_login_enabled: raise HTTPException( status_code=401, detail={"error": {"message": "Lingma not logged in", "type": "invalid_request_error"}}, ) if settings.dedicated_domain_url: try: current = await client.get_endpoint() current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" if current_ep != settings.dedicated_domain_url: await client.update_endpoint(settings.dedicated_domain_url) except Exception as exc: logger.warning("[%s] switch dedicated endpoint failed: %s", inst.name, exc) try: login_url, _login_raw = await client.generate_login_url() except Exception as exc: logger.warning("[%s] generate_login_url failed: %s", inst.name, exc) raise HTTPException( status_code=502, detail={"error": {"message": "generate login url failed", "type": "upstream_error"}}, ) if not login_url: raise HTTPException( status_code=502, detail={"error": {"message": "generate login url failed", "type": "upstream_error"}}, ) await auto_login.ensure_started(login_url) try: await auto_login.wait_done(timeout=settings.auto_login_timeout + 20) except Exception as exc: logger.warning("[%s] auto_login wait_done failed: %s", inst.name, exc) try: status = await client.auth_status() except Exception as exc: logger.warning("[%s] post-login auth_status failed: %s", inst.name, exc) status = None if status and status.get("id"): return status logger.warning( "[%s] auto login did not result in a logged-in session: %s", inst.name, auto_login.status(), ) raise HTTPException( status_code=401, detail={"error": {"message": "Lingma auto login failed", "type": "invalid_request_error"}}, ) def _affinity_key_for(req: ChatCompletionsRequest) -> str | None: """Derive a stable affinity key so that follow-ups go to the same instance. Priority: explicit `user` > hash of the first/system message. """ if req.user: return req.user.strip() or None for m in req.messages: if m.role == "system": text = flatten_content(m.content) if text: return "sys:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] if req.messages: first = req.messages[0] text = flatten_content(first.content) if text: return "first:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] return None def _extract_api_key(request: Request) -> str: h = request.headers.get("authorization", "") if h.lower().startswith("bearer "): return h[7:].strip() return "" def _last_user_text(messages: list[dict]) -> str: """Extract the text of the latest user message (trailing from end). Used when we hit the session cache and only need to send the delta. Falls back to the last message regardless of role if no user is found. """ for m in reversed(messages): if m.get("role") == "user": return flatten_content(m.get("content")) or "" if messages: return flatten_content(messages[-1].get("content")) or "" return "" @app.get("/v1/models", dependencies=[Depends(anthropic_auth_guard)]) async def v1_models(): p = _require_pool() inst = p.pick() await _ensure_instance_logged_in(inst) await stats_collector.inc_models() models = await inst.client.query_models() keys = flatten_model_keys(models) name_map = build_model_name_map(models) resp = ModelsResponse(data=[ModelData(id=k, name=name_map.get(k)) for k in keys]) return JSONResponse(content=resp.model_dump()) def _messages_to_prompt(messages: list[dict]) -> str: parts: list[str] = [] for m in messages: role = m.get("role", "user") text = flatten_content(m.get("content")) if not text and m.get("tool_calls"): text = f"[tool_calls] {json.dumps(m['tool_calls'], ensure_ascii=False)}" if not text: continue parts.append(f"[{role}] {text}") return "\n".join(parts).strip() def _include_usage(stream_options: dict | None) -> bool: if not isinstance(stream_options, dict): return False return bool(stream_options.get("include_usage")) def _tool_allowlist() -> set[str]: return {name.strip() for name in settings.tool_allowlist if isinstance(name, str) and name.strip()} def _openai_tool_name(tool: Any) -> str | None: if not isinstance(tool, dict): return None if tool.get("type") == "function": fn = tool.get("function") if isinstance(fn, dict): name = fn.get("name") if isinstance(name, str) and name.strip(): return name.strip() name = tool.get("name") if isinstance(name, str) and name.strip(): return name.strip() return None def _anthropic_tool_name(tool: Any) -> str | None: if not isinstance(tool, dict): return None name = tool.get("name") if isinstance(name, str) and name.strip(): return name.strip() fn = tool.get("function") if isinstance(fn, dict): nested_name = fn.get("name") if isinstance(nested_name, str) and nested_name.strip(): return nested_name.strip() return None def _filter_allowed_tools(tools: list[dict[str, Any]], *, provider: str) -> list[dict[str, Any]]: allowlist = _tool_allowlist() if not allowlist: return tools name_fn = _openai_tool_name if provider == "openai" else _anthropic_tool_name return [tool for tool in tools if (name := name_fn(tool)) and name in allowlist] def _ensure_tool_choice_allowed(tool_choice: Any, *, provider: str) -> None: allowlist = _tool_allowlist() if not allowlist: return forced_name = ( _openai_forced_tool_name(tool_choice) if provider == "openai" else _anthropic_forced_tool_name(tool_choice) ) if forced_name and forced_name not in allowlist: raise HTTPException( status_code=400, detail={ "error": { "type": "invalid_request_error", "message": f"tool '{forced_name}' is not allowed", } }, ) def _openai_tool_config(req: ChatCompletionsRequest) -> dict[str, Any] | None: if not settings.tool_forward_enabled: return None has_tools = isinstance(req.tools, list) and len(req.tools) > 0 has_choice = req.tool_choice is not None if not has_tools and not has_choice: return None _ensure_tool_choice_allowed(req.tool_choice, provider="openai") tools = _filter_allowed_tools(req.tools or [], provider="openai") return { "provider": "openai", "tools": tools, "tool_choice": req.tool_choice, } def _anthropic_tool_config(req: AnthropicMessagesRequest) -> dict[str, Any] | None: if not settings.tool_forward_enabled: return None has_tools = isinstance(req.tools, list) and len(req.tools) > 0 has_choice = req.tool_choice is not None if not has_tools and not has_choice: return None _ensure_tool_choice_allowed(req.tool_choice, provider="anthropic") tools = _filter_allowed_tools(req.tools or [], provider="anthropic") return { "provider": "anthropic", "tools": tools, "tool_choice": req.tool_choice, } def _openai_has_tooling_context(req: ChatCompletionsRequest, messages: list[dict[str, Any]]) -> bool: if isinstance(req.tools, list) and len(req.tools) > 0: return True if req.tool_choice is not None: return True for m in messages: role = m.get("role") if role == "tool": return True if role == "assistant" and m.get("tool_calls"): return True return False def _anthropic_content_has_tool_blocks(content: Any) -> bool: if not isinstance(content, list): return False for item in content: if isinstance(item, dict) and item.get("type") in {"tool_use", "tool_result"}: return True return False def _anthropic_has_tooling_context(req: AnthropicMessagesRequest) -> bool: if isinstance(req.tools, list) and len(req.tools) > 0: return True if req.tool_choice is not None: return True if _anthropic_content_has_tool_blocks(req.system): return True for m in req.messages: if _anthropic_content_has_tool_blocks(m.content): return True return False def _resolve_ask_mode(model: str, has_tooling_context: bool) -> str: model_name = (model or "").lower() if model_name in {"lingma-agent", "agent"} or has_tooling_context: return "agent" return settings.default_ask_mode async def _apply_cached_instance_or_invalidate( *, protocol: str, inst: PoolInstance, cached_instance_name: str | None, cached_session_id: str | None, lookup_key: str | None, ) -> str | None: if cached_instance_name and inst.name != cached_instance_name: logger.info( "%s session cache instance %s unhealthy, falling back to %s", protocol, cached_instance_name, inst.name, ) if lookup_key: await session_cache.invalidate(lookup_key) return None return cached_session_id def _streaming_response(event_stream) -> StreamingResponse: return StreamingResponse( event_stream, media_type="text/event-stream", headers=STREAMING_RESPONSE_HEADERS, ) def _stream_event_type(event: Any) -> str: if isinstance(event, dict): t = event.get("type") if t in {"text", "tool"}: return t return "text" def _stream_text(event: Any) -> str: if isinstance(event, dict): if event.get("type") == "text": text = event.get("text") if isinstance(text, str): return text return "" if isinstance(event, str): return event return "" def _stream_tool_event(event: Any) -> dict[str, Any] | None: if isinstance(event, dict) and event.get("type") == "tool": tool = event.get("tool") if isinstance(tool, dict): return tool return None def _json_string(value: Any) -> str: if isinstance(value, str): return value try: return json.dumps(value if value is not None else {}, ensure_ascii=False) except Exception: return "{}" def _openai_forced_tool_name(tool_choice: Any) -> str | None: if not isinstance(tool_choice, dict): return None fn = tool_choice.get("function") if isinstance(fn, dict): name = fn.get("name") if isinstance(name, str) and name.strip(): return name.strip() return None def _anthropic_forced_tool_name(tool_choice: Any) -> str | None: if not isinstance(tool_choice, dict): return None if tool_choice.get("type") == "tool": name = tool_choice.get("name") if isinstance(name, str) and name.strip(): return name.strip() fn = tool_choice.get("function") if isinstance(fn, dict): name = fn.get("name") if isinstance(name, str) and name.strip(): return name.strip() return None def _json_object_from_text(text: str) -> dict[str, Any] | None: raw = text.strip() if not raw: return None if raw.startswith("```") and raw.endswith("```"): raw = raw[3:-3].strip() if raw.lower().startswith("json"): raw = raw[4:].strip() try: parsed = json.loads(raw) except Exception: return None return parsed if isinstance(parsed, dict) else None def _tool_code_single_arg_name(tools: list[dict[str, Any]] | None, forced_tool_name: str) -> str | None: if not isinstance(tools, list): return None for tool in tools: if not isinstance(tool, dict): continue schema: dict[str, Any] | None = None if tool.get("type") == "function": fn = tool.get("function") if isinstance(fn, dict) and fn.get("name") == forced_tool_name: params = fn.get("parameters") if isinstance(params, dict): schema = params elif tool.get("name") == forced_tool_name: input_schema = tool.get("input_schema") if isinstance(input_schema, dict): schema = input_schema if not isinstance(schema, dict): continue properties = schema.get("properties") if not isinstance(properties, dict) or len(properties) != 1: return None only_name = next(iter(properties.keys()), None) if isinstance(only_name, str) and only_name.strip(): return only_name return None return None def _tool_code_object_from_text( text: str, forced_tool_name: str, *, single_arg_name: str | None = None, ) -> dict[str, Any] | None: raw = text.strip() if not raw.startswith("```tool_code") or not raw.endswith("```"): return None lines = raw.splitlines() if len(lines) < 2: return None body = "\n".join(lines[1:-1]).strip() try: parsed = ast.parse(body, mode="eval") except Exception: return None call = parsed.body if not isinstance(call, ast.Call): return None if not isinstance(call.func, ast.Name) or call.func.id != forced_tool_name: return None arguments: dict[str, Any] = {} if call.args: if len(call.args) != 1 or call.keywords or not single_arg_name: return None try: arguments[single_arg_name] = ast.literal_eval(call.args[0]) except Exception: return None return {"arguments": arguments} for kw in call.keywords: if kw.arg is None: return None try: arguments[kw.arg] = ast.literal_eval(kw.value) except Exception: return None return {"arguments": arguments} def _forced_tool_event_from_text( text: str, forced_tool_name: str, *, single_arg_name: str | None = None, ) -> dict[str, Any] | None: parsed = _json_object_from_text(text) if parsed is None: parsed = _tool_code_object_from_text(text, forced_tool_name, single_arg_name=single_arg_name) if parsed is None: return None explicit_name: Any = parsed.get("name") or parsed.get("tool") fn = parsed.get("function") if explicit_name is None and isinstance(fn, dict): explicit_name = fn.get("name") if explicit_name is not None and str(explicit_name) != forced_tool_name: return None tool_input: Any = None if "input" in parsed: tool_input = parsed.get("input") elif "arguments" in parsed: args = parsed.get("arguments") if isinstance(args, str): try: tool_input = json.loads(args) except Exception: return None else: tool_input = args elif isinstance(fn, dict) and "arguments" in fn: args = fn.get("arguments") if isinstance(args, str): try: tool_input = json.loads(args) except Exception: return None else: tool_input = args else: reserved = {"name", "tool", "function", "arguments", "input", "result"} tool_input = {k: v for k, v in parsed.items() if k not in reserved} event: dict[str, Any] = { "name": forced_tool_name, "input": tool_input if tool_input is not None else {}, } if "result" in parsed: event["result"] = parsed.get("result") return event def _openai_tool_call(tool: dict[str, Any], *, forced_id: str | None = None) -> dict[str, Any]: return { "id": str(tool.get("id") or forced_id or f"call_{uuid.uuid4().hex}"), "type": "function", "function": { "name": str(tool.get("name") or "tool"), "arguments": _json_string(tool.get("input")), }, } def _anthropic_tool_use_block( tool: dict[str, Any], *, forced_id: str | None = None ) -> dict[str, Any]: return { "type": "tool_use", "id": str(tool.get("id") or forced_id or f"toolu_{uuid.uuid4().hex}"), "name": str(tool.get("name") or "tool"), "input": tool.get("input") if tool.get("input") is not None else {}, } def _anthropic_tool_result_block( tool: dict[str, Any], *, forced_id: str | None = None ) -> dict[str, Any] | None: if "result" not in tool: return None result = tool.get("result") if isinstance(result, str): content: Any = result else: content = _json_string(result) return { "type": "tool_result", "tool_use_id": str(tool.get("id") or forced_id or ""), "content": content, } @app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)]) async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): p = _require_pool() messages_dump = [m.model_dump() for m in req.messages] api_key = _extract_api_key(request) or "-" # ------------------------------------------------------------- session reuse # Look up the "conversation prefix" (everything except the latest user turn) # in the session cache. A hit lets us: # 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache. # 2. Send only the new user message instead of the whole history. # 3. Stick the request to the pool instance that originally served it. tool_config = _openai_tool_config(req) has_tooling_context = _openai_has_tooling_context(req, messages_dump) ask_mode = _resolve_ask_mode(req.model, has_tooling_context) reuse_eligible = ( session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context ) lookup_key: str | None = None write_key: str | None = None cached_session_id: str | None = None cached_instance_name: str | None = None if reuse_eligible: prefix_branch_context = hash_branch_context(messages_dump[:-1]) lookup_key = session_cache.build_key( api_key, messages_dump[:-1], tool_config=tool_config, branch_context=prefix_branch_context, ) write_key = session_cache.build_key( api_key, messages_dump, tool_config=tool_config, branch_context=hash_branch_context(messages_dump), ) entry = await session_cache.get(lookup_key) if entry is None: legacy_lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config) entry = await session_cache.get(legacy_lookup_key) if entry is not None: lookup_key = legacy_lookup_key if entry is not None: cached_session_id = entry.session_id cached_instance_name = entry.instance_name or None affinity = cached_instance_name or _affinity_key_for(req) inst = p.pick(affinity_key=affinity) cached_session_id = await _apply_cached_instance_or_invalidate( protocol="chat", inst=inst, cached_instance_name=cached_instance_name, cached_session_id=cached_session_id, lookup_key=lookup_key, ) await _ensure_instance_logged_in(inst) models = await inst.client.query_models() available = flatten_model_keys(models) name_map = build_model_name_map(models) model = resolve_model(req.model, available, settings.default_model, name_map) # Prompt construction: on cache hit send only the last user turn so Lingma's # stored context isn't duplicated. if cached_session_id: prompt = _last_user_text(messages_dump) is_reply = True else: prompt = _messages_to_prompt(messages_dump) is_reply = False if not prompt: raise HTTPException( status_code=400, detail={"error": {"message": "messages is empty", "type": "invalid_request_error"}}, ) prompt_tokens = estimate_tokens(prompt) include_usage = _include_usage(req.stream_options) # Backpressure: acquire a slot *after* the cheap validation but before any # upstream call. This ensures we reject quickly when saturated. try: ticket = await chat_guard.try_acquire() except BackpressureRejected as exc: retry_after = max(1, int(exc.retry_after)) logger.warning("chat rejected by backpressure, retry_after=%ds", retry_after) raise HTTPException( status_code=429, detail={ "error": { "message": "Too many in-flight requests, please retry later", "type": "rate_limit_error", "code": "backpressure", } }, headers={"Retry-After": str(retry_after)}, ) inst.in_flight += 1 logger.info( "chat.start inst=%s model=%s ask_mode=%s stream=%s prompt_tokens~%d reuse=%s", inst.name, model, ask_mode, req.stream, prompt_tokens, bool(cached_session_id), extra={ "ctx_instance": inst.name, "ctx_model": model, "ctx_ask_mode": ask_mode, "ctx_stream": req.stream, "ctx_prompt_tokens": prompt_tokens, "ctx_in_flight": chat_guard.in_flight, "ctx_affinity": affinity, "ctx_session_reuse": bool(cached_session_id), }, ) ticket_transferred = False try: if req.stream: created = int(time.time()) completion_id = f"chatcmpl-{uuid.uuid4().hex}" completion_tokens_holder = {"n": 0} stream_meta: dict = {} forced_tool_name = _openai_forced_tool_name(req.tool_choice) forced_tool_single_arg_name = _tool_code_single_arg_name(req.tools, forced_tool_name) if forced_tool_name else None async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): success = False tool_call_indexes: dict[str, int] = {} saw_tool_call = False buffered_text_parts: list[str] = [] try: async for chunk in _inst.client.chat_stream( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=tool_config, out_meta=_meta, ): if _stream_event_type(chunk) == "tool": tool = _stream_tool_event(chunk) if not tool: continue tool_id = str(tool.get("id") or "") if not tool_id: tool_id = f"call_{len(tool_call_indexes)}" idx = tool_call_indexes.get(tool_id) if idx is None: idx = len(tool_call_indexes) tool_call_indexes[tool_id] = idx saw_tool_call = True payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": idx, **_openai_tool_call(tool, forced_id=tool_id), } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" continue text = _stream_text(chunk) if not text: continue buffered_text_parts.append(text) completion_tokens_holder["n"] += estimate_tokens(text) payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"content": text}, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" if not saw_tool_call and forced_tool_name: fallback_event = _forced_tool_event_from_text( "".join(buffered_text_parts), forced_tool_name, single_arg_name=forced_tool_single_arg_name, ) if fallback_event is not None: saw_tool_call = True tool_id = "call_fallback_0" idx = 0 tool_call_indexes[tool_id] = idx payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": idx, **_openai_tool_call(fallback_event, forced_id=tool_id), } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" done_payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "tool_calls" if saw_tool_call else "stop", } ], } yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n" if include_usage: usage_payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens_holder["n"], "total_tokens": prompt_tokens + completion_tokens_holder["n"], }, } yield f"data: {json.dumps(usage_payload, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" success = True except asyncio.CancelledError: logger.info( "chat.stream cancelled by client (inst=%s, session_id=%s)", _inst.name, cached_session_id, ) raise except Exception as exc: logger.warning( "chat.stream error (inst=%s, session_id=%s, prompt_tokens=%s, completion_tokens=%s): %s", _inst.name, cached_session_id, prompt_tokens, completion_tokens_holder["n"], exc, ) finally: # Persist upstream sessionId only on a clean chat/finish. # Partial streams (cancelled, timed out) leave Lingma's # session in an indeterminate state, so we must not reuse. if success and write_key: sid = _meta.get("session_id") if sid: await session_cache.put(write_key, sid, _inst.name) await stats_collector.record_chat( stream=True, success=success, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens_holder["n"], ) _inst.in_flight = max(0, _inst.in_flight - 1) _ticket.release() ticket_transferred = True return _streaming_response(event_stream()) try: result = await inst.client.chat_complete( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=tool_config, ) except Exception as exc: logger.warning("chat.complete error (inst=%s): %s", inst.name, exc) await stats_collector.record_chat( stream=False, success=False, prompt_tokens=prompt_tokens, completion_tokens=0, ) # If we used a cached session and the call blew up, drop it so the # next turn can start fresh instead of hitting the same dead session. if cached_session_id and lookup_key: await session_cache.invalidate(lookup_key) raise HTTPException( status_code=502, detail={"error": {"message": "upstream lingma error", "type": "upstream_error"}}, ) completion_tokens = estimate_tokens(result.get("text") or "") await stats_collector.record_chat( stream=False, success=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) if write_key: sid = result.get("sessionId") if sid: await session_cache.put(write_key, sid, inst.name) tool_events = result.get("toolEvents") or [] message_content = result.get("text") or "" tool_calls: list[dict[str, Any]] = [] saw_tool_call = False if isinstance(tool_events, list): for idx, item in enumerate(tool_events): if isinstance(item, dict): tool_id = str(item.get("id") or f"call_{idx}") tool_calls.append(_openai_tool_call(item, forced_id=tool_id)) saw_tool_call = True if not saw_tool_call: forced_tool_name = _openai_forced_tool_name(req.tool_choice) if forced_tool_name: fallback_event = _forced_tool_event_from_text( message_content, forced_tool_name, single_arg_name=_tool_code_single_arg_name(req.tools, forced_tool_name), ) if fallback_event is not None: tool_calls.append(_openai_tool_call(fallback_event, forced_id="call_fallback_0")) saw_tool_call = True message_content = "" response = ChatCompletionResponse( id=f"chatcmpl-{uuid.uuid4().hex}", created=int(time.time()), model=model, choices=[ ChatCompletionChoice( index=0, finish_reason="tool_calls" if saw_tool_call else "stop", message={ "role": "assistant", "content": message_content, "tool_calls": tool_calls or None, }, ) ], ) data = response.model_dump() data["latency"] = { "first_token_ms": result.get("firstTokenLatencyMs"), "total_ms": result.get("totalLatencyMs"), } data["usage"] = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } data["served_by"] = inst.name return JSONResponse(content=data) finally: if not ticket_transferred: inst.in_flight = max(0, inst.in_flight - 1) ticket.release() def _responses_input_to_messages(req: ResponsesRequest) -> list[dict[str, Any]]: messages: list[dict[str, Any]] = [] if req.instructions: messages.append({"role": "system", "content": req.instructions}) raw_input = req.input if raw_input is None: return messages valid_roles = {"system", "user", "assistant", "tool", "developer", "function"} def _append(role: str, content: Any, *, tool_call_id: str | None = None) -> None: msg: dict[str, Any] = {"role": role, "content": flatten_content(content)} if role == "tool" and tool_call_id: msg["tool_call_id"] = tool_call_id messages.append(msg) if isinstance(raw_input, str): _append("user", raw_input) return messages raw_items: list[Any] if isinstance(raw_input, dict): raw_items = [raw_input] elif isinstance(raw_input, list): raw_items = list(raw_input) else: _append("user", str(raw_input)) return messages for item in raw_items: if isinstance(item, str): _append("user", item) continue if not isinstance(item, dict): _append("user", str(item)) continue role = item.get("role") if isinstance(role, str) and role in valid_roles: tool_call_id = item.get("tool_call_id") or item.get("call_id") _append(role, item.get("content"), tool_call_id=str(tool_call_id) if tool_call_id else None) continue if item.get("type") == "function_call_output": output = item.get("output") if isinstance(output, (dict, list)): output = json.dumps(output, ensure_ascii=False) tool_call_id = item.get("call_id") _append("tool", output, tool_call_id=str(tool_call_id) if tool_call_id else None) continue if "content" in item: text = flatten_content(item.get("content")) else: text = flatten_content([item]) if text: _append("user", text) return messages def _responses_to_chat_request(req: ResponsesRequest) -> ChatCompletionsRequest: return ChatCompletionsRequest( model=req.model, messages=_responses_input_to_messages(req), stream=req.stream, temperature=req.temperature, top_p=req.top_p, max_tokens=req.max_output_tokens, user=req.user, tools=req.tools, tool_choice=req.tool_choice, ) def _responses_id_from_chat_id(chat_id: Any) -> str: if isinstance(chat_id, str) and chat_id: suffix = chat_id.removeprefix("chatcmpl-") return f"resp_{suffix}" return f"resp_{uuid.uuid4().hex}" def _responses_usage_from_chat(usage: Any) -> dict[str, int]: if not isinstance(usage, dict): return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} input_tokens = int(usage.get("prompt_tokens") or 0) output_tokens = int(usage.get("completion_tokens") or 0) return { "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": int(usage.get("total_tokens") or (input_tokens + output_tokens)), } def _responses_non_stream_from_chat_payload(chat_payload: Any) -> dict[str, Any]: if not isinstance(chat_payload, dict): raise HTTPException( status_code=502, detail={"error": {"message": "invalid upstream response", "type": "upstream_error"}}, ) choice = {} choices = chat_payload.get("choices") if isinstance(choices, list) and choices: choice = choices[0] if isinstance(choices[0], dict) else {} message = choice.get("message") if isinstance(choice.get("message"), dict) else {} output: list[dict[str, Any]] = [] content = message.get("content") if isinstance(content, str) and content: output.append( { "type": "message", "id": f"msg_{uuid.uuid4().hex}", "status": "completed", "role": "assistant", "content": [{"type": "output_text", "text": content}], } ) tool_calls = message.get("tool_calls") if isinstance(tool_calls, list): for idx, tool_call in enumerate(tool_calls): if not isinstance(tool_call, dict): continue fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} call_id = str(tool_call.get("id") or f"call_{idx}") output.append( { "type": "function_call", "id": call_id, "call_id": call_id, "name": str(fn.get("name") or "tool"), "arguments": str(fn.get("arguments") or "{}"), } ) output_text_parts: list[str] = [] for item in output: if item.get("type") == "message": blocks = item.get("content") if isinstance(blocks, list): for block in blocks: if isinstance(block, dict) and block.get("type") == "output_text": text = block.get("text") if isinstance(text, str) and text: output_text_parts.append(text) return { "id": _responses_id_from_chat_id(chat_payload.get("id")), "object": "response", "created_at": int(chat_payload.get("created") or time.time()), "status": "completed", "error": None, "incomplete_details": None, "model": chat_payload.get("model"), "output": output, "output_text": "".join(output_text_parts), "usage": _responses_usage_from_chat(chat_payload.get("usage")), } def _sse_data(payload: dict[str, Any]) -> str: return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" async def _responses_stream_from_chat_stream( chat_stream: StreamingResponse, *, response_id: str, model: str, ): created_at = int(time.time()) usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} completed_sent = False output_item_id = f"msg_{uuid.uuid4().hex}" output_index = 0 content_index = 0 output_text_parts: list[str] = [] function_call_items: list[dict[str, Any]] = [] function_call_index_by_id: dict[str, int] = {} function_call_arguments_by_id: dict[str, str] = {} function_call_name_by_id: dict[str, str] = {} function_call_id_by_upstream_index: dict[int, str] = {} def _message_item(status: str) -> dict[str, Any]: return { "id": output_item_id, "type": "message", "role": "assistant", "status": status, "content": [ { "type": "output_text", "text": "".join(output_text_parts), } ], } def _function_call_item(call_id: str, *, status: str, name: str, arguments: str) -> dict[str, Any]: return { "id": call_id, "type": "function_call", "call_id": call_id, "name": name, "arguments": arguments, "status": status, } def _completed_output_items() -> list[dict[str, Any]]: return [_message_item("completed"), *function_call_items] def _completed_frame() -> str: return _sse_data( { "type": "response.completed", "response": { "id": response_id, "object": "response", "created_at": created_at, "status": "completed", "model": model, "output": _completed_output_items(), "usage": usage, }, } ) def _finish_output_item_frames() -> list[str]: frames = [ _sse_data( { "type": "response.output_text.done", "response_id": response_id, "item_id": output_item_id, "output_index": output_index, "content_index": content_index, "text": "".join(output_text_parts), } ), _sse_data( { "type": "response.output_item.done", "response_id": response_id, "output_index": output_index, "item": _message_item("completed"), } ), ] for idx, item in enumerate(function_call_items, start=1): frames.append( _sse_data( { "type": "response.function_call_arguments.done", "response_id": response_id, "item_id": item["id"], "output_index": idx, "arguments": item["arguments"], } ) ) frames.append( _sse_data( { "type": "response.output_item.done", "response_id": response_id, "output_index": idx, "item": item, } ) ) return frames def _ensure_function_call_item(call_id: str) -> list[str]: existing_index = function_call_index_by_id.get(call_id) name = function_call_name_by_id.get(call_id, "tool") arguments = function_call_arguments_by_id.get(call_id, "") if existing_index is not None: function_call_items[existing_index] = _function_call_item( call_id, status="completed", name=name, arguments=arguments, ) return [] item = _function_call_item( call_id, status="completed", name=name, arguments=arguments, ) function_call_items.append(item) item_index = len(function_call_items) - 1 function_call_index_by_id[call_id] = item_index return [ _sse_data( { "type": "response.output_item.added", "response_id": response_id, "output_index": item_index + 1, "item": _function_call_item( call_id, status="in_progress", name=name, arguments="", ), } ) ] yield _sse_data( { "type": "response.created", "response": { "id": response_id, "object": "response", "created_at": created_at, "status": "in_progress", "model": model, "output": [], }, } ) yield _sse_data( { "type": "response.output_item.added", "response_id": response_id, "output_index": output_index, "item": _message_item("in_progress"), } ) try: async for part in chat_stream.body_iterator: chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part) for frame in chunk.split("\n\n"): frame = frame.strip() if not frame or not frame.startswith("data:"): continue body = frame[len("data:") :].strip() if body == "[DONE]": for event in _finish_output_item_frames(): yield event yield _completed_frame() yield "data: [DONE]\n\n" completed_sent = True return try: payload = json.loads(body) except Exception: continue frame_usage = _responses_usage_from_chat(payload.get("usage")) if any(frame_usage.values()): usage = frame_usage choices = payload.get("choices") if not isinstance(choices, list) or not choices: continue choice = choices[0] if isinstance(choices[0], dict) else {} delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {} text = delta.get("content") if isinstance(text, str) and text: output_text_parts.append(text) yield _sse_data( { "type": "response.output_text.delta", "response_id": response_id, "item_id": output_item_id, "output_index": output_index, "content_index": content_index, "delta": text, } ) tool_calls = delta.get("tool_calls") if isinstance(tool_calls, list): for idx, tool_call in enumerate(tool_calls): if not isinstance(tool_call, dict): continue fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} upstream_index_raw = tool_call.get("index") upstream_index = upstream_index_raw if isinstance(upstream_index_raw, int) else idx call_id = str( tool_call.get("id") or function_call_id_by_upstream_index.get(upstream_index) or f"call_{upstream_index}" ) function_call_id_by_upstream_index[upstream_index] = call_id name = str(fn.get("name") or function_call_name_by_id.get(call_id) or "tool") function_call_name_by_id[call_id] = name arguments_delta = str(fn.get("arguments") or "") accumulated_arguments = ( function_call_arguments_by_id.get(call_id, "") + arguments_delta ) function_call_arguments_by_id[call_id] = accumulated_arguments for event in _ensure_function_call_item(call_id): yield event if arguments_delta: yield _sse_data( { "type": "response.function_call_arguments.delta", "response_id": response_id, "item_id": call_id, "output_index": function_call_index_by_id[call_id] + 1, "delta": arguments_delta, } ) except asyncio.CancelledError: if not completed_sent: for event in _finish_output_item_frames(): yield event yield _completed_frame() yield "data: [DONE]\n\n" completed_sent = True return except Exception: if not completed_sent: for event in _finish_output_item_frames(): yield event yield _completed_frame() yield "data: [DONE]\n\n" completed_sent = True return if not completed_sent: for event in _finish_output_item_frames(): yield event yield _completed_frame() yield "data: [DONE]\n\n" @app.post("/responses", dependencies=[Depends(auth_guard)]) @app.post("/v1/responses", dependencies=[Depends(auth_guard)]) async def v1_responses(req: ResponsesRequest, request: Request): chat_req = _responses_to_chat_request(req) chat_response = await v1_chat_completions(chat_req, request) if isinstance(chat_response, StreamingResponse): response_id = f"resp_{uuid.uuid4().hex}" return StreamingResponse( _responses_stream_from_chat_stream( chat_response, response_id=response_id, model=req.model, ), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "X-Accel-Buffering": "no", "Connection": "keep-alive", }, ) invalid_upstream_error = { "error": {"message": "invalid upstream response", "type": "upstream_error"} } try: chat_payload = json.loads(chat_response.body) except Exception: raise HTTPException( status_code=502, detail=invalid_upstream_error, ) if not isinstance(chat_payload, dict): raise HTTPException( status_code=502, detail=invalid_upstream_error, ) return JSONResponse(content=_responses_non_stream_from_chat_payload(chat_payload)) def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse: """Build an Anthropic-shaped error response (`type:error` envelope).""" return JSONResponse( status_code=status_code, content={"type": "error", "error": {"type": error_type, "message": message}}, ) def _anthropic_stop_reason( completion_tokens: int, max_tokens: int, *, has_pending_tool_use: bool = False, ) -> str: """Approximate Anthropic `stop_reason`.""" if has_pending_tool_use: return "tool_use" if max_tokens and completion_tokens >= max_tokens: return "max_tokens" return "end_turn" @app.post("/v1/messages/count_tokens") async def v1_messages_count_tokens(req: AnthropicMessagesRequest, request: Request): """Anthropic-compatible token counting endpoint. Claude Code may probe this endpoint; return Anthropic-shaped response. """ try: require_anthropic_key(request, settings.api_keys) except AnthropicAuthError as exc: return _anthropic_error(exc.status_code, exc.error_type, exc.message) messages_dump = anthropic_to_internal_messages(req) prompt = _messages_to_prompt(messages_dump) return JSONResponse(content={"input_tokens": estimate_tokens(prompt)}) @app.post("/v1/messages") async def v1_messages(req: AnthropicMessagesRequest, request: Request): """Anthropic Messages API compatible endpoint. Wire contract: * auth: `x-api-key` header (fallback Authorization: Bearer) * body: Anthropic Messages spec (system top-level, content blocks, ...) * stream: named-event SSE (message_start / content_block_delta / ...) Internally we: 1. Normalise to the gateway's internal message list (`role/content` dicts) 2. Reuse the same pool pick + session cache + backpressure guard as `/v1/chat/completions`. Session-cache keys include the API key, so Anthropic and OpenAI callers on the same key share KV-cache warmth. 3. Re-wrap outputs in Anthropic's response / SSE format. """ # ------------------------------------------------------------- auth try: require_anthropic_key(request, settings.api_keys) except AnthropicAuthError as exc: return _anthropic_error(exc.status_code, exc.error_type, exc.message) # ------------------------------------------------------------- plumbing try: p = _require_pool() except HTTPException as exc: return _anthropic_error(exc.status_code, "overloaded_error", "gateway not ready") messages_dump = anthropic_to_internal_messages(req) # Prefer the auth token actually accepted so session-cache bucketing is # consistent regardless of which auth header style the caller used. api_key = ( request.headers.get("x-api-key", "").strip() or _extract_api_key(request) or "-" ) # ------------------------------------------------------------- session reuse try: tool_config = _anthropic_tool_config(req) except HTTPException as exc: detail = exc.detail if isinstance(exc.detail, dict) else {} error = detail.get("error") if isinstance(detail.get("error"), dict) else {} message = error.get("message") or str(detail) or "invalid tool configuration" return _anthropic_error(exc.status_code, "invalid_request_error", message) has_tooling_context = _anthropic_has_tooling_context(req) ask_mode = _resolve_ask_mode(req.model, has_tooling_context) reuse_eligible = ( session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context ) lookup_key: str | None = None write_key: str | None = None cached_session_id: str | None = None cached_instance_name: str | None = None if reuse_eligible: prefix_branch_context = hash_branch_context(messages_dump[:-1]) lookup_key = session_cache.build_key( api_key, messages_dump[:-1], tool_config=tool_config, branch_context=prefix_branch_context, ) write_key = session_cache.build_key( api_key, messages_dump, tool_config=tool_config, branch_context=hash_branch_context(messages_dump), ) entry = await session_cache.get(lookup_key) if entry is None: legacy_lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config) entry = await session_cache.get(legacy_lookup_key) if entry is not None: lookup_key = legacy_lookup_key if entry is not None: cached_session_id = entry.session_id cached_instance_name = entry.instance_name or None affinity = cached_instance_name or affinity_key_for_anthropic(req) inst = p.pick(affinity_key=affinity) if cached_instance_name and inst.name != cached_instance_name: logger.info( "anthropic session cache instance %s unhealthy, falling back to %s", cached_instance_name, inst.name, ) cached_session_id = None if lookup_key: await session_cache.invalidate(lookup_key) try: await _ensure_instance_logged_in(inst) except HTTPException as exc: # 503/401/502 from login: map to closest Anthropic kind. err_type = "authentication_error" if exc.status_code == 401 else "overloaded_error" detail = exc.detail if isinstance(exc.detail, dict) else {} msg = (detail.get("error") or {}).get("message") or str(detail) or "upstream error" return _anthropic_error(exc.status_code, err_type, msg) # ------------------------------------------------------------- prompt & model models = await inst.client.query_models() available = flatten_model_keys(models) name_map = build_model_name_map(models) # Anthropic callers send `claude-*` model names. resolve_model's # final fallback (default_model / first available) handles that cleanly # without us having to hard-code a mapping table. model = resolve_model(req.model, available, settings.default_model, name_map) if cached_session_id: prompt = _last_user_text(messages_dump) is_reply = True else: prompt = _messages_to_prompt(messages_dump) is_reply = False if not prompt: return _anthropic_error(400, "invalid_request_error", "messages is empty") prompt_tokens = estimate_tokens(prompt) # ------------------------------------------------------------- backpressure try: ticket = await chat_guard.try_acquire() except BackpressureRejected as exc: retry_after = max(1, int(exc.retry_after)) logger.warning("anthropic rejected by backpressure, retry_after=%ds", retry_after) resp = _anthropic_error( 429, "overloaded_error", "too many in-flight requests, please retry later", ) resp.headers["Retry-After"] = str(retry_after) return resp inst.in_flight += 1 message_id = f"msg_{uuid.uuid4().hex}" logger.info( "anthropic.start inst=%s model=%s stream=%s prompt_tokens~%d reuse=%s", inst.name, model, req.stream, prompt_tokens, bool(cached_session_id), extra={ "ctx_instance": inst.name, "ctx_model": model, "ctx_ask_mode": ask_mode, "ctx_stream": req.stream, "ctx_prompt_tokens": prompt_tokens, "ctx_in_flight": chat_guard.in_flight, "ctx_affinity": affinity, "ctx_session_reuse": bool(cached_session_id), "ctx_api": "anthropic", }, ) ticket_transferred = False def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" try: if req.stream: completion_tokens_holder = {"n": 0} stream_meta: dict = {} max_tokens = req.max_tokens async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): success = False block_index = 0 text_block_open = False saw_pending_tool_use = False try: # 1) message_start — Anthropic SDKs read this first to get # the message envelope (id/model/initial usage). start_payload = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "model": model, "content": [], "stop_reason": None, "stop_sequence": None, # input_tokens is authoritative here; output_tokens # is seeded to 0 and updated in message_delta. "usage": { "input_tokens": prompt_tokens, "output_tokens": 0, }, }, } yield _sse("message_start", start_payload) async for chunk in _inst.client.chat_stream( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=tool_config, out_meta=_meta, ): if _stream_event_type(chunk) == "tool": if text_block_open: yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 text_block_open = False tool = _stream_tool_event(chunk) if not tool: continue tool_id = str(tool.get("id") or f"toolu_stream_{block_index}") tool_use_block = _anthropic_tool_use_block(tool, forced_id=tool_id) yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": tool_use_block, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 tool_result_block = _anthropic_tool_result_block(tool, forced_id=tool_id) if tool_result_block is not None: yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": tool_result_block, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 else: saw_pending_tool_use = True continue text = _stream_text(chunk) if not text: continue completion_tokens_holder["n"] += estimate_tokens(text) if not text_block_open: yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": {"type": "text", "text": ""}, }, ) text_block_open = True yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": text}, }, ) if text_block_open: yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) # 5) message_delta carries the terminal stop_reason and # the final cumulative output_tokens count. stop_reason = _anthropic_stop_reason( completion_tokens_holder["n"], max_tokens, has_pending_tool_use=saw_pending_tool_use, ) yield _sse( "message_delta", { "type": "message_delta", "delta": { "stop_reason": stop_reason, "stop_sequence": None, }, "usage": {"output_tokens": completion_tokens_holder["n"]}, }, ) # 6) message_stop — terminal event, no [DONE] sentinel. yield _sse("message_stop", {"type": "message_stop"}) success = True except asyncio.CancelledError: logger.info("anthropic.stream cancelled (inst=%s)", _inst.name) raise except Exception as exc: logger.warning("anthropic.stream error (inst=%s): %s", _inst.name, exc) # Best-effort error frame. Anthropic clients treat any # unexpected event gracefully; we prefer visibility over # silent truncation. try: yield _sse( "error", { "type": "error", "error": { "type": "api_error", "message": str(exc) or "upstream error", }, }, ) except Exception: pass finally: # Session write-back only on clean finish — partial streams # leave Lingma's session in an indeterminate state. if success and write_key: sid = _meta.get("session_id") if sid: await session_cache.put(write_key, sid, _inst.name) await stats_collector.record_chat( stream=True, success=success, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens_holder["n"], ) _inst.in_flight = max(0, _inst.in_flight - 1) _ticket.release() ticket_transferred = True return _streaming_response(event_stream()) # ------------------------------------------------------------- non-stream try: result = await inst.client.chat_complete( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=tool_config, ) except Exception as exc: logger.warning("anthropic.complete error (inst=%s): %s", inst.name, exc) await stats_collector.record_chat( stream=False, success=False, prompt_tokens=prompt_tokens, completion_tokens=0, ) if cached_session_id and lookup_key: await session_cache.invalidate(lookup_key) return _anthropic_error(502, "api_error", "upstream lingma error") text = result.get("text") or "" completion_tokens = estimate_tokens(text) await stats_collector.record_chat( stream=False, success=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) if write_key: sid = result.get("sessionId") if sid: await session_cache.put(write_key, sid, inst.name) content_blocks: list[dict[str, Any]] = [] if text: content_blocks.append({"type": "text", "text": text}) tool_events = result.get("toolEvents") or [] saw_pending_tool_use = False saw_tool_event = False if isinstance(tool_events, list): for idx, item in enumerate(tool_events): if not isinstance(item, dict): continue saw_tool_event = True tool_id = str(item.get("id") or f"toolu_nonstream_{idx}") content_blocks.append(_anthropic_tool_use_block(item, forced_id=tool_id)) tool_result = _anthropic_tool_result_block(item, forced_id=tool_id) if tool_result is not None: content_blocks.append(tool_result) else: saw_pending_tool_use = True if not saw_tool_event: forced_tool_name = _anthropic_forced_tool_name(req.tool_choice) if forced_tool_name: fallback_event = _forced_tool_event_from_text( text, forced_tool_name, single_arg_name=_tool_code_single_arg_name(req.tools, forced_tool_name), ) if fallback_event is not None: content_blocks = [] tool_id = "toolu_fallback_0" content_blocks.append(_anthropic_tool_use_block(fallback_event, forced_id=tool_id)) tool_result = _anthropic_tool_result_block(fallback_event, forced_id=tool_id) saw_pending_tool_use = tool_result is None if tool_result is not None: content_blocks.append(tool_result) response_body: dict = { "id": message_id, "type": "message", "role": "assistant", "model": model, "content": content_blocks, "stop_reason": _anthropic_stop_reason( completion_tokens, req.max_tokens, has_pending_tool_use=saw_pending_tool_use, ), "stop_sequence": None, "usage": { "input_tokens": prompt_tokens, "output_tokens": completion_tokens, }, } return JSONResponse(content=response_body) finally: if not ticket_transferred: inst.in_flight = max(0, inst.in_flight - 1) ticket.release() @app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_start(instance: str | None = None): p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException( status_code=404, detail={"error": {"message": f"instance not found: {instance}"}}, ) else: target = p.pick() client = target.client auto_login = target.auto_login status = await client.auth_status() if status and status.get("id"): return {"ok": True, "state": "already_logged_in", "instance": target.name, "auth": status} if settings.dedicated_domain_url: try: current = await client.get_endpoint() current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" if current_ep != settings.dedicated_domain_url: await client.update_endpoint(settings.dedicated_domain_url) except Exception as exc: logger.warning("[%s] switch dedicated endpoint failed: %s", target.name, exc) try: login_url, _login_raw = await client.generate_login_url() except Exception as exc: logger.warning("[%s] generate_login_url failed: %s", target.name, exc) raise HTTPException(status_code=502, detail={"error": {"message": "generate login url failed"}}) if not login_url: raise HTTPException(status_code=502, detail={"error": {"message": "generate login url failed"}}) started = await auto_login.ensure_started(login_url) return { "ok": True, "state": "running" if started else "already_running", "instance": target.name, "auto_login": auto_login.status(), } @app.get("/internal/auto-login/status", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_status(): p = _require_pool() out = [] for inst in p.instances: try: auth = await inst.client.auth_status() except Exception as exc: auth = {"error": str(exc)} out.append( { "instance": inst.name, "auto_login": inst.auto_login.status(), "auth": auth, "state": inst.client.state, } ) return {"ok": True, "instances": out} @app.post("/internal/session/export", dependencies=[Depends(admin_auth_guard)]) async def internal_session_export(instance: str | None = None): """Export a logged-in Lingma session as a base64 tar.gz bundle. The returned `bundle_b64` can be dropped into `LINGMA_SESSION_BUNDLE` (or the `session_bundle` field in `LINGMA_ACCOUNTS` JSON) on any other deployment to skip Playwright login entirely. Safety: - Requires a valid API key. - Only works on instances that are currently authenticated (prevents exporting garbage from a half-initialised workDir). - Response is not streamed to logs; callers must store it themselves. """ p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException(status_code=404, detail={"error": f"instance {instance} not found"}) else: target = p.pick() try: status = await target.client.auth_status() except Exception as exc: raise HTTPException( status_code=503, detail={"error": f"instance {target.name} not ready: {exc}"}, ) if not (status and status.get("id")): raise HTTPException( status_code=409, detail={"error": f"instance {target.name} is not logged in"}, ) try: raw = pack_workdir(target.cfg.work_dir) except Exception as exc: raise HTTPException(status_code=500, detail={"error": str(exc)}) bundle_b64 = encode_bundle(raw) logger.info( "session bundle exported from %s (%d bytes raw, %d bytes b64)", target.name, len(raw), len(bundle_b64), ) return { "instance": target.name, "account": target.cfg.account.username or "", "raw_bytes": len(raw), "bundle_b64": bundle_b64, } @app.get("/internal/models/raw", dependencies=[Depends(admin_auth_guard)]) async def internal_models_raw(instance: str | None = None): """Return the raw `config/queryModels` response from Lingma. This is the authoritative source for per-key displayName, description, capability flags, etc. We only ever extract `key` + `displayName` for OpenAI compatibility, but clients may want to inspect everything. """ p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException(status_code=404, detail={"error": f"instance {instance} not found"}) else: target = p.pick() await _ensure_instance_logged_in(target) raw = await target.client.query_models() name_map = build_model_name_map(raw if isinstance(raw, dict) else {}) return { "instance": target.name, "raw": raw, "extracted_name_map": name_map, "exposed_keys": flatten_model_keys(raw if isinstance(raw, dict) else {}), } @app.get("/internal/stats", dependencies=[Depends(admin_auth_guard)]) async def internal_stats(): p = _require_pool() return { "ok": True, "stats": await stats_collector.snapshot(), "concurrency": chat_guard.stats(), "pool": p.stats(), "session_cache": session_cache.stats(), } @app.get("/metrics", dependencies=[Depends(metrics_auth_guard)]) async def metrics(): base = await stats_collector.prometheus_text() lines = list(chat_guard.prometheus_lines()) if pool is not None: lines.extend(pool.prometheus_lines()) lines.extend(session_cache.prometheus_lines()) extra = "\n".join(lines) + "\n" return StreamingResponse(iter([base + extra]), media_type="text/plain; version=0.0.4")