fix: harden responses stream termination

Ensure /v1/responses streaming always emits completion frames on upstream EOF, errors, and cancellation, and add targeted diagnostics for interrupted Lingma streams.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
mmc
2026-04-20 14:55:32 +08:00
parent 12a4d9584e
commit 5e6c1c1a63
3 changed files with 139 additions and 73 deletions

View File

@@ -495,13 +495,21 @@ class LspWsRpcClient:
if stream is None: if stream is None:
return return
start = time.monotonic() start = time.monotonic()
last_chunk_at = start
while True: while True:
remain = timeout - (time.monotonic() - start) remain = timeout - (time.monotonic() - start)
if remain <= 0: if remain <= 0:
raise TimeoutError("chat stream timeout") first_chunk_at = stream.get("first_chunk_at")
raise TimeoutError(
"chat stream timeout "
f"request_id={request_id} timeout={timeout:.1f}s "
f"first_chunk_at={None if first_chunk_at is None else round(first_chunk_at - start, 3)}s "
f"last_chunk_at={round(last_chunk_at - start, 3)}s"
)
chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain) chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain)
if chunk is None: if chunk is None:
break break
last_chunk_at = time.monotonic()
yield chunk yield chunk
def get_stream_result(self, request_id: str) -> dict: def get_stream_result(self, request_id: str) -> dict:

View File

@@ -859,10 +859,21 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
success = True success = True
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("chat.stream cancelled by client (inst=%s)", _inst.name) logger.info(
"chat.stream cancelled by client (inst=%s, session_id=%s)",
_inst.name,
cached_session_id,
)
raise raise
except Exception as exc: except Exception as exc:
logger.warning("chat.stream error (inst=%s): %s", _inst.name, exc) logger.warning(
"chat.stream error (inst=%s, session_id=%s, prompt_tokens=%s, completion_tokens=%s): %s",
_inst.name,
cached_session_id,
prompt_tokens,
completion_tokens_holder["n"],
exc,
)
finally: finally:
# Persist upstream sessionId only on a clean chat/finish. # Persist upstream sessionId only on a clean chat/finish.
# Partial streams (cancelled, timed out) leave Lingma's # Partial streams (cancelled, timed out) leave Lingma's
@@ -1157,6 +1168,23 @@ async def _responses_stream_from_chat_stream(
): ):
created_at = int(time.time()) created_at = int(time.time())
usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
completed_sent = False
def _completed_frame() -> str:
return _sse_data(
{
"type": "response.completed",
"response": {
"id": response_id,
"object": "response",
"created_at": created_at,
"status": "completed",
"model": model,
"usage": usage,
},
}
)
yield _sse_data( yield _sse_data(
{ {
"type": "response.created", "type": "response.created",
@@ -1170,86 +1198,77 @@ async def _responses_stream_from_chat_stream(
} }
) )
async for part in chat_stream.body_iterator: try:
chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part) async for part in chat_stream.body_iterator:
for frame in chunk.split("\n\n"): chunk = part.decode("utf-8") if isinstance(part, bytes) else str(part)
frame = frame.strip() for frame in chunk.split("\n\n"):
if not frame or not frame.startswith("data:"): frame = frame.strip()
continue if not frame or not frame.startswith("data:"):
body = frame[len("data:") :].strip() continue
if body == "[DONE]": body = frame[len("data:") :].strip()
yield _sse_data( if body == "[DONE]":
{ yield _completed_frame()
"type": "response.completed", yield "data: [DONE]\n\n"
"response": { completed_sent = True
"id": response_id, return
"object": "response",
"created_at": created_at,
"status": "completed",
"model": model,
"usage": usage,
},
}
)
yield "data: [DONE]\\n\\n"
return
try: try:
payload = json.loads(body) payload = json.loads(body)
except Exception: except Exception:
continue continue
frame_usage = _responses_usage_from_chat(payload.get("usage")) frame_usage = _responses_usage_from_chat(payload.get("usage"))
if any(frame_usage.values()): if any(frame_usage.values()):
usage = frame_usage usage = frame_usage
choices = payload.get("choices") choices = payload.get("choices")
if not isinstance(choices, list) or not choices: if not isinstance(choices, list) or not choices:
continue continue
choice = choices[0] if isinstance(choices[0], dict) else {} choice = choices[0] if isinstance(choices[0], dict) else {}
delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {} delta = choice.get("delta") if isinstance(choice.get("delta"), dict) else {}
text = delta.get("content") text = delta.get("content")
if isinstance(text, str) and text: if isinstance(text, str) and text:
yield _sse_data(
{
"type": "response.output_text.delta",
"response_id": response_id,
"delta": text,
}
)
tool_calls = delta.get("tool_calls")
if isinstance(tool_calls, list):
for idx, tool_call in enumerate(tool_calls):
if not isinstance(tool_call, dict):
continue
fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {}
call_id = str(tool_call.get("id") or f"call_{idx}")
yield _sse_data( yield _sse_data(
{ {
"type": "response.function_call.delta", "type": "response.output_text.delta",
"response_id": response_id, "response_id": response_id,
"item_id": call_id, "delta": text,
"name": str(fn.get("name") or "tool"),
"arguments": str(fn.get("arguments") or "{}"),
} }
) )
yield _sse_data( tool_calls = delta.get("tool_calls")
{ if isinstance(tool_calls, list):
"type": "response.completed", for idx, tool_call in enumerate(tool_calls):
"response": { if not isinstance(tool_call, dict):
"id": response_id, continue
"object": "response", fn = tool_call.get("function") if isinstance(tool_call.get("function"), dict) else {}
"created_at": created_at, call_id = str(tool_call.get("id") or f"call_{idx}")
"status": "completed", yield _sse_data(
"model": model, {
"usage": usage, "type": "response.function_call.delta",
}, "response_id": response_id,
} "item_id": call_id,
) "name": str(fn.get("name") or "tool"),
yield "data: [DONE]\\n\\n" "arguments": str(fn.get("arguments") or "{}"),
}
)
except asyncio.CancelledError:
if not completed_sent:
yield _completed_frame()
yield "data: [DONE]\n\n"
completed_sent = True
return
except Exception:
if not completed_sent:
yield _completed_frame()
yield "data: [DONE]\n\n"
completed_sent = True
return
if not completed_sent:
yield _completed_frame()
yield "data: [DONE]\n\n"

View File

@@ -4,6 +4,7 @@ import json
import sys import sys
import types import types
import unittest import unittest
import asyncio
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
@@ -914,6 +915,44 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertIn('"output_tokens": 1', body) self.assertIn('"output_tokens": 1', body)
self.assertIn('data: [DONE]', body) self.assertIn('data: [DONE]', body)
async def test_responses_stream_emits_completed_when_upstream_iterator_errors(self) -> None:
async def _chat_sse_error():
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
raise RuntimeError("boom")
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse_error(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "partial"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn('data: [DONE]', body)
async def test_responses_stream_emits_completed_when_upstream_cancels(self) -> None:
async def _chat_sse_cancelled():
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
raise asyncio.CancelledError()
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse_cancelled(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "partial"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn('data: [DONE]', body)
async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None: async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None:
req = ResponsesRequest(model="org_auto", input="hi", stream=False) req = ResponsesRequest(model="org_auto", input="hi", stream=False)