Ensure /v1/responses streaming always emits completion frames on upstream EOF, errors, and cancellation, and add targeted diagnostics for interrupted Lingma streams. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1244 lines
49 KiB
Python
1244 lines
49 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
import types
|
|
import unittest
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
|
class _FakeSessionCache:
|
|
def __init__(self) -> None:
|
|
self.enabled = True
|
|
self.keys: list[str] = []
|
|
self.get_calls: list[str] = []
|
|
self.put_calls: list[tuple[str, str, str]] = []
|
|
self.invalidate_calls: list[str] = []
|
|
|
|
def build_key(self, api_key: str, messages: list[dict], *, tool_config=None, branch_context=None) -> str:
|
|
marker = "with_tool" if tool_config is not None else "no_tool"
|
|
branch_marker = branch_context or "-"
|
|
key = f"{api_key}:{len(messages)}:{marker}:branch={branch_marker}"
|
|
self.keys.append(key)
|
|
return key
|
|
|
|
async def get(self, key: str):
|
|
self.get_calls.append(key)
|
|
return None
|
|
|
|
async def put(self, key: str, session_id: str, instance_name: str = "") -> None:
|
|
self.put_calls.append((key, session_id, instance_name))
|
|
|
|
async def invalidate(self, key: str) -> None:
|
|
self.invalidate_calls.append(key)
|
|
|
|
# app.main imports playwright via auto_login; tests don't exercise that path.
|
|
# Inject a lightweight stub so unit tests run without installing playwright.
|
|
_playwright = types.ModuleType("playwright")
|
|
_playwright_async = types.ModuleType("playwright.async_api")
|
|
|
|
|
|
class _StubPlaywrightTimeoutError(Exception):
|
|
pass
|
|
|
|
|
|
async def _stub_async_playwright():
|
|
raise RuntimeError("playwright is stubbed in unit tests")
|
|
|
|
|
|
_playwright_async.TimeoutError = _StubPlaywrightTimeoutError
|
|
_playwright_async.async_playwright = _stub_async_playwright
|
|
sys.modules.setdefault("playwright", _playwright)
|
|
sys.modules.setdefault("playwright.async_api", _playwright_async)
|
|
|
|
from starlette.requests import Request
|
|
from starlette.responses import JSONResponse, Response, StreamingResponse
|
|
|
|
from app.anthropic_schema import AnthropicMessagesRequest
|
|
from app.openai_schema import ChatCompletionsRequest, ResponsesRequest
|
|
import app.main as main
|
|
|
|
|
|
class _FakeTicket:
|
|
def __init__(self) -> None:
|
|
self.released = False
|
|
|
|
def release(self) -> None:
|
|
self.released = True
|
|
|
|
|
|
class _FakeGuard:
|
|
def __init__(self) -> None:
|
|
self.in_flight = 0
|
|
|
|
async def try_acquire(self) -> _FakeTicket:
|
|
return _FakeTicket()
|
|
|
|
|
|
class _FakeClient:
|
|
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
|
|
self._stream_events = stream_events
|
|
self._complete_result = complete_result
|
|
|
|
async def query_models(self) -> dict:
|
|
return {
|
|
"chat": [
|
|
{
|
|
"key": "org_auto",
|
|
"displayName": "Auto",
|
|
}
|
|
]
|
|
}
|
|
|
|
async def chat_complete(self, *args, **kwargs) -> dict:
|
|
return self._complete_result
|
|
|
|
async def chat_stream(self, *args, **kwargs):
|
|
out_meta = kwargs.get("out_meta")
|
|
if isinstance(out_meta, dict):
|
|
out_meta["session_id"] = "sess-stream"
|
|
for event in self._stream_events:
|
|
yield event
|
|
|
|
|
|
class _FakeInstance:
|
|
def __init__(self, client: _FakeClient) -> None:
|
|
self.name = "inst-test"
|
|
self.client = client
|
|
self.in_flight = 0
|
|
|
|
|
|
class _FakePool:
|
|
def __init__(self, inst: _FakeInstance) -> None:
|
|
self._inst = inst
|
|
|
|
def pick(self, affinity_key: str | None = None) -> _FakeInstance:
|
|
return self._inst
|
|
|
|
|
|
def _make_request(path: str, headers: dict[str, str] | None = None) -> Request:
|
|
header_pairs = []
|
|
for k, v in (headers or {}).items():
|
|
header_pairs.append((k.lower().encode("latin-1"), v.encode("latin-1")))
|
|
scope = {
|
|
"type": "http",
|
|
"http_version": "1.1",
|
|
"method": "POST",
|
|
"scheme": "http",
|
|
"path": path,
|
|
"raw_path": path.encode("latin-1"),
|
|
"query_string": b"",
|
|
"headers": header_pairs,
|
|
"client": ("testclient", 12345),
|
|
"server": ("testserver", 80),
|
|
"root_path": "",
|
|
}
|
|
return Request(scope)
|
|
|
|
|
|
async def _collect_stream(response) -> str:
|
|
chunks: list[str] = []
|
|
async for part in response.body_iterator:
|
|
if isinstance(part, bytes):
|
|
chunks.append(part.decode("utf-8"))
|
|
else:
|
|
chunks.append(str(part))
|
|
return "".join(chunks)
|
|
|
|
|
|
class _SpyClient(_FakeClient):
|
|
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
|
|
super().__init__(stream_events=stream_events, complete_result=complete_result)
|
|
self.last_complete_args: tuple = ()
|
|
self.last_stream_args: tuple = ()
|
|
self.last_complete_kwargs: dict = {}
|
|
self.last_stream_kwargs: dict = {}
|
|
|
|
async def chat_complete(self, *args, **kwargs) -> dict:
|
|
self.last_complete_args = tuple(args)
|
|
self.last_complete_kwargs = dict(kwargs)
|
|
return await super().chat_complete(*args, **kwargs)
|
|
|
|
async def chat_stream(self, *args, **kwargs):
|
|
self.last_stream_args = tuple(args)
|
|
self.last_stream_kwargs = dict(kwargs)
|
|
async for event in super().chat_stream(*args, **kwargs):
|
|
yield event
|
|
|
|
|
|
class _SettingsPatch:
|
|
def __init__(self, **kwargs) -> None:
|
|
self._kwargs = kwargs
|
|
|
|
def __enter__(self):
|
|
self._patchers = [patch.object(main.settings, k, v) for k, v in self._kwargs.items()]
|
|
for p in self._patchers:
|
|
p.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
for p in reversed(self._patchers):
|
|
p.stop()
|
|
return False
|
|
|
|
|
|
class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
|
|
async def test_openai_non_stream_bridges_tool_calls(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={
|
|
"text": "done",
|
|
"toolEvents": [
|
|
{
|
|
"id": "call_123",
|
|
"name": "search_docs",
|
|
"input": {"query": "gateway"},
|
|
"result": {"ok": True},
|
|
}
|
|
],
|
|
"sessionId": "sess-1",
|
|
"firstTokenLatencyMs": 12,
|
|
"totalLatencyMs": 34,
|
|
},
|
|
)
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
):
|
|
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
|
|
payload = json.loads(response.body)
|
|
message = payload["choices"][0]["message"]
|
|
self.assertEqual(message["content"], "done")
|
|
self.assertIsInstance(message["tool_calls"], list)
|
|
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
|
|
self.assertEqual(message["tool_calls"][0]["function"]["name"], "search_docs")
|
|
self.assertEqual(
|
|
json.loads(message["tool_calls"][0]["function"]["arguments"]),
|
|
{"query": "gateway"},
|
|
)
|
|
|
|
async def test_openai_non_stream_fallbacks_to_structured_tool_call_for_forced_tool(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={
|
|
"text": "```json\n{\"arguments\": {\"query\": \"gateway\"}}\n```",
|
|
"toolEvents": [],
|
|
"sessionId": "sess-fallback-openai",
|
|
},
|
|
)
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
tool_choice={"type": "function", "function": {"name": "lookup"}},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
):
|
|
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
|
|
payload = json.loads(response.body)
|
|
message = payload["choices"][0]["message"]
|
|
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
|
|
self.assertEqual(message["content"], "")
|
|
self.assertIsInstance(message["tool_calls"], list)
|
|
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
|
|
self.assertEqual(
|
|
json.loads(message["tool_calls"][0]["function"]["arguments"]),
|
|
{"query": "gateway"},
|
|
)
|
|
|
|
async def test_openai_stream_bridges_tool_and_text_events(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[
|
|
{
|
|
"type": "tool",
|
|
"tool": {
|
|
"id": "call_stream_1",
|
|
"name": "read_file",
|
|
"input": {"path": "README.md"},
|
|
},
|
|
},
|
|
{"type": "text", "text": "hello"},
|
|
],
|
|
complete_result={},
|
|
)
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=True,
|
|
stream_options={"include_usage": True},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
):
|
|
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"tool_calls"', body)
|
|
self.assertIn('"content": "hello"', body)
|
|
self.assertIn('"finish_reason": "tool_calls"', body)
|
|
self.assertIn('"usage"', body)
|
|
self.assertIn("data: [DONE]", body)
|
|
|
|
async def test_anthropic_non_stream_bridges_tool_blocks(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={
|
|
"text": "ok",
|
|
"toolEvents": [
|
|
{
|
|
"id": "toolu_1",
|
|
"name": "lookup",
|
|
"input": {"k": "v"},
|
|
"result": {"value": 1},
|
|
}
|
|
],
|
|
"sessionId": "sess-2",
|
|
},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=256,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
response = await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
payload = json.loads(response.body)
|
|
types = [item["type"] for item in payload["content"]]
|
|
self.assertEqual(types, ["text", "tool_use", "tool_result"])
|
|
self.assertEqual(payload["stop_reason"], "end_turn")
|
|
self.assertEqual(payload["content"][1]["name"], "lookup")
|
|
self.assertEqual(payload["content"][2]["tool_use_id"], "toolu_1")
|
|
|
|
async def test_anthropic_non_stream_fallbacks_to_structured_tool_blocks_for_forced_tool(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={
|
|
"text": "{\"input\": {\"k\": \"v\"}, \"result\": {\"value\": 1}}",
|
|
"toolEvents": [],
|
|
"sessionId": "sess-fallback-anthropic",
|
|
},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=256,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
tools=[{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}],
|
|
tool_choice={"type": "tool", "name": "lookup"},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
response = await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
payload = json.loads(response.body)
|
|
types = [item["type"] for item in payload["content"]]
|
|
self.assertEqual(types, ["tool_use", "tool_result"])
|
|
self.assertEqual(payload["stop_reason"], "end_turn")
|
|
self.assertEqual(payload["content"][0]["name"], "lookup")
|
|
self.assertEqual(payload["content"][1]["tool_use_id"], "toolu_fallback_0")
|
|
|
|
async def test_openai_stream_tool_call_indices_are_stable(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[
|
|
{
|
|
"type": "tool",
|
|
"tool": {
|
|
"id": "call_a",
|
|
"name": "read_file",
|
|
"input": {"path": "README.md"},
|
|
},
|
|
},
|
|
{
|
|
"type": "tool",
|
|
"tool": {
|
|
"id": "call_b",
|
|
"name": "search_docs",
|
|
"input": {"query": "gateway"},
|
|
},
|
|
},
|
|
],
|
|
complete_result={},
|
|
)
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=True,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
):
|
|
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"id": "call_a"', body)
|
|
self.assertIn('"id": "call_b"', body)
|
|
self.assertIn('"index": 0', body)
|
|
self.assertIn('"index": 1', body)
|
|
|
|
async def test_anthropic_non_stream_returns_tool_use_stop_reason_when_result_missing(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={
|
|
"text": "",
|
|
"toolEvents": [
|
|
{
|
|
"name": "lookup",
|
|
"input": {"k": "v"},
|
|
}
|
|
],
|
|
"sessionId": "sess-2",
|
|
},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=256,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
response = await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
payload = json.loads(response.body)
|
|
self.assertEqual(payload["stop_reason"], "tool_use")
|
|
self.assertEqual(len(payload["content"]), 1)
|
|
self.assertEqual(payload["content"][0]["type"], "tool_use")
|
|
|
|
async def test_anthropic_stream_returns_tool_use_stop_reason_when_result_missing(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[
|
|
{
|
|
"type": "tool",
|
|
"tool": {
|
|
"name": "read",
|
|
"input": {"file": "a.txt"},
|
|
},
|
|
}
|
|
],
|
|
complete_result={},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=256,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=True,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
response = await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"type": "tool_use"', body)
|
|
self.assertIn('"stop_reason": "tool_use"', body)
|
|
|
|
async def test_anthropic_stream_bridges_tool_and_text_events(self) -> None:
|
|
fake_client = _FakeClient(
|
|
stream_events=[
|
|
{
|
|
"type": "tool",
|
|
"tool": {
|
|
"id": "toolu_stream_1",
|
|
"name": "read",
|
|
"input": {"file": "a.txt"},
|
|
"result": "done",
|
|
},
|
|
},
|
|
{"type": "text", "text": "world"},
|
|
],
|
|
complete_result={},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=256,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=True,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
response = await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn("event: message_start", body)
|
|
self.assertIn('"type": "tool_use"', body)
|
|
self.assertIn('"type": "tool_result"', body)
|
|
self.assertIn('"stop_reason": "end_turn"', body)
|
|
self.assertIn('"type": "text_delta"', body)
|
|
self.assertIn("event: message_stop", body)
|
|
|
|
|
|
|
|
async def test_openai_non_stream_forwards_tool_config_when_enabled(self) -> None:
|
|
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
tool_choice={"type": "function", "function": {"name": "lookup"}},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
_SettingsPatch(tool_forward_enabled=True),
|
|
):
|
|
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
|
|
self.assertIn("tool_config", spy_client.last_complete_kwargs)
|
|
cfg = spy_client.last_complete_kwargs["tool_config"]
|
|
self.assertEqual(cfg["provider"], "openai")
|
|
self.assertEqual(len(cfg["tools"]), 1)
|
|
self.assertIsInstance(cfg["tool_choice"], dict)
|
|
self.assertEqual(spy_client.last_complete_args[2], "agent")
|
|
|
|
async def test_openai_non_stream_does_not_forward_tool_config_when_disabled(self) -> None:
|
|
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
tool_choice={"type": "function", "function": {"name": "lookup"}},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
_SettingsPatch(tool_forward_enabled=False),
|
|
):
|
|
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
|
|
self.assertIn("tool_config", spy_client.last_complete_kwargs)
|
|
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
|
|
self.assertEqual(spy_client.last_complete_args[2], "agent")
|
|
|
|
|
|
async def test_openai_tooling_context_disables_session_reuse_cache(self) -> None:
|
|
fake_cache = _FakeSessionCache()
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-3"},
|
|
)
|
|
req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[
|
|
{"role": "user", "content": "turn-1"},
|
|
{"role": "user", "content": "turn-2"},
|
|
],
|
|
stream=False,
|
|
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
tool_choice={"type": "function", "function": {"name": "lookup"}},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "session_cache", fake_cache),
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
_SettingsPatch(tool_forward_enabled=True),
|
|
):
|
|
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
|
|
|
|
self.assertEqual(fake_cache.keys, [])
|
|
self.assertEqual(fake_cache.get_calls, [])
|
|
self.assertEqual(fake_cache.put_calls, [])
|
|
|
|
|
|
async def test_openai_session_reuse_lookup_key_separates_branches(self) -> None:
|
|
fake_cache = _FakeSessionCache()
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-branch"},
|
|
)
|
|
|
|
req_a = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[
|
|
{"role": "system", "content": "S"},
|
|
{"role": "user", "content": "U"},
|
|
{"role": "assistant", "content": "A1"},
|
|
{"role": "user", "content": "next"},
|
|
],
|
|
stream=False,
|
|
)
|
|
req_b = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[
|
|
{"role": "system", "content": "S"},
|
|
{"role": "user", "content": "U"},
|
|
{"role": "assistant", "content": "A2"},
|
|
{"role": "user", "content": "next"},
|
|
],
|
|
stream=False,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "session_cache", fake_cache),
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
|
|
):
|
|
await main.v1_chat_completions(req_a, _make_request("/v1/chat/completions"))
|
|
await main.v1_chat_completions(req_b, _make_request("/v1/chat/completions"))
|
|
|
|
self.assertGreaterEqual(len(fake_cache.get_calls), 4)
|
|
self.assertNotEqual(fake_cache.get_calls[0], fake_cache.get_calls[2])
|
|
self.assertEqual(fake_cache.get_calls[1], fake_cache.get_calls[3])
|
|
|
|
async def test_openai_and_anthropic_resolve_same_default_ask_mode_without_tooling(self) -> None:
|
|
openai_spy = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
|
|
anthropic_spy = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
|
|
|
|
openai_req = ChatCompletionsRequest(
|
|
model="org_auto",
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
)
|
|
anthropic_req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=128,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(openai_spy))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
|
|
):
|
|
await main.v1_chat_completions(openai_req, _make_request("/v1/chat/completions"))
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(anthropic_spy))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
|
|
):
|
|
await main.v1_messages(
|
|
anthropic_req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
self.assertEqual(openai_spy.last_complete_args[2], "chat")
|
|
self.assertEqual(anthropic_spy.last_complete_args[2], "chat")
|
|
|
|
async def test_anthropic_non_stream_with_tools_uses_agent_mode(self) -> None:
|
|
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=128,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
stream=False,
|
|
tools=[{"name": "write_file", "input_schema": {"type": "object", "properties": {}}}],
|
|
tool_choice={"type": "auto"},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
_SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"),
|
|
):
|
|
await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
self.assertIn("tool_config", spy_client.last_complete_kwargs)
|
|
cfg = spy_client.last_complete_kwargs["tool_config"]
|
|
self.assertEqual(cfg["provider"], "anthropic")
|
|
self.assertEqual(len(cfg["tools"]), 1)
|
|
self.assertEqual(spy_client.last_complete_args[2], "agent")
|
|
|
|
async def test_anthropic_tooling_context_disables_session_reuse_cache(self) -> None:
|
|
fake_cache = _FakeSessionCache()
|
|
fake_client = _FakeClient(
|
|
stream_events=[],
|
|
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-4"},
|
|
)
|
|
req = AnthropicMessagesRequest(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=128,
|
|
messages=[
|
|
{"role": "user", "content": "turn-1"},
|
|
{"role": "user", "content": "turn-2"},
|
|
],
|
|
stream=False,
|
|
tools=[{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}],
|
|
tool_choice={"type": "auto"},
|
|
)
|
|
|
|
with (
|
|
patch.object(main, "session_cache", fake_cache),
|
|
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
|
|
patch.object(main, "chat_guard", _FakeGuard()),
|
|
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
|
|
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
|
|
patch.object(main.settings, "api_keys", ["test-key"]),
|
|
):
|
|
await main.v1_messages(
|
|
req,
|
|
_make_request(
|
|
"/v1/messages",
|
|
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
|
|
),
|
|
)
|
|
|
|
self.assertEqual(fake_cache.keys, [])
|
|
self.assertEqual(fake_cache.get_calls, [])
|
|
self.assertEqual(fake_cache.put_calls, [])
|
|
async def test_responses_non_stream_maps_chat_payload_shape_and_input(self) -> None:
|
|
req = ResponsesRequest(
|
|
model="org_auto",
|
|
input="hello from responses",
|
|
stream=False,
|
|
)
|
|
chat_payload = {
|
|
"id": "chatcmpl-abc123",
|
|
"created": 123,
|
|
"model": "org_auto",
|
|
"choices": [
|
|
{
|
|
"index": 0,
|
|
"finish_reason": "stop",
|
|
"message": {"role": "assistant", "content": "done"},
|
|
}
|
|
],
|
|
"usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6},
|
|
}
|
|
|
|
mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload))
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
|
|
|
payload = json.loads(response.body)
|
|
self.assertEqual(payload["id"], "resp_abc123")
|
|
self.assertEqual(payload["object"], "response")
|
|
self.assertEqual(payload["status"], "completed")
|
|
self.assertEqual(payload["output_text"], "done")
|
|
self.assertEqual(payload["usage"], {"input_tokens": 4, "output_tokens": 2, "total_tokens": 6})
|
|
self.assertEqual(payload["output"][0]["type"], "message")
|
|
self.assertEqual(payload["output"][0]["content"][0]["type"], "output_text")
|
|
|
|
mock_chat.assert_awaited_once()
|
|
chat_req = mock_chat.await_args.args[0]
|
|
self.assertIsInstance(chat_req, ChatCompletionsRequest)
|
|
messages_dump = [m.model_dump() for m in chat_req.messages]
|
|
self.assertEqual(messages_dump, [{"role": "user", "content": "hello from responses", "name": None, "tool_call_id": None, "tool_calls": None}])
|
|
|
|
async def test_responses_forwards_input_tools_and_tool_choice_to_chat_request(self) -> None:
|
|
req = ResponsesRequest(
|
|
model="org_auto",
|
|
instructions="be concise",
|
|
input=[
|
|
{"role": "user", "content": [{"type": "text", "text": "first"}]},
|
|
{"type": "function_call_output", "call_id": "call_1", "output": {"ok": True}},
|
|
"follow up",
|
|
],
|
|
stream=False,
|
|
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
tool_choice={"type": "function", "function": {"name": "lookup"}},
|
|
)
|
|
|
|
mock_chat = AsyncMock(return_value=JSONResponse(content={"id": "chatcmpl-x", "created": 1, "model": "org_auto", "choices": [{"message": {"role": "assistant", "content": "ok"}}], "usage": {}}))
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
await main.v1_responses(req, _make_request("/v1/responses"))
|
|
|
|
mock_chat.assert_awaited_once()
|
|
chat_req = mock_chat.await_args.args[0]
|
|
self.assertIsInstance(chat_req, ChatCompletionsRequest)
|
|
self.assertEqual(chat_req.tools, req.tools)
|
|
self.assertEqual(chat_req.tool_choice, req.tool_choice)
|
|
messages_dump = [m.model_dump() for m in chat_req.messages]
|
|
self.assertEqual(messages_dump[0]["role"], "system")
|
|
self.assertEqual(messages_dump[0]["content"], "be concise")
|
|
self.assertEqual(messages_dump[1]["role"], "user")
|
|
self.assertEqual(messages_dump[1]["content"], "first")
|
|
self.assertEqual(messages_dump[2]["role"], "tool")
|
|
self.assertEqual(messages_dump[2]["tool_call_id"], "call_1")
|
|
self.assertEqual(messages_dump[2]["content"], '{"ok": true}')
|
|
self.assertEqual(messages_dump[3]["role"], "user")
|
|
self.assertEqual(messages_dump[3]["content"], "follow up")
|
|
|
|
async def test_responses_stream_bridges_text_tool_and_completed_events(self) -> None:
|
|
async def _chat_sse():
|
|
yield b'data: {"choices": [{"delta": {"content": "hello"}}]}\n\n'
|
|
yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\": \\"x\\"}"}}]}}]}\n\n'
|
|
yield b'data: {"usage": {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, "choices": [{"delta": {}}]}\n\n'
|
|
yield b"data: [DONE]\n\n"
|
|
|
|
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
|
|
mock_chat = AsyncMock(
|
|
return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream")
|
|
)
|
|
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"type": "response.created"', body)
|
|
self.assertIn('"type": "response.output_text.delta"', body)
|
|
self.assertIn('"delta": "hello"', body)
|
|
self.assertIn('"type": "response.function_call.delta"', body)
|
|
self.assertIn('"item_id": "call_1"', body)
|
|
self.assertIn('"name": "lookup"', body)
|
|
self.assertIn('"type": "response.completed"', body)
|
|
self.assertIn('"input_tokens": 3', body)
|
|
self.assertIn('"output_tokens": 2', body)
|
|
self.assertIn('data: [DONE]', body)
|
|
|
|
async def test_responses_stream_emits_completed_when_upstream_closes_without_done(self) -> None:
|
|
async def _chat_sse_without_done():
|
|
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
|
|
yield b'data: {"usage": {"prompt_tokens": 7, "completion_tokens": 1, "total_tokens": 8}, "choices": [{"delta": {}}]}\n\n'
|
|
|
|
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
|
|
mock_chat = AsyncMock(
|
|
return_value=StreamingResponse(_chat_sse_without_done(), media_type="text/event-stream")
|
|
)
|
|
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"type": "response.output_text.delta"', body)
|
|
self.assertIn('"delta": "partial"', body)
|
|
self.assertIn('"type": "response.completed"', body)
|
|
self.assertIn('"input_tokens": 7', body)
|
|
self.assertIn('"output_tokens": 1', body)
|
|
self.assertIn('data: [DONE]', body)
|
|
|
|
async def test_responses_stream_emits_completed_when_upstream_iterator_errors(self) -> None:
|
|
async def _chat_sse_error():
|
|
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
|
|
raise RuntimeError("boom")
|
|
|
|
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
|
|
mock_chat = AsyncMock(
|
|
return_value=StreamingResponse(_chat_sse_error(), media_type="text/event-stream")
|
|
)
|
|
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"type": "response.output_text.delta"', body)
|
|
self.assertIn('"delta": "partial"', body)
|
|
self.assertIn('"type": "response.completed"', body)
|
|
self.assertIn('data: [DONE]', body)
|
|
|
|
async def test_responses_stream_emits_completed_when_upstream_cancels(self) -> None:
|
|
async def _chat_sse_cancelled():
|
|
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
|
|
raise asyncio.CancelledError()
|
|
|
|
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
|
|
mock_chat = AsyncMock(
|
|
return_value=StreamingResponse(_chat_sse_cancelled(), media_type="text/event-stream")
|
|
)
|
|
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
response = await main.v1_responses(req, _make_request("/v1/responses"))
|
|
body = await _collect_stream(response)
|
|
|
|
self.assertIn('"type": "response.output_text.delta"', body)
|
|
self.assertIn('"delta": "partial"', body)
|
|
self.assertIn('"type": "response.completed"', body)
|
|
self.assertIn('data: [DONE]', body)
|
|
|
|
|
|
async def test_responses_non_stream_returns_502_on_invalid_upstream_json(self) -> None:
|
|
req = ResponsesRequest(model="org_auto", input="hi", stream=False)
|
|
mock_chat = AsyncMock(return_value=Response(content="not-json", media_type="text/plain"))
|
|
|
|
with patch.object(main, "v1_chat_completions", mock_chat):
|
|
with self.assertRaises(main.HTTPException) as cm:
|
|
await main.v1_responses(req, _make_request("/v1/responses"))
|
|
|
|
self.assertEqual(cm.exception.status_code, 502)
|
|
detail = cm.exception.detail
|
|
self.assertEqual(detail["error"]["type"], "upstream_error")
|
|
self.assertEqual(detail["error"]["message"], "invalid upstream response")
|
|
|
|
|
|
class SessionCacheToolFingerprintTests(unittest.TestCase):
|
|
def test_build_key_changes_with_tool_config(self) -> None:
|
|
from app.session_cache import SessionCache
|
|
|
|
cache = SessionCache(max_entries=8, ttl_sec=60)
|
|
messages = [
|
|
{"role": "system", "content": "sys"},
|
|
{"role": "user", "content": "hello"},
|
|
]
|
|
|
|
cfg_a = {
|
|
"provider": "openai",
|
|
"tools": [{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
|
|
"tool_choice": {"type": "function", "function": {"name": "lookup"}},
|
|
}
|
|
cfg_a_reordered = {
|
|
"tool_choice": {"function": {"name": "lookup"}, "type": "function"},
|
|
"tools": [{"function": {"parameters": {}, "name": "lookup"}, "type": "function"}],
|
|
"provider": "openai",
|
|
}
|
|
cfg_b = {
|
|
"provider": "openai",
|
|
"tools": [{"type": "function", "function": {"name": "lookup_v2", "parameters": {}}}],
|
|
"tool_choice": {"type": "function", "function": {"name": "lookup_v2"}},
|
|
}
|
|
|
|
key_no_tool = cache.build_key("api-key", messages)
|
|
key_a = cache.build_key("api-key", messages, tool_config=cfg_a)
|
|
key_a_reordered = cache.build_key("api-key", messages, tool_config=cfg_a_reordered)
|
|
key_b = cache.build_key("api-key", messages, tool_config=cfg_b)
|
|
|
|
self.assertNotEqual(key_no_tool, key_a)
|
|
self.assertEqual(key_a, key_a_reordered)
|
|
self.assertNotEqual(key_a, key_b)
|
|
|
|
|
|
def test_handle_server_message_drops_unroutable_tool_event_without_request_id(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
rpc = LspWsRpcClient("ws://127.0.0.1:1")
|
|
|
|
async def run() -> None:
|
|
rpc.create_stream("req-1")
|
|
await rpc._handle_server_message(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/invoke",
|
|
"params": {
|
|
"name": "lookup",
|
|
"parameters": {"q": "x"},
|
|
},
|
|
}
|
|
)
|
|
stream = rpc._chat_streams["req-1"]
|
|
self.assertEqual(stream["tool_order"], [])
|
|
self.assertEqual(stream["tool_states"], {})
|
|
self.assertTrue(stream["chunks"].empty())
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_handle_server_message_routes_by_tool_map_without_request_id(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
rpc = LspWsRpcClient("ws://127.0.0.1:1")
|
|
|
|
async def run() -> None:
|
|
rpc.create_stream("req-1")
|
|
|
|
await rpc._handle_server_message(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/invoke",
|
|
"params": {
|
|
"requestId": "req-1",
|
|
"toolCallId": "call-1",
|
|
"name": "lookup",
|
|
"parameters": {"q": "a"},
|
|
},
|
|
}
|
|
)
|
|
|
|
await rpc._handle_server_message(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/invokeResult",
|
|
"params": {
|
|
"toolCallId": "call-1",
|
|
"result": {"ok": True},
|
|
},
|
|
}
|
|
)
|
|
|
|
result = rpc.get_stream_result("req-1")
|
|
self.assertEqual(len(result["toolEvents"]), 1)
|
|
self.assertEqual(result["toolEvents"][0]["id"], "call-1")
|
|
self.assertEqual(result["toolEvents"][0]["input"], {"q": "a"})
|
|
self.assertEqual(result["toolEvents"][0]["result"], {"ok": True})
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_handle_server_message_dedupes_identical_repeated_tool_events(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
rpc = LspWsRpcClient("ws://127.0.0.1:1")
|
|
|
|
async def run() -> None:
|
|
rpc.create_stream("req-1")
|
|
msg = {
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/invoke",
|
|
"params": {
|
|
"requestId": "req-1",
|
|
"toolCallId": "call-dup",
|
|
"name": "lookup",
|
|
"parameters": {"q": "dup"},
|
|
},
|
|
}
|
|
await rpc._handle_server_message(msg)
|
|
await rpc._handle_server_message(msg)
|
|
|
|
stream = rpc._chat_streams["req-1"]
|
|
self.assertEqual(stream["tool_order"], ["call-dup"])
|
|
self.assertEqual(stream["chunks"].qsize(), 1)
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_extracts_tool_event_from_results_and_parameters(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
event = LspWsRpcClient._extract_tool_event(
|
|
{
|
|
"toolCallId": "call_sync_1",
|
|
"parameters": {"path": "README.md"},
|
|
"results": [
|
|
{
|
|
"toolCallId": "call_sync_1",
|
|
"name": "read_file",
|
|
"result": {"ok": True},
|
|
}
|
|
],
|
|
}
|
|
)
|
|
|
|
self.assertEqual(
|
|
event,
|
|
{
|
|
"id": "call_sync_1",
|
|
"name": "read_file",
|
|
"input": {"path": "README.md"},
|
|
"result": {"ok": True},
|
|
},
|
|
)
|
|
|
|
def test_extracts_tool_event_from_invoke_result_payload(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
event = LspWsRpcClient._extract_tool_event(
|
|
{
|
|
"toolCallId": "call_inv_1",
|
|
"name": "search_docs",
|
|
"parameters": {"query": "gateway"},
|
|
"result": {"hits": 3},
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
event,
|
|
{
|
|
"id": "call_inv_1",
|
|
"name": "search_docs",
|
|
"input": {"query": "gateway"},
|
|
"result": {"hits": 3},
|
|
},
|
|
)
|
|
|
|
|
|
def test_tool_sync_triggers_approve_and_invoke_result_requests(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
class _WsStub:
|
|
def __init__(self) -> None:
|
|
self.frames: list[bytes] = []
|
|
|
|
async def send(self, data: bytes) -> None:
|
|
self.frames.append(data)
|
|
|
|
def _decode(frame: bytes) -> dict:
|
|
body = frame.split(b"\r\n\r\n", 1)[1]
|
|
return json.loads(body.decode("utf-8"))
|
|
|
|
ws = _WsStub()
|
|
rpc = LspWsRpcClient(ws)
|
|
|
|
async def run() -> None:
|
|
rpc.create_stream("req-1")
|
|
await rpc._handle_server_message(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/call/sync",
|
|
"params": {
|
|
"sessionId": "sess-1",
|
|
"requestId": "req-1",
|
|
"toolCallId": "call-1",
|
|
"name": "run_in_terminal",
|
|
"parameters": {"command": "pwd"},
|
|
},
|
|
}
|
|
)
|
|
|
|
decoded = [_decode(frame) for frame in ws.frames]
|
|
methods = [item.get("method") for item in decoded]
|
|
self.assertIn("tool/call/approve", methods)
|
|
self.assertIn("tool/invokeResult", methods)
|
|
|
|
approve = next(item for item in decoded if item.get("method") == "tool/call/approve")
|
|
self.assertEqual(
|
|
approve["params"],
|
|
{
|
|
"type": "tool_call",
|
|
"sessionId": "sess-1",
|
|
"requestId": "req-1",
|
|
"toolCallId": "call-1",
|
|
"approval": True,
|
|
},
|
|
)
|
|
|
|
invoke_result = next(item for item in decoded if item.get("method") == "tool/invokeResult")
|
|
self.assertEqual(invoke_result["params"]["toolCallId"], "call-1")
|
|
self.assertEqual(invoke_result["params"]["name"], "run_in_terminal")
|
|
self.assertTrue(invoke_result["params"]["success"])
|
|
self.assertEqual(invoke_result["params"]["errorMessage"], "")
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(run())
|
|
|
|
def test_tool_sync_does_not_emit_roundtrip_without_request_id(self) -> None:
|
|
from app.lingma_client import LspWsRpcClient
|
|
|
|
class _WsStub:
|
|
def __init__(self) -> None:
|
|
self.frames: list[bytes] = []
|
|
|
|
async def send(self, data: bytes) -> None:
|
|
self.frames.append(data)
|
|
|
|
ws = _WsStub()
|
|
rpc = LspWsRpcClient(ws)
|
|
|
|
async def run() -> None:
|
|
rpc.create_stream("req-1")
|
|
await rpc._handle_server_message(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "tool/call/sync",
|
|
"params": {
|
|
"sessionId": "sess-1",
|
|
"toolCallId": "call-1",
|
|
"name": "run_in_terminal",
|
|
"parameters": {"command": "pwd"},
|
|
},
|
|
}
|
|
)
|
|
self.assertEqual(ws.frames, [])
|
|
|
|
import asyncio
|
|
|
|
asyncio.run(run())
|