Compare commits

...

8 Commits

Author SHA1 Message Date
GitHub Actions
15cd5e8770 fix: close forced tool-choice with structured fallback
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 07:18:01 +08:00
GitHub Actions
63583712a8 fix: fallback agent payload source to numeric value
Keep Lingma chat/ask payload source as numeric 1 for agent mode A/B validation against remote upstream timeout behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 06:36:07 +08:00
GitHub Actions
c67a9c3d61 fix: align agent payload semantics with VSCode tool flow
Force OpenAI tooling-context requests into agent mode and align Lingma ask payload fields for agent requests so server-side tool path matches VSCode semantics.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 23:19:52 +08:00
GitHub Actions
e208025f35 fix: emit Lingma tool approve/invoke roundtrip
Forward tool/call/sync and tool/invoke events to Lingma with auto-approve and invokeResult so tool calls can complete end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 21:35:05 +08:00
GitHub Actions
3498b81fa2 fix: enable anthropic agent mode for tooling requests
Use agent ask_mode for Anthropic messages with tooling context so tool/write flows are executed, and add regression coverage plus docs/env updates for TOOL_FORWARD_ENABLED.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 20:15:14 +08:00
GitHub Actions
e600bae27c fix: harden tooling session reuse and event routing
Ensure session reuse is disabled for tooling contexts, include tool config in cache keys, and stabilize tool event merge/routing with expanded bridge tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 19:29:30 +08:00
GitHub Actions
5aa7fbfae5 fix: align Lingma tool event lifecycle handling
Handle tool/invokeResult and richer tool/call/sync payloads in the client,
and document/retest the verified VSCode monitoring workflow for tool events.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 09:49:01 +08:00
GitHub Actions
1c7b86e2c0 feat: bridge Lingma tool events to OpenAI/Anthropic responses
Add structured tool event propagation from Lingma stream/finish metadata and map it to OpenAI tool_calls and Anthropic tool_use/tool_result in both streaming and non-streaming responses. Add focused bridge tests and update docs/design notes to match current behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 22:34:43 +08:00
9 changed files with 1882 additions and 62 deletions

View File

@@ -46,6 +46,9 @@ DEFAULT_MODEL=org_auto
# 默认模式chat 或 agent # 默认模式chat 或 agent
DEFAULT_ASK_MODE=chat DEFAULT_ASK_MODE=chat
# 请求侧 tools/tool_choice 透传到 Lingma默认关闭开启后可支持工具写文件等场景
TOOL_FORWARD_ENABLED=false
# 专属域(可选) # 专属域(可选)
DEDICATED_DOMAIN_URL= DEDICATED_DOMAIN_URL=

95
CLAUDE.md Normal file
View File

@@ -0,0 +1,95 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Primary docs to read first
- `README.md` (runtime commands, env model, API examples)
- `DESIGN.md` (architecture decisions, module boundaries, request lifecycle)
- `.env.example` (authoritative env var reference)
No Cursor/Copilot rule files were found in this repo (`.cursorrules`, `.cursor/rules/`, `.github/copilot-instructions.md`).
## Common development commands
### Start locally
```bash
pip install -r requirements.txt
uvicorn app.main:app --reload --port 8317
```
### Start with Docker Compose
```bash
cp .env.example .env
mkdir -p data secrets
docker compose up -d --build
docker compose logs -f
```
### Run tests
```bash
# current focused suite
python3 -m unittest tests/test_tool_call_bridge.py
# discover all unittest tests under tests/
python3 -m unittest discover -s tests -p "test_*.py"
# run a single test method
python3 -m unittest tests.test_tool_call_bridge.ToolCallBridgeTests.test_openai_non_stream_bridges_tool_calls
```
### Smoke-check running gateway
```bash
API_KEY=$(grep '^API_KEYS=' .env | cut -d= -f2 | cut -d, -f1)
curl -s http://127.0.0.1:8317/healthz
curl -s http://127.0.0.1:8317/v1/models -H "Authorization: Bearer $API_KEY"
```
### Linting/type-checking status
- There is currently no repo-configured lint/type command (no `ruff`/`flake8`/`mypy` config found).
- Do not invent tooling commands; if linting is needed, add tooling in a dedicated change first.
## Architecture (big picture)
### What this service is
A FastAPI gateway that fronts Lingma and exposes:
- OpenAI-compatible API (`/v1/models`, `/v1/chat/completions`)
- Anthropic Messages-compatible API (`/v1/messages`, `/v1/messages/count_tokens`)
Both protocols share the same backend pool, backpressure guard, stats, and session reuse logic.
### Request lifecycle (important for most changes)
1. Authenticate request (`app/auth.py`)
2. Normalize inbound protocol payload to internal message shape (`openai_schema.py` / `anthropic_schema.py`)
3. Session-cache lookup (`app/session_cache.py`) for prefix-based reuse
4. Pick backend instance (`app/lingma_pool.py`) with affinity + least-in-flight
5. Acquire concurrency ticket (`app/concurrency.py`)
6. Call Lingma via websocket/LSP client (`app/lingma_client.py`)
7. Map upstream result/stream back to wire protocol in `app/main.py`
8. Record stats and release ticket (including stream-finally paths)
### Core module boundaries
- `app/main.py`: API entrypoint + orchestration + wire-format adapters
- `app/lingma_pool.py`: multi-instance lifecycle, selection, health-aware fallback
- `app/lingma_client.py`: subprocess + LSP-over-WebSocket transport to Lingma
- `app/session_cache.py`: LRU+TTL cache of conversation-prefix -> upstream session id (+ instance binding)
- `app/concurrency.py`: in-flight guard and queue timeout/backpressure behavior
- `app/stats.py`: usage counters and Prometheus text
### Protocol-specific notes
- Anthropic and OpenAI endpoints are separate adapters over shared internals.
- Response-side tool bridge is implemented: upstream Lingma tool events are surfaced as:
- OpenAI: `tool_calls` (stream + non-stream)
- Anthropic: `tool_use` / `tool_result` blocks (stream + non-stream)
- Request-side `tools` / `tool_choice` are accepted by schemas but not forwarded to Lingma.
### Operational invariants to preserve
- One request must stay on one Lingma instance for session continuity.
- Session cache entries include instance identity; invalidate on unhealthy instance mismatch.
- Streaming paths must always release in-flight tickets in `finally`.
- Multi-instance mode must use isolated workdirs per instance.
### Deployment/runtime model
- Container startup runs `python /app/app/bootstrap_lingma.py` before uvicorn.
- Compose mounts:
- `./data -> /app/data` (persistent Lingma binary/cache/workdirs)
- `./secrets -> /secrets:ro` (session bundles, secrets)

View File

@@ -47,7 +47,8 @@
- **逆向 Lingma 后端协议**:之前评估过(曾经的"B1 终极方案"),需要反编译二进制,维护成本高、政策风险大,放弃。 - **逆向 Lingma 后端协议**:之前评估过(曾经的"B1 终极方案"),需要反编译二进制,维护成本高、政策风险大,放弃。
- **多租户 / 水平扩缩**:单容器即可;真要大规模部署 → 套层反代 + N 个网关副本就够,不在进程内解决。 - **多租户 / 水平扩缩**:单容器即可;真要大规模部署 → 套层反代 + N 个网关副本就够,不在进程内解决。
- **完整 function calling / tools**OpenAI schema 里保留了字段,但目前不透传给 LingmaLingma 侧没有等价能力)。 - **请求侧完整 function calling / tools 透传**OpenAI schema 里保留了字段,但目前不会把 `tools`/`tool_choice` 透传给 Lingma上游无等价输入协议)。
- **响应侧工具事件桥接**:若 Lingma 上游产出 tool 事件,网关会向 OpenAI 输出 `tool_calls`,向 Anthropic 输出 `tool_use` / `tool_result`stream + non-stream
- **多模态**:请求里的 image/audio 会被降级成占位符 `[image]` / `[audio]`,因为 Lingma chat 不支持。 - **多模态**:请求里的 image/audio 会被降级成占位符 `[image]` / `[audio]`,因为 Lingma chat 不支持。
--- ---
@@ -591,7 +592,7 @@ FastAPI `lifespan` 退出 → `pool.close()` → 每个 `client.close()` → 进
| 需求 | 改哪些文件 | 关键入口 | | 需求 | 改哪些文件 | 关键入口 |
|---|---|---| |---|---|---|
| 加一个新的 OpenAI 端点(如 embeddings | `main.py`, `openai_schema.py` | 仿照 `v1_models``@app.post("/v1/embeddings", dependencies=[Depends(auth_guard)])` | | 加一个新的 OpenAI 端点(如 embeddings | `main.py`, `openai_schema.py` | 仿照 `v1_models``@app.post("/v1/embeddings", dependencies=[Depends(auth_guard)])` |
| 扩展 Anthropic 端点(如 count_tokens / tool_use 贯通 | `main.py::v1_messages`, `anthropic_schema.py` | count_tokens 只读:复用 `estimate_tokens`tool_use 需要 Lingma 上游支持payload 转发点在 `chat_stream` / `chat_complete` | | 扩展 Anthropic 端点(如 count_tokens / tool_use 相关能力 | `main.py::v1_messages`, `anthropic_schema.py` | count_tokens 只读:复用 `estimate_tokens`响应侧 `tool_use/tool_result` 桥接已支持,若要请求侧 tools 透传仍需改 `lingma_client.py` payload |
| 加一种新的实例调度策略(如加权轮询) | `lingma_pool.py::pick()` | 当前是 affinity → least-in-flight → round-robin | | 加一种新的实例调度策略(如加权轮询) | `lingma_pool.py::pick()` | 当前是 affinity → least-in-flight → round-robin |
| 改认证为 JWT / OAuth | `auth.py` | 三个 `require_*` 函数是全部入口;`main.py` 里只有 `*_guard` 代理 | | 改认证为 JWT / OAuth | `auth.py` | 三个 `require_*` 函数是全部入口;`main.py` 里只有 `*_guard` 代理 |
| 增加限流(按 api_key 配额) | `concurrency.py``PerKeyGuard``main.py``chat_guard.try_acquire()` 后再来一层 | 注意 ticket 释放顺序(内层先释放) | | 增加限流(按 api_key 配额) | `concurrency.py``PerKeyGuard``main.py``chat_guard.try_acquire()` 后再来一层 | 注意 ticket 释放顺序(内层先释放) |
@@ -627,7 +628,7 @@ uvicorn app.main:app --reload --port 8317
| 标签 | 描述 | 影响 | 计划 | | 标签 | 描述 | 影响 | 计划 |
|---|---|---|---| |---|---|---|---|
| D1 | `config.py` 还是纯 `dataclass` + `os.getenv`,未迁 `pydantic-settings` | 类型校验靠自己 cast | 低优,收益有限,有精力再做 | | D1 | `config.py` 还是纯 `dataclass` + `os.getenv`,未迁 `pydantic-settings` | 类型校验靠自己 cast | 低优,收益有限,有精力再做 |
| D3 | 无单元测试骨架 | 重构要靠 deploy 验证 | 想加 CI 时优先补 | | D3 | 已有基础单测覆盖 tool-call bridgeOpenAI/Anthropicstream + non-stream但整体测试矩阵仍不完整 | 回归仍依赖手工验证与定向测试 | 后续补充会话复用、背压、鉴权和异常路径用例 |
| Docker non-root | 容器还是 root 跑 | 容器逃逸时影响宿主 | 需要加 `gosu` + chown entrypoint涉及数据迁移谨慎推进 | | Docker non-root | 容器还是 root 跑 | 容器逃逸时影响宿主 | 需要加 `gosu` + chown entrypoint涉及数据迁移谨慎推进 |
| ADMIN_TOKEN 轮换 | 没有过期机制,只能重启 | 自用场景不影响 | 接 Vault / sops 时一并做 | | ADMIN_TOKEN 轮换 | 没有过期机制,只能重启 | 自用场景不影响 | 接 Vault / sops 时一并做 |
| Lingma 版本漂移 | 新版 Lingma 改 LSP 方法或新增必需 cache 文件时会无声崩 | 注入失败会 fallback但 chat 不回话题型的错误不易定位 | 加一个 `/internal/smoke` 端点做端到端自检 | | Lingma 版本漂移 | 新版 Lingma 改 LSP 方法或新增必需 cache 文件时会无声崩 | 注入失败会 fallback但 chat 不回话题型的错误不易定位 | 加一个 `/internal/smoke` 端点做端到端自检 |
@@ -707,6 +708,45 @@ uvicorn app.main:app --reload --port 8317
| → | `chat/ask` (notify!) | 见 `_build_payload` | 不回 result通过 server push 下推 | | → | `chat/ask` (notify!) | 见 `_build_payload` | 不回 result通过 server push 下推 |
| ← | `chat/answer` | `{requestId, text, content}` | 流式 token | | ← | `chat/answer` | `{requestId, text, content}` | 流式 token |
| ← | `chat/finish` | `{requestId, sessionId, ...其它元数据}` | 结束信号,含上游真实 sessionId | | ← | `chat/finish` | `{requestId, sessionId, ...其它元数据}` | 结束信号,含上游真实 sessionId |
| ← | `tool/call/sync` | `{requestId?, toolCallId, toolCallStatus, parameters, results?}` | 工具状态与结果回流 |
| ← | `tool/invoke` | `{requestId?, toolCallId, ...}` | 工具调用中间事件(兼容旧链路) |
| ← | `tool/call/approve` | `{requestId?, toolCallId, approval, ...}` | 工具审批事件 |
| ← | `tool/invokeResult` | `{requestId?, toolCallId, name, success, errorMessage, result}` | 工具执行结果事件 |
### 9.1 Tool call 监控 SOPVSCode 真实环境)
目标:拿到 Lingma 扩展真实 method/字段,避免猜测协议。
1. 确认入口文件
- `~/.vscode/extensions/alibaba-cloud.tongyi-lingma-*/package.json`
-`main`(当前是 `dist/extension.js`
2. 在发送侧打点
-`sendRequest` / `sendNotification` 处记录 method 与参数 keys
- 优先写文件,不依赖 console
3. 在入站 `tool/call/sync` handler 打点
- 记录 `toolCallId``toolCallStatus`、是否包含 `results`
4. 用真实交互触发
- VSCode 内发起会话并触发工具
- 点击 Accept/Reject观察事件闭环
5. 验证闭环
- `tool/call/sync(pending|processing)`
- `tool/call/approve`
- `tool/invokeResult`
- `tool/call/sync(results)`
6. 回滚
- 用备份文件恢复 `dist/extension.js`
- 避免长期携带探针到日常环境
**建议日志位置**
- `~/.lingma/vscode/sharedClientCache/logs/lingma-probe.log`
- `~/.lingma/vscode/sharedClientCache/logs/lingma-extension.log`
**注意**:优先使用 VSCode不混用 Cursor 扩展环境;`pipe` 连接模式下,扩展层探针最稳定。
**`chat/ask` payload 关键字段** **`chat/ask` payload 关键字段**

View File

@@ -221,7 +221,8 @@ curl -N http://127.0.0.1:8317/v1/messages \
说明: 说明:
- **模型名兼容**:客户端可以继续传 `claude-3-*` 等名字;未识别的 model 会回退到 `DEFAULT_MODEL` 对应的 Lingma key后端实际仍由 Lingma 提供Qwen 系列)。如需显式选模型,直接传 Lingma key`dashscope_qmodel` 等)。 - **模型名兼容**:客户端可以继续传 `claude-3-*` 等名字;未识别的 model 会回退到 `DEFAULT_MODEL` 对应的 Lingma key后端实际仍由 Lingma 提供Qwen 系列)。如需显式选模型,直接传 Lingma key`dashscope_qmodel` 等)。
- **会话复用共享**Anthropic 与 OpenAI 两个端点共用同一 `SessionCache`,只要 API key 相同、对话前缀相同,就会命中同一上游 `sessionId` - **会话复用共享**Anthropic 与 OpenAI 两个端点共用同一 `SessionCache`,只要 API key 相同、对话前缀相同,就会命中同一上游 `sessionId`
- **多模态**`image` 块会被降级为 `[image]` 占位符Lingma 不支持 vision`tool_use` / `tool_result` 会以纯文本形式保留语义 - **多模态**`image` 块会被降级为 `[image]` 占位符Lingma 不支持 vision
- **工具事件桥接**:当 Lingma 上游返回 `tool` 事件时,网关会输出为 OpenAI `tool_calls`(含 stream/non-stream和 Anthropic `tool_use`/`tool_result` blocks含 stream/non-stream请求侧 `tools`/`tool_choice``TOOL_FORWARD_ENABLED=true` 时会透传到 Lingma默认关闭
- **鉴权**:优先 `x-api-key`Anthropic 官方 SDK 默认),回退 `Authorization: Bearer`(方便 curl / OpenAI 风格客户端)。 - **鉴权**:优先 `x-api-key`Anthropic 官方 SDK 默认),回退 `Authorization: Bearer`(方便 curl / OpenAI 风格客户端)。
### 3.2 观测(`METRICS_TOKEN` 或 `API_KEYS` ### 3.2 观测(`METRICS_TOKEN` 或 `API_KEYS`

View File

@@ -44,6 +44,7 @@ class Settings:
session_reuse_enabled: bool = True session_reuse_enabled: bool = True
session_cache_max_entries: int = 256 session_cache_max_entries: int = 256
session_cache_ttl_sec: float = 1800.0 session_cache_ttl_sec: float = 1800.0
tool_forward_enabled: bool = False
def _bool_env(name: str, default: bool) -> bool: def _bool_env(name: str, default: bool) -> bool:
@@ -175,4 +176,5 @@ def load_settings() -> Settings:
session_reuse_enabled=_bool_env("SESSION_REUSE_ENABLED", True), session_reuse_enabled=_bool_env("SESSION_REUSE_ENABLED", True),
session_cache_max_entries=int(os.getenv("SESSION_CACHE_MAX_ENTRIES", "256")), session_cache_max_entries=int(os.getenv("SESSION_CACHE_MAX_ENTRIES", "256")),
session_cache_ttl_sec=float(os.getenv("SESSION_CACHE_TTL_SEC", "1800")), session_cache_ttl_sec=float(os.getenv("SESSION_CACHE_TTL_SEC", "1800")),
tool_forward_enabled=_bool_env("TOOL_FORWARD_ENABLED", False),
) )

View File

@@ -9,7 +9,7 @@ import subprocess
import time import time
import uuid import uuid
from pathlib import Path from pathlib import Path
from typing import AsyncIterator, Callable, Optional from typing import Any, AsyncIterator, Callable, Optional
import websockets import websockets
@@ -100,9 +100,90 @@ class LspWsRpcClient:
self._reader_task: asyncio.Task | None = None self._reader_task: asyncio.Task | None = None
self._rx_buffer = b"" self._rx_buffer = b""
self._chat_streams: dict[str, dict] = {} self._chat_streams: dict[str, dict] = {}
self._tool_stream_map: dict[str, str] = {}
self._tool_roundtrip_done: set[str] = set()
self._on_disconnect = on_disconnect self._on_disconnect = on_disconnect
self._closed = False self._closed = False
@staticmethod
def _extract_tool_event(params: dict[str, Any]) -> dict[str, Any] | None:
candidates: list[dict[str, Any]] = []
def add_candidate(obj: Any) -> None:
if isinstance(obj, dict):
candidates.append(obj)
add_candidate(params.get("toolCall"))
add_candidate(params.get("tool_call"))
add_candidate(params.get("tool"))
data = params.get("data")
if isinstance(data, dict):
add_candidate(data.get("toolCall"))
add_candidate(data.get("tool_call"))
add_candidate(data.get("tool"))
results = params.get("results")
if isinstance(results, list):
for item in results:
add_candidate(item)
if not candidates:
fallback_id = params.get("toolCallId") or params.get("tool_call_id")
if not fallback_id:
return None
return {
"id": str(fallback_id),
"name": str(params.get("name") or "tool"),
"input": params.get("parameters") or {},
"result": params.get("result"),
}
raw = candidates[0]
tool_id = (
raw.get("toolCallId")
or raw.get("tool_call_id")
or raw.get("id")
or params.get("toolCallId")
or params.get("tool_call_id")
)
name = (
raw.get("name")
or raw.get("toolName")
or raw.get("tool_name")
or params.get("name")
)
call_input = raw.get("input")
if call_input is None:
call_input = raw.get("arguments")
if call_input is None:
call_input = raw.get("args")
if call_input is None:
call_input = raw.get("parameters")
if call_input is None:
call_input = params.get("parameters")
result_payload = raw.get("result")
if result_payload is None:
result_payload = params.get("result")
if result_payload is None and isinstance(data, dict):
result_payload = data.get("result")
if result_payload is None and isinstance(raw.get("results"), list):
result_payload = raw.get("results")
if not tool_id:
return None
event: dict[str, Any] = {
"id": str(tool_id),
"name": str(name or "tool"),
"input": call_input if call_input is not None else {},
}
if result_payload is not None:
event["result"] = result_payload
return event
async def start(self): async def start(self):
self._reader_task = asyncio.create_task(self._reader_loop()) self._reader_task = asyncio.create_task(self._reader_loop())
@@ -123,6 +204,8 @@ class LspWsRpcClient:
stream["done"].set() stream["done"].set()
stream["chunks"].put_nowait(None) stream["chunks"].put_nowait(None)
self._chat_streams.clear() self._chat_streams.clear()
self._tool_stream_map.clear()
self._tool_roundtrip_done.clear()
async def _send(self, payload: dict): async def _send(self, payload: dict):
async with self._send_lock: async with self._send_lock:
@@ -172,6 +255,141 @@ class LspWsRpcClient:
except Exception: except Exception:
logger.exception("on_disconnect callback failed") logger.exception("on_disconnect callback failed")
@staticmethod
def _normalize_tool_id(method: str, params: dict[str, Any], tool_event: dict[str, Any] | None) -> str | None:
event_id = None
if isinstance(tool_event, dict):
event_id = tool_event.get("id")
if isinstance(event_id, str) and event_id.strip():
return event_id.strip()
fallback_id = params.get("toolCallId") or params.get("tool_call_id")
if isinstance(fallback_id, str) and fallback_id.strip():
return fallback_id.strip()
req_id = params.get("requestId")
name = None
if isinstance(tool_event, dict):
name = tool_event.get("name")
if not name:
name = params.get("name")
if isinstance(req_id, str) and req_id.strip() and isinstance(name, str) and name.strip():
return f"{req_id.strip()}:tool:{name.strip()}"
if isinstance(req_id, str) and req_id.strip():
return f"{req_id.strip()}:tool"
return None
@staticmethod
def _merge_tool_event(existing: dict[str, Any] | None, incoming: dict[str, Any]) -> tuple[dict[str, Any], bool]:
merged = dict(existing or {})
changed = False
val = incoming.get("id")
if val and merged.get("id") != val:
merged["id"] = val
changed = True
name = incoming.get("name")
if name:
existing_name = merged.get("name")
if not existing_name:
merged["name"] = name
changed = True
else:
existing_norm = str(existing_name).strip().lower()
incoming_norm = str(name).strip().lower()
if existing_norm == "tool" and incoming_norm != "tool":
merged["name"] = name
changed = True
elif existing_norm != "tool" and incoming_norm == "tool":
pass
elif merged.get("name") != name:
merged["name"] = name
changed = True
if "input" in incoming and incoming.get("input") is not None:
incoming_input = incoming.get("input")
should_update_input = incoming_input != {} or "input" not in merged
if should_update_input and merged.get("input") != incoming_input:
merged["input"] = incoming_input
changed = True
if "result" in incoming and incoming.get("result") is not None:
if merged.get("result") != incoming.get("result"):
merged["result"] = incoming.get("result")
changed = True
return merged, changed
@staticmethod
def _is_tool_roundtrip_method(method: str | None) -> bool:
return method in {"tool/call/sync", "tool/invoke"}
@staticmethod
def _build_tool_approve_params(params: dict[str, Any], tool_id: str) -> dict[str, Any] | None:
req_id = params.get("requestId")
session_id = params.get("sessionId")
if not isinstance(req_id, str) or not req_id.strip():
return None
if not isinstance(session_id, str) or not session_id.strip():
return None
return {
"type": "tool_call",
"sessionId": session_id,
"requestId": req_id,
"toolCallId": tool_id,
"approval": True,
}
@staticmethod
def _build_tool_invoke_result_params(params: dict[str, Any], tool_event: dict[str, Any], tool_id: str) -> dict[str, Any]:
return {
"toolCallId": tool_id,
"name": str(tool_event.get("name") or params.get("name") or "tool"),
"success": True,
"errorMessage": "",
"result": tool_event.get("result") if "result" in tool_event else {},
}
async def _maybe_emit_tool_roundtrip(self, method: str, params: dict[str, Any], tool_event: dict[str, Any]) -> None:
if not self._is_tool_roundtrip_method(method):
return
tool_id = self._normalize_tool_id(method, params, tool_event)
if not tool_id:
return
if tool_id in self._tool_roundtrip_done:
return
approve_params = self._build_tool_approve_params(params, tool_id)
if approve_params is None:
return
self._tool_roundtrip_done.add(tool_id)
await self.notify("tool/call/approve", approve_params)
invoke_result_params = self._build_tool_invoke_result_params(params, tool_event, tool_id)
await self.notify("tool/invokeResult", invoke_result_params)
def _resolve_tool_stream(self, method: str, params: dict[str, Any], tool_event: dict[str, Any] | None) -> dict | None:
req_id = params.get("requestId")
if isinstance(req_id, str) and req_id.strip():
stream = self._chat_streams.get(req_id)
if stream is not None and tool_event is not None:
tool_id = self._normalize_tool_id(method, params, tool_event)
if tool_id:
self._tool_stream_map[tool_id] = req_id
return stream
if tool_event is not None:
tool_id = self._normalize_tool_id(method, params, tool_event)
if tool_id:
mapped_req = self._tool_stream_map.get(tool_id)
if mapped_req:
return self._chat_streams.get(mapped_req)
return None
async def _handle_server_message(self, msg: dict): async def _handle_server_message(self, msg: dict):
method = msg.get("method") method = msg.get("method")
params = msg.get("params") or {} params = msg.get("params") or {}
@@ -185,7 +403,34 @@ class LspWsRpcClient:
stream["parts"].append(text) stream["parts"].append(text)
if stream["first_chunk_at"] is None: if stream["first_chunk_at"] is None:
stream["first_chunk_at"] = time.monotonic() stream["first_chunk_at"] = time.monotonic()
stream["chunks"].put_nowait(text) stream["chunks"].put_nowait({"type": "text", "text": text})
if method in {"tool/call/sync", "tool/invoke", "tool/call/approve", "tool/invokeResult"}:
tool_event = self._extract_tool_event(params)
stream = self._resolve_tool_stream(method, params, tool_event)
if stream is not None and tool_event is not None:
tool_id = self._normalize_tool_id(method, params, tool_event)
if not tool_id:
logger.warning("drop unroutable tool event: method=%s missing tool id", method)
else:
await self._maybe_emit_tool_roundtrip(method, params, tool_event)
tool_states = stream["tool_states"]
order = stream["tool_order"]
existing = tool_states.get(tool_id)
merged, changed = self._merge_tool_event(existing, tool_event)
if not existing:
if "id" not in merged or not merged.get("id"):
merged["id"] = tool_id
tool_states[tool_id] = merged
order.append(tool_id)
stream["chunks"].put_nowait({"type": "tool", "tool": merged})
elif changed:
tool_states[tool_id] = merged
stream["chunks"].put_nowait({"type": "tool", "tool": merged})
elif tool_event is not None:
logger.warning("drop unroutable tool event: method=%s requestId=%s", method, params.get("requestId"))
if method == "chat/finish": if method == "chat/finish":
req_id = params.get("requestId") req_id = params.get("requestId")
@@ -224,6 +469,8 @@ class LspWsRpcClient:
"chunks": asyncio.Queue(), "chunks": asyncio.Queue(),
"done": asyncio.Event(), "done": asyncio.Event(),
"finish": None, "finish": None,
"tool_states": {},
"tool_order": [],
"started_at": time.monotonic(), "started_at": time.monotonic(),
"first_chunk_at": None, "first_chunk_at": None,
"finish_at": None, "finish_at": None,
@@ -233,13 +480,17 @@ class LspWsRpcClient:
stream = self._chat_streams.pop(request_id, None) stream = self._chat_streams.pop(request_id, None)
if stream is None: if stream is None:
return return
for tool_id, mapped_req in list(self._tool_stream_map.items()):
if mapped_req == request_id:
self._tool_stream_map.pop(tool_id, None)
self._tool_roundtrip_done.discard(tool_id)
# Drain queue so no stray future gets stuck if the consumer bailed early. # Drain queue so no stray future gets stuck if the consumer bailed early.
if not stream["done"].is_set(): if not stream["done"].is_set():
stream["done"].set() stream["done"].set()
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
stream["chunks"].put_nowait(None) stream["chunks"].put_nowait(None)
async def consume_stream(self, request_id: str, timeout: float) -> AsyncIterator[str]: async def consume_stream(self, request_id: str, timeout: float) -> AsyncIterator[dict[str, Any]]:
stream = self._chat_streams.get(request_id) stream = self._chat_streams.get(request_id)
if stream is None: if stream is None:
return return
@@ -261,11 +512,20 @@ class LspWsRpcClient:
first_ms = int((stream["first_chunk_at"] - stream["started_at"]) * 1000) first_ms = int((stream["first_chunk_at"] - stream["started_at"]) * 1000)
if stream.get("finish_at") is not None: if stream.get("finish_at") is not None:
total_ms = int((stream["finish_at"] - stream["started_at"]) * 1000) total_ms = int((stream["finish_at"] - stream["started_at"]) * 1000)
ordered_tool_events: list[dict[str, Any]] = []
tool_states = stream.get("tool_states") or {}
for tool_id in stream.get("tool_order") or []:
event = tool_states.get(tool_id)
if isinstance(event, dict):
ordered_tool_events.append(event)
return { return {
"text": "".join(stream.get("parts") or []), "text": "".join(stream.get("parts") or []),
"finish": stream.get("finish") or {}, "finish": stream.get("finish") or {},
"firstTokenLatencyMs": first_ms, "firstTokenLatencyMs": first_ms,
"totalLatencyMs": total_ms, "totalLatencyMs": total_ms,
"toolEvents": ordered_tool_events,
} }
@@ -634,13 +894,14 @@ class LingmaGatewayClient:
request_id: str, request_id: str,
*, *,
is_reply: bool = False, is_reply: bool = False,
tool_config: dict[str, Any] | None = None,
): ):
session_type = "developer" if ask_mode == "agent" else "chat" session_type = "ask" if ask_mode == "agent" else "chat"
return { payload = {
"requestId": request_id, "requestId": request_id,
"sessionId": session_id, "sessionId": session_id,
"sessionType": session_type, "sessionType": session_type,
"chatTask": "FREE_INPUT", "chatTask": "chat" if ask_mode == "agent" else "FREE_INPUT",
"mode": ask_mode, "mode": ask_mode,
"stream": True, "stream": True,
"source": 1, "source": 1,
@@ -665,6 +926,9 @@ class LingmaGatewayClient:
"localeLang": "zh-CN", "localeLang": "zh-CN",
}, },
} }
if tool_config is not None:
payload["toolConfig"] = tool_config
return payload
async def _kick_chat_ask(self, payload: dict) -> None: async def _kick_chat_ask(self, payload: dict) -> None:
"""Fire chat/ask as a notification. """Fire chat/ask as a notification.
@@ -685,12 +949,19 @@ class LingmaGatewayClient:
*, *,
session_id: str | None = None, session_id: str | None = None,
is_reply: bool = False, is_reply: bool = False,
tool_config: dict[str, Any] | None = None,
) -> dict: ) -> dict:
await self.ensure_ready() await self.ensure_ready()
request_id = str(uuid.uuid4()) request_id = str(uuid.uuid4())
sid = session_id or str(uuid.uuid4()) sid = session_id or str(uuid.uuid4())
payload = self._build_payload( payload = self._build_payload(
prompt, model_key, ask_mode, sid, request_id, is_reply=is_reply prompt,
model_key,
ask_mode,
sid,
request_id,
is_reply=is_reply,
tool_config=tool_config,
) )
self.rpc.create_stream(request_id) self.rpc.create_stream(request_id)
try: try:
@@ -721,9 +992,14 @@ class LingmaGatewayClient:
*, *,
session_id: str | None = None, session_id: str | None = None,
is_reply: bool = False, is_reply: bool = False,
tool_config: dict[str, Any] | None = None,
out_meta: dict | None = None, out_meta: dict | None = None,
) -> AsyncIterator[str]: ) -> AsyncIterator[dict[str, Any]]:
"""Stream `chat/answer` chunks. """Stream chat events.
Yields structured events:
* {"type": "text", "text": "..."}
* {"type": "tool", "tool": {...}}
If `out_meta` is provided, the final `chat/finish` payload's sessionId If `out_meta` is provided, the final `chat/finish` payload's sessionId
(and the raw finish dict) is written into it when the stream ends or is (and the raw finish dict) is written into it when the stream ends or is
@@ -734,15 +1010,21 @@ class LingmaGatewayClient:
request_id = str(uuid.uuid4()) request_id = str(uuid.uuid4())
sid = session_id or str(uuid.uuid4()) sid = session_id or str(uuid.uuid4())
payload = self._build_payload( payload = self._build_payload(
prompt, model_key, ask_mode, sid, request_id, is_reply=is_reply prompt,
model_key,
ask_mode,
sid,
request_id,
is_reply=is_reply,
tool_config=tool_config,
) )
self.rpc.create_stream(request_id) self.rpc.create_stream(request_id)
try: try:
await self._kick_chat_ask(payload) await self._kick_chat_ask(payload)
async for chunk in self.rpc.consume_stream( async for event in self.rpc.consume_stream(
request_id, timeout=max(60.0, self.rpc_timeout + 60.0) request_id, timeout=max(60.0, self.rpc_timeout + 60.0)
): ):
yield chunk yield event
finally: finally:
# Runs on normal completion, exception, or consumer GeneratorExit (client disconnect). # Runs on normal completion, exception, or consumer GeneratorExit (client disconnect).
if out_meta is not None: if out_meta is not None:
@@ -753,6 +1035,7 @@ class LingmaGatewayClient:
out_meta["finish"] = finish out_meta["finish"] = finish
out_meta["request_id"] = request_id out_meta["request_id"] = request_id
out_meta["chars"] = len(stream_result.get("text") or "") out_meta["chars"] = len(stream_result.get("text") or "")
out_meta["tool_events"] = stream_result.get("toolEvents") or []
except Exception: except Exception:
pass pass
self.rpc.pop_stream(request_id) self.rpc.pop_stream(request_id)

View File

@@ -6,6 +6,7 @@ import json
import time import time
import uuid import uuid
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any
from fastapi import Depends, FastAPI, HTTPException, Request from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import JSONResponse, StreamingResponse
@@ -350,6 +351,233 @@ def _include_usage(stream_options: dict | None) -> bool:
return bool(stream_options.get("include_usage")) return bool(stream_options.get("include_usage"))
def _openai_tool_config(req: ChatCompletionsRequest) -> dict[str, Any] | None:
if not settings.tool_forward_enabled:
return None
has_tools = isinstance(req.tools, list) and len(req.tools) > 0
has_choice = req.tool_choice is not None
if not has_tools and not has_choice:
return None
return {
"provider": "openai",
"tools": req.tools or [],
"tool_choice": req.tool_choice,
}
def _anthropic_tool_config(req: AnthropicMessagesRequest) -> dict[str, Any] | None:
if not settings.tool_forward_enabled:
return None
has_tools = isinstance(req.tools, list) and len(req.tools) > 0
has_choice = req.tool_choice is not None
if not has_tools and not has_choice:
return None
return {
"provider": "anthropic",
"tools": req.tools or [],
"tool_choice": req.tool_choice,
}
def _openai_has_tooling_context(req: ChatCompletionsRequest, messages: list[dict[str, Any]]) -> bool:
if isinstance(req.tools, list) and len(req.tools) > 0:
return True
if req.tool_choice is not None:
return True
for m in messages:
role = m.get("role")
if role == "tool":
return True
if role == "assistant" and m.get("tool_calls"):
return True
return False
def _anthropic_content_has_tool_blocks(content: Any) -> bool:
if not isinstance(content, list):
return False
for item in content:
if isinstance(item, dict) and item.get("type") in {"tool_use", "tool_result"}:
return True
return False
def _anthropic_has_tooling_context(req: AnthropicMessagesRequest) -> bool:
if isinstance(req.tools, list) and len(req.tools) > 0:
return True
if req.tool_choice is not None:
return True
if _anthropic_content_has_tool_blocks(req.system):
return True
for m in req.messages:
if _anthropic_content_has_tool_blocks(m.content):
return True
return False
def _stream_event_type(event: Any) -> str:
if isinstance(event, dict):
t = event.get("type")
if t in {"text", "tool"}:
return t
return "text"
def _stream_text(event: Any) -> str:
if isinstance(event, dict):
if event.get("type") == "text":
text = event.get("text")
if isinstance(text, str):
return text
return ""
if isinstance(event, str):
return event
return ""
def _stream_tool_event(event: Any) -> dict[str, Any] | None:
if isinstance(event, dict) and event.get("type") == "tool":
tool = event.get("tool")
if isinstance(tool, dict):
return tool
return None
def _json_string(value: Any) -> str:
if isinstance(value, str):
return value
try:
return json.dumps(value if value is not None else {}, ensure_ascii=False)
except Exception:
return "{}"
def _openai_forced_tool_name(tool_choice: Any) -> str | None:
if not isinstance(tool_choice, dict):
return None
fn = tool_choice.get("function")
if isinstance(fn, dict):
name = fn.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _anthropic_forced_tool_name(tool_choice: Any) -> str | None:
if not isinstance(tool_choice, dict):
return None
if tool_choice.get("type") == "tool":
name = tool_choice.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
fn = tool_choice.get("function")
if isinstance(fn, dict):
name = fn.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _json_object_from_text(text: str) -> dict[str, Any] | None:
raw = text.strip()
if not raw:
return None
if raw.startswith("```") and raw.endswith("```"):
raw = raw[3:-3].strip()
if raw.lower().startswith("json"):
raw = raw[4:].strip()
try:
parsed = json.loads(raw)
except Exception:
return None
return parsed if isinstance(parsed, dict) else None
def _forced_tool_event_from_text(text: str, forced_tool_name: str) -> dict[str, Any] | None:
parsed = _json_object_from_text(text)
if parsed is None:
return None
explicit_name: Any = parsed.get("name") or parsed.get("tool")
fn = parsed.get("function")
if explicit_name is None and isinstance(fn, dict):
explicit_name = fn.get("name")
if explicit_name is not None and str(explicit_name) != forced_tool_name:
return None
tool_input: Any = None
if "input" in parsed:
tool_input = parsed.get("input")
elif "arguments" in parsed:
args = parsed.get("arguments")
if isinstance(args, str):
try:
tool_input = json.loads(args)
except Exception:
return None
else:
tool_input = args
elif isinstance(fn, dict) and "arguments" in fn:
args = fn.get("arguments")
if isinstance(args, str):
try:
tool_input = json.loads(args)
except Exception:
return None
else:
tool_input = args
else:
reserved = {"name", "tool", "function", "arguments", "input", "result"}
tool_input = {k: v for k, v in parsed.items() if k not in reserved}
event: dict[str, Any] = {
"name": forced_tool_name,
"input": tool_input if tool_input is not None else {},
}
if "result" in parsed:
event["result"] = parsed.get("result")
return event
def _openai_tool_call(tool: dict[str, Any], *, forced_id: str | None = None) -> dict[str, Any]:
return {
"id": str(tool.get("id") or forced_id or f"call_{uuid.uuid4().hex}"),
"type": "function",
"function": {
"name": str(tool.get("name") or "tool"),
"arguments": _json_string(tool.get("input")),
},
}
def _anthropic_tool_use_block(
tool: dict[str, Any], *, forced_id: str | None = None
) -> dict[str, Any]:
return {
"type": "tool_use",
"id": str(tool.get("id") or forced_id or f"toolu_{uuid.uuid4().hex}"),
"name": str(tool.get("name") or "tool"),
"input": tool.get("input") if tool.get("input") is not None else {},
}
def _anthropic_tool_result_block(
tool: dict[str, Any], *, forced_id: str | None = None
) -> dict[str, Any] | None:
if "result" not in tool:
return None
result = tool.get("result")
if isinstance(result, str):
content: Any = result
else:
content = _json_string(result)
return {
"type": "tool_result",
"tool_use_id": str(tool.get("id") or forced_id or ""),
"content": content,
}
@app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)]) @app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)])
async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
p = _require_pool() p = _require_pool()
@@ -363,22 +591,26 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
# 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache. # 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache.
# 2. Send only the new user message instead of the whole history. # 2. Send only the new user message instead of the whole history.
# 3. Stick the request to the pool instance that originally served it. # 3. Stick the request to the pool instance that originally served it.
tool_config = _openai_tool_config(req)
has_tooling_context = _openai_has_tooling_context(req, messages_dump)
ask_mode = settings.default_ask_mode ask_mode = settings.default_ask_mode
if req.model.lower() in {"lingma-agent", "agent"}: if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
ask_mode = "agent" ask_mode = "agent"
reuse_eligible = ( reuse_eligible = (
session_cache.enabled session_cache.enabled
and ask_mode == "chat" and ask_mode == "chat"
and len(messages_dump) >= 2 and len(messages_dump) >= 2
and not has_tooling_context
) )
lookup_key: str | None = None lookup_key: str | None = None
write_key: str | None = None write_key: str | None = None
cached_session_id: str | None = None cached_session_id: str | None = None
cached_instance_name: str | None = None cached_instance_name: str | None = None
if reuse_eligible: if reuse_eligible:
lookup_key = session_cache.build_key(api_key, messages_dump[:-1]) lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
write_key = session_cache.build_key(api_key, messages_dump) write_key = session_cache.build_key(api_key, messages_dump, tool_config=tool_config)
entry = await session_cache.get(lookup_key) entry = await session_cache.get(lookup_key)
if entry is not None: if entry is not None:
cached_session_id = entry.session_id cached_session_id = entry.session_id
@@ -476,6 +708,8 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
success = False success = False
tool_call_indexes: dict[str, int] = {}
saw_tool_call = False
try: try:
async for chunk in _inst.client.chat_stream( async for chunk in _inst.client.chat_stream(
prompt, prompt,
@@ -483,9 +717,48 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
ask_mode, ask_mode,
session_id=cached_session_id, session_id=cached_session_id,
is_reply=is_reply, is_reply=is_reply,
tool_config=tool_config,
out_meta=_meta, out_meta=_meta,
): ):
completion_tokens_holder["n"] += estimate_tokens(chunk) if _stream_event_type(chunk) == "tool":
tool = _stream_tool_event(chunk)
if not tool:
continue
tool_id = str(tool.get("id") or "")
if not tool_id:
tool_id = f"call_{len(tool_call_indexes)}"
idx = tool_call_indexes.get(tool_id)
if idx is None:
idx = len(tool_call_indexes)
tool_call_indexes[tool_id] = idx
saw_tool_call = True
payload = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {
"tool_calls": [
{
"index": idx,
**_openai_tool_call(tool, forced_id=tool_id),
}
]
},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
continue
text = _stream_text(chunk)
if not text:
continue
completion_tokens_holder["n"] += estimate_tokens(text)
payload = { payload = {
"id": completion_id, "id": completion_id,
"object": "chat.completion.chunk", "object": "chat.completion.chunk",
@@ -494,7 +767,7 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
"choices": [ "choices": [
{ {
"index": 0, "index": 0,
"delta": {"content": chunk}, "delta": {"content": text},
"finish_reason": None, "finish_reason": None,
} }
], ],
@@ -506,10 +779,17 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
"object": "chat.completion.chunk", "object": "chat.completion.chunk",
"created": created, "created": created,
"model": model, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], "choices": [
{
"index": 0,
"delta": {},
"finish_reason": "tool_calls" if saw_tool_call else "stop",
}
],
} }
yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n"
if include_usage: if include_usage:
usage_payload = { usage_payload = {
"id": completion_id, "id": completion_id,
@@ -567,6 +847,7 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
ask_mode, ask_mode,
session_id=cached_session_id, session_id=cached_session_id,
is_reply=is_reply, is_reply=is_reply,
tool_config=tool_config,
) )
except Exception as exc: except Exception as exc:
logger.warning("chat.complete error (inst=%s): %s", inst.name, exc) logger.warning("chat.complete error (inst=%s): %s", inst.name, exc)
@@ -596,6 +877,24 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
sid = result.get("sessionId") sid = result.get("sessionId")
if sid: if sid:
await session_cache.put(write_key, sid, inst.name) await session_cache.put(write_key, sid, inst.name)
tool_events = result.get("toolEvents") or []
message_content = result.get("text") or ""
tool_calls: list[dict[str, Any]] = []
saw_tool_call = False
if isinstance(tool_events, list):
for idx, item in enumerate(tool_events):
if isinstance(item, dict):
tool_id = str(item.get("id") or f"call_{idx}")
tool_calls.append(_openai_tool_call(item, forced_id=tool_id))
saw_tool_call = True
if not saw_tool_call:
forced_tool_name = _openai_forced_tool_name(req.tool_choice)
if forced_tool_name:
fallback_event = _forced_tool_event_from_text(message_content, forced_tool_name)
if fallback_event is not None:
tool_calls.append(_openai_tool_call(fallback_event, forced_id="call_fallback_0"))
saw_tool_call = True
message_content = ""
response = ChatCompletionResponse( response = ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex}", id=f"chatcmpl-{uuid.uuid4().hex}",
created=int(time.time()), created=int(time.time()),
@@ -603,11 +902,17 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
choices=[ choices=[
ChatCompletionChoice( ChatCompletionChoice(
index=0, index=0,
finish_reason="stop", finish_reason="tool_calls" if saw_tool_call else "stop",
message={"role": "assistant", "content": result.get("text") or ""}, message={
"role": "assistant",
"content": message_content,
"tool_calls": tool_calls or None,
},
) )
], ],
) )
data = response.model_dump() data = response.model_dump()
data["latency"] = { data["latency"] = {
"first_token_ms": result.get("firstTokenLatencyMs"), "first_token_ms": result.get("firstTokenLatencyMs"),
@@ -634,13 +939,15 @@ def _anthropic_error(status_code: int, error_type: str, message: str) -> JSONRes
) )
def _anthropic_stop_reason(completion_tokens: int, max_tokens: int) -> str: def _anthropic_stop_reason(
"""Approximate Anthropic `stop_reason`. completion_tokens: int,
max_tokens: int,
Lingma doesn't expose a `max_tokens` knob, so we can't truly enforce it; *,
we report `max_tokens` only when the generated length meets or exceeds has_pending_tool_use: bool = False,
the caller's stated ceiling. Everything else is `end_turn`. ) -> str:
""" """Approximate Anthropic `stop_reason`."""
if has_pending_tool_use:
return "tool_use"
if max_tokens and completion_tokens >= max_tokens: if max_tokens and completion_tokens >= max_tokens:
return "max_tokens" return "max_tokens"
return "end_turn" return "end_turn"
@@ -700,19 +1007,23 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
) )
# ------------------------------------------------------------- session reuse # ------------------------------------------------------------- session reuse
# Anthropic clients don't expose an ask_mode, so we always run in "chat". tool_config = _anthropic_tool_config(req)
ask_mode = "chat" has_tooling_context = _anthropic_has_tooling_context(req)
ask_mode = settings.default_ask_mode
if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
ask_mode = "agent"
reuse_eligible = ( reuse_eligible = (
session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context
) )
lookup_key: str | None = None lookup_key: str | None = None
write_key: str | None = None write_key: str | None = None
cached_session_id: str | None = None cached_session_id: str | None = None
cached_instance_name: str | None = None cached_instance_name: str | None = None
if reuse_eligible: if reuse_eligible:
lookup_key = session_cache.build_key(api_key, messages_dump[:-1]) lookup_key = session_cache.build_key(api_key, messages_dump[:-1], tool_config=tool_config)
write_key = session_cache.build_key(api_key, messages_dump) write_key = session_cache.build_key(api_key, messages_dump, tool_config=tool_config)
entry = await session_cache.get(lookup_key) entry = await session_cache.get(lookup_key)
if entry is not None: if entry is not None:
cached_session_id = entry.session_id cached_session_id = entry.session_id
@@ -760,7 +1071,6 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
return _anthropic_error(400, "invalid_request_error", "messages is empty") return _anthropic_error(400, "invalid_request_error", "messages is empty")
prompt_tokens = estimate_tokens(prompt) prompt_tokens = estimate_tokens(prompt)
# ------------------------------------------------------------- backpressure # ------------------------------------------------------------- backpressure
try: try:
ticket = await chat_guard.try_acquire() ticket = await chat_guard.try_acquire()
@@ -810,6 +1120,9 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta): async def event_stream(_ticket=ticket, _inst=inst, _meta=stream_meta):
success = False success = False
block_index = 0
text_block_open = False
saw_pending_tool_use = False
try: try:
# 1) message_start — Anthropic SDKs read this first to get # 1) message_start — Anthropic SDKs read this first to get
# the message envelope (id/model/initial usage). # the message envelope (id/model/initial usage).
@@ -833,47 +1146,99 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
} }
yield _sse("message_start", start_payload) yield _sse("message_start", start_payload)
# 2) content_block_start for a single text block (index 0).
yield _sse(
"content_block_start",
{
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
},
)
# 3) content_block_delta stream of text tokens.
async for chunk in _inst.client.chat_stream( async for chunk in _inst.client.chat_stream(
prompt, prompt,
model, model,
ask_mode, ask_mode,
session_id=cached_session_id, session_id=cached_session_id,
is_reply=is_reply, is_reply=is_reply,
tool_config=tool_config,
out_meta=_meta, out_meta=_meta,
): ):
if not chunk: if _stream_event_type(chunk) == "tool":
if text_block_open:
yield _sse(
"content_block_stop",
{"type": "content_block_stop", "index": block_index},
)
block_index += 1
text_block_open = False
tool = _stream_tool_event(chunk)
if not tool:
continue
tool_id = str(tool.get("id") or f"toolu_stream_{block_index}")
tool_use_block = _anthropic_tool_use_block(tool, forced_id=tool_id)
yield _sse(
"content_block_start",
{
"type": "content_block_start",
"index": block_index,
"content_block": tool_use_block,
},
)
yield _sse(
"content_block_stop",
{"type": "content_block_stop", "index": block_index},
)
block_index += 1
tool_result_block = _anthropic_tool_result_block(tool, forced_id=tool_id)
if tool_result_block is not None:
yield _sse(
"content_block_start",
{
"type": "content_block_start",
"index": block_index,
"content_block": tool_result_block,
},
)
yield _sse(
"content_block_stop",
{"type": "content_block_stop", "index": block_index},
)
block_index += 1
else:
saw_pending_tool_use = True
continue continue
completion_tokens_holder["n"] += estimate_tokens(chunk)
text = _stream_text(chunk)
if not text:
continue
completion_tokens_holder["n"] += estimate_tokens(text)
if not text_block_open:
yield _sse(
"content_block_start",
{
"type": "content_block_start",
"index": block_index,
"content_block": {"type": "text", "text": ""},
},
)
text_block_open = True
yield _sse( yield _sse(
"content_block_delta", "content_block_delta",
{ {
"type": "content_block_delta", "type": "content_block_delta",
"index": 0, "index": block_index,
"delta": {"type": "text_delta", "text": chunk}, "delta": {"type": "text_delta", "text": text},
}, },
) )
# 4) content_block_stop closes the single text block. if text_block_open:
yield _sse( yield _sse(
"content_block_stop", "content_block_stop",
{"type": "content_block_stop", "index": 0}, {"type": "content_block_stop", "index": block_index},
) )
# 5) message_delta carries the terminal stop_reason and # 5) message_delta carries the terminal stop_reason and
# the final cumulative output_tokens count. # the final cumulative output_tokens count.
stop_reason = _anthropic_stop_reason( stop_reason = _anthropic_stop_reason(
completion_tokens_holder["n"], max_tokens completion_tokens_holder["n"],
max_tokens,
has_pending_tool_use=saw_pending_tool_use,
) )
yield _sse( yield _sse(
"message_delta", "message_delta",
@@ -887,6 +1252,7 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
}, },
) )
# 6) message_stop — terminal event, no [DONE] sentinel. # 6) message_stop — terminal event, no [DONE] sentinel.
yield _sse("message_stop", {"type": "message_stop"}) yield _sse("message_stop", {"type": "message_stop"})
success = True success = True
@@ -946,6 +1312,7 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
ask_mode, ask_mode,
session_id=cached_session_id, session_id=cached_session_id,
is_reply=is_reply, is_reply=is_reply,
tool_config=tool_config,
) )
except Exception as exc: except Exception as exc:
logger.warning("anthropic.complete error (inst=%s): %s", inst.name, exc) logger.warning("anthropic.complete error (inst=%s): %s", inst.name, exc)
@@ -972,13 +1339,50 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
if sid: if sid:
await session_cache.put(write_key, sid, inst.name) await session_cache.put(write_key, sid, inst.name)
content_blocks: list[dict[str, Any]] = []
if text:
content_blocks.append({"type": "text", "text": text})
tool_events = result.get("toolEvents") or []
saw_pending_tool_use = False
saw_tool_event = False
if isinstance(tool_events, list):
for idx, item in enumerate(tool_events):
if not isinstance(item, dict):
continue
saw_tool_event = True
tool_id = str(item.get("id") or f"toolu_nonstream_{idx}")
content_blocks.append(_anthropic_tool_use_block(item, forced_id=tool_id))
tool_result = _anthropic_tool_result_block(item, forced_id=tool_id)
if tool_result is not None:
content_blocks.append(tool_result)
else:
saw_pending_tool_use = True
if not saw_tool_event:
forced_tool_name = _anthropic_forced_tool_name(req.tool_choice)
if forced_tool_name:
fallback_event = _forced_tool_event_from_text(text, forced_tool_name)
if fallback_event is not None:
content_blocks = []
tool_id = "toolu_fallback_0"
content_blocks.append(_anthropic_tool_use_block(fallback_event, forced_id=tool_id))
tool_result = _anthropic_tool_result_block(fallback_event, forced_id=tool_id)
saw_pending_tool_use = tool_result is None
if tool_result is not None:
content_blocks.append(tool_result)
response_body: dict = { response_body: dict = {
"id": message_id, "id": message_id,
"type": "message", "type": "message",
"role": "assistant", "role": "assistant",
"model": model, "model": model,
"content": [{"type": "text", "text": text}], "content": content_blocks,
"stop_reason": _anthropic_stop_reason(completion_tokens, req.max_tokens), "stop_reason": _anthropic_stop_reason(
completion_tokens,
req.max_tokens,
has_pending_tool_use=saw_pending_tool_use,
),
"stop_sequence": None, "stop_sequence": None,
"usage": { "usage": {
"input_tokens": prompt_tokens, "input_tokens": prompt_tokens,

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio import asyncio
import hashlib import hashlib
import json
import time import time
from collections import OrderedDict from collections import OrderedDict
from dataclasses import dataclass from dataclasses import dataclass
@@ -42,6 +43,16 @@ def hash_user_context(messages: list[dict]) -> str:
return h.hexdigest() return h.hexdigest()
def _tool_fingerprint(tool_config: dict | None) -> str:
if not isinstance(tool_config, dict):
return "-"
try:
canonical = json.dumps(tool_config, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
except Exception:
canonical = str(tool_config)
return hashlib.sha1(canonical.encode("utf-8")).hexdigest()[:16]
class SessionCache: class SessionCache:
"""LRU + TTL cache: conversation-prefix hash -> upstream Lingma sessionId. """LRU + TTL cache: conversation-prefix hash -> upstream Lingma sessionId.
@@ -79,11 +90,11 @@ class SessionCache:
def enabled(self) -> bool: def enabled(self) -> bool:
return self.max > 0 return self.max > 0
def build_key(self, api_key: str, messages: list[dict]) -> str: def build_key(self, api_key: str, messages: list[dict], *, tool_config: dict | None = None) -> str:
# API key scoping prevents cross-tenant session leakage even when # API key scoping prevents cross-tenant session leakage even when
# different clients happen to produce identical histories. # different clients happen to produce identical histories.
key_scope = hashlib.sha1((api_key or "-").encode("utf-8")).hexdigest()[:12] key_scope = hashlib.sha1((api_key or "-").encode("utf-8")).hexdigest()[:12]
return f"{key_scope}:{hash_user_context(messages)}" return f"{key_scope}:{hash_user_context(messages)}:{_tool_fingerprint(tool_config)}"
async def get(self, key: str) -> SessionEntry | None: async def get(self, key: str) -> SessionEntry | None:
if not self.enabled: if not self.enabled:

View File

@@ -0,0 +1,981 @@
from __future__ import annotations
import json
import sys
import types
import unittest
from unittest.mock import AsyncMock, patch
class _FakeSessionCache:
def __init__(self) -> None:
self.enabled = True
self.keys: list[str] = []
self.get_calls: list[str] = []
self.put_calls: list[tuple[str, str, str]] = []
self.invalidate_calls: list[str] = []
def build_key(self, api_key: str, messages: list[dict], *, tool_config=None) -> str:
marker = "with_tool" if tool_config is not None else "no_tool"
key = f"{api_key}:{len(messages)}:{marker}"
self.keys.append(key)
return key
async def get(self, key: str):
self.get_calls.append(key)
return None
async def put(self, key: str, session_id: str, instance_name: str = "") -> None:
self.put_calls.append((key, session_id, instance_name))
async def invalidate(self, key: str) -> None:
self.invalidate_calls.append(key)
# app.main imports playwright via auto_login; tests don't exercise that path.
# Inject a lightweight stub so unit tests run without installing playwright.
_playwright = types.ModuleType("playwright")
_playwright_async = types.ModuleType("playwright.async_api")
class _StubPlaywrightTimeoutError(Exception):
pass
async def _stub_async_playwright():
raise RuntimeError("playwright is stubbed in unit tests")
_playwright_async.TimeoutError = _StubPlaywrightTimeoutError
_playwright_async.async_playwright = _stub_async_playwright
sys.modules.setdefault("playwright", _playwright)
sys.modules.setdefault("playwright.async_api", _playwright_async)
from starlette.requests import Request
from app.anthropic_schema import AnthropicMessagesRequest
from app.openai_schema import ChatCompletionsRequest
import app.main as main
class _FakeTicket:
def __init__(self) -> None:
self.released = False
def release(self) -> None:
self.released = True
class _FakeGuard:
def __init__(self) -> None:
self.in_flight = 0
async def try_acquire(self) -> _FakeTicket:
return _FakeTicket()
class _FakeClient:
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
self._stream_events = stream_events
self._complete_result = complete_result
async def query_models(self) -> dict:
return {
"chat": [
{
"key": "org_auto",
"displayName": "Auto",
}
]
}
async def chat_complete(self, *args, **kwargs) -> dict:
return self._complete_result
async def chat_stream(self, *args, **kwargs):
out_meta = kwargs.get("out_meta")
if isinstance(out_meta, dict):
out_meta["session_id"] = "sess-stream"
for event in self._stream_events:
yield event
class _FakeInstance:
def __init__(self, client: _FakeClient) -> None:
self.name = "inst-test"
self.client = client
self.in_flight = 0
class _FakePool:
def __init__(self, inst: _FakeInstance) -> None:
self._inst = inst
def pick(self, affinity_key: str | None = None) -> _FakeInstance:
return self._inst
def _make_request(path: str, headers: dict[str, str] | None = None) -> Request:
header_pairs = []
for k, v in (headers or {}).items():
header_pairs.append((k.lower().encode("latin-1"), v.encode("latin-1")))
scope = {
"type": "http",
"http_version": "1.1",
"method": "POST",
"scheme": "http",
"path": path,
"raw_path": path.encode("latin-1"),
"query_string": b"",
"headers": header_pairs,
"client": ("testclient", 12345),
"server": ("testserver", 80),
"root_path": "",
}
return Request(scope)
async def _collect_stream(response) -> str:
chunks: list[str] = []
async for part in response.body_iterator:
if isinstance(part, bytes):
chunks.append(part.decode("utf-8"))
else:
chunks.append(str(part))
return "".join(chunks)
class _SpyClient(_FakeClient):
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
super().__init__(stream_events=stream_events, complete_result=complete_result)
self.last_complete_args: tuple = ()
self.last_stream_args: tuple = ()
self.last_complete_kwargs: dict = {}
self.last_stream_kwargs: dict = {}
async def chat_complete(self, *args, **kwargs) -> dict:
self.last_complete_args = tuple(args)
self.last_complete_kwargs = dict(kwargs)
return await super().chat_complete(*args, **kwargs)
async def chat_stream(self, *args, **kwargs):
self.last_stream_args = tuple(args)
self.last_stream_kwargs = dict(kwargs)
async for event in super().chat_stream(*args, **kwargs):
yield event
class _SettingsPatch:
def __init__(self, **kwargs) -> None:
self._kwargs = kwargs
def __enter__(self):
self._patchers = [patch.object(main.settings, k, v) for k, v in self._kwargs.items()]
for p in self._patchers:
p.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for p in reversed(self._patchers):
p.stop()
return False
class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
async def test_openai_non_stream_bridges_tool_calls(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "done",
"toolEvents": [
{
"id": "call_123",
"name": "search_docs",
"input": {"query": "gateway"},
"result": {"ok": True},
}
],
"sessionId": "sess-1",
"firstTokenLatencyMs": 12,
"totalLatencyMs": 34,
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
):
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(message["content"], "done")
self.assertIsInstance(message["tool_calls"], list)
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["tool_calls"][0]["function"]["name"], "search_docs")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_non_stream_fallbacks_to_structured_tool_call_for_forced_tool(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "```json\n{\"arguments\": {\"query\": \"gateway\"}}\n```",
"toolEvents": [],
"sessionId": "sess-fallback-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
):
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertIsInstance(message["tool_calls"], list)
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_stream_bridges_tool_and_text_events(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "call_stream_1",
"name": "read_file",
"input": {"path": "README.md"},
},
},
{"type": "text", "text": "hello"},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
stream_options={"include_usage": True},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
):
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
body = await _collect_stream(response)
self.assertIn('"tool_calls"', body)
self.assertIn('"content": "hello"', body)
self.assertIn('"finish_reason": "tool_calls"', body)
self.assertIn('"usage"', body)
self.assertIn("data: [DONE]", body)
async def test_anthropic_non_stream_bridges_tool_blocks(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "ok",
"toolEvents": [
{
"id": "toolu_1",
"name": "lookup",
"input": {"k": "v"},
"result": {"value": 1},
}
],
"sessionId": "sess-2",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
types = [item["type"] for item in payload["content"]]
self.assertEqual(types, ["text", "tool_use", "tool_result"])
self.assertEqual(payload["stop_reason"], "end_turn")
self.assertEqual(payload["content"][1]["name"], "lookup")
self.assertEqual(payload["content"][2]["tool_use_id"], "toolu_1")
async def test_anthropic_non_stream_fallbacks_to_structured_tool_blocks_for_forced_tool(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "{\"input\": {\"k\": \"v\"}, \"result\": {\"value\": 1}}",
"toolEvents": [],
"sessionId": "sess-fallback-anthropic",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
types = [item["type"] for item in payload["content"]]
self.assertEqual(types, ["tool_use", "tool_result"])
self.assertEqual(payload["stop_reason"], "end_turn")
self.assertEqual(payload["content"][0]["name"], "lookup")
self.assertEqual(payload["content"][1]["tool_use_id"], "toolu_fallback_0")
async def test_openai_stream_tool_call_indices_are_stable(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "call_a",
"name": "read_file",
"input": {"path": "README.md"},
},
},
{
"type": "tool",
"tool": {
"id": "call_b",
"name": "search_docs",
"input": {"query": "gateway"},
},
},
],
complete_result={},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
):
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
body = await _collect_stream(response)
self.assertIn('"id": "call_a"', body)
self.assertIn('"id": "call_b"', body)
self.assertIn('"index": 0', body)
self.assertIn('"index": 1', body)
async def test_anthropic_non_stream_returns_tool_use_stop_reason_when_result_missing(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "",
"toolEvents": [
{
"name": "lookup",
"input": {"k": "v"},
}
],
"sessionId": "sess-2",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
self.assertEqual(payload["stop_reason"], "tool_use")
self.assertEqual(len(payload["content"]), 1)
self.assertEqual(payload["content"][0]["type"], "tool_use")
async def test_anthropic_stream_returns_tool_use_stop_reason_when_result_missing(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"name": "read",
"input": {"file": "a.txt"},
},
}
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
body = await _collect_stream(response)
self.assertIn('"type": "tool_use"', body)
self.assertIn('"stop_reason": "tool_use"', body)
async def test_anthropic_stream_bridges_tool_and_text_events(self) -> None:
fake_client = _FakeClient(
stream_events=[
{
"type": "tool",
"tool": {
"id": "toolu_stream_1",
"name": "read",
"input": {"file": "a.txt"},
"result": "done",
},
},
{"type": "text", "text": "world"},
],
complete_result={},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=True,
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
body = await _collect_stream(response)
self.assertIn("event: message_start", body)
self.assertIn('"type": "tool_use"', body)
self.assertIn('"type": "tool_result"', body)
self.assertIn('"stop_reason": "end_turn"', body)
self.assertIn('"type": "text_delta"', body)
self.assertIn("event: message_stop", body)
async def test_openai_non_stream_forwards_tool_config_when_enabled(self) -> None:
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
_SettingsPatch(tool_forward_enabled=True),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertIn("tool_config", spy_client.last_complete_kwargs)
cfg = spy_client.last_complete_kwargs["tool_config"]
self.assertEqual(cfg["provider"], "openai")
self.assertEqual(len(cfg["tools"]), 1)
self.assertIsInstance(cfg["tool_choice"], dict)
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_non_stream_does_not_forward_tool_config_when_disabled(self) -> None:
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
_SettingsPatch(tool_forward_enabled=False),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_tooling_context_disables_session_reuse_cache(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
stream_events=[],
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-3"},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[
{"role": "user", "content": "turn-1"},
{"role": "user", "content": "turn-2"},
],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "session_cache", fake_cache),
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
_SettingsPatch(tool_forward_enabled=True),
):
await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
self.assertEqual(fake_cache.keys, [])
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
async def test_anthropic_non_stream_with_tools_uses_agent_mode(self) -> None:
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"name": "write_file", "input_schema": {"type": "object", "properties": {}}}],
tool_choice={"type": "auto"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
self.assertIn("tool_config", spy_client.last_complete_kwargs)
cfg = spy_client.last_complete_kwargs["tool_config"]
self.assertEqual(cfg["provider"], "anthropic")
self.assertEqual(len(cfg["tools"]), 1)
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_anthropic_tooling_context_disables_session_reuse_cache(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
stream_events=[],
complete_result={"text": "ok", "toolEvents": [], "sessionId": "sess-4"},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[
{"role": "user", "content": "turn-1"},
{"role": "user", "content": "turn-2"},
],
stream=False,
tools=[{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}],
tool_choice={"type": "auto"},
)
with (
patch.object(main, "session_cache", fake_cache),
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
self.assertEqual(fake_cache.keys, [])
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
class SessionCacheToolFingerprintTests(unittest.TestCase):
def test_build_key_changes_with_tool_config(self) -> None:
from app.session_cache import SessionCache
cache = SessionCache(max_entries=8, ttl_sec=60)
messages = [
{"role": "system", "content": "sys"},
{"role": "user", "content": "hello"},
]
cfg_a = {
"provider": "openai",
"tools": [{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
"tool_choice": {"type": "function", "function": {"name": "lookup"}},
}
cfg_a_reordered = {
"tool_choice": {"function": {"name": "lookup"}, "type": "function"},
"tools": [{"function": {"parameters": {}, "name": "lookup"}, "type": "function"}],
"provider": "openai",
}
cfg_b = {
"provider": "openai",
"tools": [{"type": "function", "function": {"name": "lookup_v2", "parameters": {}}}],
"tool_choice": {"type": "function", "function": {"name": "lookup_v2"}},
}
key_no_tool = cache.build_key("api-key", messages)
key_a = cache.build_key("api-key", messages, tool_config=cfg_a)
key_a_reordered = cache.build_key("api-key", messages, tool_config=cfg_a_reordered)
key_b = cache.build_key("api-key", messages, tool_config=cfg_b)
self.assertNotEqual(key_no_tool, key_a)
self.assertEqual(key_a, key_a_reordered)
self.assertNotEqual(key_a, key_b)
def test_handle_server_message_drops_unroutable_tool_event_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"name": "lookup",
"parameters": {"q": "x"},
},
}
)
stream = rpc._chat_streams["req-1"]
self.assertEqual(stream["tool_order"], [])
self.assertEqual(stream["tool_states"], {})
self.assertTrue(stream["chunks"].empty())
import asyncio
asyncio.run(run())
def test_handle_server_message_routes_by_tool_map_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"requestId": "req-1",
"toolCallId": "call-1",
"name": "lookup",
"parameters": {"q": "a"},
},
}
)
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/invokeResult",
"params": {
"toolCallId": "call-1",
"result": {"ok": True},
},
}
)
result = rpc.get_stream_result("req-1")
self.assertEqual(len(result["toolEvents"]), 1)
self.assertEqual(result["toolEvents"][0]["id"], "call-1")
self.assertEqual(result["toolEvents"][0]["input"], {"q": "a"})
self.assertEqual(result["toolEvents"][0]["result"], {"ok": True})
import asyncio
asyncio.run(run())
def test_handle_server_message_dedupes_identical_repeated_tool_events(self) -> None:
from app.lingma_client import LspWsRpcClient
rpc = LspWsRpcClient("ws://127.0.0.1:1")
async def run() -> None:
rpc.create_stream("req-1")
msg = {
"jsonrpc": "2.0",
"method": "tool/invoke",
"params": {
"requestId": "req-1",
"toolCallId": "call-dup",
"name": "lookup",
"parameters": {"q": "dup"},
},
}
await rpc._handle_server_message(msg)
await rpc._handle_server_message(msg)
stream = rpc._chat_streams["req-1"]
self.assertEqual(stream["tool_order"], ["call-dup"])
self.assertEqual(stream["chunks"].qsize(), 1)
import asyncio
asyncio.run(run())
def test_extracts_tool_event_from_results_and_parameters(self) -> None:
from app.lingma_client import LspWsRpcClient
event = LspWsRpcClient._extract_tool_event(
{
"toolCallId": "call_sync_1",
"parameters": {"path": "README.md"},
"results": [
{
"toolCallId": "call_sync_1",
"name": "read_file",
"result": {"ok": True},
}
],
}
)
self.assertEqual(
event,
{
"id": "call_sync_1",
"name": "read_file",
"input": {"path": "README.md"},
"result": {"ok": True},
},
)
def test_extracts_tool_event_from_invoke_result_payload(self) -> None:
from app.lingma_client import LspWsRpcClient
event = LspWsRpcClient._extract_tool_event(
{
"toolCallId": "call_inv_1",
"name": "search_docs",
"parameters": {"query": "gateway"},
"result": {"hits": 3},
}
)
self.assertEqual(
event,
{
"id": "call_inv_1",
"name": "search_docs",
"input": {"query": "gateway"},
"result": {"hits": 3},
},
)
def test_tool_sync_triggers_approve_and_invoke_result_requests(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
def _decode(frame: bytes) -> dict:
body = frame.split(b"\r\n\r\n", 1)[1]
return json.loads(body.decode("utf-8"))
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
decoded = [_decode(frame) for frame in ws.frames]
methods = [item.get("method") for item in decoded]
self.assertIn("tool/call/approve", methods)
self.assertIn("tool/invokeResult", methods)
approve = next(item for item in decoded if item.get("method") == "tool/call/approve")
self.assertEqual(
approve["params"],
{
"type": "tool_call",
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"approval": True,
},
)
invoke_result = next(item for item in decoded if item.get("method") == "tool/invokeResult")
self.assertEqual(invoke_result["params"]["toolCallId"], "call-1")
self.assertEqual(invoke_result["params"]["name"], "run_in_terminal")
self.assertTrue(invoke_result["params"]["success"])
self.assertEqual(invoke_result["params"]["errorMessage"], "")
import asyncio
asyncio.run(run())
def test_tool_sync_does_not_emit_roundtrip_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
self.assertEqual(ws.frames, [])
import asyncio
asyncio.run(run())