feat: harden cache reuse semantics and expand protocol regressions
Stabilize cross-protocol ask-mode/streaming behavior and reduce session-reuse branch collisions, then add focused docs/tests for multimodal normalization and pool/stats/config paths. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -119,10 +119,8 @@ def anthropic_to_internal_messages(req: AnthropicMessagesRequest) -> list[dict]:
|
||||
"""Project an Anthropic request into the gateway's internal message list.
|
||||
|
||||
Internal shape matches what `_messages_to_prompt` already expects:
|
||||
`[{"role": "system"|"user"|"assistant", "content": "..."}]`. This means
|
||||
session-cache hashing is identical across OpenAI and Anthropic callers —
|
||||
a user who migrates between the two endpoints keeps their session affinity
|
||||
as long as they send the same conversation prefix.
|
||||
`[{"role": "system"|"user"|"assistant", "content": "..."}]`. This keeps
|
||||
user-input cache hashing aligned across OpenAI and Anthropic callers.
|
||||
"""
|
||||
out: list[dict] = []
|
||||
if req.system:
|
||||
|
||||
136
app/main.py
136
app/main.py
@@ -38,7 +38,7 @@ from .openai_schema import (
|
||||
flatten_content,
|
||||
)
|
||||
from .session_bundle import encode_bundle, pack_workdir
|
||||
from .session_cache import SessionCache
|
||||
from .session_cache import SessionCache, hash_branch_context
|
||||
from .stats import StatsCollector, estimate_tokens
|
||||
|
||||
|
||||
@@ -57,6 +57,12 @@ session_cache = SessionCache(
|
||||
ttl_sec=settings.session_cache_ttl_sec,
|
||||
)
|
||||
|
||||
STREAMING_RESPONSE_HEADERS = {
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"Connection": "keep-alive",
|
||||
}
|
||||
|
||||
|
||||
def _require_pool() -> LingmaPool:
|
||||
if pool is None:
|
||||
@@ -416,6 +422,43 @@ def _anthropic_has_tooling_context(req: AnthropicMessagesRequest) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _resolve_ask_mode(model: str, has_tooling_context: bool) -> str:
|
||||
model_name = (model or "").lower()
|
||||
if model_name in {"lingma-agent", "agent"} or has_tooling_context:
|
||||
return "agent"
|
||||
return settings.default_ask_mode
|
||||
|
||||
|
||||
async def _apply_cached_instance_or_invalidate(
|
||||
*,
|
||||
protocol: str,
|
||||
inst: PoolInstance,
|
||||
cached_instance_name: str | None,
|
||||
cached_session_id: str | None,
|
||||
lookup_key: str | None,
|
||||
) -> str | None:
|
||||
if cached_instance_name and inst.name != cached_instance_name:
|
||||
logger.info(
|
||||
"%s session cache instance %s unhealthy, falling back to %s",
|
||||
protocol,
|
||||
cached_instance_name,
|
||||
inst.name,
|
||||
)
|
||||
if lookup_key:
|
||||
await session_cache.invalidate(lookup_key)
|
||||
return None
|
||||
return cached_session_id
|
||||
|
||||
|
||||
|
||||
def _streaming_response(event_stream) -> StreamingResponse:
|
||||
return StreamingResponse(
|
||||
event_stream,
|
||||
media_type="text/event-stream",
|
||||
headers=STREAMING_RESPONSE_HEADERS,
|
||||
)
|
||||
|
||||
|
||||
def _stream_event_type(event: Any) -> str:
|
||||
if isinstance(event, dict):
|
||||
t = event.get("type")
|
||||
@@ -595,9 +638,7 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
tool_config = _openai_tool_config(req)
|
||||
has_tooling_context = _openai_has_tooling_context(req, messages_dump)
|
||||
|
||||
ask_mode = settings.default_ask_mode
|
||||
if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
|
||||
ask_mode = "agent"
|
||||
ask_mode = _resolve_ask_mode(req.model, has_tooling_context)
|
||||
|
||||
reuse_eligible = (
|
||||
session_cache.enabled
|
||||
@@ -610,29 +651,38 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
cached_session_id: str | None = None
|
||||
cached_instance_name: str | None = None
|
||||
if reuse_eligible:
|
||||
lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
|
||||
write_key = session_cache.build_key(api_key, messages_dump, tool_config=tool_config)
|
||||
prefix_branch_context = hash_branch_context(messages_dump[:-1])
|
||||
lookup_key = session_cache.build_key(
|
||||
api_key,
|
||||
messages_dump[:-1],
|
||||
tool_config=tool_config,
|
||||
branch_context=prefix_branch_context,
|
||||
)
|
||||
write_key = session_cache.build_key(
|
||||
api_key,
|
||||
messages_dump,
|
||||
tool_config=tool_config,
|
||||
branch_context=hash_branch_context(messages_dump),
|
||||
)
|
||||
entry = await session_cache.get(lookup_key)
|
||||
if entry is None:
|
||||
legacy_lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
|
||||
entry = await session_cache.get(legacy_lookup_key)
|
||||
if entry is not None:
|
||||
lookup_key = legacy_lookup_key
|
||||
if entry is not None:
|
||||
cached_session_id = entry.session_id
|
||||
cached_instance_name = entry.instance_name or None
|
||||
|
||||
# Instance selection: prefer cached instance for continuity, else normal affinity.
|
||||
affinity = cached_instance_name or _affinity_key_for(req)
|
||||
inst = p.pick(affinity_key=affinity)
|
||||
|
||||
# If cache pointed at a specific instance that's no longer healthy, we already
|
||||
# fell back via pool.pick -> drop the cached session since Lingma on a
|
||||
# different process won't know about it.
|
||||
if cached_instance_name and inst.name != cached_instance_name:
|
||||
logger.info(
|
||||
"session cache instance %s unhealthy, falling back to %s (dropping cached session)",
|
||||
cached_instance_name,
|
||||
inst.name,
|
||||
)
|
||||
cached_session_id = None
|
||||
if lookup_key:
|
||||
await session_cache.invalidate(lookup_key)
|
||||
cached_session_id = await _apply_cached_instance_or_invalidate(
|
||||
protocol="chat",
|
||||
inst=inst,
|
||||
cached_instance_name=cached_instance_name,
|
||||
cached_session_id=cached_session_id,
|
||||
lookup_key=lookup_key,
|
||||
)
|
||||
|
||||
await _ensure_instance_logged_in(inst)
|
||||
|
||||
@@ -831,15 +881,8 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
_ticket.release()
|
||||
|
||||
ticket_transferred = True
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
)
|
||||
return _streaming_response(event_stream())
|
||||
|
||||
|
||||
try:
|
||||
result = await inst.client.chat_complete(
|
||||
@@ -1329,9 +1372,7 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
tool_config = _anthropic_tool_config(req)
|
||||
has_tooling_context = _anthropic_has_tooling_context(req)
|
||||
|
||||
ask_mode = settings.default_ask_mode
|
||||
if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
|
||||
ask_mode = "agent"
|
||||
ask_mode = _resolve_ask_mode(req.model, has_tooling_context)
|
||||
|
||||
reuse_eligible = (
|
||||
session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context
|
||||
@@ -1341,9 +1382,25 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
cached_session_id: str | None = None
|
||||
cached_instance_name: str | None = None
|
||||
if reuse_eligible:
|
||||
lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
|
||||
write_key = session_cache.build_key(api_key, messages_dump, tool_config=tool_config)
|
||||
prefix_branch_context = hash_branch_context(messages_dump[:-1])
|
||||
lookup_key = session_cache.build_key(
|
||||
api_key,
|
||||
messages_dump[:-1],
|
||||
tool_config=tool_config,
|
||||
branch_context=prefix_branch_context,
|
||||
)
|
||||
write_key = session_cache.build_key(
|
||||
api_key,
|
||||
messages_dump,
|
||||
tool_config=tool_config,
|
||||
branch_context=hash_branch_context(messages_dump),
|
||||
)
|
||||
entry = await session_cache.get(lookup_key)
|
||||
if entry is None:
|
||||
legacy_lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
|
||||
entry = await session_cache.get(legacy_lookup_key)
|
||||
if entry is not None:
|
||||
lookup_key = legacy_lookup_key
|
||||
if entry is not None:
|
||||
cached_session_id = entry.session_id
|
||||
cached_instance_name = entry.instance_name or None
|
||||
@@ -1613,15 +1670,8 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
_ticket.release()
|
||||
|
||||
ticket_transferred = True
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
)
|
||||
return _streaming_response(event_stream())
|
||||
|
||||
|
||||
# ------------------------------------------------------------- non-stream
|
||||
try:
|
||||
|
||||
@@ -26,7 +26,7 @@ class SessionEntry:
|
||||
def hash_user_context(messages: list[dict]) -> str:
|
||||
"""Hash the user/system/developer turns of a message list.
|
||||
|
||||
We deliberately skip `assistant`/`tool` messages because:
|
||||
We deliberately skip `assistant`/`tool` messages here because:
|
||||
- Clients may subtly reformat or trim assistant replies between turns,
|
||||
breaking exact-match keying.
|
||||
- Only the *inputs* are stable, and they're sufficient to identify a
|
||||
@@ -43,6 +43,28 @@ def hash_user_context(messages: list[dict]) -> str:
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def hash_branch_context(messages: list[dict]) -> str:
|
||||
"""Hash assistant/tool turns to reduce branch collisions."""
|
||||
h = hashlib.sha1()
|
||||
for m in messages:
|
||||
role = m.get("role", "")
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
content = m.get("content")
|
||||
text = content if isinstance(content, str) else flatten_content(content)
|
||||
tool_calls = m.get("tool_calls")
|
||||
if tool_calls is not None:
|
||||
try:
|
||||
tool_calls_text = json.dumps(tool_calls, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
||||
except Exception:
|
||||
tool_calls_text = str(tool_calls)
|
||||
else:
|
||||
tool_calls_text = ""
|
||||
tool_call_id = m.get("tool_call_id") or ""
|
||||
h.update(f"{role}\x1f{text or ''}\x1f{tool_calls_text}\x1f{tool_call_id}\x1e".encode("utf-8"))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _tool_fingerprint(tool_config: dict | None) -> str:
|
||||
if not isinstance(tool_config, dict):
|
||||
return "-"
|
||||
@@ -90,11 +112,21 @@ class SessionCache:
|
||||
def enabled(self) -> bool:
|
||||
return self.max > 0
|
||||
|
||||
def build_key(self, api_key: str, messages: list[dict], *, tool_config: dict | None = None) -> str:
|
||||
def build_key(
|
||||
self,
|
||||
api_key: str,
|
||||
messages: list[dict],
|
||||
*,
|
||||
tool_config: dict | None = None,
|
||||
branch_context: str | None = None,
|
||||
) -> str:
|
||||
# API key scoping prevents cross-tenant session leakage even when
|
||||
# different clients happen to produce identical histories.
|
||||
key_scope = hashlib.sha1((api_key or "-").encode("utf-8")).hexdigest()[:12]
|
||||
return f"{key_scope}:{hash_user_context(messages)}:{_tool_fingerprint(tool_config)}"
|
||||
base = f"{key_scope}:{hash_user_context(messages)}:{_tool_fingerprint(tool_config)}"
|
||||
if not branch_context:
|
||||
return base
|
||||
return f"{base}:{branch_context}"
|
||||
|
||||
async def get(self, key: str) -> SessionEntry | None:
|
||||
if not self.enabled:
|
||||
|
||||
Reference in New Issue
Block a user