From c9bd71f72798e21d44cf52f8540deb9a2dd55467 Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Mon, 20 Apr 2026 13:11:00 +0800 Subject: [PATCH] feat: add OpenAI /v1/responses adapter via chat flow Implement a thin responses layer that reuses existing chat/completions execution so auth, pooling, streaming, tool passthrough, and error semantics stay aligned across APIs. Co-Authored-By: Claude Opus 4.7 --- app/main.py | 304 +++++++++++++++++++++++++++++++++ app/openai_schema.py | 13 ++ tests/test_tool_call_bridge.py | 116 ++++++++++++- 3 files changed, 432 insertions(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index a8d16c8..fbe27e4 100644 --- a/app/main.py +++ b/app/main.py @@ -34,6 +34,7 @@ from .openai_schema import ( ChatCompletionsRequest, ModelData, ModelsResponse, + ResponsesRequest, flatten_content, ) from .session_bundle import encode_bundle, pack_workdir @@ -931,6 +932,309 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): ticket.release() + + +def _responses_input_to_messages(req: ResponsesRequest) -> list[dict[str, Any]]: + messages: list[dict[str, Any]] = [] + if req.instructions: + messages.append({"role": "system", "content": req.instructions}) + + raw_input = req.input + if raw_input is None: + return messages + + valid_roles = {"system", "user", "assistant", "tool", "developer", "function"} + + def _append(role: str, content: Any, *, tool_call_id: str | None = None) -> None: + msg: dict[str, Any] = {"role": role, "content": flatten_content(content)} + if role == "tool" and tool_call_id: + msg["tool_call_id"] = tool_call_id + messages.append(msg) + + if isinstance(raw_input, str): + _append("user", raw_input) + return messages + + raw_items: list[Any] + if isinstance(raw_input, dict): + raw_items = [raw_input] + elif isinstance(raw_input, list): + raw_items = list(raw_input) + else: + _append("user", str(raw_input)) + return messages + + for item in raw_items: + if isinstance(item, str): + _append("user", item) + continue + if not isinstance(item, dict): + _append("user", str(item)) + continue + + role = item.get("role") + if isinstance(role, str) and role in valid_roles: + tool_call_id = item.get("tool_call_id") or item.get("call_id") + _append(role, item.get("content"), tool_call_id=str(tool_call_id) if tool_call_id else None) + continue + + if item.get("type") == "function_call_output": + output = item.get("output") + if isinstance(output, (dict, list)): + output = json.dumps(output, ensure_ascii=False) + tool_call_id = item.get("call_id") + _append("tool", output, tool_call_id=str(tool_call_id) if tool_call_id else None) + continue + + if "content" in item: + text = flatten_content(item.get("content")) + else: + text = flatten_content([item]) + if text: + _append("user", text) + + return messages + + + +def _responses_to_chat_request(req: ResponsesRequest) -> ChatCompletionsRequest: + return ChatCompletionsRequest( + model=req.model, + messages=_responses_input_to_messages(req), + stream=req.stream, + temperature=req.temperature, + top_p=req.top_p, + max_tokens=req.max_output_tokens, + user=req.user, + tools=req.tools, + tool_choice=req.tool_choice, + ) + + + +def _responses_id_from_chat_id(chat_id: Any) -> str: + if isinstance(chat_id, str) and chat_id: + suffix = chat_id.removeprefix("chatcmpl-") + return f"resp_{suffix}" + return f"resp_{uuid.uuid4().hex}" + + + +def _responses_usage_from_chat(usage: Any) -> dict[str, int]: + if not isinstance(usage, dict): + return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + input_tokens = int(usage.get("prompt_tokens") or 0) + output_tokens = int(usage.get("completion_tokens") or 0) + return { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": int(usage.get("total_tokens") or (input_tokens + output_tokens)), + } + + + +def _responses_non_stream_from_chat_payload(chat_payload: Any) -> dict[str, Any]: + if not isinstance(chat_payload, dict): + raise HTTPException( + status_code=502, + detail={"error": {"message": "invalid upstream response", "type": "upstream_error"}}, + ) + choice = {} + choices = chat_payload.get("choices") + if isinstance(choices, list) and choices: + choice = choices[0] if isinstance(choices[0], dict) else {} + message = choice.get("message") if isinstance(choice.get("message"), dict) else {} + + output: list[dict[str, Any]] = [] + content = message.get("content") + if isinstance(content, str) and content: + output.append( + { + "type": "message", + "id": f"msg_{uuid.uuid4().hex}", + "status": "completed", + "role": "assistant", + "content": [{"type": "output_text", "text": content}], + } + ) + + tool_calls = message.get("tool_calls") + if isinstance(tool_calls, list): + for idx, tool_call in enumerate(tool_calls): + if not isinstance(tool_call, dict): + continue + fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} + call_id = str(tool_call.get("id") or f"call_{idx}") + output.append( + { + "type": "function_call", + "id": call_id, + "call_id": call_id, + "name": str(fn.get("name") or "tool"), + "arguments": str(fn.get("arguments") or "{}"), + } + ) + + output_text_parts: list[str] = [] + for item in output: + if item.get("type") == "message": + blocks = item.get("content") + if isinstance(blocks, list): + for block in blocks: + if isinstance(block, dict) and block.get("type") == "output_text": + text = block.get("text") + if isinstance(text, str) and text: + output_text_parts.append(text) + + return { + "id": _responses_id_from_chat_id(chat_payload.get("id")), + "object": "response", + "created_at": int(chat_payload.get("created") or time.time()), + "status": "completed", + "error": None, + "incomplete_details": None, + "model": chat_payload.get("model"), + "output": output, + "output_text": "".join(output_text_parts), + "usage": _responses_usage_from_chat(chat_payload.get("usage")), + } + + + +def _sse_data(payload: dict[str, Any]) -> str: + return f"data: {json.dumps(payload, ensure_ascii=False)}\\n\\n" + + + +async def _responses_stream_from_chat_stream( + chat_stream: StreamingResponse, + *, + response_id: str, + model: str, +): + created_at = int(time.time()) + usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + yield _sse_data( + { + "type": "response.created", + "response": { + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "in_progress", + "model": model, + }, + } + ) + + async for part in chat_stream.body_iterator: + chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part) + for frame in chunk.split("\n\n"): + frame = frame.strip() + if not frame or not frame.startswith("data:"): + continue + body = frame[len("data:") :].strip() + if body == "[DONE]": + yield _sse_data( + { + "type": "response.completed", + "response": { + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "completed", + "model": model, + "usage": usage, + }, + } + ) + yield "data: [DONE]\\n\\n" + return + + try: + payload = json.loads(body) + except Exception: + continue + + frame_usage = _responses_usage_from_chat(payload.get("usage")) + if any(frame_usage.values()): + usage = frame_usage + + choices = payload.get("choices") + if not isinstance(choices, list) or not choices: + continue + choice = choices[0] if isinstance(choices[0], dict) else {} + delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {} + + text = delta.get("content") + if isinstance(text, str) and text: + yield _sse_data( + { + "type": "response.output_text.delta", + "response_id": response_id, + "delta": text, + } + ) + + tool_calls = delta.get("tool_calls") + if isinstance(tool_calls, list): + for idx, tool_call in enumerate(tool_calls): + if not isinstance(tool_call, dict): + continue + fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} + call_id = str(tool_call.get("id") or f"call_{idx}") + yield _sse_data( + { + "type": "response.function_call.delta", + "response_id": response_id, + "item_id": call_id, + "name": str(fn.get("name") or "tool"), + "arguments": str(fn.get("arguments") or "{}"), + } + ) + + + +@app.post("/v1/responses", dependencies=[Depends(auth_guard)]) +async def v1_responses(req: ResponsesRequest, request: Request): + chat_req = _responses_to_chat_request(req) + chat_response = await v1_chat_completions(chat_req, request) + + if isinstance(chat_response, StreamingResponse): + response_id = f"resp_{uuid.uuid4().hex}" + return StreamingResponse( + _responses_stream_from_chat_stream( + chat_response, + response_id=response_id, + model=req.model, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + }, + ) + + invalid_upstream_error = { + "error": {"message": "invalid upstream response", "type": "upstream_error"} + } + try: + chat_payload = json.loads(chat_response.body) + except Exception: + raise HTTPException( + status_code=502, + detail=invalid_upstream_error, + ) + if not isinstance(chat_payload, dict): + raise HTTPException( + status_code=502, + detail=invalid_upstream_error, + ) + return JSONResponse(content=_responses_non_stream_from_chat_payload(chat_payload)) + + + def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONResponse: """Build an Anthropic-shaped error response (`type:error` envelope).""" return JSONResponse( diff --git a/app/openai_schema.py b/app/openai_schema.py index 33a22df..ee0c591 100644 --- a/app/openai_schema.py +++ b/app/openai_schema.py @@ -32,6 +32,19 @@ class ChatCompletionsRequest(BaseModel): tool_choice: Any | None = None +class ResponsesRequest(BaseModel): + model: str + input: Any | None = None + stream: bool = False + temperature: float | None = None + top_p: float | None = None + max_output_tokens: int | None = None + user: str | None = None + tools: list[dict[str, Any]] | None = None + tool_choice: Any | None = None + instructions: str | None = None + + class ModelData(BaseModel): id: str name: str | None = None diff --git a/tests/test_tool_call_bridge.py b/tests/test_tool_call_bridge.py index 13ee0df..7fc60c6 100644 --- a/tests/test_tool_call_bridge.py +++ b/tests/test_tool_call_bridge.py @@ -51,9 +51,10 @@ sys.modules.setdefault("playwright", _playwright) sys.modules.setdefault("playwright.async_api", _playwright_async) from starlette.requests import Request +from starlette.responses import JSONResponse, Response, StreamingResponse from app.anthropic_schema import AnthropicMessagesRequest -from app.openai_schema import ChatCompletionsRequest +from app.openai_schema import ChatCompletionsRequest, ResponsesRequest import app.main as main @@ -704,6 +705,119 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(fake_cache.keys, []) self.assertEqual(fake_cache.get_calls, []) self.assertEqual(fake_cache.put_calls, []) + async def test_responses_non_stream_maps_chat_payload_shape_and_input(self) -> None: + req = ResponsesRequest( + model="org_auto", + input="hello from responses", + stream=False, + ) + chat_payload = { + "id": "chatcmpl-abc123", + "created": 123, + "model": "org_auto", + "choices": [ + { + "index": 0, + "finish_reason": "stop", + "message": {"role": "assistant", "content": "done"}, + } + ], + "usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6}, + } + + mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload)) + with patch.object(main, "v1_chat_completions", mock_chat): + response = await main.v1_responses(req, _make_request("/v1/responses")) + + payload = json.loads(response.body) + self.assertEqual(payload["id"], "resp_abc123") + self.assertEqual(payload["object"], "response") + self.assertEqual(payload["status"], "completed") + self.assertEqual(payload["output_text"], "done") + self.assertEqual(payload["usage"], {"input_tokens": 4, "output_tokens": 2, "total_tokens": 6}) + self.assertEqual(payload["output"][0]["type"], "message") + self.assertEqual(payload["output"][0]["content"][0]["type"], "output_text") + + mock_chat.assert_awaited_once() + chat_req = mock_chat.await_args.args[0] + self.assertIsInstance(chat_req, ChatCompletionsRequest) + messages_dump = [m.model_dump() for m in chat_req.messages] + self.assertEqual(messages_dump, [{"role": "user", "content": "hello from responses", "name": None, "tool_call_id": None, "tool_calls": None}]) + + async def test_responses_forwards_input_tools_and_tool_choice_to_chat_request(self) -> None: + req = ResponsesRequest( + model="org_auto", + instructions="be concise", + input=[ + {"role": "user", "content": [{"type": "text", "text": "first"}]}, + {"type": "function_call_output", "call_id": "call_1", "output": {"ok": True}}, + "follow up", + ], + stream=False, + tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}], + tool_choice={"type": "function", "function": {"name": "lookup"}}, + ) + + mock_chat = AsyncMock(return_value=JSONResponse(content={"id": "chatcmpl-x", "created": 1, "model": "org_auto", "choices": [{"message": {"role": "assistant", "content": "ok"}}], "usage": {}})) + with patch.object(main, "v1_chat_completions", mock_chat): + await main.v1_responses(req, _make_request("/v1/responses")) + + mock_chat.assert_awaited_once() + chat_req = mock_chat.await_args.args[0] + self.assertIsInstance(chat_req, ChatCompletionsRequest) + self.assertEqual(chat_req.tools, req.tools) + self.assertEqual(chat_req.tool_choice, req.tool_choice) + messages_dump = [m.model_dump() for m in chat_req.messages] + self.assertEqual(messages_dump[0]["role"], "system") + self.assertEqual(messages_dump[0]["content"], "be concise") + self.assertEqual(messages_dump[1]["role"], "user") + self.assertEqual(messages_dump[1]["content"], "first") + self.assertEqual(messages_dump[2]["role"], "tool") + self.assertEqual(messages_dump[2]["tool_call_id"], "call_1") + self.assertEqual(messages_dump[2]["content"], '{"ok": true}') + self.assertEqual(messages_dump[3]["role"], "user") + self.assertEqual(messages_dump[3]["content"], "follow up") + + async def test_responses_stream_bridges_text_tool_and_completed_events(self) -> None: + async def _chat_sse(): + yield b'data: {"choices": [{"delta": {"content": "hello"}}]}\n\n' + yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\": \\"x\\"}"}}]}}]}\n\n' + yield b'data: {"usage": {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, "choices": [{"delta": {}}]}\n\n' + yield b"data: [DONE]\n\n" + + req = ResponsesRequest(model="org_auto", input="hi", stream=True) + mock_chat = AsyncMock( + return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream") + ) + + with patch.object(main, "v1_chat_completions", mock_chat): + response = await main.v1_responses(req, _make_request("/v1/responses")) + body = await _collect_stream(response) + + self.assertIn('"type": "response.created"', body) + self.assertIn('"type": "response.output_text.delta"', body) + self.assertIn('"delta": "hello"', body) + self.assertIn('"type": "response.function_call.delta"', body) + self.assertIn('"item_id": "call_1"', body) + self.assertIn('"name": "lookup"', body) + self.assertIn('"type": "response.completed"', body) + self.assertIn('"input_tokens": 3', body) + self.assertIn('"output_tokens": 2', body) + self.assertIn('data: [DONE]', body) + + + async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None: + req = ResponsesRequest(model="org_auto", input="hi", stream=False) + mock_chat = AsyncMock(return_value=Response(content="not-json", media_type="text/plain")) + + with patch.object(main, "v1_chat_completions", mock_chat): + with self.assertRaises(main.HTTPException) as cm: + await main.v1_responses(req, _make_request("/v1/responses")) + + self.assertEqual(cm.exception.status_code, 502) + detail = cm.exception.detail + self.assertEqual(detail["error"]["type"], "upstream_error") + self.assertEqual(detail["error"]["message"], "invalid upstream response") class SessionCacheToolFingerprintTests(unittest.TestCase):