Files
lingma-openai-gateway/DESIGN.md
GitHub Actions 0b08dc6573 feat: Anthropic Messages API compat (/v1/messages)
Add a wire-compatible Anthropic endpoint alongside the existing OpenAI one
so Claude Code / anthropic-sdk / Cursor Agent can hit Lingma directly.

- app/anthropic_schema.py (new): request model + content-block flattener
  + internal-messages adapter + affinity key helper. Handles text / image /
  tool_use / tool_result blocks; unknown types degrade gracefully.
- app/auth.py: add require_anthropic_key (x-api-key, Bearer fallback)
  and AnthropicAuthError so auth failures render in Anthropic's error
  envelope instead of FastAPI's {detail:...} wrapper.
- app/main.py: POST /v1/messages. Shares LingmaPool / SessionCache /
  InFlightGuard / StatsCollector with the OpenAI path — same api_key +
  same conversation prefix hits the same upstream sessionId across both
  protocols (KV cache carries over). Streaming emits the named Anthropic
  event sequence (message_start / content_block_start / content_block_delta
  / content_block_stop / message_delta / message_stop). No claude-*
  model mapping table: resolve_model's default fallback handles it.
- README.md / DESIGN.md: document the new endpoint, add decision 5.12,
  iteration history M5, and a 4.3b streaming flow diagram.
- Bump FastAPI app version to 0.4.0.

Made-with: Cursor
2026-04-18 15:40:43 +08:00

39 KiB
Raw Blame History

Lingma OpenAI Gateway — 架构与二开手册

这份文档是项目的"白盒"。读完应该能回答:

  • 为什么项目长这样,而不是别的样子?
  • 每个模块各自的职责和边界是什么?
  • 想加/改一个功能,应该从哪下手?
  • 历史上踩过什么坑,现在的做法解决了什么问题?

代码本身是最权威文档,这里只解释**"为什么""怎么一起工作"**。


目录


1. 项目目标与非目标

目标

  1. OpenAI 协议兼容:任何支持 OpenAI 的客户端curl、Cursor、Dify、LangChain、LiteLLM…不改代码就能接入 Lingma。 1b. Anthropic Messages 协议兼容Claude Code / anthropic-sdk-python / Cursor Agent 等只会说 Anthropic 的客户端也能直接接入,和 OpenAI 共享同一 session cache 与池。
  2. 单节点生产可用:自用场景下能长期跑 7×24包含合理的观测、鉴权、背压、错误恢复。
  3. 最大化利用单账号 / 多账号的配额:通过多实例池 + 会话复用把后端吞吐做到接近原始 VSCode 插件水平。
  4. 降低运维成本:首次登录成功后,可以导出一份 bundle 永久复用,彻底摆脱浏览器自动化的不稳定性。
  5. 保持可读:总代码量控制在数千行,新人(或几个月后的自己)能在一天内理清。

非目标

  • 逆向 Lingma 后端协议:之前评估过(曾经的"B1 终极方案"),需要反编译二进制,维护成本高、政策风险大,放弃。
  • 多租户 / 水平扩缩:单容器即可;真要大规模部署 → 套层反代 + N 个网关副本就够,不在进程内解决。
  • 完整 function calling / toolsOpenAI schema 里保留了字段,但目前不透传给 LingmaLingma 侧没有等价能力)。
  • 多模态:请求里的 image/audio 会被降级成占位符 [image] / [audio],因为 Lingma chat 不支持。

2. 整体架构

组件与数据流

                   HTTP请求                 背压票                      选实例
   客户端 ───▶ FastAPI(main)──▶ auth ──▶ InFlightGuard ──▶ SessionCache ──▶ LingmaPool.pick()
                                                │
                                                ▼
                                            (hit?) ─── yes ───▶ 复用 sessionId + 只发最后一条 user 消息
                                                │  no
                                                ▼
                                            发全量历史 + 新 sessionId

                                 挑到 PoolInstance
                                     │
                                     ▼
                             LingmaGatewayClient.chat_stream
                                     │
                                     ▼ notify("chat/ask", payload)     异步上行
                             LspWsRpcClient (WebSocket)   ◀─── chat/answer / chat/finish ───┐
                                     │                                                       │
                                     ▼ LSP 帧                                                 │
                             Lingma 子进程 (Popen)   ───────── KV cache / Qwen 推理 ──────────┘

关键不变量

  • 每个 Lingma 进程 ↔ 一个独立 workDir。多实例时绝不共用 workDir,避免 .info 互相覆盖。
  • 一个 request → 精确一个 Lingma 实例。中途不迁移(因为上游 session 跟实例绑定)。
  • Ticket 流转InFlightGuard.try_acquire() 发一张 InFlightTicket,由路由代码或 stream 的 finally 负责 release()。release 幂等,多次调用无害。
  • Session 绑定SessionCache 里每个 entry 记 instance_name。命中后路由粘性到同实例;若该实例不再健康,主动失效并重新分配。

3. 模块职责表

文件 行数 职责 被谁调用 调用谁
main.py 777 FastAPI 路由request 级编排;生命周期 (lifespan) 外部 HTTP 所有 app/*.py
lingma_pool.py 333 N 实例池;pick() 负载均衡 + 粘性路由;启动期 bundle 注入 main.py lingma_client.py, auto_login.py, session_bundle.py
lingma_client.py 758 单实例 Lingma 进程 + LSP-over-WS 通信LSP 帧编解码;重连循环;子进程回收 lingma_pool.py websockets 库 + subprocess
session_cache.py 165 LRU+TTL 缓存:会话前缀哈希 → 上游 sessionId;指标暴露 main.py
session_bundle.py 175 Lingma cache 目录 pack/unpacktar.gz + base64路径穿越防护 main.py, lingma_pool.py 纯标准库
concurrency.py 121 InFlightGuard:基于 asyncio.Semaphore 的背压+排队+队列超时ticket 幂等 release main.py
auto_login.py 241 Playwright 无头登录;重试 + 验证钩子 main.py, lingma_pool.py playwright
auth.py 147 三档鉴权:require_bearerchat/ require_metrics_access / require_admin_access main.py
config.py 178 env → Settings dataclassLINGMA_ACCOUNTS 多格式解析bundle 字段归一化 main.py
model_map.py 84 Lingma 模型 key ↔ displayName 双向映射;请求 model 解析(idname 都认) main.py
openai_schema.py 91 OpenAI 请求/响应 Pydantic多模态内容 flatten_content 降级 main.py, session_cache.py
anthropic_schema.py ~140 Anthropic Messages 请求 Pydanticcontent blocks flatten_anthropic_contentanthropic_to_internal_messages 归一化到内部消息;affinity_key_for_anthropic 选池键 main.py
stats.py 85 请求次数 / token 估算 / Prometheus 文本 main.py
logging_config.py 56 结构化 JSON loggerrequest_id 通过 ContextVar 注入每行 所有模块
bootstrap_lingma.py 199 启动时从 Marketplace / VSIX 提取 Lingma 二进制到 data/bin/ 容器启动脚本

4. 核心流程

4.1 启动

Docker ENTRYPOINT
  │
  ▼
bootstrap_lingma.py (按需下载 VSIX → 提取 Lingma 二进制)
  │
  ▼
uvicorn app.main:app
  │
  ▼
FastAPI lifespan.__enter__
  │
  ├─ load_settings()                     # env → Settings
  ├─ LingmaPool.build(...)               # N 个 PoolInstance + InstanceConfig
  │     └─ 为每个账号创建 LingmaGatewayClient + AutoLoginManager
  ├─ _log_auth_posture()                 # 警告裸奔的鉴权配置
  └─ await pool.start()                  # 并行启动 N 个实例
        └─ 每个实例:
              1. _maybe_apply_session_bundle(inst)
                 └─ 如果 workDir 没登录态 且 有 bundle 配置 → 解包到 workDir
              2. client.start() (非阻塞 failure)
                    └─ _connect(initial=True)
                          ├─ 读 .info如果已有预热端口就跳过 spawn
                          ├─ Popen(Lingma, stderr=PIPE)
                          ├─ 启动 _drain_stderr 后台任务
                          ├─ 轮询等 .info 文件写出 → 读取端口
                          ├─ websockets.connect(ws://127.0.0.1:port)
                          ├─ LSP initialize + initialized notify
                          └─ state → ready

关键点:

  • pool.start() 内部用 asyncio.gather 并发起每个实例N=2 时启动时间约等于 max(单实例启动) 而非求和。
  • 任何一个实例启动失败不会lifespan 崩溃;对应 client 进入 failed 状态,ensure_ready() 在第一个请求到来时重试 _connect
  • bundle 注入是幂等的workDir 已登录就跳过;注入失败打 warning 然后 fallback 到 Playwright。

4.2 非流式 chat 请求

POST /v1/chat/completions  stream=false
  │
  ▼
auth_guard(API_KEYS)
  │
  ▼
[构建 messages_dump + api_key 提取]
  │
  ▼
reuse_eligible = session_reuse_enabled AND ask_mode=="chat" AND len(messages) >= 2
  │
  ▼
if reuse_eligible:
    lookup_key  = session_cache.build_key(api_key, messages[:-1])
    write_key   = session_cache.build_key(api_key, messages)
    entry       = await session_cache.get(lookup_key)
    if entry:
        cached_session_id    = entry.session_id
        cached_instance_name = entry.instance_name
  │
  ▼
inst = pool.pick(affinity_key = cached_instance_name or _affinity_key_for(req))
  │                                                        # user > system-hash > first-msg-hash
  ▼
if cached_instance_name AND inst.name != cached_instance_name:
    # 路由迁移:原实例不再 healthy丢弃 cached session另一个进程不认这个 sessionId
    cached_session_id = None
    await session_cache.invalidate(lookup_key)
  │
  ▼
await _ensure_instance_logged_in(inst)
  │
  ▼
models   = await inst.client.query_models()
model    = resolve_model(req.model, available_keys, default, name_map)
prompt   = _last_user_text(messages) if cached_session_id else _messages_to_prompt(messages)
is_reply = bool(cached_session_id)
  │
  ▼
ticket = await chat_guard.try_acquire()            # 超时 → 429 + Retry-After
inst.in_flight += 1
  │
  ▼
try:
    result = await inst.client.chat_complete(
        prompt, model, ask_mode,
        session_id=cached_session_id,
        is_reply=is_reply,
    )
except:
    stats_collector.record(success=False)
    if cached_session_id: await session_cache.invalidate(lookup_key)  # 坏 session 不留
    raise 502
  │
  ▼
stats_collector.record(success=True)
if write_key: await session_cache.put(write_key, result["sessionId"], inst.name)
  │
  ▼
return ChatCompletionResponse(... served_by=inst.name, usage=..., latency=...)
  │
finally:
    inst.in_flight -= 1
    ticket.release()                                # 幂等

路径为什么是这样:

  • reuse_eligible 条件里 len(messages) >= 2 的原因:首轮对话 messages[:-1] 是空,没有"上下文前缀"可缓存。
  • lookup_key = 不含最后一条 user 消息的前缀;write_key = 完整 messages。下一轮请求时它的 messages[:-1] 就是这一轮的完整 messages,天然命中。
  • 失败路径主动 invalidate:避免把坏 session 一直喂给后续请求dead-session 死循环)。

4.3 流式 chat + session cache 命中

POST /v1/chat/completions  stream=true
  │
  ▼
[前半段路由 + session lookup + 选实例 + 构造 prompt 跟 4.2 一致]
  │
  ▼
ticket = await chat_guard.try_acquire()
inst.in_flight += 1
completion_id         = f"chatcmpl-{uuid}"
stream_meta           = {}            # chat_stream 会把 sessionId 写回来
completion_tokens_holder = {"n": 0}
  │
  ▼
ticket_transferred = True             # 所有权移交给 event_stream 的 finally
return StreamingResponse(event_stream(), media_type="text/event-stream")
  │
  ▼  (后台消费)
async def event_stream():
    success = False
    try:
        async for chunk in inst.client.chat_stream(
            prompt, model, ask_mode,
            session_id=cached_session_id,
            is_reply=is_reply,
            out_meta=stream_meta,
        ):
            completion_tokens_holder["n"] += estimate_tokens(chunk)
            yield f"data: {chunk SSE payload}\n\n"
        yield "data: [DONE]\n\n"
        success = True
    except asyncio.CancelledError:     # 客户端断开
        raise
    except:
        logger.warn
    finally:
        # 只有 clean finish 才写回 cache半截流不能复用 session
        if success and write_key:
            sid = stream_meta.get("session_id")
            if sid: await session_cache.put(write_key, sid, inst.name)
        await stats_collector.record_chat(...)
        inst.in_flight -= 1
        ticket.release()

两个细节:

  1. ticket_transferred=True 一旦设成 true外层 finally 就不会 release ticket责任转交给 event_stream() 的 finally。否则会 release 两次(虽然幂等,但会把 in_flight 计成 -1
  2. chat_stream 走的是 JSON-RPC notify 而非 request。早期版本用 request 会等 30s 才下第一个字节(见决策 5.1)。

4.3b 流式 Anthropic Messages/v1/messages

输入输出协议都不同于 OpenAI但中间层完全复用

client ──► POST /v1/messages (x-api-key / Bearer)
   │
   ▼
require_anthropic_key                      # x-api-key 优先;缺了 → AnthropicAuthError
   │
anthropic_to_internal_messages(req)        # system → role="system"content blocks flatten
   │                                        # 结果与 OpenAI 路径完全同构 (role/content dict)
   ▼
session_cache lookup / affinity pick       # 与 OpenAI 共享同一 SessionCache 实例
   │                                        # → 同一用户切协议不丢 KV cache
   ▼
pool.pick(affinity) + ensure_logged_in
   │
   ▼
resolve_model("claude-3-5-sonnet-*")       # 兜底到 default_model
   │
   ▼
chat_guard.try_acquire()                   # 与 OpenAI 路径同一 in-flight 池
   │
   ▼ stream=true
StreamingResponse(event_stream())
   │
   ├─ event: message_start           ← 一次性id / model / usage.input_tokens
   ├─ event: content_block_start     ← index=0, type=text
   ├─ event: content_block_delta     ← 每片 chunk 包一次
   │  ...
   ├─ event: content_block_stop
   ├─ event: message_delta           ← stop_reason (+ output_tokens)
   └─ event: message_stop            ← 终止,无 [DONE]
   │
   ▼ finally
session_cache.put(write_key, upstream_sessionId, inst.name)   # 仅 success
ticket.release() + inst.in_flight--

与 OpenAI 路径的差异点:

环节 OpenAI Anthropic
鉴权 Authorization: Bearer x-api-keyfallback Bearer
系统消息 messages 数组里的 role:"system" 顶层 system 字段
内容结构 str 或 `[{type:"text" "image_url"...}]`
流式帧 data: {delta:{content:"..."}} + [DONE] 命名事件序列 message_start / content_block_* / message_delta / message_stop
usage 语义 prompt_tokens / completion_tokens input_tokens / output_tokens
错误 envelope {"error":{...}} {"type":"error","error":{...}}
finish 语义 finish_reason: "stop"|"length" stop_reason: "end_turn"|"max_tokens"

4.4 Lingma 子进程与 LSP 通信

LingmaGatewayClient._connect(initial: bool)
  │
  ▼
state -> starting
  │
  ▼
port_prewarmed = (socket_port > 0) AND _is_port_open(...)
  │
  ├─ yes: 直接复用(单实例容器重启场景)
  ▼  no:
  clean up 老的 .info
  await _terminate_proc()           # 先杀掉上次残留的 Popen
  self._proc = subprocess.Popen(
      [bin, "start", "--workDir", workdir],
      stderr=PIPE,
  )
  asyncio.create_task(_drain_stderr(self._proc))    # 子线程 readline → logger.debug
  port, pid, info_path = _wait_info_any([...], timeout)
  self.socket_port = port
  │
  ▼
await port open            # 二次确认 TCP listener 起来
  │
  ▼
self._ws  = await websockets.connect(f"ws://127.0.0.1:{port}", max_size=10MB)
self._rpc = LspWsRpcClient(self._ws, on_disconnect=self._on_disconnect)
await self._rpc.start()     # 启动 _reader_loop
  │
  ▼
await self._rpc.request("initialize", {processId, clientInfo, rootUri=None}, timeout=rpc_timeout)
await self._rpc.notify("initialized", {})
state -> ready

LSP 帧结构WebSocket 载荷):

Content-Length: <N>\r\n
\r\n
{"jsonrpc":"2.0", "id":1, "method":"...", "params":{...}}   (N 字节 JSON)

一个 WS 消息可能粘多个帧,_parse_lsp_frames 用一个累加的 _rx_buffer 实现状态机。

request vs notify

  • request(method, params, timeout):分配自增 id,放 _pending[id] = future,收到同 id 响应时 fut.set_result(msg)。超时则 pop 并抛 TimeoutError
  • notify(method, params):不带 id,发出去就忘。chat/ask 必须用 notify见 5.1。

chat/answerchat/finish 处理:

  • server → client 的消息通过 _handle_server_message 分发。
  • chat/answer 里的 text 被塞进 _chat_streams[requestId]["chunks"] 队列;第一个 chunk 来的时候标 first_chunk_at 做 TTFB 统计。
  • chat/finish 触发 done.set() + chunks.put_nowait(None)consume_stream 看到 None 就 break。
  • 这两个消息都会带 params.sessionIdLingma 自己分配的真实 session可能跟 client 传的 hint 不一样),chat_streamfinally 里用 get_stream_result() 取到并写进 out_meta

重连:

_reader_loop 捕获到 WS 断开(异常或 closeon_disconnect(exc),启动 _reconnect_loopbackoff 从 1s 指数增长到 30s最多 20 次。成功重连后不自动重放 in-flight 请求(状态丢失),但后续新请求会正常工作。

子进程生命周期:

close()
  ├─ _reconnect_task.cancel()
  ├─ _rpc.close()                     # 拒绝所有 pending futures, 结束 reader loop
  ├─ _ws.close()
  └─ _terminate_proc()
         └─ proc.terminate() → asyncio.to_thread(proc.wait, 5s)
              └─ TimeoutError → proc.kill() → wait(3s)
              └─ finally: proc.stderr.close()

之前版本用 start_new_session=TrueLingma 会变孤儿进程;容器 stop 后宿主机上还残留着。现在保留 session 归属 + 显式 terminate退出干净。

4.5 Session bundle 导入/导出

导出(POST /internal/session/export

admin_auth_guard ──▶ 选实例 ──▶ auth_status() 必须 logged_in=true
  │
  ▼
pack_workdir(target.work_dir):
    读 cache/{id,user,quota,config.json} → tar.gz BytesIO → bytes
    size cap: 4 MiB
    raise if cache/user empty防止导出空包
  │
  ▼
encode_bundle(raw) → base64 字符串
  │
  ▼
return {instance, account, raw_bytes, bundle_b64}

注入(LingmaPool._maybe_apply_session_bundle

if is_logged_in_workdir(workdir):       # cache/user 存在且非空
    return              # 绝不覆盖活跃登录态
  │
  ▼
b64 = resolve_bundle_b64(inline, file_path)    # inline 优先
if not b64: return
  │
  ▼
raw = decode_bundle(b64)    # base64 + size cap
  │
  ▼
apply_bundle_to_workdir(workdir, raw):
    for member in tar:
        _is_safe_member(member)?   # 白名单 4 个文件 + 非目录 + 非 symlink + 无路径穿越
        写入 workdir/cache/X
        chmod 0600 for "user", 0644 others
    return [restored file names]
  │
  ▼
logger.info("pool X: applied session bundle (4 files: ...)")

安全考量:

  • 白名单:只接受 cache/{id,user,quota,config.json} 4 个文件名,任何其他成员被静默跳过并 warn。
  • 路径穿越:../、绝对路径、symlink、hardlink、非 regular file 全部拒绝。
  • 大小上限encode 前 / decode 后都限 4 MiB实际 payload 通常 < 10 KB
  • bundle 不出现在任何 loglogger.info 只打文件数和字节数,不打内容)。

4.6 自动登录 (Playwright)

仅在没有 bundle 且 workDir 未登录时触发。

_ensure_instance_logged_in(inst)
  │
  ▼
status = await client.auth_status()
if status.id: return status     # 已登录
  │
  ▼
if not auto_login_enabled: raise 401
  │
  ▼
(可选) 切 dedicated_domain_url
  │
  ▼
login_url = await client.generate_login_url()
  │
  ▼
await auto_login.ensure_started(login_url)      # 启 Playwright 后台任务(幂等)
await auto_login.wait_done(timeout)
  │
  ▼
status = await client.auth_status()
if not status.id: raise 401
return status

AutoLoginManager._run() 的细节在 app/auto_login.py。关键点Playwright 配了 headless=True + verify_logged_in 钩子做二次确认(避免误报登录成功)。

4.7 关闭

FastAPI lifespan 退出 → pool.close() → 每个 client.close() → 进程回收链。整个路径在 4.4 末尾。


5. 关键设计决策

每条都写出问题 / 方案 / 权衡 / 为何没选其他,方便二开时评估能不能推翻。

5.1 chat/ask 走 JSON-RPC notify 而非 request

  • 问题:早期版本用 rpc.request("chat/ask", ...)await 响应,但 Lingma 压根不回 response只用 chat/answer + chat/finish 异步推流。导致首字节延迟 = rpc_timeout (30s)。
  • 方案:改用 notify(),发出去不等 response响应完全靠 _reader_loop 里的 _handle_server_message 分发到 _chat_streams[requestId] 的队列。
  • 权衡:放弃"RPC 层超时"的简单性,换到"stream-level 超时"consume_stream 的 idle timeout。值得。
  • 其他方案:伪造 request 响应不行Lingma 侧没有可预期的 id 回执。

5.2 多实例池独立 workDir而不是共享

  • 问题:最初考虑多实例共享 ~/.lingma/.info,让一个 Lingma 服务多个 client。但 Lingma 的 .info 文件每次启动覆写N 个进程会互相踩端口。
  • 方案:每个 PoolInstance 一个独立 data/.lingma/pool/inst-<i>/,各自的 .info 只看自己的目录。
  • 权衡多账号登录态没法共享bundle 机制弥补(可以导出一份用在所有实例)。磁盘占用多出 N 份,单实例约 50 MB实际不是问题。
  • 其他方案:改 Lingma 命令行让它支持独立 info pathLingma 是闭源二进制,不可行。

5.3 session cache 只哈希 user/system/developer 消息

  • 问题OpenAI 客户端常常会规范化 / 裁剪 assistant 消息(例如 trim 末尾空白、去掉思考内容),导致下一轮的 messages[:-1] 跟上一轮的 messages 不完全字节相等。
  • 方案hash_user_context 只对 system / user / developer 三种 role 做 SHA1assistant/tool 不参与。只要用户输入路径稳定,哈希就稳定。
  • 权衡:理论上客户端篡改 assistant 语义比如把模型的回答改成相反的cache 依然命中,但 Lingma 侧自己持有 session 原版历史,下一轮还是按原版继续。对用户意图的偏离不可见。这是 OK 的——客户端本来就不该篡改 assistant 内容。

5.4 session cache 写入用 write_key = hash(messages),查询用 lookup_key = hash(messages[:-1])

  • 问题cache 要能被下一轮命中。
  • 推导:当前轮完成后写入的 key 是 "当前完整 messages";下一轮请求到来时它的 "前缀"messages[:-1],去掉当前这轮新的 user 消息)正是上一轮的完整 messages。所以下一轮 lookup 必中。
  • 权衡:只能命中严格 append-only 的对话模式。客户端如果重写历史就会 miss这是预期行为。

5.5 session 失败路径主动 invalidate

  • 问题:如果 Lingma 侧主动回收了某个 session我们这边还在命中 cache 反复用死 session会造成死循环 502。
  • 方案chat_complete/stream 抛异常 或 stream 未 clean finish 时,不写回当前 session如果是用了 cached_session_id 失败的,直接 invalidate(lookup_key),让下一轮重新开 session。
  • 权衡:偶尔会浪费一轮 KV cache 重建。实测下来单次延迟增加 < 300ms可接受。

5.6 Session bundle 只打 4 个文件

  • 问题Lingma workDir 里文件很多:cache/, db/, logs/, index/, .lock, .info, diagnosis.bin...
  • 方案:实验发现只有 cache/{id,user,quota,config.json} 是恢复登录必要的。其他都是 Lingma 启动时按需重建的。
  • 权衡
    • bundle 小(< 10KB传输友好
    • 如果 Lingma 新版本引入新的必需文件bundle 会 silently 坏掉——加了兼容层(注入失败 fallback 到 Playwright而不是直接崩。
    • _is_safe_member 白名单强制,新版本多写了文件也不会被偷渡进来。

5.7 auth 三档chat / admin / metrics

  • 问题:单一 API_KEY 权限太粗。调用方只需要 chat 能力,不该有 session 导出权。
  • 方案:三把独立 token
    • API_KEYS(多把):只能 chat 和看 models
    • ADMIN_TOKEN(一把):/internal/* 管理面
    • METRICS_TOKEN(一把):/metrics 观测面
  • 权衡(兼容性):三把 token 全配置是理想状态,但单租户用户嫌麻烦,所以保留 fallbackadmin/metrics 未配置时退化到 API_KEYS。启动时 _log_auth_posture() 对"全空"和"admin 回落"发 WARN提示用户显式配。

5.8 /metrics 默认严格v0.3 破坏性改动)

  • 问题:最初 /metrics 在无 token 配置时是公开的。这在单节点加反代时没问题,但容器直接暴露公网就泄露 pool 拓扑(账号名、账号数、实例健康度)。
  • 方案:默认拒绝,要么配 token 要么显式 METRICS_PUBLIC=true 才放开。
  • 权衡:破坏兼容;但通过启动 WARN + README 升级段,升级路径明确。

5.9 子进程 stderr 走 PIPE + 独立线程读

  • 问题DEVNULL 下 Lingma native 崩溃的原因完全黑箱,只能靠堆 strace
  • 方案subprocess.Popen(stderr=PIPE) + asyncio.to_thread(readline loop),逐行 log 到 DEBUG
  • 权衡DEBUG 日志稍增,但不开 debug 不影响。一定要是 to_thread 包装,因为 readline() 阻塞,直接用 asyncio.subprocess 需要整个 _connect 改写,改动太大。

5.10 least-in-flight + affinity 的调度顺序

  • 问题:粘性 affinity 和负载均衡哪个优先?
  • 方案affinity 的 bucket 优先,但要求目标实例 healthyunhealthy 时退到 least-in-flight全不健康时 round-robin 兜底(让 ensure_ready() 驱动重连)。
  • 权衡:粘性优先让 session cache 命中率最大化;只有实例挂了才强制迁移,这时也必然会 miss cachesession 跟着进程死)。

5.11 子进程 handle 保存到 LingmaGatewayClient 而非 pool

  • 问题pool 知道实例,但不知道单进程生命周期;放哪边?
  • 方案client._proc + client._terminate_proc()。pool 只负责 client.start() / client.close() 的调度,进程操作封装在 client 内部。
  • 权衡client 文件变长但边界清晰——pool 只看状态和在途数,具体进程是 client 的事。

5.12 Anthropic Messages 端点独立编排而非内部转发

  • 问题:既要兼容 Anthropic API又不能把 v1_chat_completions 的编排路径搞成大杂烩。
  • 方案:单独写一个 v1_messages 端点前半段auth / 归一化到内部 messages / affinity / session cache lookup / instance pick / prompt 构造 / ticket 获取)与 OpenAI 端点结构对齐但各自实现后半段SSE 事件生成 / 响应包装)按 Anthropic 格式输出。
  • 共享的下沉层LingmaPool / SessionCache / InFlightGuard / StatsCollector / LingmaGatewayClient.chat_stream|chat_complete / resolve_model
  • 为何不用一层统一抽象:两端的输入/输出对象形状差异足够大system 位置、content 类型、SSE 事件名、错误 envelope抽象出来的中间类型反而掩盖差异、增加维护成本。当前重复代码约 150 行,但每条分支读起来直接对应 wire 协议,调试、改协议时都是线性阅读。
  • 会话复用跨协议session_cache.build_key(api_key, messages) 在两端都接收归一化后的 {role, content} 列表——同一用户从 OpenAI 切 Anthropic只要对话前缀一致可直接命中同一上游 sessionId,等于白送 KV cache。
  • 错误路径AnthropicAuthError 专用异常 + @app.exception_handler 渲染 Anthropic envelope端点内部其他错误HTTPException、backpressure_anthropic_error() helper 直接返 JSONResponse,绕过 FastAPI 默认 {"detail":...} 包装。
  • 模型名:不维护 claude-* → dashscope_* 映射表。resolve_model 的末位兜底default_model / first available会把所有陌生 id 退回到实际可用的 Lingma keyAnthropic 客户端继续传 claude-3-5-sonnet-* 即可工作。

6. 扩展指引(要做 X 改哪里)

需求 改哪些文件 关键入口
加一个新的 OpenAI 端点(如 embeddings main.py, openai_schema.py 仿照 v1_models@app.post("/v1/embeddings", dependencies=[Depends(auth_guard)])
扩展 Anthropic 端点(如 count_tokens / tool_use 贯通) main.py::v1_messages, anthropic_schema.py count_tokens 只读:复用 estimate_tokenstool_use 需要 Lingma 上游支持payload 转发点在 chat_stream / chat_complete
加一种新的实例调度策略(如加权轮询) lingma_pool.py::pick() 当前是 affinity → least-in-flight → round-robin
改认证为 JWT / OAuth auth.py 三个 require_* 函数是全部入口;main.py 里只有 *_guard 代理
增加限流(按 api_key 配额) concurrency.pyPerKeyGuardmain.pychat_guard.try_acquire() 后再来一层 注意 ticket 释放顺序(内层先释放)
支持请求级别的 session_id 穿透 main.py(读 req header + lingma_client.py::chat_stream(session_id=...) 已支持 只需把 header 值塞进 cached_session_id 分支
改 Prometheus 指标名 所有 prometheus_lines()prometheus_text() 注意生态兼容;更名要在 README 留 alias
接入 Jaeger / OpenTelemetry logging_config.py 加 OTel instrumentationmain.py::request_id_middleware 注入 traceid request_id 可以复用为 span_id
加一个 Lingma 新方法调用(比如 code/complete lingma_client.py 仿照 query_modelsawait self.ensure_ready(); return await self.rpc.request("code/complete", ...) 原始上游响应形态需抓包确认
支持 function calling假设 Lingma 将来支持) openai_schema.py 已保留 tools / tool_choice 字段;lingma_client.py::_build_payloadextra.tools 上游协议 TBD
多模态穿透 openai_schema.py::flatten_content 不再降级;lingma_client.py payload 传 url 前提Lingma 支持(目前不支持)
换 session_cache 后端(如 Redis 实现同样接口的 RedisSessionCachemain.py 初始化换实现 接口是 get / put / invalidate / stats / prometheus_lines / build_key / enabled,内存换远端成本不高
多容器副本(水平扩) 外面套反代 + sticky session根据 Authorizationx-user 做 hashsession cache 改 Redis 或直接接受多副本 cache 独立,轻微浪费 KV cache 命中率

本地开发调试

pip install -r requirements.txt
# 在容器外跑,需要自己准备 Lingma 二进制
export LINGMA_BIN=/path/to/Lingma
export API_KEYS=sk-dev
uvicorn app.main:app --reload --port 8317

主要断点位置:

  • main.py::v1_chat_completions —— 请求编排
  • lingma_client.py::_connect —— 连接建立过程
  • lingma_client.py::_handle_server_message —— 上游推送
  • session_cache.py::get / put —— 会话复用决策

7. 已知问题 / 未完成项

标签 描述 影响 计划
D1 config.py 还是纯 dataclass + os.getenv,未迁 pydantic-settings 类型校验靠自己 cast 低优,收益有限,有精力再做
D3 无单元测试骨架 重构要靠 deploy 验证 想加 CI 时优先补
Docker non-root 容器还是 root 跑 容器逃逸时影响宿主 需要加 gosu + chown entrypoint涉及数据迁移谨慎推进
ADMIN_TOKEN 轮换 没有过期机制,只能重启 自用场景不影响 接 Vault / sops 时一并做
Lingma 版本漂移 新版 Lingma 改 LSP 方法或新增必需 cache 文件时会无声崩 注入失败会 fallback但 chat 不回话题型的错误不易定位 加一个 /internal/smoke 端点做端到端自检
estimate_tokens 粗略 按字节 / 4 估算,中文误差大 只影响 usage 字段和 Prometheus token 计数 tiktoken 即可,但包体积会涨
Lingma agent 模式未深入验证 model: "agent" 切 ask_mode但 session reuse 被禁用 agent 多轮不享受 KV 复用 agent 语义跟 chat 不同(会触发 tool use需要单独设计

8. 迭代历程

一条时间线,方便理解每层功能的动机。

M1 — 基础生产可用 (A2 + D2)

  • 背压:InFlightGuard + 429 Retry-After + 排队超时
  • 结构化 JSON 日志 + request_id 贯穿
  • auth.py 抽离;require_metrics_access 独立通道
  • Prometheus 文本格式 + /internal/stats
  • 目的:单点网关能稳定跑,能被 Prometheus 抓,能看到 request 级链路。

M2 — 多账号池 + Session bundle (A1 + B3 + C1)

  • LingmaPoolN 个独立 Lingma 子进程,每个独立 workDir
  • 路由:粘性 affinity + least-in-flight + 不健康兜底 round-robin
  • Pool-level /healthz / /internal/stats / Prometheus gauges
  • Session bundlesession_bundle.py 把 Lingma cache/ 打 tar.gz允许从一个已登录实例导出、在任意新实例上一键注入彻底跳过 Playwright。
  • Bundle 机制包含路径穿越防护、4 MiB 大小上限、文件权限规整、注入失败 fallback 到自动登录、不覆盖活跃登录态。
  • 服务器上落盘 secrets/lingma-session.b64 加 docker-compose :ro 挂载。

M3 — 性能优化

问题:用户反馈对话首 token 慢,甚至比原 VSCode 插件明显慢2-3s vs < 500ms

根因 1P0chat/ask 用了 JSON-RPC request,等 result 超时 30s 才下第一个 chat/answer。实际上 Lingma 永远不会回 result。 修复_kick_chat_ask() 改用 notify。TTFB 从 rpc_timeout 下降到 ~2s纯 Lingma 推理)。

根因 2P1:多轮对话每次都拼接完整历史发给 Lingma上游 KV cache 没被利用。 修复SessionCacheLRU + TTL+ chat_complete/stream 增加 session_id + is_reply 参数;命中时只发最后一条 user 消息,服务端识别为增量输入,命中 Qwen 的 prefix caching。

收益:单轮没有显著改变(推理仍然花最多时间),但第 2 轮起 TTFB 降 40%~60%,视 prompt 长度。

M5 — Anthropic Messages 兼容

  • 场景Claude Code / Cursor Agent / anthropic-sdk-python / 各种 agent 框架只会说 Anthropic 协议。
  • 改动
    • 新增 anthropic_schema.pyAnthropicMessagesRequest + anthropic_to_internal_messages + flatten_anthropic_content + affinity_key_for_anthropic
    • auth.py 新增 require_anthropic_keyx-api-key 优先Bearer 回退)+ AnthropicAuthError
    • main.py 新增 /v1/messages 端点:复用 LingmaPool / SessionCache / InFlightGuard;流式按 message_start / content_block_start|delta|stop / message_delta / message_stop Anthropic SSE 协议输出;错误 envelope 改写成 {"type":"error","error":{...}}
    • @app.exception_handler(AnthropicAuthError) 渲染 Anthropic 错误 wire 格式。
  • 关键设计:两端共享同一 SessionCache,同一 api_key 下的会话前缀哈希一致 → 跨协议命中同一上游 sessionId。详见 §5.12。
  • 模型名:不维护 claude-* → dashscope_* 映射表,靠 resolve_model 末位兜底。

M4 — 生产硬化包commit 2febc37

用户代号"选项 A"。

  • 权限分层ADMIN_TOKEN / METRICS_TOKEN / METRICS_PUBLIC 独立;/metrics 和 /internal/* 默认严格(全空 → 503启动 WARN 裸奔配置。
  • 子进程生命周期:不再 start_new_session=True;保存 Popen handlestderr=PIPE 读到 DEBUG log关闭走 SIGTERM → 5s → SIGKILL。杜绝孤儿 Lingma。
  • 并行池启动pool.start()asyncio.gather。N=2 启动省 ~startup_timeout 秒。
  • HEALTHCHECKDockerfile 加 30s 间隔 /healthz 探针,仅当 pool_ready>0 算 healthy。

9. Lingma LSP 协议速查

不是完整文档,只列本项目用到的方法。真实协议通过观察 Lingma 行为 + 抓包逆推。

方向 方法 载荷(精简) 返回 / 说明
initialize {processId, clientInfo, rootUri:null, capabilities:{}, workspaceFolders:[]} LSP 标准握手
initialized (notify) {} LSP 标准
auth/status {} {id, nickname, ...} or {}
config/queryModels {} {chat:[{key,displayName,...}], assistant:[...], developer:[...], inline:[...]}
config/getEndpoint {} {endpoint}
config/updateEndpoint {endpoint} ok
login/generateUrl {} str{loginUrl/url}
chat/ask (notify!) _build_payload 不回 result通过 server push 下推
chat/answer {requestId, text, content} 流式 token
chat/finish {requestId, sessionId, ...其它元数据} 结束信号,含上游真实 sessionId

chat/ask payload 关键字段

requestId              # 我们这边用 uuid4
sessionId              # 我们分配或从 cache 复用Lingma 可能在 chat/finish 里返回不同的
sessionType            # "chat" 或 "developer"agent 模式)
mode                   # "chat" / "agent"
stream                 # true
source                 # 1
isReply                # 会话复用命中时 true让上游走 KV cache 路径
content / text / message / questionText  # 都是 prompt冗余填Lingma 不同版本用不同字段)
extra.modelConfig.key  # 模型 key
pluginPayloadConfig    # {isEnableAskAgent, isEnableAutoMemory}
chatContext            # {text, preferredLanguage: "zh-CN", ...}

发现新方法的方法:开 LOG_LEVEL=DEBUG 并打开 _reader_loop 里的 msg 打印;用真实 VSCode 插件操作一次,就能观察到 Lingma 上行的完整方法列表。


文档版本:对应 main 分支 commit 2febc37 之后。后续大改请同步更新本文件,尤其是决策记录和模块职责表。