feat: Anthropic Messages API compat (/v1/messages)
Add a wire-compatible Anthropic endpoint alongside the existing OpenAI one
so Claude Code / anthropic-sdk / Cursor Agent can hit Lingma directly.
- app/anthropic_schema.py (new): request model + content-block flattener
+ internal-messages adapter + affinity key helper. Handles text / image /
tool_use / tool_result blocks; unknown types degrade gracefully.
- app/auth.py: add require_anthropic_key (x-api-key, Bearer fallback)
and AnthropicAuthError so auth failures render in Anthropic's error
envelope instead of FastAPI's {detail:...} wrapper.
- app/main.py: POST /v1/messages. Shares LingmaPool / SessionCache /
InFlightGuard / StatsCollector with the OpenAI path — same api_key +
same conversation prefix hits the same upstream sessionId across both
protocols (KV cache carries over). Streaming emits the named Anthropic
event sequence (message_start / content_block_start / content_block_delta
/ content_block_stop / message_delta / message_stop). No claude-*
model mapping table: resolve_model's default fallback handles it.
- README.md / DESIGN.md: document the new endpoint, add decision 5.12,
iteration history M5, and a 4.3b streaming flow diagram.
- Bump FastAPI app version to 0.4.0.
Made-with: Cursor
This commit is contained in:
382
app/main.py
382
app/main.py
@@ -10,7 +10,18 @@ from contextlib import asynccontextmanager
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from .auth import require_admin_access, require_bearer, require_metrics_access
|
||||
from .anthropic_schema import (
|
||||
AnthropicMessagesRequest,
|
||||
affinity_key_for_anthropic,
|
||||
anthropic_to_internal_messages,
|
||||
)
|
||||
from .auth import (
|
||||
AnthropicAuthError,
|
||||
require_admin_access,
|
||||
require_anthropic_key,
|
||||
require_bearer,
|
||||
require_metrics_access,
|
||||
)
|
||||
from .concurrency import BackpressureRejected, InFlightGuard
|
||||
from .config import Settings, load_settings
|
||||
from .lingma_pool import LingmaPool, PoolInstance
|
||||
@@ -85,7 +96,24 @@ async def lifespan(_app: FastAPI):
|
||||
await pool.close()
|
||||
|
||||
|
||||
app = FastAPI(title="Lingma OpenAI Gateway", version="0.3.0", lifespan=lifespan)
|
||||
app = FastAPI(title="Lingma OpenAI Gateway", version="0.4.0", lifespan=lifespan)
|
||||
|
||||
|
||||
@app.exception_handler(AnthropicAuthError)
|
||||
async def _anthropic_auth_error_handler(_request: Request, exc: AnthropicAuthError):
|
||||
"""Render auth failures on /v1/messages in the Anthropic wire format.
|
||||
|
||||
FastAPI's default handler wraps everything in `{"detail": ...}`, which
|
||||
Anthropic SDKs don't parse. We emit the canonical
|
||||
`{"type":"error","error":{"type":"...","message":"..."}}` instead.
|
||||
"""
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content={
|
||||
"type": "error",
|
||||
"error": {"type": exc.error_type, "message": exc.message},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
@@ -594,6 +622,356 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
ticket.release()
|
||||
|
||||
|
||||
def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse:
|
||||
"""Build an Anthropic-shaped error response (`type:error` envelope)."""
|
||||
return JSONResponse(
|
||||
status_code=status_code,
|
||||
content={"type": "error", "error": {"type": error_type, "message": message}},
|
||||
)
|
||||
|
||||
|
||||
def _anthropic_stop_reason(completion_tokens: int, max_tokens: int) -> str:
|
||||
"""Approximate Anthropic `stop_reason`.
|
||||
|
||||
Lingma doesn't expose a `max_tokens` knob, so we can't truly enforce it;
|
||||
we report `max_tokens` only when the generated length meets or exceeds
|
||||
the caller's stated ceiling. Everything else is `end_turn`.
|
||||
"""
|
||||
if max_tokens and completion_tokens >= max_tokens:
|
||||
return "max_tokens"
|
||||
return "end_turn"
|
||||
|
||||
|
||||
@app.post("/v1/messages")
|
||||
async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
"""Anthropic Messages API compatible endpoint.
|
||||
|
||||
Wire contract:
|
||||
* auth: `x-api-key` header (fallback Authorization: Bearer)
|
||||
* body: Anthropic Messages spec (system top-level, content blocks, ...)
|
||||
* stream: named-event SSE (message_start / content_block_delta / ...)
|
||||
|
||||
Internally we:
|
||||
1. Normalise to the gateway's internal message list (`role/content` dicts)
|
||||
2. Reuse the same pool pick + session cache + backpressure guard as
|
||||
`/v1/chat/completions`. Session-cache keys include the API key, so
|
||||
Anthropic and OpenAI callers on the same key share KV-cache warmth.
|
||||
3. Re-wrap outputs in Anthropic's response / SSE format.
|
||||
"""
|
||||
# ------------------------------------------------------------- auth
|
||||
try:
|
||||
require_anthropic_key(request, settings.api_keys)
|
||||
except AnthropicAuthError as exc:
|
||||
return _anthropic_error(exc.status_code, exc.error_type, exc.message)
|
||||
|
||||
# ------------------------------------------------------------- plumbing
|
||||
try:
|
||||
p = _require_pool()
|
||||
except HTTPException as exc:
|
||||
return _anthropic_error(exc.status_code, "overloaded_error", "gateway not ready")
|
||||
|
||||
messages_dump = anthropic_to_internal_messages(req)
|
||||
# Prefer the auth token actually accepted so session-cache bucketing is
|
||||
# consistent regardless of which auth header style the caller used.
|
||||
api_key = (
|
||||
request.headers.get("x-api-key", "").strip()
|
||||
or _extract_api_key(request)
|
||||
or "-"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------- session reuse
|
||||
# Anthropic clients don't expose an ask_mode, so we always run in "chat".
|
||||
ask_mode = "chat"
|
||||
|
||||
reuse_eligible = (
|
||||
session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2
|
||||
)
|
||||
lookup_key: str | None = None
|
||||
write_key: str | None = None
|
||||
cached_session_id: str | None = None
|
||||
cached_instance_name: str | None = None
|
||||
if reuse_eligible:
|
||||
lookup_key = session_cache.build_key(api_key, messages_dump[:-1])
|
||||
write_key = session_cache.build_key(api_key, messages_dump)
|
||||
entry = await session_cache.get(lookup_key)
|
||||
if entry is not None:
|
||||
cached_session_id = entry.session_id
|
||||
cached_instance_name = entry.instance_name or None
|
||||
|
||||
affinity = cached_instance_name or affinity_key_for_anthropic(req)
|
||||
inst = p.pick(affinity_key=affinity)
|
||||
|
||||
if cached_instance_name and inst.name != cached_instance_name:
|
||||
logger.info(
|
||||
"anthropic session cache instance %s unhealthy, falling back to %s",
|
||||
cached_instance_name,
|
||||
inst.name,
|
||||
)
|
||||
cached_session_id = None
|
||||
if lookup_key:
|
||||
await session_cache.invalidate(lookup_key)
|
||||
|
||||
try:
|
||||
await _ensure_instance_logged_in(inst)
|
||||
except HTTPException as exc:
|
||||
# 503/401/502 from login: map to closest Anthropic kind.
|
||||
err_type = "authentication_error" if exc.status_code == 401 else "overloaded_error"
|
||||
detail = exc.detail if isinstance(exc.detail, dict) else {}
|
||||
msg = (detail.get("error") or {}).get("message") or str(detail) or "upstream error"
|
||||
return _anthropic_error(exc.status_code, err_type, msg)
|
||||
|
||||
# ------------------------------------------------------------- prompt & model
|
||||
models = await inst.client.query_models()
|
||||
available = flatten_model_keys(models)
|
||||
name_map = build_model_name_map(models)
|
||||
# Anthropic callers send `claude-*` model names. resolve_model's
|
||||
# final fallback (default_model / first available) handles that cleanly
|
||||
# without us having to hard-code a mapping table.
|
||||
model = resolve_model(req.model, available, settings.default_model, name_map)
|
||||
|
||||
if cached_session_id:
|
||||
prompt = _last_user_text(messages_dump)
|
||||
is_reply = True
|
||||
else:
|
||||
prompt = _messages_to_prompt(messages_dump)
|
||||
is_reply = False
|
||||
|
||||
if not prompt:
|
||||
return _anthropic_error(400, "invalid_request_error", "messages is empty")
|
||||
|
||||
prompt_tokens = estimate_tokens(prompt)
|
||||
|
||||
# ------------------------------------------------------------- backpressure
|
||||
try:
|
||||
ticket = await chat_guard.try_acquire()
|
||||
except BackpressureRejected as exc:
|
||||
retry_after = max(1, int(exc.retry_after))
|
||||
logger.warning("anthropic rejected by backpressure, retry_after=%ds", retry_after)
|
||||
resp = _anthropic_error(
|
||||
429,
|
||||
"overloaded_error",
|
||||
"too many in-flight requests, please retry later",
|
||||
)
|
||||
resp.headers["Retry-After"] = str(retry_after)
|
||||
return resp
|
||||
|
||||
inst.in_flight += 1
|
||||
message_id = f"msg_{uuid.uuid4().hex}"
|
||||
logger.info(
|
||||
"anthropic.start inst=%s model=%s stream=%s prompt_tokens~%d reuse=%s",
|
||||
inst.name,
|
||||
model,
|
||||
req.stream,
|
||||
prompt_tokens,
|
||||
bool(cached_session_id),
|
||||
extra={
|
||||
"ctx_instance": inst.name,
|
||||
"ctx_model": model,
|
||||
"ctx_ask_mode": ask_mode,
|
||||
"ctx_stream": req.stream,
|
||||
"ctx_prompt_tokens": prompt_tokens,
|
||||
"ctx_in_flight": chat_guard.in_flight,
|
||||
"ctx_affinity": affinity,
|
||||
"ctx_session_reuse": bool(cached_session_id),
|
||||
"ctx_api": "anthropic",
|
||||
},
|
||||
)
|
||||
|
||||
ticket_transferred = False
|
||||
|
||||
def _sse(event: str, data: dict) -> str:
|
||||
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
|
||||
|
||||
try:
|
||||
if req.stream:
|
||||
completion_tokens_holder = {"n": 0}
|
||||
stream_meta: dict = {}
|
||||
max_tokens = req.max_tokens
|
||||
|
||||
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
|
||||
success = False
|
||||
try:
|
||||
# 1) message_start — Anthropic SDKs read this first to get
|
||||
# the message envelope (id/model/initial usage).
|
||||
start_payload = {
|
||||
"type": "message_start",
|
||||
"message": {
|
||||
"id": message_id,
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"model": model,
|
||||
"content": [],
|
||||
"stop_reason": None,
|
||||
"stop_sequence": None,
|
||||
# input_tokens is authoritative here; output_tokens
|
||||
# is seeded to 0 and updated in message_delta.
|
||||
"usage": {
|
||||
"input_tokens": prompt_tokens,
|
||||
"output_tokens": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
yield _sse("message_start", start_payload)
|
||||
|
||||
# 2) content_block_start for a single text block (index 0).
|
||||
yield _sse(
|
||||
"content_block_start",
|
||||
{
|
||||
"type": "content_block_start",
|
||||
"index": 0,
|
||||
"content_block": {"type": "text", "text": ""},
|
||||
},
|
||||
)
|
||||
|
||||
# 3) content_block_delta stream of text tokens.
|
||||
async for chunk in _inst.client.chat_stream(
|
||||
prompt,
|
||||
model,
|
||||
ask_mode,
|
||||
session_id=cached_session_id,
|
||||
is_reply=is_reply,
|
||||
out_meta=_meta,
|
||||
):
|
||||
if not chunk:
|
||||
continue
|
||||
completion_tokens_holder["n"] += estimate_tokens(chunk)
|
||||
yield _sse(
|
||||
"content_block_delta",
|
||||
{
|
||||
"type": "content_block_delta",
|
||||
"index": 0,
|
||||
"delta": {"type": "text_delta", "text": chunk},
|
||||
},
|
||||
)
|
||||
|
||||
# 4) content_block_stop closes the single text block.
|
||||
yield _sse(
|
||||
"content_block_stop",
|
||||
{"type": "content_block_stop", "index": 0},
|
||||
)
|
||||
|
||||
# 5) message_delta carries the terminal stop_reason and
|
||||
# the final cumulative output_tokens count.
|
||||
stop_reason = _anthropic_stop_reason(
|
||||
completion_tokens_holder["n"], max_tokens
|
||||
)
|
||||
yield _sse(
|
||||
"message_delta",
|
||||
{
|
||||
"type": "message_delta",
|
||||
"delta": {
|
||||
"stop_reason": stop_reason,
|
||||
"stop_sequence": None,
|
||||
},
|
||||
"usage": {"output_tokens": completion_tokens_holder["n"]},
|
||||
},
|
||||
)
|
||||
|
||||
# 6) message_stop — terminal event, no [DONE] sentinel.
|
||||
yield _sse("message_stop", {"type": "message_stop"})
|
||||
success = True
|
||||
except asyncio.CancelledError:
|
||||
logger.info("anthropic.stream cancelled (inst=%s)", _inst.name)
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.warning("anthropic.stream error (inst=%s): %s", _inst.name, exc)
|
||||
# Best-effort error frame. Anthropic clients treat any
|
||||
# unexpected event gracefully; we prefer visibility over
|
||||
# silent truncation.
|
||||
try:
|
||||
yield _sse(
|
||||
"error",
|
||||
{
|
||||
"type": "error",
|
||||
"error": {
|
||||
"type": "api_error",
|
||||
"message": str(exc) or "upstream error",
|
||||
},
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# Session write-back only on clean finish — partial streams
|
||||
# leave Lingma's session in an indeterminate state.
|
||||
if success and write_key:
|
||||
sid = _meta.get("session_id")
|
||||
if sid:
|
||||
await session_cache.put(write_key, sid, _inst.name)
|
||||
await stats_collector.record_chat(
|
||||
stream=True,
|
||||
success=success,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens_holder["n"],
|
||||
)
|
||||
_inst.in_flight = max(0, _inst.in_flight - 1)
|
||||
_ticket.release()
|
||||
|
||||
ticket_transferred = True
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------- non-stream
|
||||
try:
|
||||
result = await inst.client.chat_complete(
|
||||
prompt,
|
||||
model,
|
||||
ask_mode,
|
||||
session_id=cached_session_id,
|
||||
is_reply=is_reply,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("anthropic.complete error (inst=%s): %s", inst.name, exc)
|
||||
await stats_collector.record_chat(
|
||||
stream=False,
|
||||
success=False,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=0,
|
||||
)
|
||||
if cached_session_id and lookup_key:
|
||||
await session_cache.invalidate(lookup_key)
|
||||
return _anthropic_error(502, "api_error", "upstream lingma error")
|
||||
|
||||
text = result.get("text") or ""
|
||||
completion_tokens = estimate_tokens(text)
|
||||
await stats_collector.record_chat(
|
||||
stream=False,
|
||||
success=True,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
)
|
||||
if write_key:
|
||||
sid = result.get("sessionId")
|
||||
if sid:
|
||||
await session_cache.put(write_key, sid, inst.name)
|
||||
|
||||
response_body: dict = {
|
||||
"id": message_id,
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"model": model,
|
||||
"content": [{"type": "text", "text": text}],
|
||||
"stop_reason": _anthropic_stop_reason(completion_tokens, req.max_tokens),
|
||||
"stop_sequence": None,
|
||||
"usage": {
|
||||
"input_tokens": prompt_tokens,
|
||||
"output_tokens": completion_tokens,
|
||||
},
|
||||
}
|
||||
return JSONResponse(content=response_body)
|
||||
finally:
|
||||
if not ticket_transferred:
|
||||
inst.in_flight = max(0, inst.in_flight - 1)
|
||||
ticket.release()
|
||||
|
||||
|
||||
@app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)])
|
||||
async def internal_auto_login_start(instance: str | None = None):
|
||||
p = _require_pool()
|
||||
|
||||
Reference in New Issue
Block a user