from __future__ import annotations import json import sys import types import unittest import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, patch class _FakeSessionCache: def __init__(self) -> None: self.enabled = True self.keys: list[str] = [] self.get_calls: list[str] = [] self.put_calls: list[tuple[str, str, str]] = [] self.invalidate_calls: list[str] = [] def build_key( self, api_key: str, messages: list[dict], *, tool_config=None, branch_context=None, ) -> str: marker = "with_tool" if tool_config is not None else "no_tool" branch_marker = branch_context or "-" key = f"{api_key}:{len(messages)}:{marker}:branch={branch_marker}" self.keys.append(key) return key async def get(self, key: str): self.get_calls.append(key) return None async def put(self, key: str, session_id: str, instance_name: str = "") -> None: self.put_calls.append((key, session_id, instance_name)) async def invalidate(self, key: str) -> None: self.invalidate_calls.append(key) # app.main imports playwright via auto_login; tests don't exercise that path. # Inject a lightweight stub so unit tests run without installing playwright. _playwright = types.ModuleType("playwright") _playwright_async = types.ModuleType("playwright.async_api") class _StubPlaywrightTimeoutError(Exception): pass async def _stub_async_playwright(): raise RuntimeError("playwright is stubbed in unit tests") _playwright_async.TimeoutError = _StubPlaywrightTimeoutError _playwright_async.async_playwright = _stub_async_playwright sys.modules.setdefault("playwright", _playwright) sys.modules.setdefault("playwright.async_api", _playwright_async) from starlette.requests import Request from starlette.responses import JSONResponse, Response, StreamingResponse from app.anthropic_schema import AnthropicMessagesRequest from app.http.tool_emulation import EmulatedToolChoice, EmulatedToolDef, inject_tooling, parse_action_blocks from app.openai_schema import ChatCompletionsRequest, ResponsesRequest import app.main as main class _FakeTicket: def __init__(self) -> None: self.released = False def release(self) -> None: self.released = True class _FakeGuard: def __init__(self) -> None: self.in_flight = 0 async def try_acquire(self) -> _FakeTicket: return _FakeTicket() class _FakeClient: def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None: self._stream_events = stream_events self._complete_result = complete_result async def query_models(self) -> dict: return { "chat": [ { "key": "org_auto", "displayName": "Auto", } ] } async def chat_complete(self, *args, **kwargs) -> dict: return self._complete_result async def chat_stream(self, *args, **kwargs): out_meta = kwargs.get("out_meta") if isinstance(out_meta, dict): out_meta["session_id"] = "sess-stream" for event in self._stream_events: yield event class _FakeInstance: def __init__(self, client: _FakeClient) -> None: self.name = "inst-test" self.client = client self.in_flight = 0 class _FakePool: def __init__(self, inst: _FakeInstance) -> None: self._inst = inst def pick(self, affinity_key: str | None = None) -> _FakeInstance: return self._inst def _make_request(path: str, headers: dict[str, str] | None = None) -> Request: header_pairs = [] for k, v in (headers or {}).items(): header_pairs.append((k.lower().encode("latin-1"), v.encode("latin-1"))) scope = { "type": "http", "http_version": "1.1", "method": "POST", "scheme": "http", "path": path, "raw_path": path.encode("latin-1"), "query_string": b"", "headers": header_pairs, "client": ("testclient", 12345), "server": ("testserver", 80), "root_path": "", } return Request(scope) async def _collect_stream(response) -> str: chunks: list[str] = [] async for part in response.body_iterator: if isinstance(part, bytes): chunks.append(part.decode("utf-8")) else: chunks.append(str(part)) return "".join(chunks) class _SpyClient(_FakeClient): def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None: super().__init__(stream_events=stream_events, complete_result=complete_result) self.last_complete_args: tuple = () self.last_stream_args: tuple = () self.last_complete_kwargs: dict = {} self.last_stream_kwargs: dict = {} async def chat_complete(self, *args, **kwargs) -> dict: self.last_complete_args = tuple(args) self.last_complete_kwargs = dict(kwargs) return await super().chat_complete(*args, **kwargs) async def chat_stream(self, *args, **kwargs): self.last_stream_args = tuple(args) self.last_stream_kwargs = dict(kwargs) async for event in super().chat_stream(*args, **kwargs): yield event class _SettingsPatch: def __init__(self, **kwargs) -> None: self._kwargs = kwargs def __enter__(self): self._patchers = [ patch.object(main.settings, k, v) for k, v in self._kwargs.items() ] for p in self._patchers: p.start() return self def __exit__(self, exc_type, exc_val, exc_tb): for p in reversed(self._patchers): p.stop() return False class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase): async def test_openai_non_stream_bridges_tool_calls(self) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": "done", "toolEvents": [ { "id": "call_123", "name": "search_docs", "input": {"query": "gateway"}, "result": {"ok": True}, } ], "sessionId": "sess-1", "firstTokenLatencyMs": 12, "totalLatencyMs": 34, }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(message["content"], "done") self.assertIsInstance(message["tool_calls"], list) self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["tool_calls"][0]["function"]["name"], "search_docs") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"query": "gateway"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_plain_json( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```json\n{"arguments": {"query": "gateway"}}\n```', "toolEvents": [], "sessionId": "sess-fallback-openai", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"query": "gateway"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_tool_code( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```tool_code\nlookup(query="gateway")\n```', "toolEvents": [], "sessionId": "sess-fallback-tool-code-openai", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"query": "gateway"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_tool_code_positional_arg( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```tool_code\nlookup("gateway")\n```', "toolEvents": [], "sessionId": "sess-fallback-tool-code-openai-positional", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "type": "function", "function": { "name": "lookup", "parameters": { "type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"], }, }, } ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"query": "gateway"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_hash_tool_call_block( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '#Tool Call\n```fetch_weather\n{"city": "Hangzhou"}\n```\n', "toolEvents": [], "sessionId": "sess-fallback-hash-tool-call-openai", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], tool_choice={"type": "function", "function": {"name": "fetch_weather"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"city": "Hangzhou"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_hash_tool_call_block_without_tool_choice( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '#Tool Call\n```fetch_weather\n{"city": "Hangzhou"}\n```\n', "toolEvents": [], "sessionId": "sess-fallback-hash-tool-call-openai-no-choice", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"city": "Hangzhou"}, ) async def test_openai_non_stream_synthesizes_tool_call_from_json_action_block( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```json action\n{"tool":"fetch_weather","parameters":{"city":"Hangzhou"}}\n```', "toolEvents": [], "sessionId": "sess-action-block-openai", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"city": "Hangzhou"}, ) async def test_openai_stream_synthesizes_tool_call_from_tool_code( self, ) -> None: fake_client = _FakeClient( stream_events=[ {"type": "text", "text": "```tool_code\n"}, {"type": "text", "text": 'lookup(query="gateway")\n'}, {"type": "text", "text": "```"}, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) chunks = [ json.loads(line[6:]) for line in body.splitlines() if line.startswith("data: {") ] self.assertTrue( any( chunk.get("choices") and chunk["choices"][0]["delta"].get("tool_calls") for chunk in chunks ) ) self.assertIn('"tool_calls"', body) self.assertIn('"finish_reason": "tool_calls"', body) self.assertIn("data: [DONE]", body) async def test_openai_stream_synthesizes_tool_call_from_hash_tool_call_block_without_tool_choice( self, ) -> None: fake_client = _FakeClient( stream_events=[ {"type": "text", "text": "#Tool Call\n```fetch_weather\n"}, {"type": "text", "text": '{"city": "Hangzhou"}\n'}, {"type": "text", "text": "```\n"}, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) self.assertIn('"tool_calls"', body) self.assertIn('"fetch_weather"', body) self.assertIn('"finish_reason": "tool_calls"', body) async def test_openai_non_stream_synthesizes_tool_call_from_json_array(self) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```json\n[{"name": "lookup", "arguments": {"query": "gateway"}}]\n```', "toolEvents": [], "sessionId": "sess-fallback-openai-json-array", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) payload = json.loads(response.body) message = payload["choices"][0]["message"] self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls") self.assertEqual(message["content"], "") self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup") self.assertEqual( json.loads(message["tool_calls"][0]["function"]["arguments"]), {"query": "gateway"}, ) async def test_openai_stream_bridges_tool_and_text_events(self) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "id": "call_stream_1", "name": "read_file", "input": {"path": "README.md"}, }, }, {"type": "text", "text": "hello"}, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, stream_options={"include_usage": True}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) self.assertIn('"tool_calls"', body) self.assertEqual(body.count('"content": "hello"'), 1) self.assertIn('"finish_reason": "tool_calls"', body) self.assertIn('"usage"', body) self.assertIn("data: [DONE]", body) async def test_openai_stream_emits_text_delta_only_once_without_tools(self) -> None: fake_client = _FakeClient( stream_events=[ {"type": "text", "text": "你好"}, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) self.assertEqual(body.count('"content": "你好"'), 1) self.assertIn('"finish_reason": "stop"', body) self.assertIn("data: [DONE]", body) async def test_openai_stream_filters_tool_events_by_allowlist(self) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "id": "call_blocked", "name": "write_file", "input": {"path": "a.txt"}, }, }, { "type": "tool", "tool": { "id": "call_allowed", "name": "lookup", "input": {"query": "gateway"}, }, }, {"type": "text", "text": "hello"}, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}}, { "type": "function", "function": {"name": "write_file", "parameters": {}}, }, ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) self.assertIn('"name": "lookup"', body) self.assertNotIn('"name": "write_file"', body) self.assertIn('"content": "hello"', body) self.assertIn('"finish_reason": "tool_calls"', body) async def test_anthropic_non_stream_bridges_tool_blocks(self) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": "ok", "toolEvents": [ { "id": "toolu_1", "name": "lookup", "input": {"k": "v"}, "result": {"value": 1}, } ], "sessionId": "sess-2", }, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=False, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) types = [item["type"] for item in payload["content"]] self.assertEqual(types, ["text", "tool_use", "tool_result"]) self.assertEqual(payload["stop_reason"], "end_turn") self.assertEqual(payload["content"][1]["name"], "lookup") self.assertEqual(payload["content"][2]["tool_use_id"], "toolu_1") async def test_anthropic_non_stream_does_not_synthesize_tool_blocks_from_plain_json( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '{"input": {"k": "v"}, "result": {"value": 1}}', "toolEvents": [], "sessionId": "sess-fallback-anthropic", }, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) types = [item["type"] for item in payload["content"]] self.assertEqual(types, ["text"]) self.assertEqual(payload["stop_reason"], "end_turn") self.assertIn('"input"', payload["content"][0]["text"]) async def test_openai_stream_tool_call_indices_are_stable(self) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "id": "call_a", "name": "read_file", "input": {"path": "README.md"}, }, }, { "type": "tool", "tool": { "id": "call_b", "name": "search_docs", "input": {"query": "gateway"}, }, }, ], complete_result={}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) body = await _collect_stream(response) self.assertIn('"id": "call_a"', body) self.assertIn('"id": "call_b"', body) self.assertIn('"index": 0', body) self.assertIn('"index": 1', body) async def test_anthropic_non_stream_returns_tool_use_stop_reason_when_result_missing( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": "", "toolEvents": [ { "name": "lookup", "input": {"k": "v"}, } ], "sessionId": "sess-2", }, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=False, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) self.assertEqual(payload["stop_reason"], "tool_use") self.assertEqual(len(payload["content"]), 1) self.assertEqual(payload["content"][0]["type"], "tool_use") async def test_anthropic_stream_returns_tool_use_stop_reason_when_result_missing( self, ) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "name": "read", "input": {"file": "a.txt"}, }, } ], complete_result={}, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=True, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) body = await _collect_stream(response) self.assertIn('"type": "tool_use"', body) self.assertIn('"stop_reason": "tool_use"', body) async def test_anthropic_stream_does_not_fallback_to_synthetic_tool_blocks_for_forced_tool( self, ) -> None: fake_client = _FakeClient( stream_events=[ { "type": "text", "text": '```json\n{"input": {"k": "v"}, "result": {"value": 1}}\n```', } ], complete_result={}, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) body = await _collect_stream(response) self.assertNotIn('"type": "tool_use"', body) self.assertNotIn('"type": "tool_result"', body) self.assertIn('"type": "text_delta"', body) self.assertIn('"stop_reason": "end_turn"', body) async def test_anthropic_stream_bridges_tool_and_text_events(self) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "id": "toolu_stream_1", "name": "read", "input": {"file": "a.txt"}, "result": "done", }, }, {"type": "text", "text": "world"}, ], complete_result={}, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=True, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) body = await _collect_stream(response) self.assertIn("event: message_start", body) self.assertIn('"type": "tool_use"', body) self.assertIn('"type": "tool_result"', body) self.assertIn('"stop_reason": "end_turn"', body) self.assertIn('"type": "text_delta"', body) self.assertIn("event: message_stop", body) async def test_anthropic_stream_filters_tool_events_by_allowlist(self) -> None: fake_client = _FakeClient( stream_events=[ { "type": "tool", "tool": { "id": "toolu_blocked", "name": "write_file", "input": {"path": "a.txt"}, "result": "blocked", }, }, { "type": "tool", "tool": { "id": "toolu_allowed", "name": "lookup", "input": {"file": "a.txt"}, "result": "done", }, }, {"type": "text", "text": "world"}, ], complete_result={}, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=256, messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ { "name": "lookup", "input_schema": {"type": "object", "properties": {}}, }, { "name": "write_file", "input_schema": {"type": "object", "properties": {}}, }, ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) body = await _collect_stream(response) self.assertIn('"name": "lookup"', body) self.assertNotIn('"name": "write_file"', body) self.assertIn('"type": "tool_result"', body) self.assertIn('"stop_reason": "end_turn"', body) async def test_openai_non_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) self.assertIn("tool_config", spy_client.last_complete_kwargs) self.assertIsNone(spy_client.last_complete_kwargs["tool_config"]) self.assertEqual(spy_client.last_complete_args[2], "agent") async def test_openai_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None: spy_client = _SpyClient( stream_events=[{"type": "text", "text": "ok"}], complete_result={} ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True), ): response = await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) await _collect_stream(response) self.assertIn("tool_config", spy_client.last_stream_kwargs) self.assertIsNone(spy_client.last_stream_kwargs["tool_config"]) self.assertEqual(spy_client.last_stream_args[2], "agent") async def test_openai_non_stream_does_not_forward_tool_config_when_disabled( self, ) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=False), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) self.assertIn("tool_config", spy_client.last_complete_kwargs) self.assertIsNone(spy_client.last_complete_kwargs["tool_config"]) self.assertEqual(spy_client.last_complete_args[2], "agent") async def test_openai_non_stream_filters_tools_by_allowlist_before_emulation(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}}, { "type": "function", "function": {"name": "write_file", "parameters": {}}, }, ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) prompt = spy_client.last_complete_args[0] self.assertIn("lookup(", prompt) self.assertNotIn("write_file(", prompt) async def test_openai_non_stream_rejects_forced_tool_outside_allowlist( self, ) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "write_file"}}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): with self.assertRaises(main.HTTPException) as cm: await main.v1_chat_completions( req, _make_request("/v1/chat/completions") ) self.assertEqual(cm.exception.status_code, 400) self.assertEqual(cm.exception.detail["error"]["type"], "invalid_request_error") self.assertIn("write_file", cm.exception.detail["error"]["message"]) async def test_openai_tooling_context_disables_session_reuse_cache(self) -> None: fake_cache = _FakeSessionCache() fake_client = _FakeClient( stream_events=[], complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-3"}, ) req = ChatCompletionsRequest( model="org_auto", messages=[ {"role": "user", "content": "turn-1"}, {"role": "user", "content": "turn-2"}, ], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) with ( patch.object(main, "session_cache", fake_cache), patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(tool_forward_enabled=True), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) self.assertEqual(fake_cache.keys, []) self.assertEqual(fake_cache.get_calls, []) self.assertEqual(fake_cache.put_calls, []) async def test_openai_session_reuse_lookup_key_separates_branches(self) -> None: fake_cache = _FakeSessionCache() fake_client = _FakeClient( stream_events=[], complete_result={ "text": "ok", "toolEvents": [], "sessionId": "sess-branch", }, ) req_a = ChatCompletionsRequest( model="org_auto", messages=[ {"role": "system", "content": "S"}, {"role": "user", "content": "U"}, {"role": "assistant", "content": "A1"}, {"role": "user", "content": "next"}, ], stream=False, ) req_b = ChatCompletionsRequest( model="org_auto", messages=[ {"role": "system", "content": "S"}, {"role": "user", "content": "U"}, {"role": "assistant", "content": "A2"}, {"role": "user", "content": "next"}, ], stream=False, ) with ( patch.object(main, "session_cache", fake_cache), patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False), ): await main.v1_chat_completions(req_a, _make_request("/v1/chat/completions")) await main.v1_chat_completions(req_b, _make_request("/v1/chat/completions")) self.assertGreaterEqual(len(fake_cache.get_calls), 4) self.assertNotEqual(fake_cache.get_calls[0], fake_cache.get_calls[2]) self.assertEqual(fake_cache.get_calls[1], fake_cache.get_calls[3]) async def test_openai_and_anthropic_resolve_same_default_ask_mode_without_tooling( self, ) -> None: openai_spy = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) anthropic_spy = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) openai_req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "hi"}], stream=False, ) anthropic_req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=False, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(openai_spy))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), _SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False), ): await main.v1_chat_completions( openai_req, _make_request("/v1/chat/completions") ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(anthropic_spy))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False), ): await main.v1_messages( anthropic_req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) self.assertEqual(openai_spy.last_complete_args[2], "chat") self.assertEqual(anthropic_spy.last_complete_args[2], "chat") async def test_anthropic_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None: spy_client = _SpyClient( stream_events=[{"type": "text", "text": "ok"}], complete_result={} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=True, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) await _collect_stream(response) self.assertIn("tool_config", spy_client.last_stream_kwargs) self.assertIsNone(spy_client.last_stream_kwargs["tool_config"]) self.assertEqual(spy_client.last_stream_args[2], "agent") async def test_anthropic_non_stream_does_not_forward_tool_config_when_disabled( self, ) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=False, default_ask_mode="chat"), ): await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) self.assertIn("tool_config", spy_client.last_complete_kwargs) self.assertIsNone(spy_client.last_complete_kwargs["tool_config"]) self.assertEqual(spy_client.last_complete_args[2], "agent") async def test_anthropic_non_stream_with_tools_uses_agent_mode(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "name": "write_file", "input_schema": {"type": "object", "properties": {}}, } ], tool_choice={"type": "auto"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"), ): await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) self.assertIn("tool_config", spy_client.last_complete_kwargs) self.assertIsNone(spy_client.last_complete_kwargs["tool_config"]) self.assertEqual(spy_client.last_complete_args[2], "agent") async def test_anthropic_non_stream_filters_tools_by_allowlist_before_emulation(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ { "name": "lookup", "input_schema": {"type": "object", "properties": {}}, }, { "name": "write_file", "input_schema": {"type": "object", "properties": {}}, }, ], tool_choice={"type": "tool", "name": "lookup"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) prompt = spy_client.last_complete_args[0] self.assertIn("lookup(", prompt) self.assertNotIn("write_file(", prompt) async def test_anthropic_non_stream_rejects_forced_tool_outside_allowlist( self, ) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[{"role": "user", "content": "hi"}], stream=False, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "tool", "name": "write_file"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), _SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) self.assertEqual(response.status_code, 400) payload = json.loads(response.body) self.assertEqual(payload["type"], "error") self.assertEqual(payload["error"]["type"], "invalid_request_error") self.assertIn("write_file", payload["error"]["message"]) async def test_anthropic_tooling_context_disables_session_reuse_cache(self) -> None: fake_cache = _FakeSessionCache() fake_client = _FakeClient( stream_events=[], complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-4"}, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[ {"role": "user", "content": "turn-1"}, {"role": "user", "content": "turn-2"}, ], stream=False, tools=[ {"name": "lookup", "input_schema": {"type": "object", "properties": {}}} ], tool_choice={"type": "auto"}, ) with ( patch.object(main, "session_cache", fake_cache), patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) self.assertEqual(fake_cache.keys, []) self.assertEqual(fake_cache.get_calls, []) self.assertEqual(fake_cache.put_calls, []) async def test_anthropic_count_tokens_returns_input_tokens(self) -> None: req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=64, messages=[{"role": "user", "content": "count me"}], ) with patch.object(main.settings, "api_keys", ["test-key"]): response = await main.v1_messages_count_tokens( req, _make_request( "/v1/messages/count_tokens", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) self.assertEqual(response.status_code, 200) self.assertEqual( payload, { "input_tokens": main.estimate_tokens( main._messages_to_prompt(main.anthropic_to_internal_messages(req)) ) }, ) async def test_anthropic_count_tokens_requires_authentication(self) -> None: req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=64, messages=[{"role": "user", "content": "count me"}], ) with patch.object(main.settings, "api_keys", ["test-key"]): response = await main.v1_messages_count_tokens( req, _make_request( "/v1/messages/count_tokens", headers={"anthropic-version": "2023-06-01"}, ), ) payload = json.loads(response.body) self.assertEqual(response.status_code, 401) self.assertEqual(payload["type"], "error") self.assertEqual(payload["error"]["type"], "authentication_error") async def test_anthropic_messages_requires_authentication(self) -> None: req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=64, messages=[{"role": "user", "content": "hi"}], stream=False, ) with patch.object(main.settings, "api_keys", ["test-key"]): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={"anthropic-version": "2023-06-01"}, ), ) payload = json.loads(response.body) self.assertEqual(response.status_code, 401) self.assertEqual(payload["type"], "error") self.assertEqual(payload["error"]["type"], "authentication_error") async def test_anthropic_messages_backpressure_returns_overloaded_error( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={"text": "ok", "toolEvents": []} ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=64, messages=[{"role": "user", "content": "hi"}], stream=False, ) fake_guard = types.SimpleNamespace( in_flight=0, try_acquire=AsyncMock(side_effect=main.BackpressureRejected(2.4)), ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", fake_guard), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) self.assertEqual(response.status_code, 429) self.assertEqual(response.headers["Retry-After"], "2") self.assertEqual(payload["type"], "error") self.assertEqual(payload["error"]["type"], "overloaded_error") self.assertIn("retry later", payload["error"]["message"]) async def test_responses_non_stream_maps_chat_payload_shape_and_input(self) -> None: req = ResponsesRequest( model="org_auto", input="hello from responses", stream=False, ) chat_payload = { "id": "chatcmpl-abc123", "created": 123, "model": "org_auto", "choices": [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": "done"}, } ], "usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6}, } mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload)) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) payload = json.loads(response.body) self.assertEqual(payload["id"], "resp_abc123") self.assertEqual(payload["object"], "response") self.assertEqual(payload["status"], "completed") self.assertEqual(payload["output_text"], "done") self.assertEqual( payload["usage"], {"input_tokens": 4, "output_tokens": 2, "total_tokens": 6} ) self.assertEqual(payload["output"][0]["type"], "message") self.assertEqual(payload["output"][0]["content"][0]["type"], "output_text") mock_chat.assert_awaited_once() chat_req = mock_chat.await_args.args[0] self.assertIsInstance(chat_req, ChatCompletionsRequest) messages_dump = [m.model_dump() for m in chat_req.messages] self.assertEqual( messages_dump, [ { "role": "user", "content": "hello from responses", "name": None, "tool_call_id": None, "tool_calls": None, } ], ) async def test_responses_non_stream_maps_chat_tool_calls_to_function_call_output( self, ) -> None: req = ResponsesRequest( model="org_auto", input="tool please", stream=False, ) chat_payload = { "id": "chatcmpl-tools1", "created": 234, "model": "org_auto", "choices": [ { "index": 0, "finish_reason": "tool_calls", "message": { "role": "assistant", "content": "", "tool_calls": [ { "id": "call_1", "type": "function", "function": { "name": "lookup", "arguments": '{"q":"gateway"}', }, } ], }, } ], "usage": {"prompt_tokens": 8, "completion_tokens": 3, "total_tokens": 11}, } mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload)) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) payload = json.loads(response.body) self.assertEqual(payload["status"], "completed") self.assertEqual(payload["output_text"], "") self.assertEqual( payload["usage"], {"input_tokens": 8, "output_tokens": 3, "total_tokens": 11}, ) self.assertEqual(len(payload["output"]), 1) self.assertEqual(payload["output"][0]["type"], "function_call") self.assertEqual(payload["output"][0]["call_id"], "call_1") self.assertEqual(payload["output"][0]["id"], "call_1") self.assertEqual(payload["output"][0]["name"], "lookup") self.assertEqual(payload["output"][0]["arguments"], '{"q":"gateway"}') async def test_responses_forwards_input_tools_and_tool_choice_to_chat_request( self, ) -> None: req = ResponsesRequest( model="org_auto", instructions="be concise", input=[ {"role": "user", "content": [{"type": "text", "text": "first"}]}, { "type": "function_call_output", "call_id": "call_1", "output": {"ok": True}, }, "follow up", ], stream=False, tools=[ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], tool_choice={"type": "function", "function": {"name": "lookup"}}, ) mock_chat = AsyncMock( return_value=JSONResponse( content={ "id": "chatcmpl-x", "created": 1, "model": "org_auto", "choices": [{"message": {"role": "assistant", "content": "ok"}}], "usage": {}, } ) ) with patch.object(main, "v1_chat_completions", mock_chat): await main.v1_responses(req, _make_request("/v1/responses")) mock_chat.assert_awaited_once() chat_req = mock_chat.await_args.args[0] self.assertIsInstance(chat_req, ChatCompletionsRequest) self.assertEqual(chat_req.tools, req.tools) self.assertEqual(chat_req.tool_choice, req.tool_choice) messages_dump = [m.model_dump() for m in chat_req.messages] self.assertEqual(messages_dump[0]["role"], "system") self.assertEqual(messages_dump[0]["content"], "be concise") self.assertEqual(messages_dump[1]["role"], "user") self.assertEqual(messages_dump[1]["content"], "first") self.assertEqual(messages_dump[2]["role"], "tool") self.assertEqual(messages_dump[2]["tool_call_id"], "call_1") self.assertEqual(messages_dump[2]["content"], '{"ok": true}') self.assertEqual(messages_dump[3]["role"], "user") self.assertEqual(messages_dump[3]["content"], "follow up") async def test_openai_tool_result_is_emulated_into_followup_prompt(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={ "text": "done", "toolEvents": [], "sessionId": "sess-emulated-tool-result", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[ {"role": "assistant", "content": None, "tool_calls": [{ "id": "call_1", "type": "function", "function": {"name": "fetch_weather", "arguments": '{"city":"Hangzhou"}'}, }]}, {"role": "tool", "tool_call_id": "call_1", "content": '{"temperature":"22C"}'}, {"role": "user", "content": "continue"}, ], stream=False, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) prompt = spy_client.last_complete_args[0] self.assertIn("Tool result for call_1:", prompt) self.assertIn('{"temperature":"22C"}', prompt) self.assertIn("Assistant:", prompt) async def test_openai_assistant_tool_calls_are_projected_into_emulation_prompt(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={ "text": "done", "toolEvents": [], "sessionId": "sess-emulated-tool-history", }, ) req = ChatCompletionsRequest( model="org_auto", messages=[ { "role": "assistant", "content": "I will check that", "tool_calls": [ { "id": "call_1", "type": "function", "function": { "name": "fetch_weather", "arguments": '{"city":"Hangzhou"}', }, } ], }, {"role": "user", "content": "continue"}, ], stream=False, tools=[ { "type": "function", "function": { "name": "fetch_weather", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) prompt = spy_client.last_complete_args[0] self.assertIn("I will check that", prompt) self.assertIn('"tool": "fetch_weather"', prompt) self.assertIn('"city": "Hangzhou"', prompt) async def test_openai_emulation_prompt_includes_proxy_tool_guidance(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={"text": "done", "toolEvents": [], "sessionId": "sess-guidance"}, ) req = ChatCompletionsRequest( model="org_auto", messages=[{"role": "user", "content": "inspect README"}], stream=False, tools=[ { "type": "function", "function": { "name": "read_file", "description": "Read a file", "parameters": { "type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "bash", "parameters": { "type": "object", "properties": {"command": {"type": "string"}}, }, }, }, ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), ): await main.v1_chat_completions(req, _make_request("/v1/chat/completions")) prompt = spy_client.last_complete_args[0] self.assertIn("DIRECT tool access inside an IDE", prompt) self.assertIn("Tool routing guide:", prompt) self.assertIn("Read a specific local file or code path: use read_file.", prompt) self.assertIn("Core tool syntax examples", prompt) self.assertIn("Coding and file-work discipline:", prompt) self.assertIn("NEVER say that tools are unavailable", prompt) async def test_anthropic_tool_history_is_projected_into_emulation_prompt(self) -> None: spy_client = _SpyClient( stream_events=[], complete_result={ "text": "done", "toolEvents": [], "sessionId": "sess-anthropic-history", }, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=128, messages=[ { "role": "assistant", "content": [ {"type": "text", "text": "I will check"}, { "type": "tool_use", "id": "toolu_1", "name": "fetch_weather", "input": {"city": "Hangzhou"}, }, ], }, { "role": "user", "content": [ { "type": "tool_result", "tool_use_id": "toolu_1", "content": '{"temperature":"22C"}', } ], }, {"role": "user", "content": "continue"}, ], stream=False, tools=[ { "name": "fetch_weather", "input_schema": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, } ], ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) prompt = spy_client.last_complete_args[0] self.assertIn("I will check", prompt) self.assertIn('"tool": "fetch_weather"', prompt) self.assertIn('"city": "Hangzhou"', prompt) self.assertIn("Tool result:", prompt) self.assertIn('{"temperature":"22C"}', prompt) async def test_anthropic_non_stream_synthesizes_tool_use_from_json_action_block( self, ) -> None: fake_client = _FakeClient( stream_events=[], complete_result={ "text": '```json action\n{"tool":"fetch_weather","parameters":{"city":"Hangzhou"}}\n```', "toolEvents": [], "sessionId": "sess-anthropic-action-block", }, ) req = AnthropicMessagesRequest( model="claude-3-5-sonnet-20241022", max_tokens=64, messages=[{"role": "user", "content": "weather"}], stream=False, tools=[ { "name": "fetch_weather", "description": "Get weather for a city", "input_schema": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"], }, } ], tool_choice={"type": "tool", "name": "fetch_weather"}, ) with ( patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))), patch.object(main, "chat_guard", _FakeGuard()), patch.object( main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"}) ), patch.object( main.stats_collector, "record_chat", AsyncMock(return_value=None) ), patch.object(main.settings, "api_keys", ["test-key"]), ): response = await main.v1_messages( req, _make_request( "/v1/messages", headers={ "x-api-key": "test-key", "anthropic-version": "2023-06-01", }, ), ) payload = json.loads(response.body) tool_blocks = [item for item in payload["content"] if item["type"] == "tool_use"] self.assertEqual(payload["stop_reason"], "tool_use") self.assertEqual(tool_blocks[0]["name"], "fetch_weather") self.assertEqual(tool_blocks[0]["input"], {"city": "Hangzhou"}) async def test_responses_stream_bridges_text_tool_and_completed_events( self, ) -> None: async def _chat_sse(): yield b'data: {"choices": [{"delta": {"content": "hello"}}]}\n\n' yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\": \\"x\\"}"}}]}}]}\n\n' yield b'data: {"usage": {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, "choices": [{"delta": {}}]}\n\n' yield b"data: [DONE]\n\n" req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream") ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertIn('"type": "response.output_item.added"', body) self.assertIn('"type": "response.output_text.delta"', body) self.assertIn('"delta": "hello"', body) self.assertIn('"type": "response.function_call_arguments.delta"', body) self.assertIn('"item_id": "call_1"', body) self.assertIn('"output_index": 1', body) self.assertIn('"delta": "{\\"q\\": \\"x\\"}"', body) self.assertIn('"type": "response.function_call_arguments.done"', body) self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body) self.assertIn('"type": "response.output_item.done"', body) self.assertIn('"type": "function_call"', body) self.assertIn('"name": "lookup"', body) self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body) self.assertIn('"type": "response.completed"', body) self.assertIn('"input_tokens": 3', body) self.assertIn('"output_tokens": 2', body) self.assertIn("data: [DONE]", body) async def test_responses_stream_accumulates_fragmented_tool_arguments(self) -> None: async def _chat_sse(): yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\":"}}]}}]}\n\n' yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": " \\"x\\"}"}}]}}]}\n\n' yield b"data: [DONE]\n\n" req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream") ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertIn('"type": "response.function_call_arguments.delta"', body) self.assertIn('"delta": "{\\"q\\":"', body) self.assertIn('"delta": " \\"x\\"}"', body) self.assertIn('"type": "response.function_call_arguments.done"', body) self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body) self.assertIn('"type": "response.output_item.done"', body) self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body) self.assertIn("data: [DONE]", body) async def test_responses_stream_accumulates_fragmented_tool_arguments_without_repeated_id_or_name( self, ) -> None: async def _chat_sse(): yield b'data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\":"}}]}}]}\n\n' yield b'data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": " \\"x\\"}"}}]}}]}\n\n' yield b"data: [DONE]\n\n" req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream") ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertEqual(body.count('"item_id": "call_1"'), 3) self.assertIn('"name": "lookup"', body) self.assertIn('"delta": "{\\"q\\":"', body) self.assertIn('"delta": " \\"x\\"}"', body) self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body) self.assertIn("data: [DONE]", body) async def test_responses_stream_emits_completed_when_upstream_closes_without_done( self, ) -> None: async def _chat_sse_without_done(): yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n' yield b'data: {"usage": {"prompt_tokens": 7, "completion_tokens": 1, "total_tokens": 8}, "choices": [{"delta": {}}]}\n\n' req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse( _chat_sse_without_done(), media_type="text/event-stream" ) ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertIn('"type": "response.output_text.delta"', body) self.assertIn('"delta": "partial"', body) self.assertIn('"type": "response.completed"', body) self.assertIn('"input_tokens": 7', body) self.assertIn('"output_tokens": 1', body) self.assertIn("data: [DONE]", body) async def test_responses_stream_emits_completed_when_upstream_iterator_errors( self, ) -> None: async def _chat_sse_error(): yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n' raise RuntimeError("boom") req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse( _chat_sse_error(), media_type="text/event-stream" ) ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertIn('"type": "response.output_text.delta"', body) self.assertIn('"delta": "partial"', body) self.assertIn('"type": "response.completed"', body) self.assertIn("data: [DONE]", body) async def test_responses_stream_emits_completed_when_upstream_cancels(self) -> None: async def _chat_sse_cancelled(): yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n' raise asyncio.CancelledError() req = ResponsesRequest(model="org_auto", input="hi", stream=True) mock_chat = AsyncMock( return_value=StreamingResponse( _chat_sse_cancelled(), media_type="text/event-stream" ) ) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/v1/responses")) body = await _collect_stream(response) self.assertIn('"type": "response.output_text.delta"', body) self.assertIn('"delta": "partial"', body) self.assertIn('"type": "response.completed"', body) self.assertIn("data: [DONE]", body) async def test_responses_alias_matches_v1_responses_behavior(self) -> None: req = ResponsesRequest(model="org_auto", input="hello", stream=False) chat_payload = { "id": "chatcmpl-alias1", "created": 123, "model": "org_auto", "choices": [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": "done"}, } ], "usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6}, } mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload)) with patch.object(main, "v1_chat_completions", mock_chat): response = await main.v1_responses(req, _make_request("/responses")) payload = json.loads(response.body) self.assertEqual(payload["id"], "resp_alias1") self.assertEqual(payload["status"], "completed") mock_chat.assert_awaited_once() req = ResponsesRequest(model="org_auto", input="hi", stream=False) mock_chat = AsyncMock( return_value=Response(content="not-json", media_type="text/plain") ) with patch.object(main, "v1_chat_completions", mock_chat): with self.assertRaises(main.HTTPException) as cm: await main.v1_responses(req, _make_request("/v1/responses")) self.assertEqual(cm.exception.status_code, 502) detail = cm.exception.detail self.assertEqual(detail["error"]["type"], "upstream_error") self.assertEqual(detail["error"]["message"], "invalid upstream response") class CapabilitiesEndpointTests(unittest.IsolatedAsyncioTestCase): async def test_capabilities_payload_shape(self) -> None: with ( patch.object(main.settings, "tool_forward_enabled", True), patch.object(main.settings, "tool_allowlist", ["lookup"]), patch.object(main.settings, "session_reuse_enabled", True), patch.object(main.settings, "session_cache_max_entries", 123), patch.object(main.settings, "session_cache_ttl_sec", 45.0), patch.object(main.settings, "instance_count", 2), patch.object(main.settings, "default_model", "org_auto"), patch.object(main.settings, "default_ask_mode", "chat"), patch.object(main.settings, "api_keys", ["test-key"]), patch.object(main.settings, "admin_token", "adm"), patch.object(main.settings, "metrics_public", False), ): response = await main.capabilities() self.assertEqual(response.status_code, 200) payload = json.loads(response.body) self.assertEqual(payload["service"], "lingma-openai-gateway") self.assertIn("protocols", payload) self.assertIn("features", payload) self.assertTrue(payload["protocols"]["openai"]["chat_completions"]) self.assertTrue(payload["protocols"]["anthropic"]["messages"]) self.assertTrue(payload["protocols"]["openai"]["request_tools_forwarded"]) self.assertEqual(payload["features"]["tooling"]["allowlist"], ["lookup"]) self.assertEqual(payload["features"]["pool"]["configured_instance_count"], 2) self.assertTrue(payload["features"]["auth"]["v1_requires_auth"]) async def test_v1_capabilities_auth_guard_requires_authentication(self) -> None: with patch.object(main.settings, "api_keys", ["test-key"]): with self.assertRaises(main.AnthropicAuthError) as ctx: main.anthropic_auth_guard( _make_request( "/v1/capabilities", headers={"anthropic-version": "2023-06-01"}, ) ) self.assertEqual(ctx.exception.status_code, 401) async def test_v1_capabilities_returns_payload_with_auth(self) -> None: with ( patch.object(main.settings, "api_keys", ["test-key"]), patch.object(main.settings, "tool_forward_enabled", False), ): main.anthropic_auth_guard( _make_request( "/v1/capabilities", headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"}, ) ) response = await main.v1_capabilities() self.assertEqual(response.status_code, 200) payload = json.loads(response.body) self.assertFalse(payload["protocols"]["openai"]["request_tools_forwarded"]) class AdminIntrospectionEndpointTests(unittest.IsolatedAsyncioTestCase): async def test_internal_effective_config_requires_admin_token(self) -> None: with ( patch.object(main.settings, "api_keys", ["api-key"]), patch.object(main.settings, "admin_token", "admin-secret"), ): with self.assertRaises(main.HTTPException) as ctx: main.admin_auth_guard( _make_request( "/internal/effective-config", headers={"authorization": "Bearer wrong-token"}, ) ) self.assertEqual(ctx.exception.status_code, 401) async def test_internal_effective_config_redacts_secrets(self) -> None: with ( patch.object(main.settings, "api_keys", ["api-key-1", "api-key-2"]), patch.object(main.settings, "admin_token", "admin-secret"), patch.object(main.settings, "metrics_token", "metrics-secret"), patch.object(main.settings, "default_model", "org_auto"), patch.object(main.settings, "tool_forward_enabled", True), patch.object(main.settings, "session_reuse_enabled", True), patch.object(main.settings, "metrics_public", False), patch.object(main.settings, "auto_login_enabled", True), patch.object( main.settings, "accounts", [ SimpleNamespace( username="user-a", password="pass-a", session_bundle_b64="bundle-a", session_bundle_file="/secrets/bundle-a.txt", ) ], ), ): main.admin_auth_guard( _make_request( "/internal/effective-config", headers={"authorization": "Bearer admin-secret"}, ) ) response = await main.internal_effective_config() self.assertEqual(response.status_code, 200) payload = json.loads(response.body) settings_payload = payload["settings"] self.assertEqual(settings_payload["api_keys"], ["***", "***"]) self.assertEqual(settings_payload["admin_token"], "***") self.assertEqual(settings_payload["metrics_token"], "***") self.assertEqual(settings_payload["accounts"][0]["password"], "***") self.assertEqual(settings_payload["accounts"][0]["session_bundle_b64"], "***") self.assertEqual(settings_payload["accounts"][0]["username"], "user-a") self.assertEqual( settings_payload["accounts"][0]["session_bundle_file"], "/secrets/bundle-a.txt", ) self.assertTrue(payload["feature_flags"]["tool_forward_enabled"]) self.assertTrue(payload["feature_flags"]["session_reuse_enabled"]) async def test_internal_debug_requests_redacts_sensitive_fields(self) -> None: main._DEBUG_REQUEST_LOG.clear() main._record_debug_request( "openai", "/v1/chat/completions", { "api_key": "secret-key", "session_bundle": "bundle-value", "image_url": "data:image/png;base64,abcd", "tool_calls": [ { "function": { "arguments": "x" * 3001, } } ], }, _make_request("/v1/chat/completions", headers={"x-request-id": "req-123"}), ) response = await main.internal_debug_requests(limit=10) self.assertEqual(response.status_code, 200) payload = json.loads(response.body) self.assertEqual(payload["count"], 1) item = payload["items"][0] self.assertEqual(item["request_id"], "req-123") self.assertEqual(item["body"]["api_key"], "***") self.assertEqual(item["body"]["session_bundle"], "***") self.assertEqual(item["body"]["image_url"], "[redacted-data-url]") self.assertTrue(item["body"]["tool_calls"][0]["function"]["arguments"].endswith("... [truncated]")) async def test_internal_debug_requests_requires_admin_token(self) -> None: with ( patch.object(main.settings, "api_keys", ["api-key"]), patch.object(main.settings, "admin_token", "admin-secret"), ): with self.assertRaises(main.HTTPException) as ctx: main.admin_auth_guard( _make_request( "/internal/debug/requests", headers={"authorization": "Bearer wrong-token"}, ) ) self.assertEqual(ctx.exception.status_code, 401) class ToolEmulationPromptTests(unittest.TestCase): def test_inject_tooling_adds_routing_hints_and_examples(self) -> None: tools = [ EmulatedToolDef( name="read_file", description="Read a file", input_schema={"type": "object", "properties": {"path": {"type": "string"}}}, ), EmulatedToolDef( name="bash", description="Run shell commands", input_schema={"type": "object", "properties": {"command": {"type": "string"}}}, ), ] injected = inject_tooling("system prompt", tools, EmulatedToolChoice(mode="auto")) self.assertIn("Tool routing guide:", injected) self.assertIn("use read_file", injected) self.assertIn("use bash", injected) self.assertIn("Core tool syntax examples.", injected) self.assertIn("Example valid action block", injected) self.assertIn("tool_choice=auto means you must decide whether the user request needs a tool", injected) def test_inject_tooling_does_not_modify_plain_system_when_no_tools(self) -> None: injected = inject_tooling("system prompt", [], EmulatedToolChoice(mode="auto")) self.assertEqual(injected, "system prompt") def test_parse_action_blocks_limits_default_to_five_calls(self) -> None: tools = [ EmulatedToolDef( name="lookup", description="Lookup data", input_schema={"type": "object", "properties": {"q": {"type": "string"}}}, ) ] text = "\n".join( f"```json action\n{{\"tool\":\"lookup\",\"parameters\":{{\"q\":\"item-{i}\"}}}}\n```" for i in range(6) ) calls, remaining = parse_action_blocks(text, tools) self.assertEqual(len(calls), 5) self.assertEqual([call.arguments["q"] for call in calls], [f"item-{i}" for i in range(5)]) self.assertIn('"q":"item-5"', remaining) class SessionCacheToolFingerprintTests(unittest.TestCase): def test_build_key_changes_with_tool_config(self) -> None: from app.session_cache import SessionCache cache = SessionCache(max_entries=8, ttl_sec=60) messages = [ {"role": "system", "content": "sys"}, {"role": "user", "content": "hello"}, ] cfg_a = { "provider": "openai", "tools": [ {"type": "function", "function": {"name": "lookup", "parameters": {}}} ], "tool_choice": {"type": "function", "function": {"name": "lookup"}}, } cfg_a_reordered = { "tool_choice": {"function": {"name": "lookup"}, "type": "function"}, "tools": [ {"function": {"parameters": {}, "name": "lookup"}, "type": "function"} ], "provider": "openai", } cfg_b = { "provider": "openai", "tools": [ { "type": "function", "function": {"name": "lookup_v2", "parameters": {}}, } ], "tool_choice": {"type": "function", "function": {"name": "lookup_v2"}}, } key_no_tool = cache.build_key("api-key", messages) key_a = cache.build_key("api-key", messages, tool_config=cfg_a) key_a_reordered = cache.build_key( "api-key", messages, tool_config=cfg_a_reordered ) key_b = cache.build_key("api-key", messages, tool_config=cfg_b) self.assertNotEqual(key_no_tool, key_a) self.assertEqual(key_a, key_a_reordered) self.assertNotEqual(key_a, key_b) def test_handle_server_message_drops_unroutable_tool_event_without_request_id( self, ) -> None: from app.lingma_client import LspWsRpcClient rpc = LspWsRpcClient("ws://127.0.0.1:1") async def run() -> None: rpc.create_stream("req-1") await rpc._handle_server_message( { "jsonrpc": "2.0", "method": "tool/invoke", "params": { "name": "lookup", "parameters": {"q": "x"}, }, } ) stream = rpc._chat_streams["req-1"] self.assertEqual(stream["tool_order"], []) self.assertEqual(stream["tool_states"], {}) self.assertTrue(stream["chunks"].empty()) import asyncio asyncio.run(run()) def test_handle_server_message_routes_by_tool_map_without_request_id(self) -> None: from app.lingma_client import LspWsRpcClient rpc = LspWsRpcClient("ws://127.0.0.1:1") async def run() -> None: rpc.create_stream("req-1") await rpc._handle_server_message( { "jsonrpc": "2.0", "method": "tool/invoke", "params": { "requestId": "req-1", "toolCallId": "call-1", "name": "lookup", "parameters": {"q": "a"}, }, } ) await rpc._handle_server_message( { "jsonrpc": "2.0", "method": "tool/invokeResult", "params": { "toolCallId": "call-1", "result": {"ok": True}, }, } ) result = rpc.get_stream_result("req-1") self.assertEqual(len(result["toolEvents"]), 1) self.assertEqual(result["toolEvents"][0]["id"], "call-1") self.assertEqual(result["toolEvents"][0]["input"], {"q": "a"}) self.assertEqual(result["toolEvents"][0]["result"], {"ok": True}) import asyncio asyncio.run(run()) def test_handle_server_message_dedupes_identical_repeated_tool_events(self) -> None: from app.lingma_client import LspWsRpcClient rpc = LspWsRpcClient("ws://127.0.0.1:1") async def run() -> None: rpc.create_stream("req-1") msg = { "jsonrpc": "2.0", "method": "tool/invoke", "params": { "requestId": "req-1", "toolCallId": "call-dup", "name": "lookup", "parameters": {"q": "dup"}, }, } await rpc._handle_server_message(msg) await rpc._handle_server_message(msg) stream = rpc._chat_streams["req-1"] self.assertEqual(stream["tool_order"], ["call-dup"]) self.assertEqual(stream["chunks"].qsize(), 1) import asyncio asyncio.run(run()) def test_extracts_tool_event_from_results_and_parameters(self) -> None: from app.lingma_client import LspWsRpcClient event = LspWsRpcClient._extract_tool_event( { "toolCallId": "call_sync_1", "parameters": {"path": "README.md"}, "results": [ { "toolCallId": "call_sync_1", "name": "read_file", "result": {"ok": True}, } ], } ) self.assertEqual( event, { "id": "call_sync_1", "name": "read_file", "input": {"path": "README.md"}, "result": {"ok": True}, }, ) def test_extracts_tool_event_from_invoke_result_payload(self) -> None: from app.lingma_client import LspWsRpcClient event = LspWsRpcClient._extract_tool_event( { "toolCallId": "call_inv_1", "name": "search_docs", "parameters": {"query": "gateway"}, "result": {"hits": 3}, } ) self.assertEqual( event, { "id": "call_inv_1", "name": "search_docs", "input": {"query": "gateway"}, "result": {"hits": 3}, }, ) def test_tool_sync_triggers_approve_and_invoke_result_requests(self) -> None: from app.lingma_client import LspWsRpcClient class _WsStub: def __init__(self) -> None: self.frames: list[bytes] = [] async def send(self, data: bytes) -> None: self.frames.append(data) def _decode(frame: bytes) -> dict: body = frame.split(b"\r\n\r\n", 1)[1] return json.loads(body.decode("utf-8")) ws = _WsStub() rpc = LspWsRpcClient(ws) async def run() -> None: rpc.create_stream("req-1") await rpc._handle_server_message( { "jsonrpc": "2.0", "method": "tool/call/sync", "params": { "sessionId": "sess-1", "requestId": "req-1", "toolCallId": "call-1", "name": "run_in_terminal", "parameters": {"command": "pwd"}, }, } ) decoded = [_decode(frame) for frame in ws.frames] methods = [item.get("method") for item in decoded] self.assertIn("tool/call/approve", methods) self.assertIn("tool/invokeResult", methods) approve = next( item for item in decoded if item.get("method") == "tool/call/approve" ) self.assertEqual( approve["params"], { "type": "tool_call", "sessionId": "sess-1", "requestId": "req-1", "toolCallId": "call-1", "approval": True, }, ) invoke_result = next( item for item in decoded if item.get("method") == "tool/invokeResult" ) self.assertEqual(invoke_result["params"]["toolCallId"], "call-1") self.assertEqual(invoke_result["params"]["name"], "run_in_terminal") self.assertTrue(invoke_result["params"]["success"]) self.assertEqual(invoke_result["params"]["errorMessage"], "") import asyncio asyncio.run(run()) def test_tool_sync_does_not_emit_roundtrip_without_request_id(self) -> None: from app.lingma_client import LspWsRpcClient class _WsStub: def __init__(self) -> None: self.frames: list[bytes] = [] async def send(self, data: bytes) -> None: self.frames.append(data) ws = _WsStub() rpc = LspWsRpcClient(ws) async def run() -> None: rpc.create_stream("req-1") await rpc._handle_server_message( { "jsonrpc": "2.0", "method": "tool/call/sync", "params": { "sessionId": "sess-1", "toolCallId": "call-1", "name": "run_in_terminal", "parameters": {"command": "pwd"}, }, } ) self.assertEqual(ws.frames, []) import asyncio asyncio.run(run())