feat: add OpenAI /v1/responses adapter via chat flow

Implement a thin responses layer that reuses existing chat/completions execution so auth, pooling, streaming, tool passthrough, and error semantics stay aligned across APIs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
GitHub Actions
2026-04-20 13:11:00 +08:00
parent 56c57a4901
commit c9bd71f727
3 changed files with 432 additions and 1 deletions

View File

@@ -34,6 +34,7 @@ from .openai_schema import (
ChatCompletionsRequest,
ModelData,
ModelsResponse,
ResponsesRequest,
flatten_content,
)
from .session_bundle import encode_bundle, pack_workdir
@@ -931,6 +932,309 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
ticket.release()
def _responses_input_to_messages(req: ResponsesRequest) -> list[dict[str, Any]]:
messages: list[dict[str, Any]] = []
if req.instructions:
messages.append({"role": "system", "content": req.instructions})
raw_input = req.input
if raw_input is None:
return messages
valid_roles = {"system", "user", "assistant", "tool", "developer", "function"}
def _append(role: str, content: Any, *, tool_call_id: str | None = None) -> None:
msg: dict[str, Any] = {"role": role, "content": flatten_content(content)}
if role == "tool" and tool_call_id:
msg["tool_call_id"] = tool_call_id
messages.append(msg)
if isinstance(raw_input, str):
_append("user", raw_input)
return messages
raw_items: list[Any]
if isinstance(raw_input, dict):
raw_items = [raw_input]
elif isinstance(raw_input, list):
raw_items = list(raw_input)
else:
_append("user", str(raw_input))
return messages
for item in raw_items:
if isinstance(item, str):
_append("user", item)
continue
if not isinstance(item, dict):
_append("user", str(item))
continue
role = item.get("role")
if isinstance(role, str) and role in valid_roles:
tool_call_id = item.get("tool_call_id") or item.get("call_id")
_append(role, item.get("content"), tool_call_id=str(tool_call_id) if tool_call_id else None)
continue
if item.get("type") == "function_call_output":
output = item.get("output")
if isinstance(output, (dict, list)):
output = json.dumps(output, ensure_ascii=False)
tool_call_id = item.get("call_id")
_append("tool", output, tool_call_id=str(tool_call_id) if tool_call_id else None)
continue
if "content" in item:
text = flatten_content(item.get("content"))
else:
text = flatten_content([item])
if text:
_append("user", text)
return messages
def _responses_to_chat_request(req: ResponsesRequest) -> ChatCompletionsRequest:
return ChatCompletionsRequest(
model=req.model,
messages=_responses_input_to_messages(req),
stream=req.stream,
temperature=req.temperature,
top_p=req.top_p,
max_tokens=req.max_output_tokens,
user=req.user,
tools=req.tools,
tool_choice=req.tool_choice,
)
def _responses_id_from_chat_id(chat_id: Any) -> str:
if isinstance(chat_id, str) and chat_id:
suffix = chat_id.removeprefix("chatcmpl-")
return f"resp_{suffix}"
return f"resp_{uuid.uuid4().hex}"
def _responses_usage_from_chat(usage: Any) -> dict[str, int]:
if not isinstance(usage, dict):
return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
input_tokens = int(usage.get("prompt_tokens") or 0)
output_tokens = int(usage.get("completion_tokens") or 0)
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": int(usage.get("total_tokens") or (input_tokens + output_tokens)),
}
def _responses_non_stream_from_chat_payload(chat_payload: Any) -> dict[str, Any]:
if not isinstance(chat_payload, dict):
raise HTTPException(
status_code=502,
detail={"error": {"message": "invalid upstream response", "type": "upstream_error"}},
)
choice = {}
choices = chat_payload.get("choices")
if isinstance(choices, list) and choices:
choice = choices[0] if isinstance(choices[0], dict) else {}
message = choice.get("message") if isinstance(choice.get("message"), dict) else {}
output: list[dict[str, Any]] = []
content = message.get("content")
if isinstance(content, str) and content:
output.append(
{
"type": "message",
"id": f"msg_{uuid.uuid4().hex}",
"status": "completed",
"role": "assistant",
"content": [{"type": "output_text", "text": content}],
}
)
tool_calls = message.get("tool_calls")
if isinstance(tool_calls, list):
for idx, tool_call in enumerate(tool_calls):
if not isinstance(tool_call, dict):
continue
fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {}
call_id = str(tool_call.get("id") or f"call_{idx}")
output.append(
{
"type": "function_call",
"id": call_id,
"call_id": call_id,
"name": str(fn.get("name") or "tool"),
"arguments": str(fn.get("arguments") or "{}"),
}
)
output_text_parts: list[str] = []
for item in output:
if item.get("type") == "message":
blocks = item.get("content")
if isinstance(blocks, list):
for block in blocks:
if isinstance(block, dict) and block.get("type") == "output_text":
text = block.get("text")
if isinstance(text, str) and text:
output_text_parts.append(text)
return {
"id": _responses_id_from_chat_id(chat_payload.get("id")),
"object": "response",
"created_at": int(chat_payload.get("created") or time.time()),
"status": "completed",
"error": None,
"incomplete_details": None,
"model": chat_payload.get("model"),
"output": output,
"output_text": "".join(output_text_parts),
"usage": _responses_usage_from_chat(chat_payload.get("usage")),
}
def _sse_data(payload: dict[str, Any]) -> str:
return f"data: {json.dumps(payload, ensure_ascii=False)}\\n\\n"
async def _responses_stream_from_chat_stream(
chat_stream: StreamingResponse,
*,
response_id: str,
model: str,
):
created_at = int(time.time())
usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
yield _sse_data(
{
"type": "response.created",
"response": {
"id": response_id,
"object": "response",
"created_at": created_at,
"status": "in_progress",
"model": model,
},
}
)
async for part in chat_stream.body_iterator:
chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part)
for frame in chunk.split("\n\n"):
frame = frame.strip()
if not frame or not frame.startswith("data:"):
continue
body = frame[len("data:") :].strip()
if body == "[DONE]":
yield _sse_data(
{
"type": "response.completed",
"response": {
"id": response_id,
"object": "response",
"created_at": created_at,
"status": "completed",
"model": model,
"usage": usage,
},
}
)
yield "data: [DONE]\\n\\n"
return
try:
payload = json.loads(body)
except Exception:
continue
frame_usage = _responses_usage_from_chat(payload.get("usage"))
if any(frame_usage.values()):
usage = frame_usage
choices = payload.get("choices")
if not isinstance(choices, list) or not choices:
continue
choice = choices[0] if isinstance(choices[0], dict) else {}
delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {}
text = delta.get("content")
if isinstance(text, str) and text:
yield _sse_data(
{
"type": "response.output_text.delta",
"response_id": response_id,
"delta": text,
}
)
tool_calls = delta.get("tool_calls")
if isinstance(tool_calls, list):
for idx, tool_call in enumerate(tool_calls):
if not isinstance(tool_call, dict):
continue
fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {}
call_id = str(tool_call.get("id") or f"call_{idx}")
yield _sse_data(
{
"type": "response.function_call.delta",
"response_id": response_id,
"item_id": call_id,
"name": str(fn.get("name") or "tool"),
"arguments": str(fn.get("arguments") or "{}"),
}
)
@app.post("/v1/responses", dependencies=[Depends(auth_guard)])
async def v1_responses(req: ResponsesRequest, request: Request):
chat_req = _responses_to_chat_request(req)
chat_response = await v1_chat_completions(chat_req, request)
if isinstance(chat_response, StreamingResponse):
response_id = f"resp_{uuid.uuid4().hex}"
return StreamingResponse(
_responses_stream_from_chat_stream(
chat_response,
response_id=response_id,
model=req.model,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
invalid_upstream_error = {
"error": {"message": "invalid upstream response", "type": "upstream_error"}
}
try:
chat_payload = json.loads(chat_response.body)
except Exception:
raise HTTPException(
status_code=502,
detail=invalid_upstream_error,
)
if not isinstance(chat_payload, dict):
raise HTTPException(
status_code=502,
detail=invalid_upstream_error,
)
return JSONResponse(content=_responses_non_stream_from_chat_payload(chat_payload))
def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse:
"""Build an Anthropic-shaped error response (`type:error` envelope)."""
return JSONResponse(

View File

@@ -32,6 +32,19 @@ class ChatCompletionsRequest(BaseModel):
tool_choice: Any | None = None
class ResponsesRequest(BaseModel):
model: str
input: Any | None = None
stream: bool = False
temperature: float | None = None
top_p: float | None = None
max_output_tokens: int | None = None
user: str | None = None
tools: list[dict[str, Any]] | None = None
tool_choice: Any | None = None
instructions: str | None = None
class ModelData(BaseModel):
id: str
name: str | None = None

View File

@@ -51,9 +51,10 @@ sys.modules.setdefault("playwright", _playwright)
sys.modules.setdefault("playwright.async_api", _playwright_async)
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
from app.anthropic_schema import AnthropicMessagesRequest
from app.openai_schema import ChatCompletionsRequest
from app.openai_schema import ChatCompletionsRequest, ResponsesRequest
import app.main as main
@@ -704,6 +705,119 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(fake_cache.keys, [])
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
async def test_responses_non_stream_maps_chat_payload_shape_and_input(self) -> None:
req = ResponsesRequest(
model="org_auto",
input="hello from responses",
stream=False,
)
chat_payload = {
"id": "chatcmpl-abc123",
"created": 123,
"model": "org_auto",
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {"role": "assistant", "content": "done"},
}
],
"usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6},
}
mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload))
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
payload = json.loads(response.body)
self.assertEqual(payload["id"], "resp_abc123")
self.assertEqual(payload["object"], "response")
self.assertEqual(payload["status"], "completed")
self.assertEqual(payload["output_text"], "done")
self.assertEqual(payload["usage"], {"input_tokens": 4, "output_tokens": 2, "total_tokens": 6})
self.assertEqual(payload["output"][0]["type"], "message")
self.assertEqual(payload["output"][0]["content"][0]["type"], "output_text")
mock_chat.assert_awaited_once()
chat_req = mock_chat.await_args.args[0]
self.assertIsInstance(chat_req, ChatCompletionsRequest)
messages_dump = [m.model_dump() for m in chat_req.messages]
self.assertEqual(messages_dump, [{"role": "user", "content": "hello from responses", "name": None, "tool_call_id": None, "tool_calls": None}])
async def test_responses_forwards_input_tools_and_tool_choice_to_chat_request(self) -> None:
req = ResponsesRequest(
model="org_auto",
instructions="be concise",
input=[
{"role": "user", "content": [{"type": "text", "text": "first"}]},
{"type": "function_call_output", "call_id": "call_1", "output": {"ok": True}},
"follow up",
],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
mock_chat = AsyncMock(return_value=JSONResponse(content={"id": "chatcmpl-x", "created": 1, "model": "org_auto", "choices": [{"message": {"role": "assistant", "content": "ok"}}], "usage": {}}))
with patch.object(main, "v1_chat_completions", mock_chat):
await main.v1_responses(req, _make_request("/v1/responses"))
mock_chat.assert_awaited_once()
chat_req = mock_chat.await_args.args[0]
self.assertIsInstance(chat_req, ChatCompletionsRequest)
self.assertEqual(chat_req.tools, req.tools)
self.assertEqual(chat_req.tool_choice, req.tool_choice)
messages_dump = [m.model_dump() for m in chat_req.messages]
self.assertEqual(messages_dump[0]["role"], "system")
self.assertEqual(messages_dump[0]["content"], "be concise")
self.assertEqual(messages_dump[1]["role"], "user")
self.assertEqual(messages_dump[1]["content"], "first")
self.assertEqual(messages_dump[2]["role"], "tool")
self.assertEqual(messages_dump[2]["tool_call_id"], "call_1")
self.assertEqual(messages_dump[2]["content"], '{"ok": true}')
self.assertEqual(messages_dump[3]["role"], "user")
self.assertEqual(messages_dump[3]["content"], "follow up")
async def test_responses_stream_bridges_text_tool_and_completed_events(self) -> None:
async def _chat_sse():
yield b'data: {"choices": [{"delta": {"content": "hello"}}]}\n\n'
yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\": \\"x\\"}"}}]}}]}\n\n'
yield b'data: {"usage": {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, "choices": [{"delta": {}}]}\n\n'
yield b"data: [DONE]\n\n"
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.created"', body)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "hello"', body)
self.assertIn('"type": "response.function_call.delta"', body)
self.assertIn('"item_id": "call_1"', body)
self.assertIn('"name": "lookup"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn('"input_tokens": 3', body)
self.assertIn('"output_tokens": 2', body)
self.assertIn('data: [DONE]', body)
async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None:
req = ResponsesRequest(model="org_auto", input="hi", stream=False)
mock_chat = AsyncMock(return_value=Response(content="not-json", media_type="text/plain"))
with patch.object(main, "v1_chat_completions", mock_chat):
with self.assertRaises(main.HTTPException) as cm:
await main.v1_responses(req, _make_request("/v1/responses"))
self.assertEqual(cm.exception.status_code, 502)
detail = cm.exception.detail
self.assertEqual(detail["error"]["type"], "upstream_error")
self.assertEqual(detail["error"]["message"], "invalid upstream response")
class SessionCacheToolFingerprintTests(unittest.TestCase):