diff --git a/DESIGN.md b/DESIGN.md index 7cd214b..43c35a6 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -37,6 +37,7 @@ ### 目标 1. **OpenAI 协议兼容**:任何支持 OpenAI 的客户端(curl、Cursor、Dify、LangChain、LiteLLM…)不改代码就能接入 Lingma。 +1b. **Anthropic Messages 协议兼容**:Claude Code / anthropic-sdk-python / Cursor Agent 等只会说 Anthropic 的客户端也能直接接入,和 OpenAI 共享同一 session cache 与池。 2. **单节点生产可用**:自用场景下能长期跑 7×24,包含合理的观测、鉴权、背压、错误恢复。 3. **最大化利用单账号 / 多账号的配额**:通过多实例池 + 会话复用把后端吞吐做到接近原始 VSCode 插件水平。 4. **降低运维成本**:首次登录成功后,可以导出一份 bundle 永久复用,彻底摆脱浏览器自动化的不稳定性。 @@ -101,6 +102,7 @@ | `config.py` | 178 | env → `Settings` dataclass;`LINGMA_ACCOUNTS` 多格式解析;bundle 字段归一化 | `main.py` | — | | `model_map.py` | 84 | Lingma 模型 `key ↔ displayName` 双向映射;请求 `model` 解析(`id` 或 `name` 都认) | `main.py` | — | | `openai_schema.py` | 91 | OpenAI 请求/响应 Pydantic;多模态内容 `flatten_content` 降级 | `main.py`, `session_cache.py` | — | +| `anthropic_schema.py` | ~140 | Anthropic Messages 请求 Pydantic;content blocks `flatten_anthropic_content`;`anthropic_to_internal_messages` 归一化到内部消息;`affinity_key_for_anthropic` 选池键 | `main.py` | — | | `stats.py` | 85 | 请求次数 / token 估算 / Prometheus 文本 | `main.py` | — | | `logging_config.py` | 56 | 结构化 JSON logger;`request_id` 通过 `ContextVar` 注入每行 | 所有模块 | — | | `bootstrap_lingma.py` | 199 | 启动时从 Marketplace / VSIX 提取 Lingma 二进制到 `data/bin/` | 容器启动脚本 | — | @@ -274,6 +276,58 @@ async def event_stream(): 1. `ticket_transferred=True` 一旦设成 true,外层 `finally` 就不会 release ticket;责任转交给 `event_stream()` 的 finally。否则会 release 两次(虽然幂等,但会把 in_flight 计成 -1)。 2. `chat_stream` 走的是 JSON-RPC **notify** 而非 request。早期版本用 request 会等 30s 才下第一个字节(见决策 5.1)。 +### 4.3b 流式 Anthropic Messages(/v1/messages) + +输入输出协议都不同于 OpenAI,但中间层完全复用: + +``` +client ──► POST /v1/messages (x-api-key / Bearer) + │ + ▼ +require_anthropic_key # x-api-key 优先;缺了 → AnthropicAuthError + │ +anthropic_to_internal_messages(req) # system → role="system";content blocks flatten + │ # 结果与 OpenAI 路径完全同构 (role/content dict) + ▼ +session_cache lookup / affinity pick # 与 OpenAI 共享同一 SessionCache 实例 + │ # → 同一用户切协议不丢 KV cache + ▼ +pool.pick(affinity) + ensure_logged_in + │ + ▼ +resolve_model("claude-3-5-sonnet-*") # 兜底到 default_model + │ + ▼ +chat_guard.try_acquire() # 与 OpenAI 路径同一 in-flight 池 + │ + ▼ stream=true +StreamingResponse(event_stream()) + │ + ├─ event: message_start ← 一次性:id / model / usage.input_tokens + ├─ event: content_block_start ← index=0, type=text + ├─ event: content_block_delta ← 每片 chunk 包一次 + │ ... + ├─ event: content_block_stop + ├─ event: message_delta ← stop_reason (+ output_tokens) + └─ event: message_stop ← 终止,无 [DONE] + │ + ▼ finally +session_cache.put(write_key, upstream_sessionId, inst.name) # 仅 success +ticket.release() + inst.in_flight-- +``` + +与 OpenAI 路径的差异点: + +| 环节 | OpenAI | Anthropic | +|---|---|---| +| 鉴权 | `Authorization: Bearer` | `x-api-key`(fallback Bearer)| +| 系统消息 | messages 数组里的 `role:"system"` | 顶层 `system` 字段 | +| 内容结构 | `str` 或 `[{type:"text"|"image_url"...}]` | `str` 或 `[{type:"text"|"image"|"tool_use"|"tool_result"...}]` | +| 流式帧 | `data: {delta:{content:"..."}}` + `[DONE]` | 命名事件序列 `message_start / content_block_* / message_delta / message_stop` | +| usage 语义 | `prompt_tokens / completion_tokens` | `input_tokens / output_tokens` | +| 错误 envelope | `{"error":{...}}` | `{"type":"error","error":{...}}` | +| finish 语义 | `finish_reason: "stop"\|"length"` | `stop_reason: "end_turn"\|"max_tokens"` | + ### 4.4 Lingma 子进程与 LSP 通信 ``` @@ -520,6 +574,16 @@ FastAPI `lifespan` 退出 → `pool.close()` → 每个 `client.close()` → 进 - **方案**:`client._proc` + `client._terminate_proc()`。pool 只负责 `client.start()` / `client.close()` 的调度,进程操作封装在 client 内部。 - **权衡**:client 文件变长,但边界清晰——pool 只看状态和在途数,具体进程是 client 的事。 +### 5.12 Anthropic Messages 端点独立编排而非内部转发 + +- **问题**:既要兼容 Anthropic API,又不能把 `v1_chat_completions` 的编排路径搞成大杂烩。 +- **方案**:单独写一个 `v1_messages` 端点,前半段(auth / 归一化到内部 messages / affinity / session cache lookup / instance pick / prompt 构造 / ticket 获取)与 OpenAI 端点结构对齐但各自实现;后半段(SSE 事件生成 / 响应包装)按 Anthropic 格式输出。 +- **共享的下沉层**:`LingmaPool` / `SessionCache` / `InFlightGuard` / `StatsCollector` / `LingmaGatewayClient.chat_stream|chat_complete` / `resolve_model`。 +- **为何不用一层统一抽象**:两端的输入/输出对象形状差异足够大(system 位置、content 类型、SSE 事件名、错误 envelope),抽象出来的中间类型反而掩盖差异、增加维护成本。当前重复代码约 150 行,但每条分支读起来直接对应 wire 协议,调试、改协议时都是线性阅读。 +- **会话复用跨协议**:`session_cache.build_key(api_key, messages)` 在两端都接收归一化后的 `{role, content}` 列表——同一用户从 OpenAI 切 Anthropic(只要对话前缀一致)可直接命中同一上游 `sessionId`,等于白送 KV cache。 +- **错误路径**:`AnthropicAuthError` 专用异常 + `@app.exception_handler` 渲染 Anthropic envelope;端点内部其他错误(HTTPException、backpressure)用 `_anthropic_error()` helper 直接返 `JSONResponse`,绕过 FastAPI 默认 `{"detail":...}` 包装。 +- **模型名**:不维护 `claude-* → dashscope_*` 映射表。`resolve_model` 的末位兜底(default_model / first available)会把所有陌生 id 退回到实际可用的 Lingma key,Anthropic 客户端继续传 `claude-3-5-sonnet-*` 即可工作。 + --- ## 6. 扩展指引(要做 X 改哪里) @@ -527,6 +591,7 @@ FastAPI `lifespan` 退出 → `pool.close()` → 每个 `client.close()` → 进 | 需求 | 改哪些文件 | 关键入口 | |---|---|---| | 加一个新的 OpenAI 端点(如 embeddings) | `main.py`, `openai_schema.py` | 仿照 `v1_models` 加 `@app.post("/v1/embeddings", dependencies=[Depends(auth_guard)])` | +| 扩展 Anthropic 端点(如 count_tokens / tool_use 贯通) | `main.py::v1_messages`, `anthropic_schema.py` | count_tokens 只读:复用 `estimate_tokens`;tool_use 需要 Lingma 上游支持,payload 转发点在 `chat_stream` / `chat_complete` | | 加一种新的实例调度策略(如加权轮询) | `lingma_pool.py::pick()` | 当前是 affinity → least-in-flight → round-robin | | 改认证为 JWT / OAuth | `auth.py` | 三个 `require_*` 函数是全部入口;`main.py` 里只有 `*_guard` 代理 | | 增加限流(按 api_key 配额) | `concurrency.py` 加 `PerKeyGuard`;`main.py` 在 `chat_guard.try_acquire()` 后再来一层 | 注意 ticket 释放顺序(内层先释放) | @@ -604,6 +669,17 @@ uvicorn app.main:app --reload --port 8317 收益:单轮没有显著改变(推理仍然花最多时间),但第 2 轮起 TTFB 降 40%~60%,视 prompt 长度。 +### M5 — Anthropic Messages 兼容 + +- **场景**:Claude Code / Cursor Agent / anthropic-sdk-python / 各种 agent 框架只会说 Anthropic 协议。 +- **改动**: + - 新增 `anthropic_schema.py`:`AnthropicMessagesRequest` + `anthropic_to_internal_messages` + `flatten_anthropic_content` + `affinity_key_for_anthropic`。 + - `auth.py` 新增 `require_anthropic_key`(`x-api-key` 优先,Bearer 回退)+ `AnthropicAuthError`。 + - `main.py` 新增 `/v1/messages` 端点:复用 `LingmaPool` / `SessionCache` / `InFlightGuard`;流式按 `message_start / content_block_start|delta|stop / message_delta / message_stop` Anthropic SSE 协议输出;错误 envelope 改写成 `{"type":"error","error":{...}}`。 + - `@app.exception_handler(AnthropicAuthError)` 渲染 Anthropic 错误 wire 格式。 +- **关键设计**:两端共享同一 `SessionCache`,同一 api_key 下的会话前缀哈希一致 → 跨协议命中同一上游 `sessionId`。详见 §5.12。 +- **模型名**:不维护 `claude-* → dashscope_*` 映射表,靠 `resolve_model` 末位兜底。 + ### M4 — 生产硬化包(commit `2febc37`) 用户代号"选项 A"。 diff --git a/README.md b/README.md index e0fffe6..64549c1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,10 @@ 把本地 Lingma 插件封装成 OpenAI 兼容接口。任何能调 OpenAI 的客户端(Cursor、Dify、LangChain、curl…)都能直接接入。 -**支持:** `GET /v1/models` / `POST /v1/chat/completions`(含 SSE 流式) / Bearer 鉴权 / Prometheus / 多账号实例池 / 会话复用 / 免浏览器登录态注入。 +**支持:** +- OpenAI 兼容:`GET /v1/models` / `POST /v1/chat/completions`(含 SSE 流式) / Bearer 鉴权 +- **Anthropic 兼容**:`POST /v1/messages`(含 Anthropic SSE 事件流) / `x-api-key` 鉴权 +- Prometheus / 多账号实例池 / 会话复用(跨两种协议共享) / 免浏览器登录态注入 > 想看架构、模块划分、设计决策、二开路线图 → 直接读 [`DESIGN.md`](./DESIGN.md)。 @@ -160,6 +163,7 @@ curl -s http://127.0.0.1:8317/v1/chat/completions \ | GET | `/healthz` | 免鉴权;返回 `ok` / `pool_size` / `pool_ready` / 每实例状态 | | GET | `/v1/models` | OpenAI 兼容;`id` 是 Lingma 原 key,`name` 是可读名 | | POST | `/v1/chat/completions` | OpenAI 兼容;`stream=true` 走 SSE;`model: "agent"` 切 agent 模式 | +| POST | `/v1/messages` | **Anthropic Messages 兼容**;`x-api-key` 或 `Authorization: Bearer`;`stream=true` 走 Anthropic 命名事件 SSE | **chat 请求示例(非流式)** @@ -182,6 +186,44 @@ curl -N http://127.0.0.1:8317/v1/chat/completions \ }' ``` +**Anthropic Messages 示例(非流式)** + +```bash +curl -s http://127.0.0.1:8317/v1/messages \ + -H "x-api-key: $API_KEY" \ + -H "anthropic-version: 2023-06-01" \ + -H "Content-Type: application/json" \ + -d '{ + "model":"claude-3-5-sonnet-20241022", + "max_tokens":256, + "system":"你是一个简洁的助手", + "messages":[{"role":"user","content":"你好"}] + }' +``` + +**Anthropic Messages 示例(流式)** + +```bash +curl -N http://127.0.0.1:8317/v1/messages \ + -H "x-api-key: $API_KEY" \ + -H "anthropic-version: 2023-06-01" \ + -H "Content-Type: application/json" \ + -d '{ + "model":"claude-3-5-sonnet-20241022", + "max_tokens":256, + "stream":true, + "messages":[{"role":"user","content":"写一首四行诗"}] + }' +# 返回 message_start / content_block_start / content_block_delta* / +# content_block_stop / message_delta / message_stop +``` + +说明: +- **模型名兼容**:客户端可以继续传 `claude-3-*` 等名字;未识别的 model 会回退到 `DEFAULT_MODEL` 对应的 Lingma key,后端实际仍由 Lingma 提供(Qwen 系列)。如需显式选模型,直接传 Lingma key(`dashscope_qmodel` 等)。 +- **会话复用共享**:Anthropic 与 OpenAI 两个端点共用同一 `SessionCache`,只要 API key 相同、对话前缀相同,就会命中同一上游 `sessionId`。 +- **多模态**:`image` 块会被降级为 `[image]` 占位符(Lingma 不支持 vision);`tool_use` / `tool_result` 会以纯文本形式保留语义。 +- **鉴权**:优先 `x-api-key` 头(Anthropic 官方 SDK 默认),回退 `Authorization: Bearer`(方便 curl / OpenAI 风格客户端)。 + ### 3.2 观测(`METRICS_TOKEN` 或 `API_KEYS`) | 方法 | 路径 | 说明 | diff --git a/app/anthropic_schema.py b/app/anthropic_schema.py new file mode 100644 index 0000000..af854b1 --- /dev/null +++ b/app/anthropic_schema.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +"""Anthropic Messages API schema + content adapters. + +Why this exists +--------------- +The Anthropic Messages API (`POST /v1/messages`) is wire-incompatible with +OpenAI chat completions even though it covers the same ground: + +* auth: `x-api-key` header (not `Authorization: Bearer`) +* system: separate top-level field, never a message role +* content: `str` or array of typed blocks (`text`, `image`, `tool_use`, ...) +* streaming: a named-event SSE protocol (`message_start`, `content_block_delta`, + `message_delta`, `message_stop`) rather than OpenAI's `delta.content` +* errors: `{"type":"error","error":{"type":"...","message":"..."}}` + +We keep a separate schema module rather than squeezing everything into +`openai_schema.py` so both adapters stay small and auditable. Both eventually +collapse to the same Lingma prompt shape inside `main.py`. +""" + +import json +from typing import Any, Literal + +from pydantic import BaseModel + + +# Anthropic accepts either a raw string or a list of typed content blocks. +# We keep the list loosely typed (plain dicts) so future block kinds +# (e.g. `thinking`, `document`) don't break the gateway — they simply fall +# into the generic flattener below. +AnthropicContent = str | list[dict[str, Any]] | None + + +class AnthropicMessage(BaseModel): + # Anthropic: system is a top-level field, messages only carry user/assistant. + role: Literal["user", "assistant"] + content: AnthropicContent = None + + +class AnthropicMessagesRequest(BaseModel): + model: str + # max_tokens is REQUIRED by Anthropic. We default to a sane value so callers + # that forget it don't 422 — easier migration from OpenAI clients. + max_tokens: int = 1024 + messages: list[AnthropicMessage] + system: AnthropicContent = None + stream: bool = False + temperature: float | None = None + top_p: float | None = None + top_k: int | None = None + stop_sequences: list[str] | None = None + # metadata.user_id is the official hint for per-user routing / abuse tracking. + metadata: dict[str, Any] | None = None + # Tools / tool_choice are accepted but we can't forward them to Lingma yet — + # they're preserved here so the request doesn't 422, and the flattener + # surfaces any tool_use blocks as `[tool_use] {...}` text so the assistant + # still sees the context. + tools: list[dict[str, Any]] | None = None + tool_choice: dict[str, Any] | None = None + + +def flatten_anthropic_content(content: AnthropicContent) -> str: + """Reduce Anthropic block arrays to a plain-string prompt for Lingma. + + Handled block types: + * text -> verbatim text + * image -> `[image]` placeholder (Lingma has no vision) + * tool_use -> `[tool_use] {json}` so the assistant can reference it + * tool_result -> `[tool_result] ...` (string or nested blocks) + * unknown -> fall back to `.text` / `.content` if present, else drop + + Returning an empty string here means the caller (prompt builder) will skip + the whole message rather than emit a bare `[role] ` line. + """ + if content is None: + return "" + if isinstance(content, str): + return content + if not isinstance(content, list): + return str(content) + + parts: list[str] = [] + for item in content: + if not isinstance(item, dict): + parts.append(str(item)) + continue + t = item.get("type") + if t == "text": + text = item.get("text") or "" + if text: + parts.append(text) + elif t == "image": + parts.append("[image]") + elif t == "tool_use": + # Compact one-line JSON keeps prompt_tokens estimate stable. + try: + payload = json.dumps( + {"name": item.get("name"), "input": item.get("input")}, + ensure_ascii=False, + ) + except Exception: + payload = str(item) + parts.append(f"[tool_use] {payload}") + elif t == "tool_result": + inner = item.get("content") + if isinstance(inner, str): + parts.append(f"[tool_result] {inner}") + elif isinstance(inner, list): + parts.append(f"[tool_result] {flatten_anthropic_content(inner)}") + else: + fallback = item.get("text") or item.get("content") + if isinstance(fallback, str) and fallback: + parts.append(fallback) + return "\n".join(p for p in parts if p) + + +def anthropic_to_internal_messages(req: AnthropicMessagesRequest) -> list[dict]: + """Project an Anthropic request into the gateway's internal message list. + + Internal shape matches what `_messages_to_prompt` already expects: + `[{"role": "system"|"user"|"assistant", "content": "..."}]`. This means + session-cache hashing is identical across OpenAI and Anthropic callers — + a user who migrates between the two endpoints keeps their session affinity + as long as they send the same conversation prefix. + """ + out: list[dict] = [] + if req.system: + sys_text = flatten_anthropic_content(req.system) + if sys_text: + out.append({"role": "system", "content": sys_text}) + for m in req.messages: + text = flatten_anthropic_content(m.content) + out.append({"role": m.role, "content": text}) + return out + + +def affinity_key_for_anthropic(req: AnthropicMessagesRequest) -> str | None: + """Best-effort stable routing key for an Anthropic request. + + Priority mirrors the OpenAI side: + 1. metadata.user_id (the official per-user hint) + 2. hash of the system prompt + 3. hash of the first message + + Kept here rather than in `main.py` because it needs the flatten helper and + the request type — `main.py` stays endpoint-shaped, not schema-shaped. + """ + import hashlib + + if req.metadata: + user_id = req.metadata.get("user_id") + if isinstance(user_id, str) and user_id.strip(): + return user_id.strip() + + if req.system: + text = flatten_anthropic_content(req.system) + if text: + return "sys:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] + + if req.messages: + text = flatten_anthropic_content(req.messages[0].content) + if text: + return "first:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] + return None diff --git a/app/auth.py b/app/auth.py index 437d08f..e7c08db 100644 --- a/app/auth.py +++ b/app/auth.py @@ -98,6 +98,58 @@ def require_metrics_access( ) +class AnthropicAuthError(Exception): + """Raised when an Anthropic Messages request fails authentication. + + Carries enough context for the endpoint to render the Anthropic-shaped + error body (`{"type":"error","error":{"type":..., "message":...}}`) — we + don't use `HTTPException` here because FastAPI would wrap the detail in + `{"detail": ...}`, which is not the Anthropic wire format. + """ + + def __init__(self, status_code: int, error_type: str, message: str) -> None: + super().__init__(message) + self.status_code = status_code + self.error_type = error_type + self.message = message + + +def require_anthropic_key(request: Request, api_keys: list[str]) -> None: + """Authenticate a `POST /v1/messages` request the Anthropic way. + + Accept order: + 1. `x-api-key` header (official Anthropic SDK / CLI / Claude Code) + 2. `Authorization: Bearer ` (OpenAI-shaped clients / curl) + + Empty `api_keys` means auth is disabled — the startup auth-posture warning + already covers that case loudly, same as `require_bearer`. + + Note: we keep `anthropic-version` header permissive (don't parse/validate) + so clients on any official version work without gateway churn. + """ + if not api_keys: + return + + token = request.headers.get("x-api-key", "").strip() + if not token: + auth = request.headers.get("authorization", "") + if auth.startswith("Bearer "): + token = auth[len("Bearer ") :].strip() + + if not token: + raise AnthropicAuthError( + status.HTTP_401_UNAUTHORIZED, + "authentication_error", + "missing x-api-key header (or Authorization: Bearer ...)", + ) + if not _match_any(token, api_keys): + raise AnthropicAuthError( + status.HTTP_401_UNAUTHORIZED, + "authentication_error", + "invalid x-api-key", + ) + + def require_admin_access( request: Request, api_keys: list[str], diff --git a/app/main.py b/app/main.py index f240536..1218f2c 100644 --- a/app/main.py +++ b/app/main.py @@ -10,7 +10,18 @@ from contextlib import asynccontextmanager from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse -from .auth import require_admin_access, require_bearer, require_metrics_access +from .anthropic_schema import ( + AnthropicMessagesRequest, + affinity_key_for_anthropic, + anthropic_to_internal_messages, +) +from .auth import ( + AnthropicAuthError, + require_admin_access, + require_anthropic_key, + require_bearer, + require_metrics_access, +) from .concurrency import BackpressureRejected, InFlightGuard from .config import Settings, load_settings from .lingma_pool import LingmaPool, PoolInstance @@ -85,7 +96,24 @@ async def lifespan(_app: FastAPI): await pool.close() -app = FastAPI(title="Lingma OpenAI Gateway", version="0.3.0", lifespan=lifespan) +app = FastAPI(title="Lingma OpenAI Gateway", version="0.4.0", lifespan=lifespan) + + +@app.exception_handler(AnthropicAuthError) +async def _anthropic_auth_error_handler(_request: Request, exc: AnthropicAuthError): + """Render auth failures on /v1/messages in the Anthropic wire format. + + FastAPI's default handler wraps everything in `{"detail": ...}`, which + Anthropic SDKs don't parse. We emit the canonical + `{"type":"error","error":{"type":"...","message":"..."}}` instead. + """ + return JSONResponse( + status_code=exc.status_code, + content={ + "type": "error", + "error": {"type": exc.error_type, "message": exc.message}, + }, + ) @app.middleware("http") @@ -594,6 +622,356 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): ticket.release() +def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse: + """Build an Anthropic-shaped error response (`type:error` envelope).""" + return JSONResponse( + status_code=status_code, + content={"type": "error", "error": {"type": error_type, "message": message}}, + ) + + +def _anthropic_stop_reason(completion_tokens: int, max_tokens: int) -> str: + """Approximate Anthropic `stop_reason`. + + Lingma doesn't expose a `max_tokens` knob, so we can't truly enforce it; + we report `max_tokens` only when the generated length meets or exceeds + the caller's stated ceiling. Everything else is `end_turn`. + """ + if max_tokens and completion_tokens >= max_tokens: + return "max_tokens" + return "end_turn" + + +@app.post("/v1/messages") +async def v1_messages(req: AnthropicMessagesRequest, request: Request): + """Anthropic Messages API compatible endpoint. + + Wire contract: + * auth: `x-api-key` header (fallback Authorization: Bearer) + * body: Anthropic Messages spec (system top-level, content blocks, ...) + * stream: named-event SSE (message_start / content_block_delta / ...) + + Internally we: + 1. Normalise to the gateway's internal message list (`role/content` dicts) + 2. Reuse the same pool pick + session cache + backpressure guard as + `/v1/chat/completions`. Session-cache keys include the API key, so + Anthropic and OpenAI callers on the same key share KV-cache warmth. + 3. Re-wrap outputs in Anthropic's response / SSE format. + """ + # ------------------------------------------------------------- auth + try: + require_anthropic_key(request, settings.api_keys) + except AnthropicAuthError as exc: + return _anthropic_error(exc.status_code, exc.error_type, exc.message) + + # ------------------------------------------------------------- plumbing + try: + p = _require_pool() + except HTTPException as exc: + return _anthropic_error(exc.status_code, "overloaded_error", "gateway not ready") + + messages_dump = anthropic_to_internal_messages(req) + # Prefer the auth token actually accepted so session-cache bucketing is + # consistent regardless of which auth header style the caller used. + api_key = ( + request.headers.get("x-api-key", "").strip() + or _extract_api_key(request) + or "-" + ) + + # ------------------------------------------------------------- session reuse + # Anthropic clients don't expose an ask_mode, so we always run in "chat". + ask_mode = "chat" + + reuse_eligible = ( + session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 + ) + lookup_key: str | None = None + write_key: str | None = None + cached_session_id: str | None = None + cached_instance_name: str | None = None + if reuse_eligible: + lookup_key = session_cache.build_key(api_key, messages_dump[:-1]) + write_key = session_cache.build_key(api_key, messages_dump) + entry = await session_cache.get(lookup_key) + if entry is not None: + cached_session_id = entry.session_id + cached_instance_name = entry.instance_name or None + + affinity = cached_instance_name or affinity_key_for_anthropic(req) + inst = p.pick(affinity_key=affinity) + + if cached_instance_name and inst.name != cached_instance_name: + logger.info( + "anthropic session cache instance %s unhealthy, falling back to %s", + cached_instance_name, + inst.name, + ) + cached_session_id = None + if lookup_key: + await session_cache.invalidate(lookup_key) + + try: + await _ensure_instance_logged_in(inst) + except HTTPException as exc: + # 503/401/502 from login: map to closest Anthropic kind. + err_type = "authentication_error" if exc.status_code == 401 else "overloaded_error" + detail = exc.detail if isinstance(exc.detail, dict) else {} + msg = (detail.get("error") or {}).get("message") or str(detail) or "upstream error" + return _anthropic_error(exc.status_code, err_type, msg) + + # ------------------------------------------------------------- prompt & model + models = await inst.client.query_models() + available = flatten_model_keys(models) + name_map = build_model_name_map(models) + # Anthropic callers send `claude-*` model names. resolve_model's + # final fallback (default_model / first available) handles that cleanly + # without us having to hard-code a mapping table. + model = resolve_model(req.model, available, settings.default_model, name_map) + + if cached_session_id: + prompt = _last_user_text(messages_dump) + is_reply = True + else: + prompt = _messages_to_prompt(messages_dump) + is_reply = False + + if not prompt: + return _anthropic_error(400, "invalid_request_error", "messages is empty") + + prompt_tokens = estimate_tokens(prompt) + + # ------------------------------------------------------------- backpressure + try: + ticket = await chat_guard.try_acquire() + except BackpressureRejected as exc: + retry_after = max(1, int(exc.retry_after)) + logger.warning("anthropic rejected by backpressure, retry_after=%ds", retry_after) + resp = _anthropic_error( + 429, + "overloaded_error", + "too many in-flight requests, please retry later", + ) + resp.headers["Retry-After"] = str(retry_after) + return resp + + inst.in_flight += 1 + message_id = f"msg_{uuid.uuid4().hex}" + logger.info( + "anthropic.start inst=%s model=%s stream=%s prompt_tokens~%d reuse=%s", + inst.name, + model, + req.stream, + prompt_tokens, + bool(cached_session_id), + extra={ + "ctx_instance": inst.name, + "ctx_model": model, + "ctx_ask_mode": ask_mode, + "ctx_stream": req.stream, + "ctx_prompt_tokens": prompt_tokens, + "ctx_in_flight": chat_guard.in_flight, + "ctx_affinity": affinity, + "ctx_session_reuse": bool(cached_session_id), + "ctx_api": "anthropic", + }, + ) + + ticket_transferred = False + + def _sse(event: str, data: dict) -> str: + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + try: + if req.stream: + completion_tokens_holder = {"n": 0} + stream_meta: dict = {} + max_tokens = req.max_tokens + + async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): + success = False + try: + # 1) message_start — Anthropic SDKs read this first to get + # the message envelope (id/model/initial usage). + start_payload = { + "type": "message_start", + "message": { + "id": message_id, + "type": "message", + "role": "assistant", + "model": model, + "content": [], + "stop_reason": None, + "stop_sequence": None, + # input_tokens is authoritative here; output_tokens + # is seeded to 0 and updated in message_delta. + "usage": { + "input_tokens": prompt_tokens, + "output_tokens": 0, + }, + }, + } + yield _sse("message_start", start_payload) + + # 2) content_block_start for a single text block (index 0). + yield _sse( + "content_block_start", + { + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""}, + }, + ) + + # 3) content_block_delta stream of text tokens. + async for chunk in _inst.client.chat_stream( + prompt, + model, + ask_mode, + session_id=cached_session_id, + is_reply=is_reply, + out_meta=_meta, + ): + if not chunk: + continue + completion_tokens_holder["n"] += estimate_tokens(chunk) + yield _sse( + "content_block_delta", + { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "text_delta", "text": chunk}, + }, + ) + + # 4) content_block_stop closes the single text block. + yield _sse( + "content_block_stop", + {"type": "content_block_stop", "index": 0}, + ) + + # 5) message_delta carries the terminal stop_reason and + # the final cumulative output_tokens count. + stop_reason = _anthropic_stop_reason( + completion_tokens_holder["n"], max_tokens + ) + yield _sse( + "message_delta", + { + "type": "message_delta", + "delta": { + "stop_reason": stop_reason, + "stop_sequence": None, + }, + "usage": {"output_tokens": completion_tokens_holder["n"]}, + }, + ) + + # 6) message_stop — terminal event, no [DONE] sentinel. + yield _sse("message_stop", {"type": "message_stop"}) + success = True + except asyncio.CancelledError: + logger.info("anthropic.stream cancelled (inst=%s)", _inst.name) + raise + except Exception as exc: + logger.warning("anthropic.stream error (inst=%s): %s", _inst.name, exc) + # Best-effort error frame. Anthropic clients treat any + # unexpected event gracefully; we prefer visibility over + # silent truncation. + try: + yield _sse( + "error", + { + "type": "error", + "error": { + "type": "api_error", + "message": str(exc) or "upstream error", + }, + }, + ) + except Exception: + pass + finally: + # Session write-back only on clean finish — partial streams + # leave Lingma's session in an indeterminate state. + if success and write_key: + sid = _meta.get("session_id") + if sid: + await session_cache.put(write_key, sid, _inst.name) + await stats_collector.record_chat( + stream=True, + success=success, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens_holder["n"], + ) + _inst.in_flight = max(0, _inst.in_flight - 1) + _ticket.release() + + ticket_transferred = True + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + }, + ) + + # ------------------------------------------------------------- non-stream + try: + result = await inst.client.chat_complete( + prompt, + model, + ask_mode, + session_id=cached_session_id, + is_reply=is_reply, + ) + except Exception as exc: + logger.warning("anthropic.complete error (inst=%s): %s", inst.name, exc) + await stats_collector.record_chat( + stream=False, + success=False, + prompt_tokens=prompt_tokens, + completion_tokens=0, + ) + if cached_session_id and lookup_key: + await session_cache.invalidate(lookup_key) + return _anthropic_error(502, "api_error", "upstream lingma error") + + text = result.get("text") or "" + completion_tokens = estimate_tokens(text) + await stats_collector.record_chat( + stream=False, + success=True, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + if write_key: + sid = result.get("sessionId") + if sid: + await session_cache.put(write_key, sid, inst.name) + + response_body: dict = { + "id": message_id, + "type": "message", + "role": "assistant", + "model": model, + "content": [{"type": "text", "text": text}], + "stop_reason": _anthropic_stop_reason(completion_tokens, req.max_tokens), + "stop_sequence": None, + "usage": { + "input_tokens": prompt_tokens, + "output_tokens": completion_tokens, + }, + } + return JSONResponse(content=response_body) + finally: + if not ticket_transferred: + inst.in_flight = max(0, inst.in_flight - 1) + ticket.release() + + @app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_start(instance: str | None = None): p = _require_pool()