From 5e6c1c1a633f63376933b35ac503ca85bfa53e04 Mon Sep 17 00:00:00 2001 From: mmc <853506518@qq.com> Date: Mon, 20 Apr 2026 14:55:32 +0800 Subject: [PATCH] fix: harden responses stream termination Ensure /v1/responses streaming always emits completion frames on upstream EOF, errors, and cancellation, and add targeted diagnostics for interrupted Lingma streams. Co-Authored-By: Claude Opus 4.7 --- app/lingma_client.py | 10 +- app/main.py | 163 ++++++++++++++++++--------------- tests/test_tool_call_bridge.py | 39 ++++++++ 3 files changed, 139 insertions(+), 73 deletions(-) diff --git a/app/lingma_client.py b/app/lingma_client.py index 571de69..f46ecf4 100644 --- a/app/lingma_client.py +++ b/app/lingma_client.py @@ -495,13 +495,21 @@ class LspWsRpcClient: if stream is None: return start = time.monotonic() + last_chunk_at = start while True: remain = timeout - (time.monotonic() - start) if remain <= 0: - raise TimeoutError("chat stream timeout") + first_chunk_at = stream.get("first_chunk_at") + raise TimeoutError( + "chat stream timeout " + f"request_id={request_id} timeout={timeout:.1f}s " + f"first_chunk_at={None if first_chunk_at is None else round(first_chunk_at - start, 3)}s " + f"last_chunk_at={round(last_chunk_at - start, 3)}s" + ) chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain) if chunk is None: break + last_chunk_at = time.monotonic() yield chunk def get_stream_result(self, request_id: str) -> dict: diff --git a/app/main.py b/app/main.py index c0c7068..3a23b87 100644 --- a/app/main.py +++ b/app/main.py @@ -859,10 +859,21 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): yield "data: [DONE]\n\n" success = True except asyncio.CancelledError: - logger.info("chat.stream cancelled by client (inst=%s)", _inst.name) + logger.info( + "chat.stream cancelled by client (inst=%s, session_id=%s)", + _inst.name, + cached_session_id, + ) raise except Exception as exc: - logger.warning("chat.stream error (inst=%s): %s", _inst.name, exc) + logger.warning( + "chat.stream error (inst=%s, session_id=%s, prompt_tokens=%s, completion_tokens=%s): %s", + _inst.name, + cached_session_id, + prompt_tokens, + completion_tokens_holder["n"], + exc, + ) finally: # Persist upstream sessionId only on a clean chat/finish. # Partial streams (cancelled, timed out) leave Lingma's @@ -1157,6 +1168,23 @@ async def _responses_stream_from_chat_stream( ): created_at = int(time.time()) usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + completed_sent = False + + def _completed_frame() -> str: + return _sse_data( + { + "type": "response.completed", + "response": { + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "completed", + "model": model, + "usage": usage, + }, + } + ) + yield _sse_data( { "type": "response.created", @@ -1170,86 +1198,77 @@ async def _responses_stream_from_chat_stream( } ) - async for part in chat_stream.body_iterator: - chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part) - for frame in chunk.split("\n\n"): - frame = frame.strip() - if not frame or not frame.startswith("data:"): - continue - body = frame[len("data:") :].strip() - if body == "[DONE]": - yield _sse_data( - { - "type": "response.completed", - "response": { - "id": response_id, - "object": "response", - "created_at": created_at, - "status": "completed", - "model": model, - "usage": usage, - }, - } - ) - yield "data: [DONE]\\n\\n" - return + try: + async for part in chat_stream.body_iterator: + chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part) + for frame in chunk.split("\n\n"): + frame = frame.strip() + if not frame or not frame.startswith("data:"): + continue + body = frame[len("data:") :].strip() + if body == "[DONE]": + yield _completed_frame() + yield "data: [DONE]\n\n" + completed_sent = True + return - try: - payload = json.loads(body) - except Exception: - continue + try: + payload = json.loads(body) + except Exception: + continue - frame_usage = _responses_usage_from_chat(payload.get("usage")) - if any(frame_usage.values()): - usage = frame_usage + frame_usage = _responses_usage_from_chat(payload.get("usage")) + if any(frame_usage.values()): + usage = frame_usage - choices = payload.get("choices") - if not isinstance(choices, list) or not choices: - continue - choice = choices[0] if isinstance(choices[0], dict) else {} - delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {} + choices = payload.get("choices") + if not isinstance(choices, list) or not choices: + continue + choice = choices[0] if isinstance(choices[0], dict) else {} + delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {} - text = delta.get("content") - if isinstance(text, str) and text: - yield _sse_data( - { - "type": "response.output_text.delta", - "response_id": response_id, - "delta": text, - } - ) - - tool_calls = delta.get("tool_calls") - if isinstance(tool_calls, list): - for idx, tool_call in enumerate(tool_calls): - if not isinstance(tool_call, dict): - continue - fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} - call_id = str(tool_call.get("id") or f"call_{idx}") + text = delta.get("content") + if isinstance(text, str) and text: yield _sse_data( { - "type": "response.function_call.delta", + "type": "response.output_text.delta", "response_id": response_id, - "item_id": call_id, - "name": str(fn.get("name") or "tool"), - "arguments": str(fn.get("arguments") or "{}"), + "delta": text, } ) - yield _sse_data( - { - "type": "response.completed", - "response": { - "id": response_id, - "object": "response", - "created_at": created_at, - "status": "completed", - "model": model, - "usage": usage, - }, - } - ) - yield "data: [DONE]\\n\\n" + tool_calls = delta.get("tool_calls") + if isinstance(tool_calls, list): + for idx, tool_call in enumerate(tool_calls): + if not isinstance(tool_call, dict): + continue + fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {} + call_id = str(tool_call.get("id") or f"call_{idx}") + yield _sse_data( + { + "type": "response.function_call.delta", + "response_id": response_id, + "item_id": call_id, + "name": str(fn.get("name") or "tool"), + "arguments": str(fn.get("arguments") or "{}"), + } + ) + except asyncio.CancelledError: + if not completed_sent: + yield _completed_frame() + yield "data: [DONE]\n\n" + completed_sent = True + return + except Exception: + if not completed_sent: + yield _completed_frame() + yield "data: [DONE]\n\n" + completed_sent = True + return + + if not completed_sent: + yield _completed_frame() + yield "data: [DONE]\n\n" diff --git a/tests/test_tool_call_bridge.py b/tests/test_tool_call_bridge.py index 8b7125a..ea7f12a 100644 --- a/tests/test_tool_call_bridge.py +++ b/tests/test_tool_call_bridge.py @@ -4,6 +4,7 @@ import json import sys import types import unittest +import asyncio from unittest.mock import AsyncMock, patch @@ -914,6 +915,44 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase): self.assertIn('"output_tokens": 1', body) self.assertIn('data: [DONE]', body) + async def test_responses_stream_emits_completed_when_upstream_iterator_errors(self) -> None: + async def _chat_sse_error(): + yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n' + raise RuntimeError("boom") + + req = ResponsesRequest(model="org_auto", input="hi", stream=True) + mock_chat = AsyncMock( + return_value=StreamingResponse(_chat_sse_error(), media_type="text/event-stream") + ) + + with patch.object(main, "v1_chat_completions", mock_chat): + response = await main.v1_responses(req, _make_request("/v1/responses")) + body = await _collect_stream(response) + + self.assertIn('"type": "response.output_text.delta"', body) + self.assertIn('"delta": "partial"', body) + self.assertIn('"type": "response.completed"', body) + self.assertIn('data: [DONE]', body) + + async def test_responses_stream_emits_completed_when_upstream_cancels(self) -> None: + async def _chat_sse_cancelled(): + yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n' + raise asyncio.CancelledError() + + req = ResponsesRequest(model="org_auto", input="hi", stream=True) + mock_chat = AsyncMock( + return_value=StreamingResponse(_chat_sse_cancelled(), media_type="text/event-stream") + ) + + with patch.object(main, "v1_chat_completions", mock_chat): + response = await main.v1_responses(req, _make_request("/v1/responses")) + body = await _collect_stream(response) + + self.assertIn('"type": "response.output_text.delta"', body) + self.assertIn('"delta": "partial"', body) + self.assertIn('"type": "response.completed"', body) + self.assertIn('data: [DONE]', body) + async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None: req = ResponsesRequest(model="org_auto", input="hi", stream=False)