Files
GitHub Actions 0b08dc6573 feat: Anthropic Messages API compat (/v1/messages)
Add a wire-compatible Anthropic endpoint alongside the existing OpenAI one
so Claude Code / anthropic-sdk / Cursor Agent can hit Lingma directly.

- app/anthropic_schema.py (new): request model + content-block flattener
  + internal-messages adapter + affinity key helper. Handles text / image /
  tool_use / tool_result blocks; unknown types degrade gracefully.
- app/auth.py: add require_anthropic_key (x-api-key, Bearer fallback)
  and AnthropicAuthError so auth failures render in Anthropic's error
  envelope instead of FastAPI's {detail:...} wrapper.
- app/main.py: POST /v1/messages. Shares LingmaPool / SessionCache /
  InFlightGuard / StatsCollector with the OpenAI path — same api_key +
  same conversation prefix hits the same upstream sessionId across both
  protocols (KV cache carries over). Streaming emits the named Anthropic
  event sequence (message_start / content_block_start / content_block_delta
  / content_block_stop / message_delta / message_stop). No claude-*
  model mapping table: resolve_model's default fallback handles it.
- README.md / DESIGN.md: document the new endpoint, add decision 5.12,
  iteration history M5, and a 4.3b streaming flow diagram.
- Bump FastAPI app version to 0.4.0.

Made-with: Cursor
2026-04-18 15:40:43 +08:00

200 lines
6.5 KiB
Python

from __future__ import annotations
import hmac
from fastapi import HTTPException, Request, status
def _extract_bearer(request: Request) -> str:
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": {
"message": "Missing or invalid Authorization header",
"type": "invalid_request_error",
"code": "invalid_api_key",
}
},
)
return auth[len("Bearer ") :].strip()
def _match_any(token: str, candidates: list[str]) -> bool:
for c in candidates:
if c and hmac.compare_digest(token, c):
return True
return False
def require_bearer(request: Request, api_keys: list[str]) -> None:
# Empty api_keys means auth is disabled (kept for local dev). The startup
# logger warns loudly in that case so it can't go unnoticed in prod.
if not api_keys:
return
token = _extract_bearer(request)
if not _match_any(token, api_keys):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": {
"message": "Invalid API key",
"type": "invalid_request_error",
"code": "invalid_api_key",
}
},
)
def require_metrics_access(
request: Request,
api_keys: list[str],
metrics_token: str,
*,
public: bool = False,
) -> None:
"""Gate /metrics.
Resolution order:
1. `public=True` (METRICS_PUBLIC) — wide open, explicit opt-in for
sidecar scrapers on a private network.
2. `METRICS_TOKEN` configured — must match.
3. `API_KEYS` configured — any configured API key works.
4. Nothing configured at all — 503 (scraping disabled) so we don't
silently leak the pool topology on an un-hardened deployment.
"""
if public:
return
accepted: list[str] = []
if metrics_token:
accepted.append(metrics_token)
accepted.extend(api_keys)
if not accepted:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": {
"message": (
"metrics scraping is disabled: set METRICS_TOKEN, "
"API_KEYS, or METRICS_PUBLIC=true"
),
"type": "service_unavailable",
"code": "metrics_disabled",
}
},
)
token = _extract_bearer(request)
if not _match_any(token, accepted):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": {
"message": "Invalid metrics token",
"type": "invalid_request_error",
"code": "invalid_api_key",
}
},
)
class AnthropicAuthError(Exception):
"""Raised when an Anthropic Messages request fails authentication.
Carries enough context for the endpoint to render the Anthropic-shaped
error body (`{"type":"error","error":{"type":..., "message":...}}`) — we
don't use `HTTPException` here because FastAPI would wrap the detail in
`{"detail": ...}`, which is not the Anthropic wire format.
"""
def __init__(self, status_code: int, error_type: str, message: str) -> None:
super().__init__(message)
self.status_code = status_code
self.error_type = error_type
self.message = message
def require_anthropic_key(request: Request, api_keys: list[str]) -> None:
"""Authenticate a `POST /v1/messages` request the Anthropic way.
Accept order:
1. `x-api-key` header (official Anthropic SDK / CLI / Claude Code)
2. `Authorization: Bearer <token>` (OpenAI-shaped clients / curl)
Empty `api_keys` means auth is disabled — the startup auth-posture warning
already covers that case loudly, same as `require_bearer`.
Note: we keep `anthropic-version` header permissive (don't parse/validate)
so clients on any official version work without gateway churn.
"""
if not api_keys:
return
token = request.headers.get("x-api-key", "").strip()
if not token:
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
token = auth[len("Bearer ") :].strip()
if not token:
raise AnthropicAuthError(
status.HTTP_401_UNAUTHORIZED,
"authentication_error",
"missing x-api-key header (or Authorization: Bearer ...)",
)
if not _match_any(token, api_keys):
raise AnthropicAuthError(
status.HTTP_401_UNAUTHORIZED,
"authentication_error",
"invalid x-api-key",
)
def require_admin_access(
request: Request,
api_keys: list[str],
admin_token: str,
) -> None:
"""Gate /internal/* admin endpoints.
Resolution order:
1. `ADMIN_TOKEN` configured — must match exactly.
2. Otherwise fall back to the regular API_KEYS (single-tenant deploys).
3. If nothing is configured — 503 so we never expose auto-login /
session-export on an unauthenticated gateway.
Backwards compat: existing deployments that only set `API_KEYS` keep
working; add ADMIN_TOKEN in .env when you want a dedicated split.
"""
accepted: list[str] = []
if admin_token:
accepted.append(admin_token)
else:
accepted.extend(api_keys)
if not accepted:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": {
"message": (
"admin endpoints disabled: configure ADMIN_TOKEN "
"(recommended) or API_KEYS"
),
"type": "service_unavailable",
"code": "admin_disabled",
}
},
)
token = _extract_bearer(request)
if not _match_any(token, accepted):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": {
"message": "Invalid admin token",
"type": "invalid_request_error",
"code": "invalid_api_key",
}
},
)