fix: ensure responses stream always completes
Emit a fallback response.completed and [DONE] when upstream SSE closes early so OpenAI /v1/responses clients do not fail on incomplete streams. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
15
app/main.py
15
app/main.py
@@ -1193,6 +1193,21 @@ async def _responses_stream_from_chat_stream(
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
yield _sse_data(
|
||||||
|
{
|
||||||
|
"type": "response.completed",
|
||||||
|
"response": {
|
||||||
|
"id": response_id,
|
||||||
|
"object": "response",
|
||||||
|
"created_at": created_at,
|
||||||
|
"status": "completed",
|
||||||
|
"model": model,
|
||||||
|
"usage": usage,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
yield "data: [DONE]\\n\\n"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/v1/responses", dependencies=[Depends(auth_guard)])
|
@app.post("/v1/responses", dependencies=[Depends(auth_guard)])
|
||||||
|
|||||||
@@ -805,6 +805,27 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
|
|||||||
self.assertIn('"output_tokens": 2', body)
|
self.assertIn('"output_tokens": 2', body)
|
||||||
self.assertIn('data: [DONE]', body)
|
self.assertIn('data: [DONE]', body)
|
||||||
|
|
||||||
|
async def test_responses_stream_emits_completed_when_upstream_closes_without_done(self) -> None:
|
||||||
|
async def _chat_sse_without_done():
|
||||||
|
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
|
||||||
|
yield b'data: {"usage": {"prompt_tokens": 7, "completion_tokens": 1, "total_tokens": 8}, "choices": [{"delta": {}}]}\n\n'
|
||||||
|
|
||||||
|
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
|
||||||
|
mock_chat = AsyncMock(
|
||||||
|
return_value=StreamingResponse(_chat_sse_without_done(), media_type="text/event-stream")
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
||||||
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
||||||
|
body = await _collect_stream(response)
|
||||||
|
|
||||||
|
self.assertIn('"type": "response.output_text.delta"', body)
|
||||||
|
self.assertIn('"delta": "partial"', body)
|
||||||
|
self.assertIn('"type": "response.completed"', body)
|
||||||
|
self.assertIn('"input_tokens": 7', body)
|
||||||
|
self.assertIn('"output_tokens": 1', body)
|
||||||
|
self.assertIn('data: [DONE]', body)
|
||||||
|
|
||||||
|
|
||||||
async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None:
|
async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None:
|
||||||
req = ResponsesRequest(model="org_auto", input="hi", stream=False)
|
req = ResponsesRequest(model="org_auto", input="hi", stream=False)
|
||||||
|
|||||||
Reference in New Issue
Block a user