Files
lingma-openai-gateway/tests/test_tool_call_bridge.py
mmc 05768316d9 feat: strengthen tool emulation prompting
Improve proxy-side tool instructions so models more reliably emit structured tool actions, and add focused tests covering prompt guidance and default action limits.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-12 14:36:43 +08:00

3129 lines
117 KiB
Python

from __future__ import annotations
import json
import sys
import types
import unittest
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
class _FakeSessionCache:
def __init__(self) -> None:
self.enabled = True
self.keys: list[str] = []
self.get_calls: list[str] = []
self.put_calls: list[tuple[str, str, str]] = []
self.invalidate_calls: list[str] = []
def build_key(
self,
api_key: str,
messages: list[dict],
*,
tool_config=None,
branch_context=None,
) -> str:
marker = "with_tool" if tool_config is not None else "no_tool"
branch_marker = branch_context or "-"
key = f"{api_key}:{len(messages)}:{marker}:branch={branch_marker}"
self.keys.append(key)
return key
async def get(self, key: str):
self.get_calls.append(key)
return None
async def put(self, key: str, session_id: str, instance_name: str = "") -> None:
self.put_calls.append((key, session_id, instance_name))
async def invalidate(self, key: str) -> None:
self.invalidate_calls.append(key)
# app.main imports playwright via auto_login; tests don't exercise that path.
# Inject a lightweight stub so unit tests run without installing playwright.
_playwright = types.ModuleType("playwright")
_playwright_async = types.ModuleType("playwright.async_api")
class _StubPlaywrightTimeoutError(Exception):
pass
async def _stub_async_playwright():
raise RuntimeError("playwright is stubbed in unit tests")
_playwright_async.TimeoutError = _StubPlaywrightTimeoutError
_playwright_async.async_playwright = _stub_async_playwright
sys.modules.setdefault("playwright", _playwright)
sys.modules.setdefault("playwright.async_api", _playwright_async)
from starlette.requests import Request
from starlette.responses import JSONResponse, Response, StreamingResponse
from app.anthropic_schema import AnthropicMessagesRequest
from app.http.tool_emulation import EmulatedToolChoice, EmulatedToolDef, inject_tooling, parse_action_blocks
from app.openai_schema import ChatCompletionsRequest, ResponsesRequest
import app.main as main
class _FakeTicket:
def __init__(self) -> None:
self.released = False
def release(self) -> None:
self.released = True
class _FakeGuard:
def __init__(self) -> None:
self.in_flight = 0
async def try_acquire(self) -> _FakeTicket:
return _FakeTicket()
class _FakeClient:
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
self._stream_events = stream_events
self._complete_result = complete_result
async def query_models(self) -> dict:
return {
"chat": [
{
"key": "org_auto",
"displayName": "Auto",
}
]
}
async def chat_complete(self, *args, **kwargs) -> dict:
return self._complete_result
async def chat_stream(self, *args, **kwargs):
out_meta = kwargs.get("out_meta")
if isinstance(out_meta, dict):
out_meta["session_id"] = "sess-stream"
for event in self._stream_events:
yield event
class _FakeInstance:
def __init__(self, client: _FakeClient) -> None:
self.name = "inst-test"
self.client = client
self.in_flight = 0
class _FakePool:
def __init__(self, inst: _FakeInstance) -> None:
self._inst = inst
def pick(self, affinity_key: str | None = None) -> _FakeInstance:
return self._inst
def _make_request(path: str, headers: dict[str, str] | None = None) -> Request:
header_pairs = []
for k, v in (headers or {}).items():
header_pairs.append((k.lower().encode("latin-1"), v.encode("latin-1")))
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"scheme": "http",
"path": path,
"raw_path": path.encode("latin-1"),
"query_string": b"",
"headers": header_pairs,
"client": ("testclient", 12345),
"server": ("testserver", 80),
"root_path": "",
}
return Request(scope)
async def _collect_stream(response) -> str:
chunks: list[str] = []
async for part in response.body_iterator:
if isinstance(part, bytes):
chunks.append(part.decode("utf-8"))
else:
chunks.append(str(part))
return "".join(chunks)
class _SpyClient(_FakeClient):
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
super().__init__(stream_events=stream_events, complete_result=complete_result)
self.last_complete_args: tuple = ()
self.last_stream_args: tuple = ()
self.last_complete_kwargs: dict = {}
self.last_stream_kwargs: dict = {}
async def chat_complete(self, *args, **kwargs) -> dict:
self.last_complete_args = tuple(args)
self.last_complete_kwargs = dict(kwargs)
return await super().chat_complete(*args, **kwargs)
async def chat_stream(self, *args, **kwargs):
self.last_stream_args = tuple(args)
self.last_stream_kwargs = dict(kwargs)
async for event in super().chat_stream(*args, **kwargs):
yield event
class _SettingsPatch:
def __init__(self, **kwargs) -> None:
self._kwargs = kwargs
def __enter__(self):
self._patchers = [
patch.object(main.settings, k, v) for k, v in self._kwargs.items()
]
for p in self._patchers:
p.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for p in reversed(self._patchers):
p.stop()
return False
class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
async def test_openai_non_stream_bridges_tool_calls(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "done",
"toolEvents": [
{
"id": "call_123",
"name": "search_docs",
"input": {"query": "gateway"},
"result": {"ok": True},
}
],
"sessionId": "sess-1",
"firstTokenLatencyMs": 12,
"totalLatencyMs": 34,
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(message["content"], "done")
self.assertIsInstance(message["tool_calls"], list)
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "search_docs")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_plain_json(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```json\n{"arguments": {"query": "gateway"}}\n```',
"toolEvents": [],
"sessionId": "sess-fallback-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_tool_code(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```tool_code\nlookup(query="gateway")\n```',
"toolEvents": [],
"sessionId": "sess-fallback-tool-code-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_tool_code_positional_arg(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```tool_code\nlookup("gateway")\n```',
"toolEvents": [],
"sessionId": "sess-fallback-tool-code-openai-positional",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "lookup",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_hash_tool_call_block(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '#Tool Call\n```fetch_weather\n{"city": "Hangzhou"}\n```\n',
"toolEvents": [],
"sessionId": "sess-fallback-hash-tool-call-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
tool_choice={"type": "function", "function": {"name": "fetch_weather"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"city": "Hangzhou"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_hash_tool_call_block_without_tool_choice(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '#Tool Call\n```fetch_weather\n{"city": "Hangzhou"}\n```\n',
"toolEvents": [],
"sessionId": "sess-fallback-hash-tool-call-openai-no-choice",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"city": "Hangzhou"},
)
async def test_openai_non_stream_synthesizes_tool_call_from_json_action_block(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```json action\n{"tool":"fetch_weather","parameters":{"city":"Hangzhou"}}\n```',
"toolEvents": [],
"sessionId": "sess-action-block-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "fetch_weather")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"city": "Hangzhou"},
)
async def test_openai_stream_synthesizes_tool_call_from_tool_code(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[
{"type": "text", "text": "```tool_code\n"},
{"type": "text", "text": 'lookup(query="gateway")\n'},
{"type": "text", "text": "```"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
chunks = [
json.loads(line[6:])
for line in body.splitlines()
if line.startswith("data: {")
]
self.assertTrue(
any(
chunk.get("choices") and chunk["choices"][0]["delta"].get("tool_calls")
for chunk in chunks
)
)
self.assertIn('"tool_calls"', body)
self.assertIn('"finish_reason": "tool_calls"', body)
self.assertIn("data: [DONE]", body)
async def test_openai_stream_synthesizes_tool_call_from_hash_tool_call_block_without_tool_choice(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[
{"type": "text", "text": "#Tool Call\n```fetch_weather\n"},
{"type": "text", "text": '{"city": "Hangzhou"}\n'},
{"type": "text", "text": "```\n"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
self.assertIn('"tool_calls"', body)
self.assertIn('"fetch_weather"', body)
self.assertIn('"finish_reason": "tool_calls"', body)
async def test_openai_non_stream_synthesizes_tool_call_from_json_array(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```json\n[{"name": "lookup", "arguments": {"query": "gateway"}}]\n```',
"toolEvents": [],
"sessionId": "sess-fallback-openai-json-array",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_stream_bridges_tool_and_text_events(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "call_stream_1",
"name": "read_file",
"input": {"path": "README.md"},
},
},
{"type": "text", "text": "hello"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
stream_options={"include_usage": True},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
self.assertIn('"tool_calls"', body)
self.assertEqual(body.count('"content": "hello"'), 1)
self.assertIn('"finish_reason": "tool_calls"', body)
self.assertIn('"usage"', body)
self.assertIn("data: [DONE]", body)
async def test_openai_stream_emits_text_delta_only_once_without_tools(self) -> None:
fake_client = _FakeClient(
stream_events=[
{"type": "text", "text": "你好"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
self.assertEqual(body.count('"content": "你好"'), 1)
self.assertIn('"finish_reason": "stop"', body)
self.assertIn("data: [DONE]", body)
async def test_openai_stream_filters_tool_events_by_allowlist(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "call_blocked",
"name": "write_file",
"input": {"path": "a.txt"},
},
},
{
"type": "tool",
"tool": {
"id": "call_allowed",
"name": "lookup",
"input": {"query": "gateway"},
},
},
{"type": "text", "text": "hello"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}},
{
"type": "function",
"function": {"name": "write_file", "parameters": {}},
},
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
self.assertIn('"name": "lookup"', body)
self.assertNotIn('"name": "write_file"', body)
self.assertIn('"content": "hello"', body)
self.assertIn('"finish_reason": "tool_calls"', body)
async def test_anthropic_non_stream_bridges_tool_blocks(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "ok",
"toolEvents": [
{
"id": "toolu_1",
"name": "lookup",
"input": {"k": "v"},
"result": {"value": 1},
}
],
"sessionId": "sess-2",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
types = [item["type"] for item in payload["content"]]
self.assertEqual(types, ["text", "tool_use", "tool_result"])
self.assertEqual(payload["stop_reason"], "end_turn")
self.assertEqual(payload["content"][1]["name"], "lookup")
self.assertEqual(payload["content"][2]["tool_use_id"], "toolu_1")
async def test_anthropic_non_stream_does_not_synthesize_tool_blocks_from_plain_json(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '{"input": {"k": "v"}, "result": {"value": 1}}',
"toolEvents": [],
"sessionId": "sess-fallback-anthropic",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
types = [item["type"] for item in payload["content"]]
self.assertEqual(types, ["text"])
self.assertEqual(payload["stop_reason"], "end_turn")
self.assertIn('"input"', payload["content"][0]["text"])
async def test_openai_stream_tool_call_indices_are_stable(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "call_a",
"name": "read_file",
"input": {"path": "README.md"},
},
},
{
"type": "tool",
"tool": {
"id": "call_b",
"name": "search_docs",
"input": {"query": "gateway"},
},
},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
body = await _collect_stream(response)
self.assertIn('"id": "call_a"', body)
self.assertIn('"id": "call_b"', body)
self.assertIn('"index": 0', body)
self.assertIn('"index": 1', body)
async def test_anthropic_non_stream_returns_tool_use_stop_reason_when_result_missing(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "",
"toolEvents": [
{
"name": "lookup",
"input": {"k": "v"},
}
],
"sessionId": "sess-2",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
self.assertEqual(payload["stop_reason"], "tool_use")
self.assertEqual(len(payload["content"]), 1)
self.assertEqual(payload["content"][0]["type"], "tool_use")
async def test_anthropic_stream_returns_tool_use_stop_reason_when_result_missing(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"name": "read",
"input": {"file": "a.txt"},
},
}
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
body = await _collect_stream(response)
self.assertIn('"type": "tool_use"', body)
self.assertIn('"stop_reason": "tool_use"', body)
async def test_anthropic_stream_does_not_fallback_to_synthetic_tool_blocks_for_forced_tool(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "text",
"text": '```json\n{"input": {"k": "v"}, "result": {"value": 1}}\n```',
}
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
body = await _collect_stream(response)
self.assertNotIn('"type": "tool_use"', body)
self.assertNotIn('"type": "tool_result"', body)
self.assertIn('"type": "text_delta"', body)
self.assertIn('"stop_reason": "end_turn"', body)
async def test_anthropic_stream_bridges_tool_and_text_events(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "toolu_stream_1",
"name": "read",
"input": {"file": "a.txt"},
"result": "done",
},
},
{"type": "text", "text": "world"},
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
body = await _collect_stream(response)
self.assertIn("event: message_start", body)
self.assertIn('"type": "tool_use"', body)
self.assertIn('"type": "tool_result"', body)
self.assertIn('"stop_reason": "end_turn"', body)
self.assertIn('"type": "text_delta"', body)
self.assertIn("event: message_stop", body)
async def test_anthropic_stream_filters_tool_events_by_allowlist(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "toolu_blocked",
"name": "write_file",
"input": {"path": "a.txt"},
"result": "blocked",
},
},
{
"type": "tool",
"tool": {
"id": "toolu_allowed",
"name": "lookup",
"input": {"file": "a.txt"},
"result": "done",
},
},
{"type": "text", "text": "world"},
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{
"name": "lookup",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "write_file",
"input_schema": {"type": "object", "properties": {}},
},
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
body = await _collect_stream(response)
self.assertIn('"name": "lookup"', body)
self.assertNotIn('"name": "write_file"', body)
self.assertIn('"type": "tool_result"', body)
self.assertIn('"stop_reason": "end_turn"', body)
async def test_openai_non_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None:
spy_client = _SpyClient(
stream_events=[{"type": "text", "text": "ok"}], complete_result={}
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True),
):
response = await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
await _collect_stream(response)
self.assertIn("tool_config", spy_client.last_stream_kwargs)
self.assertIsNone(spy_client.last_stream_kwargs["tool_config"])
self.assertEqual(spy_client.last_stream_args[2], "agent")
async def test_openai_non_stream_does_not_forward_tool_config_when_disabled(
self,
) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=False),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_non_stream_filters_tools_by_allowlist_before_emulation(self) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}},
{
"type": "function",
"function": {"name": "write_file", "parameters": {}},
},
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
prompt = spy_client.last_complete_args[0]
self.assertIn("lookup(", prompt)
self.assertNotIn("write_file(", prompt)
async def test_openai_non_stream_rejects_forced_tool_outside_allowlist(
self,
) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "write_file"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
with self.assertRaises(main.HTTPException) as cm:
await main.v1_chat_completions(
req, _make_request("/v1/chat/completions")
)
self.assertEqual(cm.exception.status_code, 400)
self.assertEqual(cm.exception.detail["error"]["type"], "invalid_request_error")
self.assertIn("write_file", cm.exception.detail["error"]["message"])
async def test_openai_tooling_context_disables_session_reuse_cache(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
stream_events=[],
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-3"},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[
{"role": "user", "content": "turn-1"},
{"role": "user", "content": "turn-2"},
],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "session_cache", fake_cache),
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(tool_forward_enabled=True),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertEqual(fake_cache.keys, [])
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
async def test_openai_session_reuse_lookup_key_separates_branches(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "ok",
"toolEvents": [],
"sessionId": "sess-branch",
},
)
req_a = ChatCompletionsRequest(
model="org_auto",
messages=[
{"role": "system", "content": "S"},
{"role": "user", "content": "U"},
{"role": "assistant", "content": "A1"},
{"role": "user", "content": "next"},
],
stream=False,
)
req_b = ChatCompletionsRequest(
model="org_auto",
messages=[
{"role": "system", "content": "S"},
{"role": "user", "content": "U"},
{"role": "assistant", "content": "A2"},
{"role": "user", "content": "next"},
],
stream=False,
)
with (
patch.object(main, "session_cache", fake_cache),
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
):
await main.v1_chat_completions(req_a, _make_request("/v1/chat/completions"))
await main.v1_chat_completions(req_b, _make_request("/v1/chat/completions"))
self.assertGreaterEqual(len(fake_cache.get_calls), 4)
self.assertNotEqual(fake_cache.get_calls[0], fake_cache.get_calls[2])
self.assertEqual(fake_cache.get_calls[1], fake_cache.get_calls[3])
async def test_openai_and_anthropic_resolve_same_default_ask_mode_without_tooling(
self,
) -> None:
openai_spy = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
anthropic_spy = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
openai_req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
anthropic_req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(openai_spy))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
):
await main.v1_chat_completions(
openai_req, _make_request("/v1/chat/completions")
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(anthropic_spy))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(default_ask_mode="chat", tool_forward_enabled=False),
):
await main.v1_messages(
anthropic_req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
self.assertEqual(openai_spy.last_complete_args[2], "chat")
self.assertEqual(anthropic_spy.last_complete_args[2], "chat")
async def test_anthropic_stream_uses_emulation_instead_of_forwarding_tool_config(self) -> None:
spy_client = _SpyClient(
stream_events=[{"type": "text", "text": "ok"}], complete_result={}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=True,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
await _collect_stream(response)
self.assertIn("tool_config", spy_client.last_stream_kwargs)
self.assertIsNone(spy_client.last_stream_kwargs["tool_config"])
self.assertEqual(spy_client.last_stream_args[2], "agent")
async def test_anthropic_non_stream_does_not_forward_tool_config_when_disabled(
self,
) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=False, default_ask_mode="chat"),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_anthropic_non_stream_with_tools_uses_agent_mode(self) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"name": "write_file",
"input_schema": {"type": "object", "properties": {}},
}
],
tool_choice={"type": "auto"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_anthropic_non_stream_filters_tools_by_allowlist_before_emulation(self) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{
"name": "lookup",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "write_file",
"input_schema": {"type": "object", "properties": {}},
},
],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
prompt = spy_client.last_complete_args[0]
self.assertIn("lookup(", prompt)
self.assertNotIn("write_file(", prompt)
async def test_anthropic_non_stream_rejects_forced_tool_outside_allowlist(
self,
) -> None:
spy_client = _SpyClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "tool", "name": "write_file"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, tool_allowlist=["lookup"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
self.assertEqual(response.status_code, 400)
payload = json.loads(response.body)
self.assertEqual(payload["type"], "error")
self.assertEqual(payload["error"]["type"], "invalid_request_error")
self.assertIn("write_file", payload["error"]["message"])
async def test_anthropic_tooling_context_disables_session_reuse_cache(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
stream_events=[],
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-4"},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[
{"role": "user", "content": "turn-1"},
{"role": "user", "content": "turn-2"},
],
stream=False,
tools=[
{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}
],
tool_choice={"type": "auto"},
)
with (
patch.object(main, "session_cache", fake_cache),
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
self.assertEqual(fake_cache.keys, [])
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
async def test_anthropic_count_tokens_returns_input_tokens(self) -> None:
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=64,
messages=[{"role": "user", "content": "count me"}],
)
with patch.object(main.settings, "api_keys", ["test-key"]):
response = await main.v1_messages_count_tokens(
req,
_make_request(
"/v1/messages/count_tokens",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
self.assertEqual(response.status_code, 200)
self.assertEqual(
payload,
{
"input_tokens": main.estimate_tokens(
main._messages_to_prompt(main.anthropic_to_internal_messages(req))
)
},
)
async def test_anthropic_count_tokens_requires_authentication(self) -> None:
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=64,
messages=[{"role": "user", "content": "count me"}],
)
with patch.object(main.settings, "api_keys", ["test-key"]):
response = await main.v1_messages_count_tokens(
req,
_make_request(
"/v1/messages/count_tokens",
headers={"anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
self.assertEqual(response.status_code, 401)
self.assertEqual(payload["type"], "error")
self.assertEqual(payload["error"]["type"], "authentication_error")
async def test_anthropic_messages_requires_authentication(self) -> None:
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=64,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with patch.object(main.settings, "api_keys", ["test-key"]):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
self.assertEqual(response.status_code, 401)
self.assertEqual(payload["type"], "error")
self.assertEqual(payload["error"]["type"], "authentication_error")
async def test_anthropic_messages_backpressure_returns_overloaded_error(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[], complete_result={"text": "ok", "toolEvents": []}
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=64,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
fake_guard = types.SimpleNamespace(
in_flight=0,
try_acquire=AsyncMock(side_effect=main.BackpressureRejected(2.4)),
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", fake_guard),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
self.assertEqual(response.status_code, 429)
self.assertEqual(response.headers["Retry-After"], "2")
self.assertEqual(payload["type"], "error")
self.assertEqual(payload["error"]["type"], "overloaded_error")
self.assertIn("retry later", payload["error"]["message"])
async def test_responses_non_stream_maps_chat_payload_shape_and_input(self) -> None:
req = ResponsesRequest(
model="org_auto",
input="hello from responses",
stream=False,
)
chat_payload = {
"id": "chatcmpl-abc123",
"created": 123,
"model": "org_auto",
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {"role": "assistant", "content": "done"},
}
],
"usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6},
}
mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload))
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
payload = json.loads(response.body)
self.assertEqual(payload["id"], "resp_abc123")
self.assertEqual(payload["object"], "response")
self.assertEqual(payload["status"], "completed")
self.assertEqual(payload["output_text"], "done")
self.assertEqual(
payload["usage"], {"input_tokens": 4, "output_tokens": 2, "total_tokens": 6}
)
self.assertEqual(payload["output"][0]["type"], "message")
self.assertEqual(payload["output"][0]["content"][0]["type"], "output_text")
mock_chat.assert_awaited_once()
chat_req = mock_chat.await_args.args[0]
self.assertIsInstance(chat_req, ChatCompletionsRequest)
messages_dump = [m.model_dump() for m in chat_req.messages]
self.assertEqual(
messages_dump,
[
{
"role": "user",
"content": "hello from responses",
"name": None,
"tool_call_id": None,
"tool_calls": None,
}
],
)
async def test_responses_non_stream_maps_chat_tool_calls_to_function_call_output(
self,
) -> None:
req = ResponsesRequest(
model="org_auto",
input="tool please",
stream=False,
)
chat_payload = {
"id": "chatcmpl-tools1",
"created": 234,
"model": "org_auto",
"choices": [
{
"index": 0,
"finish_reason": "tool_calls",
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "lookup",
"arguments": '{"q":"gateway"}',
},
}
],
},
}
],
"usage": {"prompt_tokens": 8, "completion_tokens": 3, "total_tokens": 11},
}
mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload))
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
payload = json.loads(response.body)
self.assertEqual(payload["status"], "completed")
self.assertEqual(payload["output_text"], "")
self.assertEqual(
payload["usage"],
{"input_tokens": 8, "output_tokens": 3, "total_tokens": 11},
)
self.assertEqual(len(payload["output"]), 1)
self.assertEqual(payload["output"][0]["type"], "function_call")
self.assertEqual(payload["output"][0]["call_id"], "call_1")
self.assertEqual(payload["output"][0]["id"], "call_1")
self.assertEqual(payload["output"][0]["name"], "lookup")
self.assertEqual(payload["output"][0]["arguments"], '{"q":"gateway"}')
async def test_responses_forwards_input_tools_and_tool_choice_to_chat_request(
self,
) -> None:
req = ResponsesRequest(
model="org_auto",
instructions="be concise",
input=[
{"role": "user", "content": [{"type": "text", "text": "first"}]},
{
"type": "function_call_output",
"call_id": "call_1",
"output": {"ok": True},
},
"follow up",
],
stream=False,
tools=[
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
mock_chat = AsyncMock(
return_value=JSONResponse(
content={
"id": "chatcmpl-x",
"created": 1,
"model": "org_auto",
"choices": [{"message": {"role": "assistant", "content": "ok"}}],
"usage": {},
}
)
)
with patch.object(main, "v1_chat_completions", mock_chat):
await main.v1_responses(req, _make_request("/v1/responses"))
mock_chat.assert_awaited_once()
chat_req = mock_chat.await_args.args[0]
self.assertIsInstance(chat_req, ChatCompletionsRequest)
self.assertEqual(chat_req.tools, req.tools)
self.assertEqual(chat_req.tool_choice, req.tool_choice)
messages_dump = [m.model_dump() for m in chat_req.messages]
self.assertEqual(messages_dump[0]["role"], "system")
self.assertEqual(messages_dump[0]["content"], "be concise")
self.assertEqual(messages_dump[1]["role"], "user")
self.assertEqual(messages_dump[1]["content"], "first")
self.assertEqual(messages_dump[2]["role"], "tool")
self.assertEqual(messages_dump[2]["tool_call_id"], "call_1")
self.assertEqual(messages_dump[2]["content"], '{"ok": true}')
self.assertEqual(messages_dump[3]["role"], "user")
self.assertEqual(messages_dump[3]["content"], "follow up")
async def test_openai_tool_result_is_emulated_into_followup_prompt(self) -> None:
spy_client = _SpyClient(
stream_events=[],
complete_result={
"text": "done",
"toolEvents": [],
"sessionId": "sess-emulated-tool-result",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[
{"role": "assistant", "content": None, "tool_calls": [{
"id": "call_1",
"type": "function",
"function": {"name": "fetch_weather", "arguments": '{"city":"Hangzhou"}'},
}]},
{"role": "tool", "tool_call_id": "call_1", "content": '{"temperature":"22C"}'},
{"role": "user", "content": "continue"},
],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
prompt = spy_client.last_complete_args[0]
self.assertIn("Tool result for call_1:", prompt)
self.assertIn('{"temperature":"22C"}', prompt)
self.assertIn("Assistant:", prompt)
async def test_openai_assistant_tool_calls_are_projected_into_emulation_prompt(self) -> None:
spy_client = _SpyClient(
stream_events=[],
complete_result={
"text": "done",
"toolEvents": [],
"sessionId": "sess-emulated-tool-history",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[
{
"role": "assistant",
"content": "I will check that",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "fetch_weather",
"arguments": '{"city":"Hangzhou"}',
},
}
],
},
{"role": "user", "content": "continue"},
],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "fetch_weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
prompt = spy_client.last_complete_args[0]
self.assertIn("I will check that", prompt)
self.assertIn('"tool": "fetch_weather"', prompt)
self.assertIn('"city": "Hangzhou"', prompt)
async def test_openai_emulation_prompt_includes_proxy_tool_guidance(self) -> None:
spy_client = _SpyClient(
stream_events=[],
complete_result={"text": "done", "toolEvents": [], "sessionId": "sess-guidance"},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "inspect README"}],
stream=False,
tools=[
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "bash",
"parameters": {
"type": "object",
"properties": {"command": {"type": "string"}},
},
},
},
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
prompt = spy_client.last_complete_args[0]
self.assertIn("DIRECT tool access inside an IDE", prompt)
self.assertIn("Tool routing guide:", prompt)
self.assertIn("Read a specific local file or code path: use read_file.", prompt)
self.assertIn("Core tool syntax examples", prompt)
self.assertIn("Coding and file-work discipline:", prompt)
self.assertIn("NEVER say that tools are unavailable", prompt)
async def test_anthropic_tool_history_is_projected_into_emulation_prompt(self) -> None:
spy_client = _SpyClient(
stream_events=[],
complete_result={
"text": "done",
"toolEvents": [],
"sessionId": "sess-anthropic-history",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[
{
"role": "assistant",
"content": [
{"type": "text", "text": "I will check"},
{
"type": "tool_use",
"id": "toolu_1",
"name": "fetch_weather",
"input": {"city": "Hangzhou"},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_1",
"content": '{"temperature":"22C"}',
}
],
},
{"role": "user", "content": "continue"},
],
stream=False,
tools=[
{
"name": "fetch_weather",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}
],
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
prompt = spy_client.last_complete_args[0]
self.assertIn("I will check", prompt)
self.assertIn('"tool": "fetch_weather"', prompt)
self.assertIn('"city": "Hangzhou"', prompt)
self.assertIn("Tool result:", prompt)
self.assertIn('{"temperature":"22C"}', prompt)
async def test_anthropic_non_stream_synthesizes_tool_use_from_json_action_block(
self,
) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": '```json action\n{"tool":"fetch_weather","parameters":{"city":"Hangzhou"}}\n```',
"toolEvents": [],
"sessionId": "sess-anthropic-action-block",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=64,
messages=[{"role": "user", "content": "weather"}],
stream=False,
tools=[
{
"name": "fetch_weather",
"description": "Get weather for a city",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
}
],
tool_choice={"type": "tool", "name": "fetch_weather"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(
main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})
),
patch.object(
main.stats_collector, "record_chat", AsyncMock(return_value=None)
),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={
"x-api-key": "test-key",
"anthropic-version": "2023-06-01",
},
),
)
payload = json.loads(response.body)
tool_blocks = [item for item in payload["content"] if item["type"] == "tool_use"]
self.assertEqual(payload["stop_reason"], "tool_use")
self.assertEqual(tool_blocks[0]["name"], "fetch_weather")
self.assertEqual(tool_blocks[0]["input"], {"city": "Hangzhou"})
async def test_responses_stream_bridges_text_tool_and_completed_events(
self,
) -> None:
async def _chat_sse():
yield b'data: {"choices": [{"delta": {"content": "hello"}}]}\n\n'
yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\": \\"x\\"}"}}]}}]}\n\n'
yield b'data: {"usage": {"prompt_tokens": 3, "completion_tokens": 2, "total_tokens": 5}, "choices": [{"delta": {}}]}\n\n'
yield b"data: [DONE]\n\n"
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_item.added"', body)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "hello"', body)
self.assertIn('"type": "response.function_call_arguments.delta"', body)
self.assertIn('"item_id": "call_1"', body)
self.assertIn('"output_index": 1', body)
self.assertIn('"delta": "{\\"q\\": \\"x\\"}"', body)
self.assertIn('"type": "response.function_call_arguments.done"', body)
self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body)
self.assertIn('"type": "response.output_item.done"', body)
self.assertIn('"type": "function_call"', body)
self.assertIn('"name": "lookup"', body)
self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn('"input_tokens": 3', body)
self.assertIn('"output_tokens": 2', body)
self.assertIn("data: [DONE]", body)
async def test_responses_stream_accumulates_fragmented_tool_arguments(self) -> None:
async def _chat_sse():
yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\":"}}]}}]}\n\n'
yield b'data: {"choices": [{"delta": {"tool_calls": [{"id": "call_1", "function": {"name": "lookup", "arguments": " \\"x\\"}"}}]}}]}\n\n'
yield b"data: [DONE]\n\n"
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.function_call_arguments.delta"', body)
self.assertIn('"delta": "{\\"q\\":"', body)
self.assertIn('"delta": " \\"x\\"}"', body)
self.assertIn('"type": "response.function_call_arguments.done"', body)
self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body)
self.assertIn('"type": "response.output_item.done"', body)
self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body)
self.assertIn("data: [DONE]", body)
async def test_responses_stream_accumulates_fragmented_tool_arguments_without_repeated_id_or_name(
self,
) -> None:
async def _chat_sse():
yield b'data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "call_1", "function": {"name": "lookup", "arguments": "{\\"q\\":"}}]}}]}\n\n'
yield b'data: {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": " \\"x\\"}"}}]}}]}\n\n'
yield b"data: [DONE]\n\n"
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(_chat_sse(), media_type="text/event-stream")
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertEqual(body.count('"item_id": "call_1"'), 3)
self.assertIn('"name": "lookup"', body)
self.assertIn('"delta": "{\\"q\\":"', body)
self.assertIn('"delta": " \\"x\\"}"', body)
self.assertIn('"arguments": "{\\"q\\": \\"x\\"}"', body)
self.assertIn("data: [DONE]", body)
async def test_responses_stream_emits_completed_when_upstream_closes_without_done(
self,
) -> None:
async def _chat_sse_without_done():
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
yield b'data: {"usage": {"prompt_tokens": 7, "completion_tokens": 1, "total_tokens": 8}, "choices": [{"delta": {}}]}\n\n'
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(
_chat_sse_without_done(), media_type="text/event-stream"
)
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "partial"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn('"input_tokens": 7', body)
self.assertIn('"output_tokens": 1', body)
self.assertIn("data: [DONE]", body)
async def test_responses_stream_emits_completed_when_upstream_iterator_errors(
self,
) -> None:
async def _chat_sse_error():
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
raise RuntimeError("boom")
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(
_chat_sse_error(), media_type="text/event-stream"
)
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "partial"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn("data: [DONE]", body)
async def test_responses_stream_emits_completed_when_upstream_cancels(self) -> None:
async def _chat_sse_cancelled():
yield b'data: {"choices": [{"delta": {"content": "partial"}}]}\n\n'
raise asyncio.CancelledError()
req = ResponsesRequest(model="org_auto", input="hi", stream=True)
mock_chat = AsyncMock(
return_value=StreamingResponse(
_chat_sse_cancelled(), media_type="text/event-stream"
)
)
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/v1/responses"))
body = await _collect_stream(response)
self.assertIn('"type": "response.output_text.delta"', body)
self.assertIn('"delta": "partial"', body)
self.assertIn('"type": "response.completed"', body)
self.assertIn("data: [DONE]", body)
async def test_responses_alias_matches_v1_responses_behavior(self) -> None:
req = ResponsesRequest(model="org_auto", input="hello", stream=False)
chat_payload = {
"id": "chatcmpl-alias1",
"created": 123,
"model": "org_auto",
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {"role": "assistant", "content": "done"},
}
],
"usage": {"prompt_tokens": 4, "completion_tokens": 2, "total_tokens": 6},
}
mock_chat = AsyncMock(return_value=JSONResponse(content=chat_payload))
with patch.object(main, "v1_chat_completions", mock_chat):
response = await main.v1_responses(req, _make_request("/responses"))
payload = json.loads(response.body)
self.assertEqual(payload["id"], "resp_alias1")
self.assertEqual(payload["status"], "completed")
mock_chat.assert_awaited_once()
req = ResponsesRequest(model="org_auto", input="hi", stream=False)
mock_chat = AsyncMock(
return_value=Response(content="not-json", media_type="text/plain")
)
with patch.object(main, "v1_chat_completions", mock_chat):
with self.assertRaises(main.HTTPException) as cm:
await main.v1_responses(req, _make_request("/v1/responses"))
self.assertEqual(cm.exception.status_code, 502)
detail = cm.exception.detail
self.assertEqual(detail["error"]["type"], "upstream_error")
self.assertEqual(detail["error"]["message"], "invalid upstream response")
class CapabilitiesEndpointTests(unittest.IsolatedAsyncioTestCase):
async def test_capabilities_payload_shape(self) -> None:
with (
patch.object(main.settings, "tool_forward_enabled", True),
patch.object(main.settings, "tool_allowlist", ["lookup"]),
patch.object(main.settings, "session_reuse_enabled", True),
patch.object(main.settings, "session_cache_max_entries", 123),
patch.object(main.settings, "session_cache_ttl_sec", 45.0),
patch.object(main.settings, "instance_count", 2),
patch.object(main.settings, "default_model", "org_auto"),
patch.object(main.settings, "default_ask_mode", "chat"),
patch.object(main.settings, "api_keys", ["test-key"]),
patch.object(main.settings, "admin_token", "adm"),
patch.object(main.settings, "metrics_public", False),
):
response = await main.capabilities()
self.assertEqual(response.status_code, 200)
payload = json.loads(response.body)
self.assertEqual(payload["service"], "lingma-openai-gateway")
self.assertIn("protocols", payload)
self.assertIn("features", payload)
self.assertTrue(payload["protocols"]["openai"]["chat_completions"])
self.assertTrue(payload["protocols"]["anthropic"]["messages"])
self.assertTrue(payload["protocols"]["openai"]["request_tools_forwarded"])
self.assertEqual(payload["features"]["tooling"]["allowlist"], ["lookup"])
self.assertEqual(payload["features"]["pool"]["configured_instance_count"], 2)
self.assertTrue(payload["features"]["auth"]["v1_requires_auth"])
async def test_v1_capabilities_auth_guard_requires_authentication(self) -> None:
with patch.object(main.settings, "api_keys", ["test-key"]):
with self.assertRaises(main.AnthropicAuthError) as ctx:
main.anthropic_auth_guard(
_make_request(
"/v1/capabilities",
headers={"anthropic-version": "2023-06-01"},
)
)
self.assertEqual(ctx.exception.status_code, 401)
async def test_v1_capabilities_returns_payload_with_auth(self) -> None:
with (
patch.object(main.settings, "api_keys", ["test-key"]),
patch.object(main.settings, "tool_forward_enabled", False),
):
main.anthropic_auth_guard(
_make_request(
"/v1/capabilities",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
)
)
response = await main.v1_capabilities()
self.assertEqual(response.status_code, 200)
payload = json.loads(response.body)
self.assertFalse(payload["protocols"]["openai"]["request_tools_forwarded"])
class AdminIntrospectionEndpointTests(unittest.IsolatedAsyncioTestCase):
async def test_internal_effective_config_requires_admin_token(self) -> None:
with (
patch.object(main.settings, "api_keys", ["api-key"]),
patch.object(main.settings, "admin_token", "admin-secret"),
):
with self.assertRaises(main.HTTPException) as ctx:
main.admin_auth_guard(
_make_request(
"/internal/effective-config",
headers={"authorization": "Bearer wrong-token"},
)
)
self.assertEqual(ctx.exception.status_code, 401)
async def test_internal_effective_config_redacts_secrets(self) -> None:
with (
patch.object(main.settings, "api_keys", ["api-key-1", "api-key-2"]),
patch.object(main.settings, "admin_token", "admin-secret"),
patch.object(main.settings, "metrics_token", "metrics-secret"),
patch.object(main.settings, "default_model", "org_auto"),
patch.object(main.settings, "tool_forward_enabled", True),
patch.object(main.settings, "session_reuse_enabled", True),
patch.object(main.settings, "metrics_public", False),
patch.object(main.settings, "auto_login_enabled", True),
patch.object(
main.settings,
"accounts",
[
SimpleNamespace(
username="user-a",
password="pass-a",
session_bundle_b64="bundle-a",
session_bundle_file="/secrets/bundle-a.txt",
)
],
),
):
main.admin_auth_guard(
_make_request(
"/internal/effective-config",
headers={"authorization": "Bearer admin-secret"},
)
)
response = await main.internal_effective_config()
self.assertEqual(response.status_code, 200)
payload = json.loads(response.body)
settings_payload = payload["settings"]
self.assertEqual(settings_payload["api_keys"], ["***", "***"])
self.assertEqual(settings_payload["admin_token"], "***")
self.assertEqual(settings_payload["metrics_token"], "***")
self.assertEqual(settings_payload["accounts"][0]["password"], "***")
self.assertEqual(settings_payload["accounts"][0]["session_bundle_b64"], "***")
self.assertEqual(settings_payload["accounts"][0]["username"], "user-a")
self.assertEqual(
settings_payload["accounts"][0]["session_bundle_file"],
"/secrets/bundle-a.txt",
)
self.assertTrue(payload["feature_flags"]["tool_forward_enabled"])
self.assertTrue(payload["feature_flags"]["session_reuse_enabled"])
async def test_internal_debug_requests_redacts_sensitive_fields(self) -> None:
main._DEBUG_REQUEST_LOG.clear()
main._record_debug_request(
"openai",
"/v1/chat/completions",
{
"api_key": "secret-key",
"session_bundle": "bundle-value",
"image_url": "data:image/png;base64,abcd",
"tool_calls": [
{
"function": {
"arguments": "x" * 3001,
}
}
],
},
_make_request("/v1/chat/completions", headers={"x-request-id": "req-123"}),
)
response = await main.internal_debug_requests(limit=10)
self.assertEqual(response.status_code, 200)
payload = json.loads(response.body)
self.assertEqual(payload["count"], 1)
item = payload["items"][0]
self.assertEqual(item["request_id"], "req-123")
self.assertEqual(item["body"]["api_key"], "***")
self.assertEqual(item["body"]["session_bundle"], "***")
self.assertEqual(item["body"]["image_url"], "[redacted-data-url]")
self.assertTrue(item["body"]["tool_calls"][0]["function"]["arguments"].endswith("... [truncated]"))
async def test_internal_debug_requests_requires_admin_token(self) -> None:
with (
patch.object(main.settings, "api_keys", ["api-key"]),
patch.object(main.settings, "admin_token", "admin-secret"),
):
with self.assertRaises(main.HTTPException) as ctx:
main.admin_auth_guard(
_make_request(
"/internal/debug/requests",
headers={"authorization": "Bearer wrong-token"},
)
)
self.assertEqual(ctx.exception.status_code, 401)
class ToolEmulationPromptTests(unittest.TestCase):
def test_inject_tooling_adds_routing_hints_and_examples(self) -> None:
tools = [
EmulatedToolDef(
name="read_file",
description="Read a file",
input_schema={"type": "object", "properties": {"path": {"type": "string"}}},
),
EmulatedToolDef(
name="bash",
description="Run shell commands",
input_schema={"type": "object", "properties": {"command": {"type": "string"}}},
),
]
injected = inject_tooling("system prompt", tools, EmulatedToolChoice(mode="auto"))
self.assertIn("Tool routing guide:", injected)
self.assertIn("use read_file", injected)
self.assertIn("use bash", injected)
self.assertIn("Core tool syntax examples.", injected)
self.assertIn("Example valid action block", injected)
self.assertIn("tool_choice=auto means you must decide whether the user request needs a tool", injected)
def test_inject_tooling_does_not_modify_plain_system_when_no_tools(self) -> None:
injected = inject_tooling("system prompt", [], EmulatedToolChoice(mode="auto"))
self.assertEqual(injected, "system prompt")
def test_parse_action_blocks_limits_default_to_five_calls(self) -> None:
tools = [
EmulatedToolDef(
name="lookup",
description="Lookup data",
input_schema={"type": "object", "properties": {"q": {"type": "string"}}},
)
]
text = "\n".join(
f"```json action\n{{\"tool\":\"lookup\",\"parameters\":{{\"q\":\"item-{i}\"}}}}\n```"
for i in range(6)
)
calls, remaining = parse_action_blocks(text, tools)
self.assertEqual(len(calls), 5)
self.assertEqual([call.arguments["q"] for call in calls], [f"item-{i}" for i in range(5)])
self.assertIn('"q":"item-5"', remaining)
class SessionCacheToolFingerprintTests(unittest.TestCase):
def test_build_key_changes_with_tool_config(self) -> None:
from app.session_cache import SessionCache
cache = SessionCache(max_entries=8, ttl_sec=60)
messages = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "hello"},
]
cfg_a = {
"provider": "openai",
"tools": [
{"type": "function", "function": {"name": "lookup", "parameters": {}}}
],
"tool_choice": {"type": "function", "function": {"name": "lookup"}},
}
cfg_a_reordered = {
"tool_choice": {"function": {"name": "lookup"}, "type": "function"},
"tools": [
{"function": {"parameters": {}, "name": "lookup"}, "type": "function"}
],
"provider": "openai",
}
cfg_b = {
"provider": "openai",
"tools": [
{
"type": "function",
"function": {"name": "lookup_v2", "parameters": {}},
}
],
"tool_choice": {"type": "function", "function": {"name": "lookup_v2"}},
}
key_no_tool = cache.build_key("api-key", messages)
key_a = cache.build_key("api-key", messages, tool_config=cfg_a)
key_a_reordered = cache.build_key(
"api-key", messages, tool_config=cfg_a_reordered
)
key_b = cache.build_key("api-key", messages, tool_config=cfg_b)
self.assertNotEqual(key_no_tool, key_a)
self.assertEqual(key_a, key_a_reordered)
self.assertNotEqual(key_a, key_b)
def test_handle_server_message_drops_unroutable_tool_event_without_request_id(
self,
) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"name": "lookup",
"parameters": {"q": "x"},
},
}
)
stream = rpc._chat_streams["req-1"]
self.assertEqual(stream["tool_order"], [])
self.assertEqual(stream["tool_states"], {})
self.assertTrue(stream["chunks"].empty())
import asyncio
asyncio.run(run())
def test_handle_server_message_routes_by_tool_map_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"requestId": "req-1",
"toolCallId": "call-1",
"name": "lookup",
"parameters": {"q": "a"},
},
}
)
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invokeResult",
"params": {
"toolCallId": "call-1",
"result": {"ok": True},
},
}
)
result = rpc.get_stream_result("req-1")
self.assertEqual(len(result["toolEvents"]), 1)
self.assertEqual(result["toolEvents"][0]["id"], "call-1")
self.assertEqual(result["toolEvents"][0]["input"], {"q": "a"})
self.assertEqual(result["toolEvents"][0]["result"], {"ok": True})
import asyncio
asyncio.run(run())
def test_handle_server_message_dedupes_identical_repeated_tool_events(self) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
msg = {
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"requestId": "req-1",
"toolCallId": "call-dup",
"name": "lookup",
"parameters": {"q": "dup"},
},
}
await rpc._handle_server_message(msg)
await rpc._handle_server_message(msg)
stream = rpc._chat_streams["req-1"]
self.assertEqual(stream["tool_order"], ["call-dup"])
self.assertEqual(stream["chunks"].qsize(), 1)
import asyncio
asyncio.run(run())
def test_extracts_tool_event_from_results_and_parameters(self) -> None:
from app.lingma_client import LspWsRpcClient
event = LspWsRpcClient._extract_tool_event(
{
"toolCallId": "call_sync_1",
"parameters": {"path": "README.md"},
"results": [
{
"toolCallId": "call_sync_1",
"name": "read_file",
"result": {"ok": True},
}
],
}
)
self.assertEqual(
event,
{
"id": "call_sync_1",
"name": "read_file",
"input": {"path": "README.md"},
"result": {"ok": True},
},
)
def test_extracts_tool_event_from_invoke_result_payload(self) -> None:
from app.lingma_client import LspWsRpcClient
event = LspWsRpcClient._extract_tool_event(
{
"toolCallId": "call_inv_1",
"name": "search_docs",
"parameters": {"query": "gateway"},
"result": {"hits": 3},
}
)
self.assertEqual(
event,
{
"id": "call_inv_1",
"name": "search_docs",
"input": {"query": "gateway"},
"result": {"hits": 3},
},
)
def test_tool_sync_triggers_approve_and_invoke_result_requests(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
def _decode(frame: bytes) -> dict:
body = frame.split(b"\r\n\r\n", 1)[1]
return json.loads(body.decode("utf-8"))
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
decoded = [_decode(frame) for frame in ws.frames]
methods = [item.get("method") for item in decoded]
self.assertIn("tool/call/approve", methods)
self.assertIn("tool/invokeResult", methods)
approve = next(
item for item in decoded if item.get("method") == "tool/call/approve"
)
self.assertEqual(
approve["params"],
{
"type": "tool_call",
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"approval": True,
},
)
invoke_result = next(
item for item in decoded if item.get("method") == "tool/invokeResult"
)
self.assertEqual(invoke_result["params"]["toolCallId"], "call-1")
self.assertEqual(invoke_result["params"]["name"], "run_in_terminal")
self.assertTrue(invoke_result["params"]["success"])
self.assertEqual(invoke_result["params"]["errorMessage"], "")
import asyncio
asyncio.run(run())
def test_tool_sync_does_not_emit_roundtrip_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
self.assertEqual(ws.frames, [])
import asyncio
asyncio.run(run())