from __future__ import annotations """Anthropic Messages API schema + content adapters. Why this exists --------------- The Anthropic Messages API (`POST /v1/messages`) is wire-incompatible with OpenAI chat completions even though it covers the same ground: * auth: `x-api-key` header (not `Authorization: Bearer`) * system: separate top-level field, never a message role * content: `str` or array of typed blocks (`text`, `image`, `tool_use`, ...) * streaming: a named-event SSE protocol (`message_start`, `content_block_delta`, `message_delta`, `message_stop`) rather than OpenAI's `delta.content` * errors: `{"type":"error","error":{"type":"...","message":"..."}}` We keep a separate schema module rather than squeezing everything into `openai_schema.py` so both adapters stay small and auditable. Both eventually collapse to the same Lingma prompt shape inside `main.py`. """ import json from typing import Any, Literal from pydantic import BaseModel # Anthropic accepts either a raw string or a list of typed content blocks. # We keep the list loosely typed (plain dicts) so future block kinds # (e.g. `thinking`, `document`) don't break the gateway — they simply fall # into the generic flattener below. AnthropicContent = str | list[dict[str, Any]] | None class AnthropicMessage(BaseModel): # Anthropic: system is a top-level field, messages only carry user/assistant. role: Literal["user", "assistant"] content: AnthropicContent = None class AnthropicMessagesRequest(BaseModel): model: str # max_tokens is REQUIRED by Anthropic. We default to a sane value so callers # that forget it don't 422 — easier migration from OpenAI clients. max_tokens: int = 1024 messages: list[AnthropicMessage] system: AnthropicContent = None stream: bool = False temperature: float | None = None top_p: float | None = None top_k: int | None = None stop_sequences: list[str] | None = None # metadata.user_id is the official hint for per-user routing / abuse tracking. metadata: dict[str, Any] | None = None # Tools / tool_choice are accepted for compatibility and, when forwarding is # enabled, are passed upstream as tool_config; tool_use / tool_result blocks # are still flattened into text so the assistant can see prior tool context. tools: list[dict[str, Any]] | None = None tool_choice: dict[str, Any] | None = None def flatten_anthropic_content(content: AnthropicContent) -> str: """Reduce Anthropic block arrays to a plain-string prompt for Lingma. Handled block types: * text -> verbatim text * image -> `[image]` placeholder (Lingma has no vision) * tool_use -> `[tool_use] {json}` so the assistant can reference it * tool_result -> `[tool_result] ...` (string or nested blocks) * unknown -> fall back to `.text` / `.content` if present, else drop Returning an empty string here means the caller (prompt builder) will skip the whole message rather than emit a bare `[role] ` line. """ if content is None: return "" if isinstance(content, str): return content if not isinstance(content, list): return str(content) parts: list[str] = [] for item in content: if not isinstance(item, dict): parts.append(str(item)) continue t = item.get("type") if t == "text": text = item.get("text") or "" if text: parts.append(text) elif t == "image": parts.append("[image]") elif t == "tool_use": # Compact one-line JSON keeps prompt_tokens estimate stable. try: payload = json.dumps( {"name": item.get("name"), "input": item.get("input")}, ensure_ascii=False, ) except Exception: payload = str(item) parts.append(f"[tool_use] {payload}") elif t == "tool_result": inner = item.get("content") if isinstance(inner, str): parts.append(f"[tool_result] {inner}") elif isinstance(inner, list): parts.append(f"[tool_result] {flatten_anthropic_content(inner)}") else: fallback = item.get("text") or item.get("content") if isinstance(fallback, str) and fallback: parts.append(fallback) return "\n".join(p for p in parts if p) def anthropic_to_internal_messages(req: AnthropicMessagesRequest) -> list[dict]: """Project an Anthropic request into the gateway's internal message list. Internal shape matches what `_messages_to_prompt` already expects: `[{"role": "system"|"user"|"assistant", "content": "..."}]`. This keeps user-input cache hashing aligned across OpenAI and Anthropic callers. """ out: list[dict] = [] if req.system: sys_text = flatten_anthropic_content(req.system) if sys_text: out.append({"role": "system", "content": sys_text}) for m in req.messages: text = flatten_anthropic_content(m.content) out.append({"role": m.role, "content": text}) return out def affinity_key_for_anthropic(req: AnthropicMessagesRequest) -> str | None: """Best-effort stable routing key for an Anthropic request. Priority mirrors the OpenAI side: 1. metadata.user_id (the official per-user hint) 2. hash of the system prompt 3. hash of the first message Kept here rather than in `main.py` because it needs the flatten helper and the request type — `main.py` stays endpoint-shaped, not schema-shaped. """ import hashlib if req.metadata: user_id = req.metadata.get("user_id") if isinstance(user_id, str) and user_id.strip(): return user_id.strip() if req.system: text = flatten_anthropic_content(req.system) if text: return "sys:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] if req.messages: text = flatten_anthropic_content(req.messages[0].content) if text: return "first:" + hashlib.sha1(text.encode("utf-8")).hexdigest()[:16] return None