Compare commits

..

7 Commits

Author SHA1 Message Date
GitHub Actions
56c57a4901 docs: sync DESIGN with current tooling behavior
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 08:31:45 +08:00
GitHub Actions
df80a86310 docs: refocus README on quickstart and runbook flow
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 08:11:00 +08:00
GitHub Actions
15cd5e8770 fix: close forced tool-choice with structured fallback
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 07:18:01 +08:00
GitHub Actions
63583712a8 fix: fallback agent payload source to numeric value
Keep Lingma chat/ask payload source as numeric 1 for agent mode A/B validation against remote upstream timeout behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 06:36:07 +08:00
GitHub Actions
c67a9c3d61 fix: align agent payload semantics with VSCode tool flow
Force OpenAI tooling-context requests into agent mode and align Lingma ask payload fields for agent requests so server-side tool path matches VSCode semantics.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 23:19:52 +08:00
GitHub Actions
e208025f35 fix: emit Lingma tool approve/invoke roundtrip
Forward tool/call/sync and tool/invoke events to Lingma with auto-approve and invokeResult so tool calls can complete end-to-end.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 21:35:05 +08:00
GitHub Actions
3498b81fa2 fix: enable anthropic agent mode for tooling requests
Use agent ask_mode for Anthropic messages with tooling context so tool/write flows are executed, and add regression coverage plus docs/env updates for TOOL_FORWARD_ENABLED.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 20:15:14 +08:00
6 changed files with 543 additions and 346 deletions

View File

@@ -46,6 +46,9 @@ DEFAULT_MODEL=org_auto
# 默认模式chat 或 agent
DEFAULT_ASK_MODE=chat
# 请求侧 tools/tool_choice 透传到 Lingma默认关闭开启后可支持工具写文件等场景
TOOL_FORWARD_ENABLED=false
# 专属域(可选)
DEDICATED_DOMAIN_URL=

View File

@@ -47,9 +47,9 @@
- **逆向 Lingma 后端协议**:之前评估过(曾经的"B1 终极方案"),需要反编译二进制,维护成本高、政策风险大,放弃。
- **多租户 / 水平扩缩**:单容器即可;真要大规模部署 → 套层反代 + N 个网关副本就够,不在进程内解决。
- **请求侧完整 function calling / tools 透传**OpenAI schema 里保留了字段,但目前不会把 `tools`/`tool_choice` 透传给 Lingma上游无等价输入协议)。
- **请求侧完整 function calling / tools 语义**仍不是当前目标;现阶段仅支持 `tools`/`tool_choice``TOOL_FORWARD_ENABLED` 开关下灰度透传(默认关闭)。
- **响应侧工具事件桥接**:若 Lingma 上游产出 tool 事件,网关会向 OpenAI 输出 `tool_calls`,向 Anthropic 输出 `tool_use` / `tool_result`stream + non-stream
- **多模态**:请求里的 image/audio 会被降级成占位符 `[image]` / `[audio]`,因为 Lingma chat 不支持
- **强制工具回退闭环non-stream**:当上游未返回 tool 事件且请求为强制 `tool_choice` 时,网关会从文本里解析严格 JSON合成 OpenAI `tool_calls` 与 Anthropic `tool_use` / `tool_result`
---
@@ -592,7 +592,7 @@ FastAPI `lifespan` 退出 → `pool.close()` → 每个 `client.close()` → 进
| 需求 | 改哪些文件 | 关键入口 |
|---|---|---|
| 加一个新的 OpenAI 端点(如 embeddings | `main.py`, `openai_schema.py` | 仿照 `v1_models``@app.post("/v1/embeddings", dependencies=[Depends(auth_guard)])` |
| 扩展 Anthropic 端点(如 count_tokens / tool_use 相关能力) | `main.py::v1_messages`, `anthropic_schema.py` | count_tokens 只读:复用 `estimate_tokens`;响应侧 `tool_use/tool_result` 桥接已支持,若要请求侧 tools 透传仍需改 `lingma_client.py` payload |
| 扩展 Anthropic 端点(如 count_tokens / tool_use 相关能力) | `main.py::v1_messages`, `anthropic_schema.py` | count_tokens 只读:复用 `estimate_tokens`;响应侧 `tool_use/tool_result` 桥接已支持请求侧 `tools/tool_choice` 透传由 `TOOL_FORWARD_ENABLED` 控制并经 `lingma_client.py` payload 下发 |
| 加一种新的实例调度策略(如加权轮询) | `lingma_pool.py::pick()` | 当前是 affinity → least-in-flight → round-robin |
| 改认证为 JWT / OAuth | `auth.py` | 三个 `require_*` 函数是全部入口;`main.py` 里只有 `*_guard` 代理 |
| 增加限流(按 api_key 配额) | `concurrency.py``PerKeyGuard``main.py``chat_guard.try_acquire()` 后再来一层 | 注意 ticket 释放顺序(内层先释放) |
@@ -600,7 +600,7 @@ FastAPI `lifespan` 退出 → `pool.close()` → 每个 `client.close()` → 进
| 改 Prometheus 指标名 | 所有 `prometheus_lines()``prometheus_text()` | 注意生态兼容;更名要在 README 留 alias |
| 接入 Jaeger / OpenTelemetry | `logging_config.py` 加 OTel instrumentation`main.py::request_id_middleware` 注入 traceid | request_id 可以复用为 span_id |
| 加一个 Lingma 新方法调用(比如 code/complete | `lingma_client.py` 仿照 `query_models``await self.ensure_ready(); return await self.rpc.request("code/complete", ...)` | 原始上游响应形态需抓包确认 |
| 支持 function calling假设 Lingma 将来支持) | `openai_schema.py` 已保留 `tools` / `tool_choice` 字段;`lingma_client.py::_build_payload``extra.tools` | 上游协议 TBD |
| 支持 function calling假设 Lingma 将来支持) | `openai_schema.py` / `anthropic_schema.py` / `main.py` / `lingma_client.py` | 当前仅支持请求侧 `tools/tool_choice` 在开关控制下透传与响应侧桥接;若要完整 function calling 语义仍需按上游协议补齐 |
| 多模态穿透 | `openai_schema.py::flatten_content` 不再降级;`lingma_client.py` payload 传 url | 前提Lingma 支持(目前不支持) |
| 换 session_cache 后端(如 Redis | 实现同样接口的 `RedisSessionCache``main.py` 初始化换实现 | 接口是 `get / put / invalidate / stats / prometheus_lines / build_key / enabled`,内存换远端成本不高 |
| 多容器副本(水平扩) | 外面套反代 + sticky session根据 `Authorization``x-user` 做 hashsession cache 改 Redis | 或直接接受多副本 cache 独立,轻微浪费 KV cache 命中率 |
@@ -612,7 +612,8 @@ pip install -r requirements.txt
# 在容器外跑,需要自己准备 Lingma 二进制
export LINGMA_BIN=/path/to/Lingma
export API_KEYS=sk-dev
uvicorn app.main:app --reload --port 8317
export PORT=8317
uvicorn app.main:app --reload --port ${PORT}
```
主要断点位置:

473
README.md
View File

@@ -1,396 +1,215 @@
# Lingma OpenAI Gateway
把本地 Lingma 插件封装 OpenAI 兼容接口。任何能调 OpenAI 的客户端Cursor、Dify、LangChain、curl…都能直接接入。
Lingma 封装 OpenAI / Anthropic 兼容网关,便于现有客户端直接接入。
**支持:**
- OpenAI 兼容:`GET /v1/models` / `POST /v1/chat/completions`(含 SSE 流式) / Bearer 鉴权
- **Anthropic 兼容**`POST /v1/messages`(含 Anthropic SSE 事件流) / `x-api-key` 鉴权
- Prometheus / 多账号实例池 / 会话复用(跨两种协议共享) / 免浏览器登录态注入
- OpenAI`/v1/models``/v1/chat/completions`(含 stream
- Anthropic`/v1/messages``/v1/messages/count_tokens`(含 stream
- 内置多实例池、会话复用、Prometheus 指标、登录态 bundle 注入
> 想看架构、模块划分、设计决策、二开路线图 → 直接读 [`DESIGN.md`](./DESIGN.md)。
> 架构设计与二开细节请看 [`DESIGN.md`](./DESIGN.md)。
---
## 架构速览
## 目录
```
┌─────────────┐ OpenAI 协议 ┌─────────────────────────────────────────┐
│ 任意客户端 │ ───────────▶ │ FastAPI (app/main.py) │
│ (curl/ │ │ ├─ auth_guard / admin_guard │
│ Cursor/ │ │ ├─ chat_guard (InFlightGuard 背压) │
│ Dify…) │ │ ├─ SessionCache (LRU+TTL, KV 复用) │
└─────────────┘ │ └─ StatsCollector + Prometheus │
└────────────────┬────────────────────────┘
│ 选实例 (least-in-flight + affinity)
┌────────────────▼────────────────────────┐
│ LingmaPool (app/lingma_pool.py) │
│ ├─ inst-0 inst-1 inst-N … │
│ └─ 启动前自动 restore session bundle │
└────────────────┬────────────────────────┘
┌───────────────────────┼───────────────────────┐
▼ ▼ ▼
┌────────────────────┐ ┌────────────────────┐ ┌────────────────────┐
│ LingmaGatewayClient│ │ … │ │ … │
│ (LSP over WS) │ │ │ │ │
│ ├─ Popen (PID管理) │ │ │ │ │
│ ├─ reconnect loop │ │ │ │ │
│ └─ ws://:PORT │ │ │ │ │
└──────────┬─────────┘ └────────────────────┘ └────────────────────┘
│ spawn + ws
┌──────────▼─────────┐
│ Lingma 二进制 │
│ --workDir /… │
└────────────────────┘
```
1. [5 分钟启动](#5-分钟启动)
2. [常用命令](#常用命令)
3. [最小 API 示例](#最小-api-示例)
4. [部署与更新](#部署与更新)
5. [排障速查](#排障速查)
6. [文档入口](#文档入口)
---
## 一、快速开始
## 5 分钟启动
### 1) 准备配置
```bash
git clone <repo>
cd lingma-openai-gateway
cp .env.example .env
# 至少填 API_KEYS + LINGMA_USERNAME + LINGMA_PASSWORD或 session bundle
```
至少配置这些变量(在 `.env`
- `API_KEYS`
- `LINGMA_USERNAME` / `LINGMA_PASSWORD`(或 `LINGMA_SESSION_BUNDLE(_FILE)`
### 2) Docker 启动(推荐)
```bash
mkdir -p data secrets
docker compose up -d --build
docker compose logs -f # 看到 "Uvicorn running on..." 就 OK
docker compose logs -f
```
冒烟测试:
### 3) 冒烟检查
```bash
PORT=$(grep '^PORT=' .env | cut -d= -f2)
API_KEY=$(grep '^API_KEYS=' .env | cut -d= -f2 | cut -d, -f1)
curl -s http://127.0.0.1:8317/healthz
curl -s http://127.0.0.1:8317/v1/models -H "Authorization: Bearer $API_KEY"
curl -s http://127.0.0.1:8317/v1/chat/completions \
-H "Authorization: Bearer $API_KEY" \
curl -s "http://127.0.0.1:${PORT}/healthz"
curl -s "http://127.0.0.1:${PORT}/v1/models" \
-H "Authorization: Bearer ${API_KEY}"
```
---
## 常用命令
### 本地开发运行
```bash
pip install -r requirements.txt
uvicorn app.main:app --reload --port 8317
```
### Docker 常用
```bash
docker compose up -d --build
docker compose logs -f
docker compose ps
docker compose down
```
### 测试
```bash
# 重点回归套件
python3 -m unittest tests/test_tool_call_bridge.py
# 全量 unittest
python3 -m unittest discover -s tests -p "test_*.py"
```
---
## 最小 API 示例
先取 key
```bash
PORT=$(grep '^PORT=' .env | cut -d= -f2)
API_KEY=$(grep '^API_KEYS=' .env | cut -d= -f2 | cut -d, -f1)
```
### OpenAI非流式
```bash
curl -s "http://127.0.0.1:${PORT}/v1/chat/completions" \
-H "Authorization: Bearer ${API_KEY}" \
-H "Content-Type: application/json" \
-d '{"model":"org_auto","messages":[{"role":"user","content":"hi"}]}'
```
---
## 二、配置参考
`.env.example` 是权威说明,这里按主题分组。
### 2.1 核心
| 变量 | 默认 | 说明 |
|---|---|---|
| `HOST` / `PORT` | `0.0.0.0` / `8317` | 网关监听地址与端口 |
| `API_KEYS` | — | Bearer key多个逗号分隔**留空则 /v1/\* 无鉴权**,启动会 warn |
| `LOG_LEVEL` | `INFO` | `DEBUG`/`INFO`/`WARNING`/`ERROR`,日志为结构化 JSON`request_id` |
| `DEFAULT_MODEL` | `org_auto` | 模型无法映射时兜底 |
| `DEFAULT_ASK_MODE` | `chat` | `chat``agent`(传 `model: "agent"` 时自动切) |
| `DEDICATED_DOMAIN_URL` | — | 企业专属域(可空) |
### 2.2 权限分层(生产建议全配)
| 变量 | 默认 | 说明 |
|---|---|---|
| `ADMIN_TOKEN` | — | `/internal/*` 专属 token未配置时 fallback 到 `API_KEYS`(兼容);都为空 → 503 |
| `METRICS_TOKEN` | — | `/metrics` 专属 token未配置时 fallback 到 `API_KEYS` |
| `METRICS_PUBLIC` | `false` | 显式公开 `/metrics`(仅用于私网采集器) |
> `ADMIN_TOKEN` / `METRICS_TOKEN` / `API_KEYS` 三者都为空时,`/metrics` 和 `/internal/*` 会返回 503拒绝裸奔
### 2.3 并发与背压
| 变量 | 默认 | 说明 |
|---|---|---|
| `GATEWAY_MAX_IN_FLIGHT` | `4` | 并发上限;`<=0` 表示不限 |
| `GATEWAY_QUEUE_TIMEOUT_SEC` | `30` | 排队超时;超时直接返回 `429 + Retry-After` |
### 2.4 Lingma 进程
| 变量 | 默认 | 说明 |
|---|---|---|
| `LINGMA_BIN` | `/app/data/bin/Lingma` | 容器内二进制路径 |
| `LINGMA_SOURCE_TYPE` | `marketplace` | `marketplace``vsix` |
| `LINGMA_MARKETPLACE_PUBLISHER` | `Alibaba-Cloud` | Marketplace 发布者 |
| `LINGMA_MARKETPLACE_EXTENSION` | `tongyi-lingma` | Marketplace 扩展名 |
| `LINGMA_VSIX_URL` | 官方地址 | 兜底 VSIX 下载地址 |
| `LINGMA_BOOTSTRAP_ALWAYS` | `true` | 启动时总是尝试刷新二进制 |
| `LINGMA_FORCE_REFRESH` | `false` | 强制忽略本地缓存重新下载 |
| `LINGMA_WORK_DIR` | `/app/data/.lingma/vscode/sharedClientCache` | 登录态/缓存所在目录 |
| `LINGMA_SOCKET_PORT` | `36510` | 单实例模式下的 Lingma WS 端口 |
| `LINGMA_STARTUP_TIMEOUT` | `40` | 启动超时秒 |
| `LINGMA_RPC_TIMEOUT` | `30` | 单次 RPC 超时秒 |
### 2.5 多账号 / 多实例池
| 变量 | 默认 | 说明 |
|---|---|---|
| `LINGMA_ACCOUNTS` | — | `u1:p1,u2:p2` 或 JSON 数组;配置后每个账号 = 一个独立 Lingma 子进程 |
| `LINGMA_INSTANCE_COUNT` | 账号数 | 显式指定实例数;不足账号循环复用并打 warn |
| `LINGMA_USERNAME` / `LINGMA_PASSWORD` | — | 单实例兼容模式(仅 `LINGMA_ACCOUNTS` 为空时生效) |
### 2.6 会话复用KV cache 优化)
| 变量 | 默认 | 说明 |
|---|---|---|
| `SESSION_REUSE_ENABLED` | `true` | 多轮对话命中时只发增量 user 消息 + 复用上游 `sessionId` |
| `SESSION_CACHE_MAX_ENTRIES` | `256` | LRU 容量 |
| `SESSION_CACHE_TTL_SEC` | `1800` | TTL避免命中已回收的 session |
### 2.7 登录态注入(跳过 Playwright
| 变量 | 默认 | 说明 |
|---|---|---|
| `LINGMA_SESSION_BUNDLE` | — | base64 格式的 bundleinline适合短字符串 |
| `LINGMA_SESSION_BUNDLE_FILE` | — | bundle 文件路径(推荐,避免 env 过长) |
### 2.8 自动登录
| 变量 | 默认 | 说明 |
|---|---|---|
| `AUTO_LOGIN_ENABLED` | `true` | 未登录时自动启 Playwright |
| `AUTO_LOGIN_HEADLESS` | `true` | 无头浏览器 |
| `AUTO_LOGIN_TIMEOUT` | `180` | 登录超时秒 |
| `AUTO_LOGIN_MAX_RETRY` | `2` | 登录失败重试次数 |
---
## 三、API 参考
### 3.1 公共(`API_KEYS`
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | `/healthz` | 免鉴权;返回 `ok` / `pool_size` / `pool_ready` / 每实例状态 |
| GET | `/v1/models` | OpenAI 兼容;`id` 是 Lingma 原 key`name` 是可读名 |
| POST | `/v1/chat/completions` | OpenAI 兼容;`stream=true` 走 SSE`model: "agent"` 切 agent 模式 |
| POST | `/v1/messages` | **Anthropic Messages 兼容**`x-api-key``Authorization: Bearer``stream=true` 走 Anthropic 命名事件 SSE |
**chat 请求示例(非流式)**
```bash
curl -s http://127.0.0.1:8317/v1/chat/completions \
-H "Authorization: Bearer $API_KEY" -H "Content-Type: application/json" \
-d '{"model":"dashscope_qmodel","messages":[{"role":"user","content":"你好"}]}'
```
**chat 请求示例(流式 + usage**
```bash
curl -N http://127.0.0.1:8317/v1/chat/completions \
-H "Authorization: Bearer $API_KEY" -H "Content-Type: application/json" \
-d '{
"model":"dashscope_qmodel",
"stream":true,
"stream_options":{"include_usage":true},
"messages":[{"role":"user","content":"介绍一下你自己"}]
"model": "org_auto",
"messages": [{"role": "user", "content": "hi"}],
"stream": false
}'
```
**Anthropic Messages 示例(非流式)**
### OpenAI流式
```bash
curl -s http://127.0.0.1:8317/v1/messages \
-H "x-api-key: $API_KEY" \
curl -N "http://127.0.0.1:${PORT}/v1/chat/completions" \
-H "Authorization: Bearer ${API_KEY}" \
-H "Content-Type: application/json" \
-d '{
"model": "org_auto",
"messages": [{"role": "user", "content": "say hi"}],
"stream": true
}'
```
### Anthropic非流式
```bash
curl -s "http://127.0.0.1:${PORT}/v1/messages" \
-H "x-api-key: ${API_KEY}" \
-H "anthropic-version: 2023-06-01" \
-H "Content-Type: application/json" \
-d '{
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 256,
"system":"你是一个简洁的助手",
"messages":[{"role":"user","content":"你好"}]
"messages": [{"role": "user", "content": "hi"}],
"stream": false
}'
```
**Anthropic Messages 示例(流式)**
### Anthropic:流式
```bash
curl -N http://127.0.0.1:8317/v1/messages \
-H "x-api-key: $API_KEY" \
curl -N "http://127.0.0.1:${PORT}/v1/messages" \
-H "x-api-key: ${API_KEY}" \
-H "anthropic-version: 2023-06-01" \
-H "Content-Type: application/json" \
-d '{
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 256,
"stream":true,
"messages":[{"role":"user","content":"写一首四行诗"}]
"messages": [{"role": "user", "content": "say hi"}],
"stream": true
}'
# 返回 message_start / content_block_start / content_block_delta* /
# content_block_stop / message_delta / message_stop
```
说明:
- **模型名兼容**:客户端可以继续传 `claude-3-*` 等名字;未识别的 model 会回退到 `DEFAULT_MODEL` 对应的 Lingma key后端实际仍由 Lingma 提供Qwen 系列)。如需显式选模型,直接传 Lingma key`dashscope_qmodel` 等)。
- **会话复用共享**Anthropic 与 OpenAI 两个端点共用同一 `SessionCache`,只要 API key 相同、对话前缀相同,就会命中同一上游 `sessionId`
- **多模态**`image` 块会被降级为 `[image]` 占位符Lingma 不支持 vision
- **工具事件桥接**:当 Lingma 上游返回 `tool` 事件时,网关会输出为 OpenAI `tool_calls`(含 stream/non-stream和 Anthropic `tool_use`/`tool_result` blocks含 stream/non-stream但请求侧 `tools`/`tool_choice` 仍不会透传到 Lingma。
- **鉴权**:优先 `x-api-key`Anthropic 官方 SDK 默认),回退 `Authorization: Bearer`(方便 curl / OpenAI 风格客户端)。
### 3.2 观测(`METRICS_TOKEN` 或 `API_KEYS`
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | `/metrics` | Prometheus 文本;含池每实例 gauge、并发、session cache 命中率、token 计数 |
### 3.3 管理(`ADMIN_TOKEN` 或 fallback 到 `API_KEYS`
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | `/internal/stats` | JSON`stats` + `concurrency` + `pool` + `session_cache` |
| GET | `/internal/auto-login/status` | 每实例登录态与 auto_login 状态 |
| POST | `/internal/auto-login/start?instance=inst-0` | 主动触发某实例登录(可不传,由 pool.pick 选) |
| POST | `/internal/session/export?instance=inst-0` | 把已登录实例的 cache 打包成 base64 bundle |
| GET | `/internal/models/raw?instance=inst-0` | Lingma 原始 `config/queryModels` 响应displayName / isReasoning / isVl 等) |
---
## 四、常用场景
### 4.1 多账号池
```env
LINGMA_ACCOUNTS=user1:pass1,user2:pass2,user3:pass3
# LINGMA_INSTANCE_COUNT=3 # 不写默认=账号数
```
- 每个账号一个独立 Lingma 子进程 + 独立 `workDir``data/.lingma/pool/inst-<i>/`)。
- 路由:同 `user` 字段或同 system prompt 的请求**粘性**分到同一实例;其他按**最小在途**分配。
- 一个实例挂掉不影响整体,`/healthz.pool_ready` 下降,自动重连。
### 4.2 跳过 Playwrightsession bundle
**从已登录实例导出:**
### Anthropiccount_tokens
```bash
curl -sS -X POST \
-H "Authorization: Bearer $ADMIN_TOKEN" \
"http://host:port/internal/session/export" \
| jq -r '.bundle_b64' > secrets/lingma-session.b64
chmod 600 secrets/lingma-session.b64
curl -s "http://127.0.0.1:${PORT}/v1/messages/count_tokens" \
-H "x-api-key: ${API_KEY}" \
-H "anthropic-version: 2023-06-01" \
-H "Content-Type: application/json" \
-d '{
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 64,
"messages": [{"role": "user", "content": "count me"}]
}'
```
**在新部署注入(选一种):**
---
```env
# 文件注入(推荐)—— 需要在 docker-compose.yml 挂载 secrets 目录
LINGMA_SESSION_BUNDLE_FILE=/secrets/lingma-session.b64
## 部署与更新
# 或 inline适合小 bundle
LINGMA_SESSION_BUNDLE=H4sIAAAA...
### 服务器更新到最新 main
# 多账号 JSON 模式,每账号独立 bundle
LINGMA_ACCOUNTS=[
{"username":"u1","password":"p1","session_bundle_file":"/secrets/u1.b64"},
{"username":"u2","password":"p2","session_bundle":"H4sIAAAA..."}
]
```bash
cd /root/lingma-openai-gateway
git fetch origin
git checkout -B main origin/main
git reset --hard origin/main
git clean -fd
docker compose up -d --build
docker compose ps
```
**行为保证:**
### 健康检查
- 只在目标 `workDir` 空(`cache/user` 不存在或 empty时才注入不会覆盖活跃登录态。
- 注入失败(损坏/权限)自动 fallback 到 Playwright。
- bundle 只含 `cache/{id,user,quota,config.json}` 4 个文件;大小上限 4 MiB实际通常 < 10 KB。
- **bundle 等同于密钥**,落盘需 `chmod 600`,不要进 git。
### 4.3 Prometheus 接入
```yaml
# prometheus scrape_configs 片段
- job_name: lingma-gateway
bearer_token: <METRICS_TOKEN>
static_configs: [{targets: ['host:8317']}]
metrics_path: /metrics
```bash
PORT=$(grep '^PORT=' .env | cut -d= -f2)
curl -s "http://127.0.0.1:${PORT}/healthz"
```
关键指标:
---
| 指标 | 类型 | 意义 |
## 排障速查
| 现象 | 常见原因 | 处理 |
|---|---|---|
| `gateway_in_flight` / `gateway_queued` | gauge | 并发 / 排队 |
| `gateway_rejected_total` | counter | 背压拒绝429累计 |
| `gateway_pool_instance_ready{name}` | gauge | 每实例是否就绪0/1 |
| `gateway_pool_instance_in_flight{name}` | gauge | 每实例在途 |
| `gateway_session_cache_hit_total` / `_miss_total` | counter | 会话复用命中率原料 |
| `gateway_chat_requests_success` / `_error` | counter | chat 成功率 |
| `/v1/*` 返回 401 | 缺失或错误 API key | 检查 `Authorization: Bearer``x-api-key` |
| `healthz` 正常但请求失败 | 用错端口 | 以 `.env``PORT` 为准,`docker compose ps` 再确认 |
| `git pull` 提示 not on a branch | 处于 detached HEAD | 执行 `git checkout -B main origin/main` |
| 自动登录不稳定 | 浏览器流程波动 | 优先使用 `LINGMA_SESSION_BUNDLE(_FILE)` |
| 工具调用未触发 | 模型未选择工具 | 使用 `tool_choice` 强制,必要时约束输出 JSON |
---
## 五、升级注意事项
## 文档入口
从旧版本升级时注意**破坏性变更**(每一项都有 fallback默认不会炸但建议显式配置
| 版本 | 变更 | 应对 |
|---|---|---|
| v0.3 | `/metrics` 裸奔时(无 token / 无 key由公开改为 503 | 显式配 `METRICS_PUBLIC=true``METRICS_TOKEN` |
| v0.3 | `/internal/*` 引入 `ADMIN_TOKEN` | 未配置自动 fallback 到 `API_KEYS`,生产建议单独配 |
| v0.2 | 默认会话复用(多轮对话只发增量) | 如果你的客户端裁剪了历史导致语义不连续,设 `SESSION_REUSE_ENABLED=false` |
| v0.2 | Chat 请求走 JSON-RPC `notify` 而非 `request`(修复 30s TTFB bug | 无需行动 |
| v0.2 | 多实例池(`LINGMA_ACCOUNTS` 存在时启用) | 不配则保持单实例行为 |
---
## 六、故障排查FAQ
| 症状 | 排查方向 |
|---|---|
| `/healthz` 返回 `ok=false` / `pool_ready=0` | 查 `docker logs`,关键字 `lingma spawned` / `state ... -> ready`;若卡在 `starting` → Lingma 二进制或 workDir 权限问题 |
| 返回 `401` 且带 `Invalid admin token` | 你用了 `API_KEYS` 去打 `/internal/*`,但服务端已设了 `ADMIN_TOKEN`;用 `ADMIN_TOKEN` 或清空 `ADMIN_TOKEN` |
| 返回 `503 metrics scraping disabled` | 三个 env 全空,按 "权限分层" 章节配任一 |
| 返回 `429 Too many in-flight` | 并发超过 `GATEWAY_MAX_IN_FLIGHT`;增大或客户端加重试 |
| 首 token 延迟 2-3 秒 | Lingma 侧常态;多轮对话第二轮起,会话复用命中后 TTFB 明显降低(看 `gateway_session_cache_hit_total` |
| Playwright 登录失败 | 导出一个已登录 bundle 注入(见 4.2),彻底跳过浏览器 |
| 容器重启后 Lingma 要重新登录 | `data/` 没挂在卷上或被清过;确认 `./data:/app/data` 挂载 + bundle fallback |
| 升级后 `/metrics` 返回 503 | v0.3 默认严格;按表格 5.1 配置 |
`LOG_LEVEL=DEBUG` 可以看到 Lingma 子进程的 stderr 输出,便于定位 native 崩溃。
---
## 七、开发与二开
项目本身是单仓 FastAPI3400 行 Python。推荐阅读路径
1. **先读 [`DESIGN.md`](./DESIGN.md)** —— 架构、模块职责、关键设计决策、二开指引。
2. 再按需读对应模块:
- 想改请求入口 / 路由 → `app/main.py`
- 想加实例调度策略 → `app/lingma_pool.py::pick()`
- 想改 Lingma 通信协议 → `app/lingma_client.py`
- 想扩展会话复用 → `app/session_cache.py` + `main.py` 的 reuse 块
- 想做认证改造 → `app/auth.py` + `main.py::*_guard`
3. 本地跑:`pip install -r requirements.txt && uvicorn app.main:app --reload`
---
## 八、目录结构
```
lingma-openai-gateway/
├── app/ # 主代码(见 DESIGN.md 模块一览)
│ ├── main.py # FastAPI 入口 + 路由
│ ├── lingma_pool.py # N 实例池
│ ├── lingma_client.py # LSP over WS + 子进程管理
│ ├── session_cache.py # 多轮对话 sessionId 复用
│ ├── session_bundle.py # 登录态 export/import
│ ├── concurrency.py # InFlightGuard 背压
│ ├── auto_login.py # Playwright 登录
│ ├── auth.py # Bearer / admin / metrics 三档鉴权
│ ├── config.py # 环境变量 → dataclass
│ ├── model_map.py # 模型 key ↔ displayName
│ ├── openai_schema.py # OpenAI 请求/响应 Pydantic
│ ├── stats.py # StatsCollector + Prometheus
│ ├── logging_config.py # 结构化 JSON log + request_id 上下文
│ └── bootstrap_lingma.py # 启动时下载/提取 Lingma 二进制
├── data/ # 持久化Lingma 二进制 + workDir不进 git
├── secrets/ # 注入的 bundle 等敏感文件,不进 git
├── Dockerfile # Playwright base + HEALTHCHECK
├── docker-compose.yml
├── .env.example # 配置权威文档
├── requirements.txt
├── README.md # 本文件
└── DESIGN.md # 架构与二开手册
```
---
- 配置权威:[`/.env.example`](./.env.example)
- 架构/模块边界/设计决策:[`/DESIGN.md`](./DESIGN.md)
- 主要入口代码:[`/app/main.py`](./app/main.py)
- 测试:[`/tests/test_tool_call_bridge.py`](./tests/test_tool_call_bridge.py)
## License
内部使用,按需调整。
MIT

View File

@@ -101,6 +101,7 @@ class LspWsRpcClient:
self._rx_buffer = b""
self._chat_streams: dict[str, dict] = {}
self._tool_stream_map: dict[str, str] = {}
self._tool_roundtrip_done: set[str] = set()
self._on_disconnect = on_disconnect
self._closed = False
@@ -204,6 +205,7 @@ class LspWsRpcClient:
stream["chunks"].put_nowait(None)
self._chat_streams.clear()
self._tool_stream_map.clear()
self._tool_roundtrip_done.clear()
async def _send(self, payload: dict):
async with self._send_lock:
@@ -320,6 +322,55 @@ class LspWsRpcClient:
return merged, changed
@staticmethod
def _is_tool_roundtrip_method(method: str | None) -> bool:
return method in {"tool/call/sync", "tool/invoke"}
@staticmethod
def _build_tool_approve_params(params: dict[str, Any], tool_id: str) -> dict[str, Any] | None:
req_id = params.get("requestId")
session_id = params.get("sessionId")
if not isinstance(req_id, str) or not req_id.strip():
return None
if not isinstance(session_id, str) or not session_id.strip():
return None
return {
"type": "tool_call",
"sessionId": session_id,
"requestId": req_id,
"toolCallId": tool_id,
"approval": True,
}
@staticmethod
def _build_tool_invoke_result_params(params: dict[str, Any], tool_event: dict[str, Any], tool_id: str) -> dict[str, Any]:
return {
"toolCallId": tool_id,
"name": str(tool_event.get("name") or params.get("name") or "tool"),
"success": True,
"errorMessage": "",
"result": tool_event.get("result") if "result" in tool_event else {},
}
async def _maybe_emit_tool_roundtrip(self, method: str, params: dict[str, Any], tool_event: dict[str, Any]) -> None:
if not self._is_tool_roundtrip_method(method):
return
tool_id = self._normalize_tool_id(method, params, tool_event)
if not tool_id:
return
if tool_id in self._tool_roundtrip_done:
return
approve_params = self._build_tool_approve_params(params, tool_id)
if approve_params is None:
return
self._tool_roundtrip_done.add(tool_id)
await self.notify("tool/call/approve", approve_params)
invoke_result_params = self._build_tool_invoke_result_params(params, tool_event, tool_id)
await self.notify("tool/invokeResult", invoke_result_params)
def _resolve_tool_stream(self, method: str, params: dict[str, Any], tool_event: dict[str, Any] | None) -> dict | None:
req_id = params.get("requestId")
if isinstance(req_id, str) and req_id.strip():
@@ -363,6 +414,7 @@ class LspWsRpcClient:
if not tool_id:
logger.warning("drop unroutable tool event: method=%s missing tool id", method)
else:
await self._maybe_emit_tool_roundtrip(method, params, tool_event)
tool_states = stream["tool_states"]
order = stream["tool_order"]
existing = tool_states.get(tool_id)
@@ -431,6 +483,7 @@ class LspWsRpcClient:
for tool_id, mapped_req in list(self._tool_stream_map.items()):
if mapped_req == request_id:
self._tool_stream_map.pop(tool_id, None)
self._tool_roundtrip_done.discard(tool_id)
# Drain queue so no stray future gets stuck if the consumer bailed early.
if not stream["done"].is_set():
stream["done"].set()
@@ -843,12 +896,12 @@ class LingmaGatewayClient:
is_reply: bool = False,
tool_config: dict[str, Any] | None = None,
):
session_type = "developer" if ask_mode == "agent" else "chat"
session_type = "ask" if ask_mode == "agent" else "chat"
payload = {
"requestId": request_id,
"sessionId": session_id,
"sessionType": session_type,
"chatTask": "FREE_INPUT",
"chatTask": "chat" if ask_mode == "agent" else "FREE_INPUT",
"mode": ask_mode,
"stream": True,
"source": 1,

View File

@@ -452,6 +452,93 @@ def _json_string(value: Any) -> str:
return "{}"
def _openai_forced_tool_name(tool_choice: Any) -> str | None:
if not isinstance(tool_choice, dict):
return None
fn = tool_choice.get("function")
if isinstance(fn, dict):
name = fn.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _anthropic_forced_tool_name(tool_choice: Any) -> str | None:
if not isinstance(tool_choice, dict):
return None
if tool_choice.get("type") == "tool":
name = tool_choice.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
fn = tool_choice.get("function")
if isinstance(fn, dict):
name = fn.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _json_object_from_text(text: str) -> dict[str, Any] | None:
raw = text.strip()
if not raw:
return None
if raw.startswith("```") and raw.endswith("```"):
raw = raw[3:-3].strip()
if raw.lower().startswith("json"):
raw = raw[4:].strip()
try:
parsed = json.loads(raw)
except Exception:
return None
return parsed if isinstance(parsed, dict) else None
def _forced_tool_event_from_text(text: str, forced_tool_name: str) -> dict[str, Any] | None:
parsed = _json_object_from_text(text)
if parsed is None:
return None
explicit_name: Any = parsed.get("name") or parsed.get("tool")
fn = parsed.get("function")
if explicit_name is None and isinstance(fn, dict):
explicit_name = fn.get("name")
if explicit_name is not None and str(explicit_name) != forced_tool_name:
return None
tool_input: Any = None
if "input" in parsed:
tool_input = parsed.get("input")
elif "arguments" in parsed:
args = parsed.get("arguments")
if isinstance(args, str):
try:
tool_input = json.loads(args)
except Exception:
return None
else:
tool_input = args
elif isinstance(fn, dict) and "arguments" in fn:
args = fn.get("arguments")
if isinstance(args, str):
try:
tool_input = json.loads(args)
except Exception:
return None
else:
tool_input = args
else:
reserved = {"name", "tool", "function", "arguments", "input", "result"}
tool_input = {k: v for k, v in parsed.items() if k not in reserved}
event: dict[str, Any] = {
"name": forced_tool_name,
"input": tool_input if tool_input is not None else {},
}
if "result" in parsed:
event["result"] = parsed.get("result")
return event
def _openai_tool_call(tool: dict[str, Any], *, forced_id: str | None = None) -> dict[str, Any]:
return {
"id": str(tool.get("id") or forced_id or f"call_{uuid.uuid4().hex}"),
@@ -504,13 +591,13 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
# 1. Reuse the upstream sessionId so Lingma/Qwen hits its KV cache.
# 2. Send only the new user message instead of the whole history.
# 3. Stick the request to the pool instance that originally served it.
ask_mode = settings.default_ask_mode
if req.model.lower() in {"lingma-agent", "agent"}:
ask_mode = "agent"
tool_config = _openai_tool_config(req)
has_tooling_context = _openai_has_tooling_context(req, messages_dump)
ask_mode = settings.default_ask_mode
if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
ask_mode = "agent"
reuse_eligible = (
session_cache.enabled
and ask_mode == "chat"
@@ -800,6 +887,14 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request):
tool_id = str(item.get("id") or f"call_{idx}")
tool_calls.append(_openai_tool_call(item, forced_id=tool_id))
saw_tool_call = True
if not saw_tool_call:
forced_tool_name = _openai_forced_tool_name(req.tool_choice)
if forced_tool_name:
fallback_event = _forced_tool_event_from_text(message_content, forced_tool_name)
if fallback_event is not None:
tool_calls.append(_openai_tool_call(fallback_event, forced_id="call_fallback_0"))
saw_tool_call = True
message_content = ""
response = ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex}",
created=int(time.time()),
@@ -912,12 +1007,13 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
)
# ------------------------------------------------------------- session reuse
# Anthropic clients don't expose an ask_mode, so we always run in "chat".
ask_mode = "chat"
tool_config = _anthropic_tool_config(req)
has_tooling_context = _anthropic_has_tooling_context(req)
ask_mode = settings.default_ask_mode
if req.model.lower() in {"lingma-agent", "agent"} or has_tooling_context:
ask_mode = "agent"
reuse_eligible = (
session_cache.enabled and ask_mode == "chat" and len(messages_dump) >= 2 and not has_tooling_context
)
@@ -1248,10 +1344,12 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
content_blocks.append({"type": "text", "text": text})
tool_events = result.get("toolEvents") or []
saw_pending_tool_use = False
saw_tool_event = False
if isinstance(tool_events, list):
for idx, item in enumerate(tool_events):
if not isinstance(item, dict):
continue
saw_tool_event = True
tool_id = str(item.get("id") or f"toolu_nonstream_{idx}")
content_blocks.append(_anthropic_tool_use_block(item, forced_id=tool_id))
tool_result = _anthropic_tool_result_block(item, forced_id=tool_id)
@@ -1260,7 +1358,21 @@ async def v1_messages(req: AnthropicMessagesRequest, request: Request):
else:
saw_pending_tool_use = True
if not saw_tool_event:
forced_tool_name = _anthropic_forced_tool_name(req.tool_choice)
if forced_tool_name:
fallback_event = _forced_tool_event_from_text(text, forced_tool_name)
if fallback_event is not None:
content_blocks = []
tool_id = "toolu_fallback_0"
content_blocks.append(_anthropic_tool_use_block(fallback_event, forced_id=tool_id))
tool_result = _anthropic_tool_result_block(fallback_event, forced_id=tool_id)
saw_pending_tool_use = tool_result is None
if tool_result is not None:
content_blocks.append(tool_result)
response_body: dict = {
"id": message_id,
"type": "message",
"role": "assistant",

View File

@@ -147,14 +147,18 @@ async def _collect_stream(response) -> str:
class _SpyClient(_FakeClient):
def __init__(self, *, stream_events: list[dict], complete_result: dict) -> None:
super().__init__(stream_events=stream_events, complete_result=complete_result)
self.last_complete_args: tuple = ()
self.last_stream_args: tuple = ()
self.last_complete_kwargs: dict = {}
self.last_stream_kwargs: dict = {}
async def chat_complete(self, *args, **kwargs) -> dict:
self.last_complete_args = tuple(args)
self.last_complete_kwargs = dict(kwargs)
return await super().chat_complete(*args, **kwargs)
async def chat_stream(self, *args, **kwargs):
self.last_stream_args = tuple(args)
self.last_stream_kwargs = dict(kwargs)
async for event in super().chat_stream(*args, **kwargs):
yield event
@@ -220,6 +224,42 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
{"query": "gateway"},
)
async def test_openai_non_stream_fallbacks_to_structured_tool_call_for_forced_tool(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "```json\n{\"arguments\": {\"query\": \"gateway\"}}\n```",
"toolEvents": [],
"sessionId": "sess-fallback-openai",
},
)
req = ChatCompletionsRequest(
model="org_auto",
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"type": "function", "function": {"name": "lookup", "parameters": {}}}],
tool_choice={"type": "function", "function": {"name": "lookup"}},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
):
response = await main.v1_chat_completions(req, _make_request("/v1/chat/completions"))
payload = json.loads(response.body)
message = payload["choices"][0]["message"]
self.assertEqual(payload["choices"][0]["finish_reason"], "tool_calls")
self.assertEqual(message["content"], "")
self.assertIsInstance(message["tool_calls"], list)
self.assertEqual(message["tool_calls"][0]["function"]["name"], "lookup")
self.assertEqual(
json.loads(message["tool_calls"][0]["function"]["arguments"]),
{"query": "gateway"},
)
async def test_openai_stream_bridges_tool_and_text_events(self) -> None:
fake_client = _FakeClient(
stream_events=[
@@ -302,6 +342,46 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(payload["content"][1]["name"], "lookup")
self.assertEqual(payload["content"][2]["tool_use_id"], "toolu_1")
async def test_anthropic_non_stream_fallbacks_to_structured_tool_blocks_for_forced_tool(self) -> None:
fake_client = _FakeClient(
stream_events=[],
complete_result={
"text": "{\"input\": {\"k\": \"v\"}, \"result\": {\"value\": 1}}",
"toolEvents": [],
"sessionId": "sess-fallback-anthropic",
},
)
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=256,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"name": "lookup", "input_schema": {"type": "object", "properties": {}}}],
tool_choice={"type": "tool", "name": "lookup"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(fake_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
):
response = await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
payload = json.loads(response.body)
types = [item["type"] for item in payload["content"]]
self.assertEqual(types, ["tool_use", "tool_result"])
self.assertEqual(payload["stop_reason"], "end_turn")
self.assertEqual(payload["content"][0]["name"], "lookup")
self.assertEqual(payload["content"][1]["tool_use_id"], "toolu_fallback_0")
async def test_openai_stream_tool_call_indices_are_stable(self) -> None:
fake_client = _FakeClient(
stream_events=[
@@ -496,6 +576,7 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(cfg["provider"], "openai")
self.assertEqual(len(cfg["tools"]), 1)
self.assertIsInstance(cfg["tool_choice"], dict)
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_non_stream_does_not_forward_tool_config_when_disabled(self) -> None:
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
@@ -518,6 +599,7 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("tool_config", spy_client.last_complete_kwargs)
self.assertIsNone(spy_client.last_complete_kwargs["tool_config"])
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_openai_tooling_context_disables_session_reuse_cache(self) -> None:
@@ -551,6 +633,40 @@ class ToolCallBridgeTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(fake_cache.get_calls, [])
self.assertEqual(fake_cache.put_calls, [])
async def test_anthropic_non_stream_with_tools_uses_agent_mode(self) -> None:
spy_client = _SpyClient(stream_events=[], complete_result={"text": "ok", "toolEvents": []})
req = AnthropicMessagesRequest(
model="claude-3-5-sonnet-20241022",
max_tokens=128,
messages=[{"role": "user", "content": "hi"}],
stream=False,
tools=[{"name": "write_file", "input_schema": {"type": "object", "properties": {}}}],
tool_choice={"type": "auto"},
)
with (
patch.object(main, "pool", _FakePool(_FakeInstance(spy_client))),
patch.object(main, "chat_guard", _FakeGuard()),
patch.object(main, "_ensure_instance_logged_in", AsyncMock(return_value={"id": "u"})),
patch.object(main.stats_collector, "record_chat", AsyncMock(return_value=None)),
patch.object(main.settings, "api_keys", ["test-key"]),
_SettingsPatch(tool_forward_enabled=True, default_ask_mode="chat"),
):
await main.v1_messages(
req,
_make_request(
"/v1/messages",
headers={"x-api-key": "test-key", "anthropic-version": "2023-06-01"},
),
)
self.assertIn("tool_config", spy_client.last_complete_kwargs)
cfg = spy_client.last_complete_kwargs["tool_config"]
self.assertEqual(cfg["provider"], "anthropic")
self.assertEqual(len(cfg["tools"]), 1)
self.assertEqual(spy_client.last_complete_args[2], "agent")
async def test_anthropic_tooling_context_disables_session_reuse_cache(self) -> None:
fake_cache = _FakeSessionCache()
fake_client = _FakeClient(
@@ -760,7 +876,6 @@ class SessionCacheToolFingerprintTests(unittest.TestCase):
"result": {"hits": 3},
}
)
self.assertEqual(
event,
{
@@ -770,3 +885,97 @@ class SessionCacheToolFingerprintTests(unittest.TestCase):
"result": {"hits": 3},
},
)
def test_tool_sync_triggers_approve_and_invoke_result_requests(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
def _decode(frame: bytes) -> dict:
body = frame.split(b"\r\n\r\n", 1)[1]
return json.loads(body.decode("utf-8"))
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
decoded = [_decode(frame) for frame in ws.frames]
methods = [item.get("method") for item in decoded]
self.assertIn("tool/call/approve", methods)
self.assertIn("tool/invokeResult", methods)
approve = next(item for item in decoded if item.get("method") == "tool/call/approve")
self.assertEqual(
approve["params"],
{
"type": "tool_call",
"sessionId": "sess-1",
"requestId": "req-1",
"toolCallId": "call-1",
"approval": True,
},
)
invoke_result = next(item for item in decoded if item.get("method") == "tool/invokeResult")
self.assertEqual(invoke_result["params"]["toolCallId"], "call-1")
self.assertEqual(invoke_result["params"]["name"], "run_in_terminal")
self.assertTrue(invoke_result["params"]["success"])
self.assertEqual(invoke_result["params"]["errorMessage"], "")
import asyncio
asyncio.run(run())
def test_tool_sync_does_not_emit_roundtrip_without_request_id(self) -> None:
from app.lingma_client import LspWsRpcClient
class _WsStub:
def __init__(self) -> None:
self.frames: list[bytes] = []
async def send(self, data: bytes) -> None:
self.frames.append(data)
ws = _WsStub()
rpc = LspWsRpcClient(ws)
async def run() -> None:
rpc.create_stream("req-1")
await rpc._handle_server_message(
{
"jsonrpc": "2.0",
"method": "tool/call/sync",
"params": {
"sessionId": "sess-1",
"toolCallId": "call-1",
"name": "run_in_terminal",
"parameters": {"command": "pwd"},
},
}
)
self.assertEqual(ws.frames, [])
import asyncio
asyncio.run(run())