feat: improve tool-call bridging and env documentation
This commit is contained in:
218
app/main.py
218
app/main.py
@@ -43,12 +43,10 @@ from .http.tool_bridge import (
|
||||
_anthropic_forced_tool_name,
|
||||
_anthropic_tool_result_block,
|
||||
_anthropic_tool_use_block,
|
||||
_forced_tool_event_from_text,
|
||||
_forced_tool_fallback_event,
|
||||
_extract_function_call_event_from_text,
|
||||
_json_string,
|
||||
_openai_forced_tool_name,
|
||||
_openai_tool_call,
|
||||
_tool_code_single_arg_name,
|
||||
)
|
||||
from .http.tooling_policy import (
|
||||
_anthropic_has_tooling_context,
|
||||
@@ -84,7 +82,9 @@ chat_guard = InFlightGuard(
|
||||
queue_timeout_sec=settings.gateway_queue_timeout_sec,
|
||||
)
|
||||
session_cache = SessionCache(
|
||||
max_entries=settings.session_cache_max_entries if settings.session_reuse_enabled else 0,
|
||||
max_entries=settings.session_cache_max_entries
|
||||
if settings.session_reuse_enabled
|
||||
else 0,
|
||||
ttl_sec=settings.session_cache_ttl_sec,
|
||||
)
|
||||
|
||||
@@ -99,7 +99,12 @@ def _require_pool() -> LingmaPool:
|
||||
if pool is None:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail={"error": {"message": "pool not initialized", "type": "service_unavailable"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "pool not initialized",
|
||||
"type": "service_unavailable",
|
||||
}
|
||||
},
|
||||
)
|
||||
return pool
|
||||
|
||||
@@ -254,7 +259,12 @@ async def _ensure_instance_logged_in(inst: PoolInstance) -> dict:
|
||||
logger.warning("[%s] auth_status failed before chat: %s", inst.name, exc)
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail={"error": {"message": "Lingma is not ready", "type": "service_unavailable"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "Lingma is not ready",
|
||||
"type": "service_unavailable",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if status and status.get("id"):
|
||||
@@ -263,13 +273,20 @@ async def _ensure_instance_logged_in(inst: PoolInstance) -> dict:
|
||||
if not settings.auto_login_enabled:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail={"error": {"message": "Lingma not logged in", "type": "invalid_request_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "Lingma not logged in",
|
||||
"type": "invalid_request_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if settings.dedicated_domain_url:
|
||||
try:
|
||||
current = await client.get_endpoint()
|
||||
current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
||||
current_ep = (
|
||||
(current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
||||
)
|
||||
if current_ep != settings.dedicated_domain_url:
|
||||
await client.update_endpoint(settings.dedicated_domain_url)
|
||||
except Exception as exc:
|
||||
@@ -281,13 +298,23 @@ async def _ensure_instance_logged_in(inst: PoolInstance) -> dict:
|
||||
logger.warning("[%s] generate_login_url failed: %s", inst.name, exc)
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail={"error": {"message": "generate login url failed", "type": "upstream_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "generate login url failed",
|
||||
"type": "upstream_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if not login_url:
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail={"error": {"message": "generate login url failed", "type": "upstream_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "generate login url failed",
|
||||
"type": "upstream_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
await auto_login.ensure_started(login_url)
|
||||
@@ -312,7 +339,12 @@ async def _ensure_instance_logged_in(inst: PoolInstance) -> dict:
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail={"error": {"message": "Lingma auto login failed", "type": "invalid_request_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "Lingma auto login failed",
|
||||
"type": "invalid_request_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -416,7 +448,6 @@ async def _apply_cached_instance_or_invalidate(
|
||||
)
|
||||
|
||||
|
||||
|
||||
def _streaming_response(event_stream) -> StreamingResponse:
|
||||
return StreamingResponse(
|
||||
event_stream,
|
||||
@@ -505,7 +536,12 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
except ValueError:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={"error": {"message": "messages is empty", "type": "invalid_request_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "messages is empty",
|
||||
"type": "invalid_request_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
except BackpressureRejected as exc:
|
||||
retry_after = max(1, int(exc.retry_after))
|
||||
@@ -534,7 +570,6 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
completion_tokens_holder = {"n": 0}
|
||||
stream_meta: dict = {}
|
||||
forced_tool_name = _openai_forced_tool_name(req.tool_choice)
|
||||
forced_tool_single_arg_name = _tool_code_single_arg_name(req.tools, forced_tool_name) if forced_tool_name else None
|
||||
|
||||
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
|
||||
success = False
|
||||
@@ -602,7 +637,9 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
"tool_calls": [
|
||||
{
|
||||
"index": idx,
|
||||
**_openai_tool_call(tool, forced_id=tool_id),
|
||||
**_openai_tool_call(
|
||||
tool, forced_id=tool_id
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -622,18 +659,15 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
continue
|
||||
yield _text_payload(text)
|
||||
|
||||
if buffered_text_parts and not saw_tool_call and forced_tool_name:
|
||||
fallback_event = _forced_tool_event_from_text(
|
||||
if buffered_text_parts and forced_tool_name and not saw_tool_call:
|
||||
inferred = _extract_function_call_event_from_text(
|
||||
"".join(buffered_text_parts),
|
||||
forced_tool_name,
|
||||
single_arg_name=forced_tool_single_arg_name,
|
||||
forced_tool_name=forced_tool_name,
|
||||
)
|
||||
if fallback_event is not None:
|
||||
if inferred is not None:
|
||||
tool_id = "call_inferred_0"
|
||||
tool_call_indexes[tool_id] = 0
|
||||
saw_tool_call = True
|
||||
tool_id = "call_fallback_0"
|
||||
idx = 0
|
||||
tool_call_indexes[tool_id] = idx
|
||||
fallback_tool_call = _openai_tool_call(fallback_event, forced_id=tool_id)
|
||||
payload = {
|
||||
"id": completion_id,
|
||||
"object": "chat.completion.chunk",
|
||||
@@ -645,8 +679,10 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
"delta": {
|
||||
"tool_calls": [
|
||||
{
|
||||
"index": idx,
|
||||
**fallback_tool_call,
|
||||
"index": 0,
|
||||
**_openai_tool_call(
|
||||
inferred, forced_id=tool_id
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -669,7 +705,9 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {},
|
||||
"finish_reason": "tool_calls" if saw_tool_call else "stop",
|
||||
"finish_reason": "tool_calls"
|
||||
if saw_tool_call
|
||||
else "stop",
|
||||
}
|
||||
],
|
||||
}
|
||||
@@ -685,7 +723,8 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
"usage": {
|
||||
"prompt_tokens": prompt_tokens,
|
||||
"completion_tokens": completion_tokens_holder["n"],
|
||||
"total_tokens": prompt_tokens + completion_tokens_holder["n"],
|
||||
"total_tokens": prompt_tokens
|
||||
+ completion_tokens_holder["n"],
|
||||
},
|
||||
}
|
||||
yield f"data: {json.dumps(usage_payload, ensure_ascii=False)}\n\n"
|
||||
@@ -738,7 +777,12 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
except UpstreamExecutionError:
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail={"error": {"message": "upstream lingma error", "type": "upstream_error"}},
|
||||
detail={
|
||||
"error": {
|
||||
"message": "upstream lingma error",
|
||||
"type": "upstream_error",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
result = completed.result
|
||||
@@ -757,13 +801,14 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
tool_calls.append(_openai_tool_call(item, forced_id=tool_id))
|
||||
saw_tool_call = True
|
||||
if not saw_tool_call and forced_tool_name:
|
||||
fallback_event = _forced_tool_fallback_event(
|
||||
inferred = _extract_function_call_event_from_text(
|
||||
message_content,
|
||||
forced_tool_name=forced_tool_name,
|
||||
tools=req.tools,
|
||||
)
|
||||
if fallback_event is not None:
|
||||
tool_calls.append(_openai_tool_call(fallback_event, forced_id="call_fallback_0"))
|
||||
if inferred is not None:
|
||||
tool_calls.append(
|
||||
_openai_tool_call(inferred, forced_id="call_inferred_0")
|
||||
)
|
||||
saw_tool_call = True
|
||||
message_content = ""
|
||||
response = ChatCompletionResponse(
|
||||
@@ -783,7 +828,6 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
data = response.model_dump()
|
||||
data["latency"] = {
|
||||
"first_token_ms": result.get("firstTokenLatencyMs"),
|
||||
@@ -801,8 +845,6 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
|
||||
release_execution(ticket=ticket, inst=inst)
|
||||
|
||||
|
||||
|
||||
|
||||
@app.post("/responses", dependencies=[Depends(auth_guard)])
|
||||
@app.post("/v1/responses", dependencies=[Depends(auth_guard)])
|
||||
async def v1_responses(req: ResponsesRequest, request: Request):
|
||||
@@ -814,7 +856,6 @@ async def v1_responses(req: ResponsesRequest, request: Request):
|
||||
)
|
||||
|
||||
|
||||
|
||||
def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse:
|
||||
"""Build an Anthropic-shaped error response (`type:error` envelope)."""
|
||||
return JSONResponse(
|
||||
@@ -879,15 +920,15 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
try:
|
||||
p = _require_pool()
|
||||
except HTTPException as exc:
|
||||
return _anthropic_error(exc.status_code, "overloaded_error", "gateway not ready")
|
||||
return _anthropic_error(
|
||||
exc.status_code, "overloaded_error", "gateway not ready"
|
||||
)
|
||||
|
||||
messages_dump = anthropic_to_internal_messages(req)
|
||||
# Prefer the auth token actually accepted so session-cache bucketing is
|
||||
# consistent regardless of which auth header style the caller used.
|
||||
api_key = (
|
||||
request.headers.get("x-api-key", "").strip()
|
||||
or _extract_api_key(request)
|
||||
or "-"
|
||||
request.headers.get("x-api-key", "").strip() or _extract_api_key(request) or "-"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------- session reuse
|
||||
@@ -924,9 +965,15 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
messages_to_prompt=_messages_to_prompt,
|
||||
)
|
||||
except HTTPException as exc:
|
||||
err_type = "authentication_error" if exc.status_code == 401 else "overloaded_error"
|
||||
err_type = (
|
||||
"authentication_error" if exc.status_code == 401 else "overloaded_error"
|
||||
)
|
||||
detail = exc.detail if isinstance(exc.detail, dict) else {}
|
||||
msg = (detail.get("error") or {}).get("message") or str(detail) or "upstream error"
|
||||
msg = (
|
||||
(detail.get("error") or {}).get("message")
|
||||
or str(detail)
|
||||
or "upstream error"
|
||||
)
|
||||
return _anthropic_error(exc.status_code, err_type, msg)
|
||||
ask_mode = execution.ask_mode
|
||||
write_key = execution.write_key
|
||||
@@ -950,7 +997,9 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
return _anthropic_error(400, "invalid_request_error", "messages is empty")
|
||||
except BackpressureRejected as exc:
|
||||
retry_after = max(1, int(exc.retry_after))
|
||||
logger.warning("anthropic rejected by backpressure, retry_after=%ds", retry_after)
|
||||
logger.warning(
|
||||
"anthropic rejected by backpressure, retry_after=%ds", retry_after
|
||||
)
|
||||
resp = _anthropic_error(
|
||||
429,
|
||||
"overloaded_error",
|
||||
@@ -1016,7 +1065,10 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
if text_block_open:
|
||||
yield _sse(
|
||||
"content_block_stop",
|
||||
{"type": "content_block_stop", "index": block_index},
|
||||
{
|
||||
"type": "content_block_stop",
|
||||
"index": block_index,
|
||||
},
|
||||
)
|
||||
block_index += 1
|
||||
text_block_open = False
|
||||
@@ -1029,9 +1081,13 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
if not tool:
|
||||
continue
|
||||
|
||||
tool_id = str(tool.get("id") or f"toolu_stream_{block_index}")
|
||||
tool_id = str(
|
||||
tool.get("id") or f"toolu_stream_{block_index}"
|
||||
)
|
||||
|
||||
tool_use_block = _anthropic_tool_use_block(tool, forced_id=tool_id)
|
||||
tool_use_block = _anthropic_tool_use_block(
|
||||
tool, forced_id=tool_id
|
||||
)
|
||||
yield _sse(
|
||||
"content_block_start",
|
||||
{
|
||||
@@ -1046,7 +1102,9 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
)
|
||||
block_index += 1
|
||||
|
||||
tool_result_block = _anthropic_tool_result_block(tool, forced_id=tool_id)
|
||||
tool_result_block = _anthropic_tool_result_block(
|
||||
tool, forced_id=tool_id
|
||||
)
|
||||
if tool_result_block is not None:
|
||||
yield _sse(
|
||||
"content_block_start",
|
||||
@@ -1058,7 +1116,10 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
)
|
||||
yield _sse(
|
||||
"content_block_stop",
|
||||
{"type": "content_block_stop", "index": block_index},
|
||||
{
|
||||
"type": "content_block_stop",
|
||||
"index": block_index,
|
||||
},
|
||||
)
|
||||
block_index += 1
|
||||
else:
|
||||
@@ -1114,7 +1175,6 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
# 6) message_stop — terminal event, no [DONE] sentinel.
|
||||
yield _sse("message_stop", {"type": "message_stop"})
|
||||
success = True
|
||||
@@ -1122,7 +1182,9 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
logger.info("anthropic.stream cancelled (inst=%s)", _inst.name)
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.warning("anthropic.stream error (inst=%s): %s", _inst.name, exc)
|
||||
logger.warning(
|
||||
"anthropic.stream error (inst=%s): %s", _inst.name, exc
|
||||
)
|
||||
# Best-effort error frame. Anthropic clients treat any
|
||||
# unexpected event gracefully; we prefer visibility over
|
||||
# silent truncation.
|
||||
@@ -1155,7 +1217,6 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
ticket_transferred = True
|
||||
return _streaming_response(event_stream())
|
||||
|
||||
|
||||
try:
|
||||
completed = await complete_execution(
|
||||
protocol="anthropic",
|
||||
@@ -1196,22 +1257,18 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
|
||||
saw_pending_tool_use = True
|
||||
|
||||
if not saw_tool_event and forced_tool_name:
|
||||
fallback_event = _forced_tool_fallback_event(
|
||||
inferred = _extract_function_call_event_from_text(
|
||||
text,
|
||||
forced_tool_name=forced_tool_name,
|
||||
tools=req.tools,
|
||||
)
|
||||
if fallback_event is not None:
|
||||
content_blocks = []
|
||||
tool_id = "toolu_fallback_0"
|
||||
content_blocks.append(_anthropic_tool_use_block(fallback_event, forced_id=tool_id))
|
||||
tool_result = _anthropic_tool_result_block(fallback_event, forced_id=tool_id)
|
||||
saw_pending_tool_use = tool_result is None
|
||||
if tool_result is not None:
|
||||
content_blocks.append(tool_result)
|
||||
if inferred is not None:
|
||||
content_blocks = [
|
||||
_anthropic_tool_use_block(inferred, forced_id="toolu_inferred_0")
|
||||
]
|
||||
saw_tool_event = True
|
||||
saw_pending_tool_use = True
|
||||
|
||||
response_body: dict = {
|
||||
|
||||
"id": message_id,
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
@@ -1256,25 +1313,38 @@ async def internal_auto_login_start(instance: str | None = None):
|
||||
|
||||
status = await client.auth_status()
|
||||
if status and status.get("id"):
|
||||
return {"ok": True, "state": "already_logged_in", "instance": target.name, "auth": status}
|
||||
return {
|
||||
"ok": True,
|
||||
"state": "already_logged_in",
|
||||
"instance": target.name,
|
||||
"auth": status,
|
||||
}
|
||||
|
||||
if settings.dedicated_domain_url:
|
||||
try:
|
||||
current = await client.get_endpoint()
|
||||
current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
||||
current_ep = (
|
||||
(current or {}).get("endpoint", "") if isinstance(current, dict) else ""
|
||||
)
|
||||
if current_ep != settings.dedicated_domain_url:
|
||||
await client.update_endpoint(settings.dedicated_domain_url)
|
||||
except Exception as exc:
|
||||
logger.warning("[%s] switch dedicated endpoint failed: %s", target.name, exc)
|
||||
logger.warning(
|
||||
"[%s] switch dedicated endpoint failed: %s", target.name, exc
|
||||
)
|
||||
|
||||
try:
|
||||
login_url, _login_raw = await client.generate_login_url()
|
||||
except Exception as exc:
|
||||
logger.warning("[%s] generate_login_url failed: %s", target.name, exc)
|
||||
raise HTTPException(status_code=502, detail={"error": {"message": "generate login url failed"}})
|
||||
raise HTTPException(
|
||||
status_code=502, detail={"error": {"message": "generate login url failed"}}
|
||||
)
|
||||
|
||||
if not login_url:
|
||||
raise HTTPException(status_code=502, detail={"error": {"message": "generate login url failed"}})
|
||||
raise HTTPException(
|
||||
status_code=502, detail={"error": {"message": "generate login url failed"}}
|
||||
)
|
||||
|
||||
started = await auto_login.ensure_started(login_url)
|
||||
return {
|
||||
@@ -1327,7 +1397,9 @@ async def internal_session_export(instance: str | None = None):
|
||||
target = inst
|
||||
break
|
||||
if target is None:
|
||||
raise HTTPException(status_code=404, detail={"error": f"instance {instance} not found"})
|
||||
raise HTTPException(
|
||||
status_code=404, detail={"error": f"instance {instance} not found"}
|
||||
)
|
||||
else:
|
||||
target = p.pick()
|
||||
|
||||
@@ -1380,7 +1452,9 @@ async def internal_models_raw(instance: str | None = None):
|
||||
target = inst
|
||||
break
|
||||
if target is None:
|
||||
raise HTTPException(status_code=404, detail={"error": f"instance {instance} not found"})
|
||||
raise HTTPException(
|
||||
status_code=404, detail={"error": f"instance {instance} not found"}
|
||||
)
|
||||
else:
|
||||
target = p.pick()
|
||||
await _ensure_instance_logged_in(target)
|
||||
@@ -1414,4 +1488,6 @@ async def metrics():
|
||||
lines.extend(pool.prometheus_lines())
|
||||
lines.extend(session_cache.prometheus_lines())
|
||||
extra = "\n".join(lines) + "\n"
|
||||
return StreamingResponse(iter([base + extra]), media_type="text/plain; version=0.0.4")
|
||||
return StreamingResponse(
|
||||
iter([base + extra]), media_type="text/plain; version=0.0.4"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user