from __future__ import annotations import asyncio import hashlib import json import time import uuid from collections import deque from contextlib import asynccontextmanager from typing import Any from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from .anthropic_schema import ( AnthropicMessagesRequest, affinity_key_for_anthropic, anthropic_to_internal_messages, flatten_anthropic_content, ) from .auth import ( AnthropicAuthError, require_admin_access, require_anthropic_key, require_bearer, require_metrics_access, ) from .concurrency import BackpressureRejected, InFlightGuard from .config import Settings, load_settings from .http.execution_core import ( _apply_cached_instance_or_invalidate as _shared_apply_cached_instance_or_invalidate, _resolve_ask_mode as _shared_resolve_ask_mode, _tool_config_summary, UpstreamExecutionError, complete_execution, finalize_stream_execution, prepare_execution_context, release_execution, start_execution, ) from .http.tool_emulation import ( action_output_prompt, extract_anthropic_tool_choice as _em_extract_anthropic_tool_choice, extract_anthropic_tools as _em_extract_anthropic_tools, extract_openai_tool_choice as _em_extract_openai_tool_choice, extract_openai_tools as _em_extract_openai_tools, force_tooling_prompt, has_tool_request as _em_has_tool_request, infer_declared_tool_call_from_text, infer_tool_calls_from_text, inject_tooling, openai_tool_call_from_emulated, parse_action_blocks, ) from .http.openai_responses import handle_responses from .http.tool_bridge import ( _allowed_stream_tool_event, _allowed_tool_events, _anthropic_forced_tool_name, _anthropic_tool_result_block, _anthropic_tool_use_block, _extract_function_call_event_from_text, _extract_hash_tool_call_event_from_text, _extract_tool_calls_from_text, _forced_tool_fallback_event, _infer_tool_event_from_declared_tools, _json_string, _openai_forced_tool_name, _openai_tool_call, ) from .http.tooling_policy import ( _anthropic_has_tooling_context, _anthropic_tool_config, _openai_has_tooling_context, _openai_tool_config, ) from .lingma_pool import LingmaPool, PoolInstance from .logging_config import configure_logging, get_logger, request_id_var from .model_map import build_model_name_map, flatten_model_keys, resolve_model from .openai_schema import ( ChatCompletionChoice, ChatCompletionResponse, ChatCompletionsRequest, ModelData, ModelsResponse, ResponsesRequest, flatten_content, ) from .session_bundle import encode_bundle, pack_workdir from .session_cache import SessionCache, hash_branch_context from .stats import StatsCollector, estimate_tokens settings: Settings = load_settings() configure_logging(settings.log_level) logger = get_logger("lingma_gateway") pool: LingmaPool | None = None stats_collector = StatsCollector() chat_guard = InFlightGuard( max_in_flight=settings.gateway_max_in_flight, queue_timeout_sec=settings.gateway_queue_timeout_sec, ) session_cache = SessionCache( max_entries=settings.session_cache_max_entries if settings.session_reuse_enabled else 0, ttl_sec=settings.session_cache_ttl_sec, ) STREAMING_RESPONSE_HEADERS = { "Cache-Control": "no-cache, no-transform", "X-Accel-Buffering": "no", "Connection": "keep-alive", } _DEBUG_REQUEST_LOG: deque[dict[str, Any]] = deque(maxlen=100) def _require_pool() -> LingmaPool: if pool is None: raise HTTPException( status_code=503, detail={ "error": { "message": "pool not initialized", "type": "service_unavailable", } }, ) return pool @asynccontextmanager async def lifespan(_app: FastAPI): global pool pool = LingmaPool.build( lingma_bin=settings.lingma_bin, base_work_dir=settings.lingma_work_dir, legacy_socket_port=settings.lingma_socket_port, startup_timeout=settings.lingma_startup_timeout, rpc_timeout=settings.lingma_rpc_timeout, default_model=settings.default_model, default_ask_mode=settings.default_ask_mode, accounts=settings.accounts, instance_count=settings.instance_count, auto_login_headless=settings.auto_login_headless, auto_login_timeout=settings.auto_login_timeout, auto_login_max_retry=settings.auto_login_max_retry, ) logger.info( "gateway startup: pool_size=%d max_in_flight=%d", pool.size(), settings.gateway_max_in_flight, ) _log_auth_posture() await pool.start() try: yield finally: if pool is not None: await pool.close() app = FastAPI(title="Lingma OpenAI Gateway", version="0.4.0", lifespan=lifespan) @app.exception_handler(AnthropicAuthError) async def _anthropic_auth_error_handler(_request: Request, exc: AnthropicAuthError): """Render auth failures on /v1/messages in the Anthropic wire format. FastAPI's default handler wraps everything in `{"detail": ...}`, which Anthropic SDKs don't parse. We emit the canonical `{"type":"error","error":{"type":"...","message":"..."}}` instead. """ return JSONResponse( status_code=exc.status_code, content={ "type": "error", "error": {"type": exc.error_type, "message": exc.message}, }, ) @app.middleware("http") async def request_id_middleware(request: Request, call_next): req_id = request.headers.get("x-request-id") or f"req-{uuid.uuid4().hex[:12]}" token = request_id_var.set(req_id) start = time.monotonic() status_code = 500 try: response = await call_next(request) status_code = response.status_code response.headers["x-request-id"] = req_id return response finally: elapsed_ms = int((time.monotonic() - start) * 1000) logger.info( "http %s %s -> %s in %dms", request.method, request.url.path, status_code, elapsed_ms, extra={ "ctx_method": request.method, "ctx_path": request.url.path, "ctx_status": status_code, "ctx_elapsed_ms": elapsed_ms, }, ) request_id_var.reset(token) def auth_guard(request: Request): require_bearer(request, settings.api_keys) def anthropic_auth_guard(request: Request): require_anthropic_key(request, settings.api_keys) def metrics_auth_guard(request: Request): require_metrics_access( request, settings.api_keys, settings.metrics_token, public=settings.metrics_public, ) def admin_auth_guard(request: Request): require_admin_access(request, settings.api_keys, settings.admin_token) def _log_auth_posture() -> None: """Loud warnings on misconfigured auth so ops can't miss them.""" if not settings.api_keys: logger.warning( "AUTH DISABLED: API_KEYS is empty, /v1/* is wide open. " "Set API_KEYS before exposing this gateway to anything " "other than localhost." ) if not settings.admin_token: logger.warning( "ADMIN_TOKEN not set: /internal/* reuses API_KEYS for auth. " "For production set a dedicated ADMIN_TOKEN so rotating chat " "keys doesn't require exporting the session bundle." ) if settings.metrics_public: logger.warning( "METRICS_PUBLIC=true: /metrics is open. Only enable this " "when the gateway is behind a private-network scraper." ) def _safe_setting_value(key: str, value: Any) -> Any: key_upper = key.upper() if any( marker in key_upper for marker in {"KEY", "TOKEN", "PASSWORD", "SECRET", "BUNDLE"} ): if isinstance(value, list): return ["***" for _ in value] return "***" return value def _redact_debug_value(path: tuple[str, ...], value: Any) -> Any: if isinstance(value, dict): return { k: _redact_debug_value(path + (str(k).lower(),), v) for k, v in value.items() } if isinstance(value, list): return [_redact_debug_value(path + ("[]",), item) for item in value] if isinstance(value, str): lowered_path = "/".join(path) if any(marker in lowered_path for marker in ("authorization", "x-api-key", "api_key", "token", "password", "secret", "session_bundle")): return "***" if value.startswith("data:"): return "[redacted-data-url]" if "session bundle" in value.lower(): return "[redacted-session-bundle]" if any(part in {"args", "arguments"} for part in path) and len(value) > 2048: return value[:1024] + "... [truncated]" return value def _record_debug_request(protocol: str, path: str, body: dict[str, Any], request: Request) -> None: _DEBUG_REQUEST_LOG.appendleft( { "timestamp": int(time.time()), "protocol": protocol, "path": path, "request_id": request.headers.get("x-request-id", ""), "body": _redact_debug_value((), body), } ) @app.get("/internal/debug/requests", dependencies=[Depends(admin_auth_guard)]) async def internal_debug_requests(limit: int = 20): safe_limit = min(max(limit, 1), 100) return JSONResponse( content={ "ok": True, "count": min(safe_limit, len(_DEBUG_REQUEST_LOG)), "items": list(_DEBUG_REQUEST_LOG)[:safe_limit], } ) @app.get("/healthz") async def healthz(): if pool is None: return {"ok": False, "time": int(time.time()), "reason": "pool uninitialized"} insts = pool.stats() ready = sum(1 for i in insts if i["state"] == "ready") return { "ok": ready > 0, "time": int(time.time()), "pool_size": len(insts), "pool_ready": ready, "instances": [ {"name": i["name"], "state": i["state"], "in_flight": i["in_flight"]} for i in insts ], } def _capabilities_payload() -> dict[str, Any]: return { "service": "lingma-openai-gateway", "version": app.version, "protocols": { "openai": { "models": True, "chat_completions": True, "responses": True, "streaming": True, "response_tool_calls": True, "request_tools_forwarded": settings.tool_forward_enabled, }, "anthropic": { "messages": True, "count_tokens": True, "streaming": True, "response_tool_use": True, "request_tools_forwarded": settings.tool_forward_enabled, }, }, "features": { "session_reuse": { "enabled": settings.session_reuse_enabled, "cache_max_entries": settings.session_cache_max_entries, "cache_ttl_sec": settings.session_cache_ttl_sec, }, "tooling": { "forward_enabled": settings.tool_forward_enabled, "allowlist": settings.tool_allowlist, "emulation_bridge_enabled": True, }, "pool": { "configured_instance_count": settings.instance_count, "default_model": settings.default_model, "default_ask_mode": settings.default_ask_mode, }, "auth": { "v1_requires_auth": bool(settings.api_keys), "admin_token_configured": bool(settings.admin_token), "metrics_public": settings.metrics_public, }, }, } @app.get("/capabilities") async def capabilities(): return JSONResponse(content=_capabilities_payload()) @app.get("/v1/capabilities", dependencies=[Depends(anthropic_auth_guard)]) async def v1_capabilities(): return JSONResponse(content=_capabilities_payload()) async def _ensure_instance_logged_in(inst: PoolInstance) -> dict: client = inst.client auto_login = inst.auto_login try: status = await client.auth_status() except Exception as exc: logger.warning("[%s] auth_status failed before chat: %s", inst.name, exc) raise HTTPException( status_code=503, detail={ "error": { "message": "Lingma is not ready", "type": "service_unavailable", } }, ) if status and status.get("id"): return status if not settings.auto_login_enabled: raise HTTPException( status_code=401, detail={ "error": { "message": "Lingma not logged in", "type": "invalid_request_error", } }, ) if settings.dedicated_domain_url: try: current = await client.get_endpoint() current_ep = ( (current or {}).get("endpoint", "") if isinstance(current, dict) else "" ) if current_ep != settings.dedicated_domain_url: await client.update_endpoint(settings.dedicated_domain_url) except Exception as exc: logger.warning("[%s] switch dedicated endpoint failed: %s", inst.name, exc) try: login_url, _login_raw = await client.generate_login_url() except Exception as exc: logger.warning("[%s] generate_login_url failed: %s", inst.name, exc) raise HTTPException( status_code=502, detail={ "error": { "message": "generate login url failed", "type": "upstream_error", } }, ) if not login_url: raise HTTPException( status_code=502, detail={ "error": { "message": "generate login url failed", "type": "upstream_error", } }, ) await auto_login.ensure_started(login_url) try: await auto_login.wait_done(timeout=settings.auto_login_timeout + 20) except Exception as exc: logger.warning("[%s] auto_login wait_done failed: %s", inst.name, exc) try: status = await client.auth_status() except Exception as exc: logger.warning("[%s] post-login auth_status failed: %s", inst.name, exc) status = None if status and status.get("id"): return status logger.warning( "[%s] auto login did not result in a logged-in session: %s", inst.name, auto_login.status(), ) raise HTTPException( status_code=401, detail={ "error": { "message": "Lingma auto login failed", "type": "invalid_request_error", } }, ) def _affinity_key_for(req: ChatCompletionsRequest) -> str | None: """Derive a stable affinity key so that follow-ups go to the same instance. Priority: explicit `user` > hash of the first/system message. """ if req.user: return req.user.strip() or None for m in req.messages: if m.role == "system": text = flatten_content(m.content) if text: return "sys:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] if req.messages: first = req.messages[0] text = flatten_content(first.content) if text: return "first:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] return None def _extract_api_key(request: Request) -> str: h = request.headers.get("authorization", "") if h.lower().startswith("bearer "): return h[7:].strip() return "" def _last_user_text(messages: list[dict]) -> str: """Extract the text of the latest user message (trailing from end). Used when we hit the session cache and only need to send the delta. Falls back to the last message regardless of role if no user is found. """ for m in reversed(messages): if m.get("role") == "user": return flatten_content(m.get("content")) or "" if messages: return flatten_content(messages[-1].get("content")) or "" return "" @app.get("/v1/models", dependencies=[Depends(anthropic_auth_guard)]) async def v1_models(): p = _require_pool() inst = p.pick() await _ensure_instance_logged_in(inst) await stats_collector.inc_models() models = await inst.client.query_models() keys = flatten_model_keys(models) name_map = build_model_name_map(models) resp = ModelsResponse(data=[ModelData(id=k, name=name_map.get(k)) for k in keys]) return JSONResponse(content=resp.model_dump()) def _messages_to_prompt(messages: list[dict]) -> str: parts: list[str] = [] for m in messages: role = m.get("role", "user") text = flatten_content(m.get("content")) if not text and m.get("tool_calls"): text = f"[tool_calls] {json.dumps(m['tool_calls'], ensure_ascii=False)}" if not text: continue parts.append(f"[{role}] {text}") return "\n".join(parts).strip() def _assistant_tool_calls_to_emulation_text(tool_calls: Any) -> str: if not isinstance(tool_calls, list): return "" blocks: list[str] = [] for item in tool_calls: if not isinstance(item, dict): continue fn = item.get("function") if isinstance(item.get("function"), dict) else None name = str((fn or {}).get("name") or item.get("name") or "").strip() if not name: continue arguments = (fn or {}).get("arguments") if isinstance(arguments, str): try: arguments = json.loads(arguments) except Exception: arguments = {"raw": arguments} if not isinstance(arguments, dict): arguments = {} blocks.append( "```json action\n" + json.dumps( {"tool": name, "parameters": arguments}, ensure_ascii=False, indent=2 ) + "\n```" ) return "\n\n".join(blocks) def _tool_action_block(name: str, arguments: dict[str, Any]) -> str: return ( "```json action\n" + json.dumps( {"tool": name, "parameters": arguments}, ensure_ascii=False, indent=2 ) + "\n```" ) def _anthropic_flattened_tool_history_to_emulation_text(text: str) -> str: if not text: return "" out: list[str] = [] for line in text.splitlines(): stripped = line.strip() if stripped.startswith("[tool_use]"): raw = stripped[len("[tool_use]") :].strip() try: payload = json.loads(raw) except Exception: out.append(line) continue if not isinstance(payload, dict): out.append(line) continue name = str(payload.get("name") or "").strip() arguments = payload.get("input") if name and isinstance(arguments, dict): out.append(_tool_action_block(name, arguments)) else: out.append(line) continue if stripped.startswith("[tool_result]"): out.append(action_output_prompt(None, stripped[len("[tool_result]") :].strip())) continue out.append(line) return "\n".join(part for part in out if part).strip() def _messages_to_emulation_prompt( messages: list[dict[str, Any]], *, system_text: str, tools: list[dict[str, Any]] | None, tool_choice: Any, ) -> str: filtered: list[tuple[str, str]] = [] for message in messages: role = str(message.get("role") or "").strip().lower() if role in {"system", "developer"}: continue text = flatten_content(message.get("content")) if role == "assistant" and message.get("tool_calls"): projected = _assistant_tool_calls_to_emulation_text(message.get("tool_calls")) if projected: text = "\n\n".join(part for part in [text, projected] if part) if role == "tool": text = action_output_prompt(message.get("tool_call_id"), text) role = "user" if not text: continue if role not in {"user", "assistant"}: continue filtered.append((role, text)) if not filtered: return system_text.strip() em_tools = _em_extract_openai_tools(tools) em_choice = _em_extract_openai_tool_choice(tool_choice) injected_system = inject_tooling(system_text, em_tools, em_choice) parts: list[str] = [] for role, text in filtered: label = "User" if role == "user" else "Assistant" parts.append(f"{label}: {text}") if injected_system: parts.append(injected_system) parts.append("Assistant:") return "\n\n".join(parts).strip() def _effective_tool_config_for_emulation( tool_config: dict[str, Any] | None, *, use_emulation: bool, ) -> dict[str, Any] | None: if use_emulation: return None return tool_config def _emulation_tools(raw_tools: list[dict[str, Any]] | None, tool_config: dict[str, Any] | None) -> list[dict[str, Any]] | None: if isinstance(tool_config, dict) and isinstance(tool_config.get("tools"), list): return tool_config.get("tools") return raw_tools def _anthropic_messages_to_emulation_prompt( messages: list[dict[str, Any]], *, system_text: str, tools: list[dict[str, Any]] | None, tool_choice: Any, ) -> str: filtered: list[tuple[str, str]] = [] for message in messages: role = str(message.get("role") or "").strip().lower() text = str(message.get("content") or "").strip() if role == "assistant" and "[tool_use]" in text: text = _anthropic_flattened_tool_history_to_emulation_text(text) elif role == "user" and "[tool_result]" in text: text = _anthropic_flattened_tool_history_to_emulation_text(text) if role == "tool": text = action_output_prompt(message.get("tool_call_id"), text) role = "user" if not text: continue if role not in {"user", "assistant"}: continue filtered.append((role, text)) if not filtered: return system_text.strip() em_tools = _em_extract_anthropic_tools(tools) em_choice = _em_extract_anthropic_tool_choice(tool_choice) injected_system = inject_tooling(system_text, em_tools, em_choice) parts: list[str] = [] for role, text in filtered: label = "User" if role == "user" else "Assistant" parts.append(f"{label}: {text}") if injected_system: parts.append(injected_system) parts.append("Assistant:") return "\n\n".join(parts).strip() def _include_usage(stream_options: dict | None) -> bool: if not isinstance(stream_options, dict): return False return bool(stream_options.get("include_usage")) def _resolve_ask_mode(model: str, has_tooling_context: bool) -> str: return _shared_resolve_ask_mode( model, has_tooling_context, default_ask_mode=settings.default_ask_mode, ) async def _apply_cached_instance_or_invalidate( *, protocol: str, inst: PoolInstance, cached_instance_name: str | None, cached_session_id: str | None, lookup_key: str | None, ) -> str | None: return await _shared_apply_cached_instance_or_invalidate( protocol=protocol, logger=logger, session_cache=session_cache, inst=inst, cached_instance_name=cached_instance_name, cached_session_id=cached_session_id, lookup_key=lookup_key, ) def _streaming_response(event_stream) -> StreamingResponse: return StreamingResponse( event_stream, media_type="text/event-stream", headers=STREAMING_RESPONSE_HEADERS, ) def _stream_event_type(event: Any) -> str: if isinstance(event, dict): t = event.get("type") if t in {"text", "tool"}: return t return "text" def _stream_text(event: Any) -> str: if isinstance(event, dict): if event.get("type") == "text": text = event.get("text") if isinstance(text, str): return text return "" if isinstance(event, str): return event return "" @app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)]) async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): p = _require_pool() messages_dump = [m.model_dump() for m in req.messages] _record_debug_request("openai", "/v1/chat/completions", req.model_dump(mode="json"), request) api_key = _extract_api_key(request) or "-" # ------------------------------------------------------------- session reuse # Look up the "conversation prefix" (everything except the latest user turn) # in the session cache. A hit lets us: # 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache. # 2. Send only the new user message instead of the whole history. # 3. Stick the request to the pool instance that originally served it. tool_config = _openai_tool_config(req, settings=settings) has_tooling_context = _openai_has_tooling_context(req, messages_dump) logger.info( "chat.request stream=%s tooling=%s tool_config=%s", req.stream, has_tooling_context, _tool_config_summary(tool_config), ) execution = await prepare_execution_context( protocol="chat", requested_model=req.model, has_tooling_context=has_tooling_context, tool_config=tool_config, messages_dump=messages_dump, api_key=api_key, affinity_key=_affinity_key_for(req), pool=p, session_cache=session_cache, logger=logger, default_model=settings.default_model, default_ask_mode=settings.default_ask_mode, ensure_instance_logged_in=_ensure_instance_logged_in, last_user_text=_last_user_text, messages_to_prompt=_messages_to_prompt, ) ask_mode = execution.ask_mode write_key = execution.write_key cached_session_id = execution.cached_session_id inst = execution.inst model = execution.model prompt = execution.prompt is_reply = execution.is_reply include_usage = _include_usage(req.stream_options) emulation_tools = _emulation_tools(req.tools, tool_config) em_tools = _em_extract_openai_tools(emulation_tools) em_choice = _em_extract_openai_tool_choice(req.tool_choice) use_emulation = has_tooling_context if use_emulation: system_parts = [ flatten_content(m.content) for m in req.messages if m.role in {"system", "developer"} and flatten_content(m.content) ] prompt = _messages_to_emulation_prompt( messages_dump, system_text="\n\n".join(system_parts), tools=emulation_tools, tool_choice=req.tool_choice, ) execution.prompt = prompt effective_tool_config = _effective_tool_config_for_emulation( tool_config, use_emulation=use_emulation, ) try: started = await start_execution( protocol="chat", execution=execution, stream=req.stream, chat_guard=chat_guard, logger=logger, estimate_tokens=estimate_tokens, ) except ValueError: raise HTTPException( status_code=400, detail={ "error": { "message": "messages is empty", "type": "invalid_request_error", } }, ) except BackpressureRejected as exc: retry_after = max(1, int(exc.retry_after)) logger.warning("chat rejected by backpressure, retry_after=%ds", retry_after) raise HTTPException( status_code=429, detail={ "error": { "message": "Too many in-flight requests, please retry later", "type": "rate_limit_error", "code": "backpressure", } }, headers={"Retry-After": str(retry_after)}, ) ticket = started.ticket prompt_tokens = started.prompt_tokens ticket_transferred = False try: if req.stream: created = int(time.time()) completion_id = f"chatcmpl-{uuid.uuid4().hex}" completion_tokens_holder = {"n": 0} stream_meta: dict = {} forced_tool_name = _openai_forced_tool_name(req.tool_choice) async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): success = False tool_call_indexes: dict[str, int] = {} saw_tool_call = False buffered_text_parts: list[str] = [] def _text_payload(text: str) -> str: payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"content": text}, "finish_reason": None, } ], } return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" try: async for chunk in _inst.client.chat_stream( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=effective_tool_config, out_meta=_meta, ): if _stream_event_type(chunk) == "tool": tool = _allowed_stream_tool_event( chunk, tool_config=tool_config, forced_tool_name=forced_tool_name, ) if not tool: continue if buffered_text_parts: for buffered_text in buffered_text_parts: yield _text_payload(buffered_text) buffered_text_parts.clear() tool_id = str(tool.get("id") or "") if not tool_id: tool_id = f"call_{len(tool_call_indexes)}" idx = tool_call_indexes.get(tool_id) if idx is None: idx = len(tool_call_indexes) tool_call_indexes[tool_id] = idx saw_tool_call = True payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": idx, **_openai_tool_call( tool, forced_id=tool_id ), } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" continue text = _stream_text(chunk) if not text: continue buffered_text_parts.append(text) completion_tokens_holder["n"] += estimate_tokens(text) if use_emulation: continue full_text = "".join(buffered_text_parts) if req.tools: if "[tool_calls]".startswith(full_text) or "[tool_calls]" in full_text: continue if forced_tool_name and not saw_tool_call: continue # Yield all buffered text text_to_yield = "".join(buffered_text_parts) buffered_text_parts.clear() yield _text_payload(text_to_yield) if buffered_text_parts and not saw_tool_call: merged_text = "".join(buffered_text_parts) extracted_tool_calls = _extract_tool_calls_from_text(merged_text) if extracted_tool_calls: saw_tool_call = True for i, tc in enumerate(extracted_tool_calls): tool_id = str(tc.get("id") or f"call_inferred_{i}") tool_call_indexes[tool_id] = i payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": i, **_openai_tool_call(tc, forced_id=tool_id), } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" buffered_text_parts.clear() if buffered_text_parts and forced_tool_name and not saw_tool_call: merged_text = "".join(buffered_text_parts) inferred = _extract_function_call_event_from_text( merged_text, forced_tool_name=forced_tool_name, ) if inferred is None: inferred = _extract_hash_tool_call_event_from_text( merged_text, forced_tool_name=forced_tool_name, ) if inferred is None: inferred = _forced_tool_fallback_event( merged_text, forced_tool_name=forced_tool_name, tools=req.tools, ) if inferred is not None: tool_id = "call_inferred_0" tool_call_indexes[tool_id] = 0 saw_tool_call = True payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": 0, **_openai_tool_call( inferred, forced_id=tool_id ), } ] }, "finish_reason": None, } ], } buffered_text_parts.clear() yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" if buffered_text_parts and req.tools and not saw_tool_call: merged_text = "".join(buffered_text_parts) inferred = _infer_tool_event_from_declared_tools( merged_text, tools=req.tools, ) if inferred is not None: tool_id = "call_inferred_0" tool_call_indexes[tool_id] = 0 saw_tool_call = True payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": 0, **_openai_tool_call( inferred, forced_id=tool_id ), } ] }, "finish_reason": None, } ], } buffered_text_parts.clear() yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" if buffered_text_parts and req.tools and not saw_tool_call: merged_text = "".join(buffered_text_parts) parsed_calls, remaining = parse_action_blocks(merged_text, em_tools) if parsed_calls: saw_tool_call = True for i, call in enumerate(parsed_calls): tool_id = call.id or f"call_inferred_{i}" tool_call_indexes[tool_id] = i payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": { "tool_calls": [ { "index": i, **openai_tool_call_from_emulated(call), } ] }, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" buffered_text_parts = [remaining] if remaining else [] if buffered_text_parts and saw_tool_call: text_to_yield = "".join(buffered_text_parts) buffered_text_parts.clear() yield _text_payload(text_to_yield) done_payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "tool_calls" if saw_tool_call else "stop", } ], } yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n" if include_usage: usage_payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens_holder["n"], "total_tokens": prompt_tokens + completion_tokens_holder["n"], }, } yield f"data: {json.dumps(usage_payload, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" success = True except asyncio.CancelledError: logger.info( "chat.stream cancelled by client (inst=%s, session_id=%s)", _inst.name, cached_session_id, ) raise except Exception as exc: logger.warning( "chat.stream error (inst=%s, session_id=%s, prompt_tokens=%s, completion_tokens=%s): %s", _inst.name, cached_session_id, prompt_tokens, completion_tokens_holder["n"], exc, ) finally: await finalize_stream_execution( success=success, write_key=write_key, session_id=_meta.get("session_id"), inst=_inst, ticket=_ticket, session_cache=session_cache, stats_collector=stats_collector, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens_holder["n"], ) ticket_transferred = True return _streaming_response(event_stream()) try: completed = await complete_execution( protocol="chat", execution=execution, prompt_tokens=prompt_tokens, tool_config=effective_tool_config, logger=logger, stats_collector=stats_collector, session_cache=session_cache, estimate_tokens=estimate_tokens, ) except UpstreamExecutionError: raise HTTPException( status_code=502, detail={ "error": { "message": "upstream lingma error", "type": "upstream_error", } }, ) result = completed.result completion_tokens = completed.completion_tokens forced_tool_name = _openai_forced_tool_name(req.tool_choice) tool_events = _allowed_tool_events( result.get("toolEvents"), tool_config=tool_config, forced_tool_name=forced_tool_name, ) message_content = result.get("text") or "" tool_calls: list[dict[str, Any]] = [] saw_tool_call = False for idx, item in enumerate(tool_events): tool_id = str(item.get("id") or f"call_{idx}") tool_calls.append(_openai_tool_call(item, forced_id=tool_id)) saw_tool_call = True if not saw_tool_call: extracted_tool_calls = _extract_tool_calls_from_text(message_content) if extracted_tool_calls: for idx, tc in enumerate(extracted_tool_calls): tool_id = str(tc.get("id") or f"call_inferred_{idx}") tool_calls.append(_openai_tool_call(tc, forced_id=tool_id)) saw_tool_call = True message_content = "" if not saw_tool_call and forced_tool_name: inferred = _extract_function_call_event_from_text( message_content, forced_tool_name=forced_tool_name, ) if inferred is None: inferred = _extract_hash_tool_call_event_from_text( message_content, forced_tool_name=forced_tool_name, ) if inferred is None: inferred = _forced_tool_fallback_event( message_content, forced_tool_name=forced_tool_name, tools=req.tools, ) if inferred is not None: tool_calls.append( _openai_tool_call(inferred, forced_id="call_inferred_0") ) saw_tool_call = True message_content = "" if not saw_tool_call and req.tools: inferred = _infer_tool_event_from_declared_tools( message_content, tools=req.tools, ) if inferred is not None: tool_calls.append( _openai_tool_call(inferred, forced_id="call_inferred_0") ) saw_tool_call = True message_content = "" if not saw_tool_call and em_tools: parsed_calls, remaining = parse_action_blocks(message_content, em_tools) if parsed_calls: for call in parsed_calls: tool_calls.append(openai_tool_call_from_emulated(call)) saw_tool_call = True message_content = remaining if not saw_tool_call and em_tools: inferred_call = infer_declared_tool_call_from_text(message_content, em_tools) if inferred_call is None: inferred_calls = infer_tool_calls_from_text(message_content, em_tools) inferred_call = inferred_calls[0] if inferred_calls else None if inferred_call is not None: tool_calls.append(openai_tool_call_from_emulated(inferred_call)) saw_tool_call = True message_content = "" if not saw_tool_call and em_tools: retry_prompt = f"{prompt}\n\n{force_tooling_prompt(em_choice)}" retry_result = await inst.client.chat_complete( retry_prompt, model, ask_mode, session_id=None, is_reply=False, tool_config=effective_tool_config, ) retry_text = retry_result.get("text") or "" parsed_calls, remaining = parse_action_blocks(retry_text, em_tools) if parsed_calls: for call in parsed_calls: tool_calls.append(openai_tool_call_from_emulated(call)) saw_tool_call = True message_content = remaining else: inferred_call = infer_declared_tool_call_from_text(retry_text, em_tools) if inferred_call is None: inferred_calls = infer_tool_calls_from_text(retry_text, em_tools) inferred_call = inferred_calls[0] if inferred_calls else None if inferred_call is not None: tool_calls.append(openai_tool_call_from_emulated(inferred_call)) saw_tool_call = True message_content = "" response = ChatCompletionResponse( id=f"chatcmpl-{uuid.uuid4().hex}", created=int(time.time()), model=model, choices=[ ChatCompletionChoice( index=0, finish_reason="tool_calls" if saw_tool_call else "stop", message={ "role": "assistant", "content": message_content, "tool_calls": tool_calls or None, }, ) ], ) data = response.model_dump() data["latency"] = { "first_token_ms": result.get("firstTokenLatencyMs"), "total_ms": result.get("totalLatencyMs"), } data["usage"] = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } data["served_by"] = inst.name return JSONResponse(content=data) finally: if not ticket_transferred: release_execution(ticket=ticket, inst=inst) @app.post("/responses", dependencies=[Depends(auth_guard)]) @app.post("/v1/responses", dependencies=[Depends(auth_guard)]) async def v1_responses(req: ResponsesRequest, request: Request): return await handle_responses( req, request, chat_completions_handler=v1_chat_completions, streaming_response_headers=STREAMING_RESPONSE_HEADERS, ) def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse: """Build an Anthropic-shaped error response (`type:error` envelope).""" return JSONResponse( status_code=status_code, content={"type": "error", "error": {"type": error_type, "message": message}}, ) def _anthropic_stop_reason( completion_tokens: int, max_tokens: int, *, has_pending_tool_use: bool = False, ) -> str: """Approximate Anthropic `stop_reason`.""" if has_pending_tool_use: return "tool_use" if max_tokens and completion_tokens >= max_tokens: return "max_tokens" return "end_turn" @app.post("/v1/messages/count_tokens") async def v1_messages_count_tokens(req: AnthropicMessagesRequest, request: Request): """Anthropic-compatible token counting endpoint. Claude Code may probe this endpoint; return Anthropic-shaped response. """ try: require_anthropic_key(request, settings.api_keys) except AnthropicAuthError as exc: return _anthropic_error(exc.status_code, exc.error_type, exc.message) messages_dump = anthropic_to_internal_messages(req) prompt = _messages_to_prompt(messages_dump) return JSONResponse(content={"input_tokens": estimate_tokens(prompt)}) @app.post("/v1/messages") async def v1_messages(req: AnthropicMessagesRequest, request: Request): """Anthropic Messages API compatible endpoint. Wire contract: * auth: `x-api-key` header (fallback Authorization: Bearer) * body: Anthropic Messages spec (system top-level, content blocks, ...) * stream: named-event SSE (message_start / content_block_delta / ...) Internally we: 1. Normalise to the gateway's internal message list (`role/content` dicts) 2. Reuse the same pool pick + session cache + backpressure guard as `/v1/chat/completions`. Session-cache keys include the API key, so Anthropic and OpenAI callers on the same key share KV-cache warmth. 3. Re-wrap outputs in Anthropic's response / SSE format. """ # ------------------------------------------------------------- auth try: require_anthropic_key(request, settings.api_keys) except AnthropicAuthError as exc: return _anthropic_error(exc.status_code, exc.error_type, exc.message) # ------------------------------------------------------------- plumbing try: p = _require_pool() except HTTPException as exc: return _anthropic_error( exc.status_code, "overloaded_error", "gateway not ready" ) messages_dump = anthropic_to_internal_messages(req) _record_debug_request("anthropic", "/v1/messages", req.model_dump(mode="json"), request) # Prefer the auth token actually accepted so session-cache bucketing is # consistent regardless of which auth header style the caller used. api_key = ( request.headers.get("x-api-key", "").strip() or _extract_api_key(request) or "-" ) # ------------------------------------------------------------- session reuse try: tool_config = _anthropic_tool_config(req, settings=settings) except HTTPException as exc: detail = exc.detail if isinstance(exc.detail, dict) else {} error = detail.get("error") if isinstance(detail.get("error"), dict) else {} message = error.get("message") or str(detail) or "invalid tool configuration" return _anthropic_error(exc.status_code, "invalid_request_error", message) has_tooling_context = _anthropic_has_tooling_context(req) logger.info( "anthropic.request stream=%s tooling=%s tool_config=%s", req.stream, has_tooling_context, _tool_config_summary(tool_config), ) try: execution = await prepare_execution_context( protocol="anthropic", requested_model=req.model, has_tooling_context=has_tooling_context, tool_config=tool_config, messages_dump=messages_dump, api_key=api_key, affinity_key=affinity_key_for_anthropic(req), pool=p, session_cache=session_cache, logger=logger, default_model=settings.default_model, default_ask_mode=settings.default_ask_mode, ensure_instance_logged_in=_ensure_instance_logged_in, last_user_text=_last_user_text, messages_to_prompt=_messages_to_prompt, ) except HTTPException as exc: err_type = ( "authentication_error" if exc.status_code == 401 else "overloaded_error" ) detail = exc.detail if isinstance(exc.detail, dict) else {} msg = ( (detail.get("error") or {}).get("message") or str(detail) or "upstream error" ) return _anthropic_error(exc.status_code, err_type, msg) ask_mode = execution.ask_mode write_key = execution.write_key cached_session_id = execution.cached_session_id inst = execution.inst model = execution.model prompt = execution.prompt is_reply = execution.is_reply emulation_tools = _emulation_tools(req.tools, tool_config) em_anthropic_tools = _em_extract_anthropic_tools(emulation_tools) em_anthropic_choice = _em_extract_anthropic_tool_choice(req.tool_choice) use_emulation = has_tooling_context if use_emulation: system_text = flatten_anthropic_content(req.system) if req.system else "" prompt = _anthropic_messages_to_emulation_prompt( messages_dump, system_text=system_text, tools=emulation_tools, tool_choice=req.tool_choice, ) execution.prompt = prompt effective_tool_config = _effective_tool_config_for_emulation( tool_config, use_emulation=use_emulation, ) try: started = await start_execution( protocol="anthropic", execution=execution, stream=req.stream, chat_guard=chat_guard, logger=logger, estimate_tokens=estimate_tokens, extra_log_context={"ctx_api": "anthropic"}, ) except ValueError: return _anthropic_error(400, "invalid_request_error", "messages is empty") except BackpressureRejected as exc: retry_after = max(1, int(exc.retry_after)) logger.warning( "anthropic rejected by backpressure, retry_after=%ds", retry_after ) resp = _anthropic_error( 429, "overloaded_error", "too many in-flight requests, please retry later", ) resp.headers["Retry-After"] = str(retry_after) return resp ticket = started.ticket prompt_tokens = started.prompt_tokens message_id = f"msg_{uuid.uuid4().hex}" ticket_transferred = False def _sse(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" try: if req.stream: completion_tokens_holder = {"n": 0} stream_meta: dict = {} max_tokens = req.max_tokens forced_tool_name = _anthropic_forced_tool_name(req.tool_choice) aggregate_emulated_tools = bool(em_anthropic_tools) async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): success = False block_index = 0 text_block_open = False saw_pending_tool_use = False buffered_text_parts: list[str] = [] try: # 1) message_start — Anthropic SDKs read this first to get # the message envelope (id/model/initial usage). start_payload = { "type": "message_start", "message": { "id": message_id, "type": "message", "role": "assistant", "model": model, "content": [], "stop_reason": None, "stop_sequence": None, # input_tokens is authoritative here; output_tokens # is seeded to 0 and updated in message_delta. "usage": { "input_tokens": prompt_tokens, "output_tokens": 0, }, }, } yield _sse("message_start", start_payload) async for chunk in _inst.client.chat_stream( prompt, model, ask_mode, session_id=cached_session_id, is_reply=is_reply, tool_config=effective_tool_config, out_meta=_meta, ): if _stream_event_type(chunk) == "tool": if text_block_open: yield _sse( "content_block_stop", { "type": "content_block_stop", "index": block_index, }, ) block_index += 1 text_block_open = False tool = _allowed_stream_tool_event( chunk, tool_config=tool_config, forced_tool_name=forced_tool_name, ) if not tool: continue tool_id = str( tool.get("id") or f"toolu_stream_{block_index}" ) tool_use_block = _anthropic_tool_use_block( tool, forced_id=tool_id ) yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": tool_use_block, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 tool_result_block = _anthropic_tool_result_block( tool, forced_id=tool_id ) if tool_result_block is not None: yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": tool_result_block, }, ) yield _sse( "content_block_stop", { "type": "content_block_stop", "index": block_index, }, ) block_index += 1 else: saw_pending_tool_use = True continue text = _stream_text(chunk) if not text: continue if aggregate_emulated_tools: buffered_text_parts.append(text) completion_tokens_holder["n"] += estimate_tokens(text) continue buffered_text_parts.append(text) merged_text = "".join(buffered_text_parts) parsed_calls, remaining = parse_action_blocks( merged_text, em_anthropic_tools ) if not parsed_calls: inferred = infer_declared_tool_call_from_text( merged_text, em_anthropic_tools, ) if inferred is None: inferred_calls = infer_tool_calls_from_text( merged_text, em_anthropic_tools, ) inferred = inferred_calls[0] if inferred_calls else None if inferred is not None: parsed_calls = [inferred] remaining = "" if parsed_calls: if text_block_open: yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 text_block_open = False saw_pending_tool_use = True for call in parsed_calls: yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": { "type": "tool_use", "id": call.id, "name": call.name, "input": {}, }, }, ) yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": { "type": "input_json_delta", "partial_json": json.dumps(call.arguments, ensure_ascii=False), }, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 buffered_text_parts = [remaining] if remaining else [] if not buffered_text_parts: continue text_to_emit = "".join(buffered_text_parts) buffered_text_parts.clear() completion_tokens_holder["n"] += estimate_tokens(text_to_emit) if not text_block_open: yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": {"type": "text", "text": ""}, }, ) text_block_open = True yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": text_to_emit}, }, ) if aggregate_emulated_tools: merged_text = "".join(buffered_text_parts) parsed_calls, remaining = parse_action_blocks( merged_text, em_anthropic_tools ) if not parsed_calls: inferred = infer_declared_tool_call_from_text( merged_text, em_anthropic_tools, ) if inferred is None: inferred_calls = infer_tool_calls_from_text( merged_text, em_anthropic_tools, ) inferred = inferred_calls[0] if inferred_calls else None if inferred is not None: parsed_calls = [inferred] remaining = "" if parsed_calls: if remaining.strip(): yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": {"type": "text", "text": ""}, }, ) yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": remaining}, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 for call in parsed_calls: saw_pending_tool_use = True yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": { "type": "tool_use", "id": call.id, "name": call.name, "input": {}, }, }, ) yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": { "type": "input_json_delta", "partial_json": json.dumps(call.arguments, ensure_ascii=False), }, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) block_index += 1 elif merged_text.strip(): yield _sse( "content_block_start", { "type": "content_block_start", "index": block_index, "content_block": {"type": "text", "text": ""}, }, ) yield _sse( "content_block_delta", { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": merged_text}, }, ) yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) if text_block_open: yield _sse( "content_block_stop", {"type": "content_block_stop", "index": block_index}, ) # 5) message_delta carries the terminal stop_reason and # the final cumulative output_tokens count. stop_reason = _anthropic_stop_reason( completion_tokens_holder["n"], max_tokens, has_pending_tool_use=saw_pending_tool_use, ) yield _sse( "message_delta", { "type": "message_delta", "delta": { "stop_reason": stop_reason, "stop_sequence": None, }, "usage": {"output_tokens": completion_tokens_holder["n"]}, }, ) # 6) message_stop — terminal event, no [DONE] sentinel. yield _sse("message_stop", {"type": "message_stop"}) success = True except asyncio.CancelledError: logger.info("anthropic.stream cancelled (inst=%s)", _inst.name) raise except Exception as exc: logger.warning( "anthropic.stream error (inst=%s): %s", _inst.name, exc ) # Best-effort error frame. Anthropic clients treat any # unexpected event gracefully; we prefer visibility over # silent truncation. try: yield _sse( "error", { "type": "error", "error": { "type": "api_error", "message": str(exc) or "upstream error", }, }, ) except Exception: pass finally: await finalize_stream_execution( success=success, write_key=write_key, session_id=_meta.get("session_id"), inst=_inst, ticket=_ticket, session_cache=session_cache, stats_collector=stats_collector, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens_holder["n"], ) ticket_transferred = True return _streaming_response(event_stream()) try: completed = await complete_execution( protocol="anthropic", execution=execution, prompt_tokens=prompt_tokens, tool_config=effective_tool_config, logger=logger, stats_collector=stats_collector, session_cache=session_cache, estimate_tokens=estimate_tokens, ) except UpstreamExecutionError: return _anthropic_error(502, "api_error", "upstream lingma error") result = completed.result text = result.get("text") or "" completion_tokens = completed.completion_tokens content_blocks: list[dict[str, Any]] = [] if text: content_blocks.append({"type": "text", "text": text}) forced_tool_name = _anthropic_forced_tool_name(req.tool_choice) tool_events = _allowed_tool_events( result.get("toolEvents"), tool_config=tool_config, forced_tool_name=forced_tool_name, ) saw_pending_tool_use = False saw_tool_event = False for idx, item in enumerate(tool_events): saw_tool_event = True tool_id = str(item.get("id") or f"toolu_nonstream_{idx}") content_blocks.append(_anthropic_tool_use_block(item, forced_id=tool_id)) tool_result = _anthropic_tool_result_block(item, forced_id=tool_id) if tool_result is not None: content_blocks.append(tool_result) else: saw_pending_tool_use = True if not saw_tool_event and em_anthropic_tools: parsed_calls, remaining = parse_action_blocks(text, em_anthropic_tools) if parsed_calls: content_blocks = [] if remaining: content_blocks.append({"type": "text", "text": remaining}) for call in parsed_calls: content_blocks.append( { "type": "tool_use", "id": call.id, "name": call.name, "input": call.arguments, } ) saw_tool_event = True saw_pending_tool_use = True text = remaining if not saw_tool_event and em_anthropic_tools: inferred_calls = infer_tool_calls_from_text(text, em_anthropic_tools) inferred_call = inferred_calls[0] if inferred_calls else None if inferred_call is not None: content_blocks = [ { "type": "tool_use", "id": inferred_call.id, "name": inferred_call.name, "input": inferred_call.arguments, } ] saw_tool_event = True saw_pending_tool_use = True text = "" if not saw_tool_event and em_anthropic_tools and not text.strip(): retry_prompt = f"{prompt}\n\n{force_tooling_prompt(em_anthropic_choice)}" retry_result = await inst.client.chat_complete( retry_prompt, model, ask_mode, session_id=None, is_reply=False, tool_config=effective_tool_config, ) retry_text = retry_result.get("text") or "" parsed_calls, remaining = parse_action_blocks(retry_text, em_anthropic_tools) if parsed_calls: content_blocks = [] if remaining: content_blocks.append({"type": "text", "text": remaining}) for call in parsed_calls: content_blocks.append( { "type": "tool_use", "id": call.id, "name": call.name, "input": call.arguments, } ) saw_tool_event = True saw_pending_tool_use = True text = remaining else: inferred_call = infer_declared_tool_call_from_text(retry_text, em_anthropic_tools) if inferred_call is None: inferred_calls = infer_tool_calls_from_text(retry_text, em_anthropic_tools) inferred_call = inferred_calls[0] if inferred_calls else None if inferred_call is not None: content_blocks = [ { "type": "tool_use", "id": inferred_call.id, "name": inferred_call.name, "input": inferred_call.arguments, } ] saw_tool_event = True saw_pending_tool_use = True text = "" if not saw_tool_event and forced_tool_name: inferred = _extract_function_call_event_from_text( text, forced_tool_name=forced_tool_name, ) if inferred is not None: content_blocks = [ _anthropic_tool_use_block(inferred, forced_id="toolu_inferred_0") ] saw_tool_event = True saw_pending_tool_use = True response_body: dict = { "id": message_id, "type": "message", "role": "assistant", "model": model, "content": content_blocks, "stop_reason": _anthropic_stop_reason( completion_tokens, req.max_tokens, has_pending_tool_use=saw_pending_tool_use, ), "stop_sequence": None, "usage": { "input_tokens": prompt_tokens, "output_tokens": completion_tokens, }, } return JSONResponse(content=response_body) finally: if not ticket_transferred: release_execution(ticket=ticket, inst=inst) @app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_start(instance: str | None = None): p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException( status_code=404, detail={"error": {"message": f"instance not found: {instance}"}}, ) else: target = p.pick() client = target.client auto_login = target.auto_login status = await client.auth_status() if status and status.get("id"): return { "ok": True, "state": "already_logged_in", "instance": target.name, "auth": status, } if settings.dedicated_domain_url: try: current = await client.get_endpoint() current_ep = ( (current or {}).get("endpoint", "") if isinstance(current, dict) else "" ) if current_ep != settings.dedicated_domain_url: await client.update_endpoint(settings.dedicated_domain_url) except Exception as exc: logger.warning( "[%s] switch dedicated endpoint failed: %s", target.name, exc ) try: login_url, _login_raw = await client.generate_login_url() except Exception as exc: logger.warning("[%s] generate_login_url failed: %s", target.name, exc) raise HTTPException( status_code=502, detail={"error": {"message": "generate login url failed"}} ) if not login_url: raise HTTPException( status_code=502, detail={"error": {"message": "generate login url failed"}} ) started = await auto_login.ensure_started(login_url) return { "ok": True, "state": "running" if started else "already_running", "instance": target.name, "auto_login": auto_login.status(), } @app.get("/internal/auto-login/status", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_status(): p = _require_pool() out = [] for inst in p.instances: try: auth = await inst.client.auth_status() except Exception as exc: auth = {"error": str(exc)} out.append( { "instance": inst.name, "auto_login": inst.auto_login.status(), "auth": auth, "state": inst.client.state, } ) return {"ok": True, "instances": out} @app.post("/internal/session/export", dependencies=[Depends(admin_auth_guard)]) async def internal_session_export(instance: str | None = None): """Export a logged-in Lingma session as a base64 tar.gz bundle. The returned `bundle_b64` can be dropped into `LINGMA_SESSION_BUNDLE` (or the `session_bundle` field in `LINGMA_ACCOUNTS` JSON) on any other deployment to skip Playwright login entirely. Safety: - Requires a valid API key. - Only works on instances that are currently authenticated (prevents exporting garbage from a half-initialised workDir). - Response is not streamed to logs; callers must store it themselves. """ p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException( status_code=404, detail={"error": f"instance {instance} not found"} ) else: target = p.pick() try: status = await target.client.auth_status() except Exception as exc: raise HTTPException( status_code=503, detail={"error": f"instance {target.name} not ready: {exc}"}, ) if not (status and status.get("id")): raise HTTPException( status_code=409, detail={"error": f"instance {target.name} is not logged in"}, ) try: raw = pack_workdir(target.cfg.work_dir) except Exception as exc: raise HTTPException(status_code=500, detail={"error": str(exc)}) bundle_b64 = encode_bundle(raw) logger.info( "session bundle exported from %s (%d bytes raw, %d bytes b64)", target.name, len(raw), len(bundle_b64), ) return { "instance": target.name, "account": target.cfg.account.username or "", "raw_bytes": len(raw), "bundle_b64": bundle_b64, } @app.get("/internal/models/raw", dependencies=[Depends(admin_auth_guard)]) async def internal_models_raw(instance: str | None = None): """Return the raw `config/queryModels` response from Lingma. This is the authoritative source for per-key displayName, description, capability flags, etc. We only ever extract `key` + `displayName` for OpenAI compatibility, but clients may want to inspect everything. """ p = _require_pool() target = None if instance: for inst in p.instances: if inst.name == instance: target = inst break if target is None: raise HTTPException( status_code=404, detail={"error": f"instance {instance} not found"} ) else: target = p.pick() await _ensure_instance_logged_in(target) raw = await target.client.query_models() name_map = build_model_name_map(raw if isinstance(raw, dict) else {}) return { "instance": target.name, "raw": raw, "extracted_name_map": name_map, "exposed_keys": flatten_model_keys(raw if isinstance(raw, dict) else {}), } @app.get("/internal/stats", dependencies=[Depends(admin_auth_guard)]) async def internal_stats(): p = _require_pool() return { "ok": True, "stats": await stats_collector.snapshot(), "concurrency": chat_guard.stats(), "pool": p.stats(), "session_cache": session_cache.stats(), } @app.get("/internal/effective-config", dependencies=[Depends(admin_auth_guard)]) async def internal_effective_config(): cfg = settings return JSONResponse(content={ "ok": True, "settings": { "host": cfg.host, "port": cfg.port, "api_keys": _safe_setting_value("api_keys", cfg.api_keys), "metrics_token": _safe_setting_value("metrics_token", cfg.metrics_token), "admin_token": _safe_setting_value("admin_token", cfg.admin_token), "metrics_public": cfg.metrics_public, "log_level": cfg.log_level, "gateway_max_in_flight": cfg.gateway_max_in_flight, "gateway_queue_timeout_sec": cfg.gateway_queue_timeout_sec, "lingma_bin": cfg.lingma_bin, "lingma_work_dir": cfg.lingma_work_dir, "lingma_socket_port": cfg.lingma_socket_port, "lingma_startup_timeout": cfg.lingma_startup_timeout, "lingma_rpc_timeout": cfg.lingma_rpc_timeout, "default_model": cfg.default_model, "default_ask_mode": cfg.default_ask_mode, "dedicated_domain_url": cfg.dedicated_domain_url, "auto_login_enabled": cfg.auto_login_enabled, "auto_login_headless": cfg.auto_login_headless, "auto_login_timeout": cfg.auto_login_timeout, "auto_login_max_retry": cfg.auto_login_max_retry, "instance_count": cfg.instance_count, "session_reuse_enabled": cfg.session_reuse_enabled, "session_cache_max_entries": cfg.session_cache_max_entries, "session_cache_ttl_sec": cfg.session_cache_ttl_sec, "tool_forward_enabled": cfg.tool_forward_enabled, "tool_allowlist": cfg.tool_allowlist, "accounts": [ { "username": account.username, "password": _safe_setting_value("password", account.password), "session_bundle_b64": _safe_setting_value( "session_bundle_b64", account.session_bundle_b64 ), "session_bundle_file": account.session_bundle_file, } for account in cfg.accounts ], }, "feature_flags": { "tool_forward_enabled": cfg.tool_forward_enabled, "session_reuse_enabled": cfg.session_reuse_enabled, "metrics_public": cfg.metrics_public, "auto_login_enabled": cfg.auto_login_enabled, }, }) @app.get("/metrics", dependencies=[Depends(metrics_auth_guard)]) async def metrics(): base = await stats_collector.prometheus_text() lines = list(chat_guard.prometheus_lines()) if pool is not None: lines.extend(pool.prometheus_lines()) lines.extend(session_cache.prometheus_lines()) extra = "\n".join(lines) + "\n" return StreamingResponse( iter([base + extra]), media_type="text/plain; version=0.0.4" )