From 8139a5e97baa895912ab8db64a6cb4d4f3403566 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 17 Apr 2026 09:42:23 +0800 Subject: [PATCH] feat: add Lingma OpenAI-compatible gateway service --- .env.example | 22 +++ .gitignore | 4 + Dockerfile | 15 ++ README.md | 155 +++++++++++++++++ app/__init__.py | 0 app/auth.py | 34 ++++ app/auto_login.py | 149 +++++++++++++++++ app/config.py | 54 ++++++ app/lingma_client.py | 386 +++++++++++++++++++++++++++++++++++++++++++ app/main.py | 283 +++++++++++++++++++++++++++++++ app/model_map.py | 84 ++++++++++ app/openai_schema.py | 45 +++++ app/stats.py | 85 ++++++++++ docker-compose.yml | 13 ++ requirements.txt | 5 + 15 files changed, 1334 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 app/__init__.py create mode 100644 app/auth.py create mode 100644 app/auto_login.py create mode 100644 app/config.py create mode 100644 app/lingma_client.py create mode 100644 app/main.py create mode 100644 app/model_map.py create mode 100644 app/openai_schema.py create mode 100644 app/stats.py create mode 100644 docker-compose.yml create mode 100644 requirements.txt diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..36de347 --- /dev/null +++ b/.env.example @@ -0,0 +1,22 @@ +HOST=0.0.0.0 +PORT=8317 +API_KEYS=sk-DXKFFEL0A1bN8Teqz + +LINGMA_BIN=/app/bin/Lingma +LINGMA_WORK_DIR=/root/.lingma/vscode/sharedClientCache +LINGMA_SOCKET_PORT=36510 +LINGMA_STARTUP_TIMEOUT=40 +LINGMA_RPC_TIMEOUT=30 + +DEFAULT_MODEL=org_auto +DEFAULT_ASK_MODE=chat + +DEDICATED_DOMAIN_URL= + +AUTO_LOGIN_ENABLED=true +AUTO_LOGIN_HEADLESS=true +AUTO_LOGIN_TIMEOUT=180 +AUTO_LOGIN_MAX_RETRY=2 + +LINGMA_USERNAME= +LINGMA_PASSWORD= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0230b89 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +__pycache__/ +*.pyc +bin/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..96e9a2f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM mcr.microsoft.com/playwright/python:v1.52.0-jammy + +WORKDIR /app + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY app /app/app + +EXPOSE 8317 + +CMD ["sh", "-c", "uvicorn app.main:app --host ${HOST:-0.0.0.0} --port ${PORT:-8317}"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e0baff0 --- /dev/null +++ b/README.md @@ -0,0 +1,155 @@ +# Lingma OpenAI Gateway + +把本地 Lingma 能力封装为 OpenAI 兼容接口,支持: + +- `GET /v1/models` +- `POST /v1/chat/completions` +- `stream=true`(SSE) +- Bearer API Key 鉴权 + +## 1. 准备目录 + +```bash +mkdir -p bin +cp ../Lingma bin/Lingma +chmod +x bin/Lingma +``` + +## 2. 配置环境变量 + +```bash +cp .env.example .env +``` + +至少修改: + +- `API_KEYS` +- `LINGMA_USERNAME` +- `LINGMA_PASSWORD` + +如果你的 Lingma 路径不同,修改: + +- `LINGMA_BIN` + +可选(企业专属域): + +- `DEDICATED_DOMAIN_URL` + +## 3. Docker 运行 + +```bash +docker compose up -d --build +``` + +查看日志: + +```bash +docker compose logs -f +``` + +## 4. 调用示例 + +### 模型列表 + +```bash +curl -s http://127.0.0.1:8317/v1/models \ + -H "Authorization: Bearer sk-DXKFFEL0A1bN8Teqz" +``` + +说明: + +- `id` 保持 Lingma 原始模型 key(兼容 OpenAI 客户端) +- `name` 提供可读名称(如 `qwen3.6-plus`) +- 调用 `/v1/chat/completions` 时,`model` 既可传 `id`,也可直接传 `name` + +### 非流式聊天 + +```bash +curl -s http://127.0.0.1:8317/v1/chat/completions \ + -H "Authorization: Bearer sk-DXKFFEL0A1bN8Teqz" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "dashscope_qmodel", + "messages": [ + {"role": "user", "content": "写一个 python hello world"} + ] + }' +``` + +### 流式聊天 + +```bash +curl -N http://127.0.0.1:8317/v1/chat/completions \ + -H "Authorization: Bearer sk-DXKFFEL0A1bN8Teqz" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "dashscope_qmodel", + "stream": true, + "messages": [ + {"role": "user", "content": "介绍一下你自己"} + ] + }' +``` + +## 5. 统计与监控 + +支持调用次数与 token(估算值)统计: + +- `GET /internal/stats`(需 Bearer) +- `GET /metrics`(Prometheus 文本格式) + +示例: + +```bash +curl -s http://127.0.0.1:8317/internal/stats \ + -H "Authorization: Bearer sk-xxx" +``` + +```bash +curl -s http://127.0.0.1:8317/metrics +``` + +说明: + +- `usage.prompt_tokens/completion_tokens` 为估算值(按字节近似换算)。 +- 非流式响应里会附带 `usage` 字段。 + +## 6. 容器内自动登录 + +已内置自动登录能力(Playwright + Chromium)。 + +你可以主动触发: + +```bash +curl -s -X POST http://127.0.0.1:8317/internal/auto-login/start \ + -H "Authorization: Bearer sk-DXKFFEL0A1bN8Teqz" +``` + +查看状态: + +```bash +curl -s http://127.0.0.1:8317/internal/auto-login/status \ + -H "Authorization: Bearer sk-DXKFFEL0A1bN8Teqz" +``` + +说明: + +- 若未登录,`/v1/models` 与 `/v1/chat/completions` 也会尝试自动登录。 +- 账号密码来自 `.env`(`LINGMA_USERNAME` / `LINGMA_PASSWORD`)。 +- 建议仅在受控环境使用,并妥善保护 `.env`。 + +## 7. agent 模式 + +在 v1 中,若 `model` 传 `agent` 或 `lingma-agent`,会走 agent 模式。 + +```bash +curl -s http://127.0.0.1:8317/v1/chat/completions \ + -H "Authorization: Bearer sk-xxx" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "agent", + "messages": [ + {"role": "user", "content": "分析这个项目目录结构"} + ] + }' +``` diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 0000000..574db14 --- /dev/null +++ b/app/auth.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from fastapi import HTTPException, Request, status + + +def require_bearer(request: Request, api_keys: list[str]) -> None: + if not api_keys: + return + + auth = request.headers.get("authorization", "") + if not auth.startswith("Bearer "): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={ + "error": { + "message": "Missing or invalid Authorization header", + "type": "invalid_request_error", + "code": "invalid_api_key", + } + }, + ) + + token = auth[len("Bearer ") :].strip() + if token not in api_keys: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={ + "error": { + "message": "Invalid API key", + "type": "invalid_request_error", + "code": "invalid_api_key", + } + }, + ) diff --git a/app/auto_login.py b/app/auto_login.py new file mode 100644 index 0000000..4d09c3c --- /dev/null +++ b/app/auto_login.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import asyncio +import contextlib +import time + +from playwright.async_api import TimeoutError as PlaywrightTimeoutError +from playwright.async_api import async_playwright + + +class AutoLoginManager: + def __init__( + self, + username: str, + password: str, + headless: bool, + timeout_sec: int, + max_retry: int, + ): + self.username = username + self.password = password + self.headless = headless + self.timeout_sec = timeout_sec + self.max_retry = max_retry + self._lock = asyncio.Lock() + self._task: asyncio.Task | None = None + self._state = "idle" + self._last_error = "" + self._last_started_at = 0.0 + self._last_finished_at = 0.0 + + @property + def state(self) -> str: + return self._state + + def status(self) -> dict: + return { + "state": self._state, + "last_error": self._last_error, + "last_started_at": self._last_started_at, + "last_finished_at": self._last_finished_at, + "running": self._task is not None and not self._task.done(), + } + + async def ensure_started(self, login_url: str): + async with self._lock: + if self._task and not self._task.done(): + return False + self._task = asyncio.create_task(self._run(login_url)) + return True + + async def wait_done(self, timeout: float): + if not self._task: + return + await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout) + + async def _run(self, login_url: str): + self._state = "running" + self._last_error = "" + self._last_started_at = time.time() + + try: + for attempt in range(1, self.max_retry + 1): + try: + await self._auto_login_once(login_url) + self._state = "success" + self._last_finished_at = time.time() + return + except Exception as exc: + self._last_error = f"attempt {attempt}: {exc}" + if attempt >= self.max_retry: + raise + await asyncio.sleep(1.5) + except Exception: + self._state = "failed" + self._last_finished_at = time.time() + + async def _auto_login_once(self, login_url: str): + if not self.username or not self.password: + raise RuntimeError("LINGMA_USERNAME/LINGMA_PASSWORD not configured") + + deadline = time.time() + self.timeout_sec + + async with async_playwright() as p: + browser = await p.chromium.launch(headless=self.headless) + context = await browser.new_context(ignore_https_errors=True) + page = await context.new_page() + try: + await page.goto(login_url, wait_until="domcontentloaded", timeout=30000) + + # Try common login selectors. + await self._fill_if_visible(page, [ + 'input[type="email"]', + 'input[name="loginId"]', + 'input[name="username"]', + 'input[name="account"]', + 'input[placeholder*="账号"]', + 'input[placeholder*="邮箱"]', + ], self.username) + + await self._fill_if_visible(page, [ + 'input[type="password"]', + 'input[name="password"]', + 'input[placeholder*="密码"]', + ], self.password) + + await self._click_if_visible(page, [ + 'button:has-text("登录")', + 'button:has-text("登 录")', + 'button:has-text("Login")', + 'button[type="submit"]', + ]) + + # Wait for redirect / callback activity. + while time.time() < deadline: + url = page.url + if "lingma" in url and ("callback" in url or "tokenString=" in url): + break + await asyncio.sleep(1.0) + + except PlaywrightTimeoutError as exc: + raise RuntimeError(f"playwright timeout: {exc}") from exc + finally: + with contextlib.suppress(Exception): + await context.close() + with contextlib.suppress(Exception): + await browser.close() + + async def _fill_if_visible(self, page, selectors: list[str], value: str): + for sel in selectors: + locator = page.locator(sel).first + try: + if await locator.is_visible(timeout=1500): + await locator.fill(value) + return True + except Exception: + continue + return False + + async def _click_if_visible(self, page, selectors: list[str]): + for sel in selectors: + locator = page.locator(sel).first + try: + if await locator.is_visible(timeout=1500): + await locator.click() + return True + except Exception: + continue + return False diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..f424fa0 --- /dev/null +++ b/app/config.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class Settings: + host: str + port: int + api_keys: list[str] + lingma_bin: str + lingma_work_dir: str + lingma_socket_port: int + lingma_startup_timeout: int + lingma_rpc_timeout: int + default_model: str + default_ask_mode: str + dedicated_domain_url: str + auto_login_enabled: bool + auto_login_headless: bool + auto_login_timeout: int + auto_login_max_retry: int + lingma_username: str + lingma_password: str + + +def load_settings() -> Settings: + keys_raw = os.getenv("API_KEYS", "") + api_keys = [k.strip() for k in keys_raw.split(",") if k.strip()] + work_dir = os.getenv( + "LINGMA_WORK_DIR", + str(Path.home() / ".lingma" / "vscode" / "sharedClientCache"), + ) + return Settings( + host=os.getenv("HOST", "0.0.0.0"), + port=int(os.getenv("PORT", "8317")), + api_keys=api_keys, + lingma_bin=os.getenv("LINGMA_BIN", "/app/bin/Lingma"), + lingma_work_dir=work_dir, + lingma_socket_port=int(os.getenv("LINGMA_SOCKET_PORT", "36510")), + lingma_startup_timeout=int(os.getenv("LINGMA_STARTUP_TIMEOUT", "40")), + lingma_rpc_timeout=int(os.getenv("LINGMA_RPC_TIMEOUT", "30")), + default_model=os.getenv("DEFAULT_MODEL", "org_auto"), + default_ask_mode=os.getenv("DEFAULT_ASK_MODE", "chat"), + dedicated_domain_url=os.getenv("DEDICATED_DOMAIN_URL", "").strip(), + auto_login_enabled=os.getenv("AUTO_LOGIN_ENABLED", "true").lower() in {"1", "true", "yes", "on"}, + auto_login_headless=os.getenv("AUTO_LOGIN_HEADLESS", "true").lower() in {"1", "true", "yes", "on"}, + auto_login_timeout=int(os.getenv("AUTO_LOGIN_TIMEOUT", "180")), + auto_login_max_retry=int(os.getenv("AUTO_LOGIN_MAX_RETRY", "2")), + lingma_username=os.getenv("LINGMA_USERNAME", "").strip(), + lingma_password=os.getenv("LINGMA_PASSWORD", "").strip(), + ) diff --git a/app/lingma_client.py b/app/lingma_client.py new file mode 100644 index 0000000..36ed509 --- /dev/null +++ b/app/lingma_client.py @@ -0,0 +1,386 @@ +from __future__ import annotations + +import asyncio +import contextlib +import json +import os +import socket +import subprocess +import time +import uuid +from pathlib import Path +from typing import AsyncIterator + +import websockets + + +def _is_port_open(host: str, port: int, timeout_sec: float = 0.5) -> bool: + try: + with socket.create_connection((host, port), timeout=timeout_sec): + return True + except OSError: + return False + + +def _read_info_file(info_path: Path): + if not info_path.exists(): + return None, None + txt = info_path.read_text(encoding="utf-8", errors="ignore").strip() + if not txt: + return None, None + lines = txt.splitlines() + if len(lines) < 2: + return None, None + try: + return int(lines[0].strip()), int(lines[1].strip()) + except ValueError: + return None, None + + +def _wait_info_any(info_paths: list[Path], timeout_sec: int): + start = time.time() + while time.time() - start < timeout_sec: + for p in info_paths: + port, pid = _read_info_file(p) + if port and pid: + return port, pid, p + time.sleep(0.2) + raise TimeoutError(".info not ready") + + +def _encode_lsp_frame(payload_obj: dict) -> bytes: + body = json.dumps(payload_obj, ensure_ascii=False).encode("utf-8") + header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii") + return header + body + + +def _parse_lsp_frames(buf: bytes): + frames = [] + while True: + header_end = buf.find(b"\r\n\r\n") + if header_end < 0: + break + header = buf[:header_end] + body_start = header_end + 4 + content_length = None + for line in header.split(b"\r\n"): + if line.lower().startswith(b"content-length:"): + content_length = int(line.split(b":", 1)[1].strip()) + break + if content_length is None: + buf = buf[body_start:] + continue + if len(buf) < body_start + content_length: + break + body = buf[body_start : body_start + content_length] + frames.append(body.decode("utf-8", errors="ignore")) + buf = buf[body_start + content_length :] + return frames, buf + + +class LspWsRpcClient: + def __init__(self, ws): + self.ws = ws + self._id = 1 + self._pending: dict[int, asyncio.Future] = {} + self._send_lock = asyncio.Lock() + self._reader_task = None + self._rx_buffer = b"" + self._chat_streams: dict[str, dict] = {} + + async def start(self): + self._reader_task = asyncio.create_task(self._reader_loop()) + + async def close(self): + if self._reader_task: + self._reader_task.cancel() + with contextlib.suppress(Exception): + await self._reader_task + + async def _send(self, payload: dict): + async with self._send_lock: + await self.ws.send(_encode_lsp_frame(payload)) + + async def _reader_loop(self): + try: + while True: + raw = await self.ws.recv() + chunk = raw if isinstance(raw, bytes) else raw.encode("utf-8", errors="ignore") + self._rx_buffer += chunk + bodies, self._rx_buffer = _parse_lsp_frames(self._rx_buffer) + for body in bodies: + try: + msg = json.loads(body) + except Exception: + continue + + if "method" in msg and "result" not in msg and "error" not in msg: + await self._handle_server_message(msg) + continue + + rid = msg.get("id") + if rid is None: + continue + fut = self._pending.pop(rid, None) + if fut and not fut.done(): + fut.set_result(msg) + except asyncio.CancelledError: + pass + except Exception as exc: + for fut in self._pending.values(): + if not fut.done(): + fut.set_exception(exc) + self._pending.clear() + + async def _handle_server_message(self, msg: dict): + method = msg.get("method") + params = msg.get("params") or {} + + if method == "chat/answer": + req_id = params.get("requestId") + stream = self._chat_streams.get(req_id) + if stream is not None: + text = params.get("text") or params.get("content") or "" + if text: + stream["parts"].append(text) + if stream["first_chunk_at"] is None: + stream["first_chunk_at"] = time.monotonic() + stream["chunks"].put_nowait(text) + + if method == "chat/finish": + req_id = params.get("requestId") + stream = self._chat_streams.get(req_id) + if stream is not None and not stream["done"].is_set(): + stream["finish"] = params + stream["finish_at"] = time.monotonic() + stream["done"].set() + stream["chunks"].put_nowait(None) + + if "id" in msg: + await self._send({"jsonrpc": "2.0", "id": msg.get("id"), "result": {}}) + + async def request(self, method, params=None, timeout=20): + rid = self._id + self._id += 1 + payload = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params or {}} + fut = asyncio.get_running_loop().create_future() + self._pending[rid] = fut + await self._send(payload) + try: + msg = await asyncio.wait_for(fut, timeout=timeout) + except TimeoutError: + self._pending.pop(rid, None) + raise TimeoutError(f"RPC timeout: {method}") + if "error" in msg: + raise RuntimeError(f"RPC {method} error: {msg['error']}") + return msg.get("result") + + async def notify(self, method, params=None): + await self._send({"jsonrpc": "2.0", "method": method, "params": params or {}}) + + def create_stream(self, request_id: str): + self._chat_streams[request_id] = { + "parts": [], + "chunks": asyncio.Queue(), + "done": asyncio.Event(), + "finish": None, + "started_at": time.monotonic(), + "first_chunk_at": None, + "finish_at": None, + } + + async def consume_stream(self, request_id: str, timeout: float) -> AsyncIterator[str]: + stream = self._chat_streams[request_id] + start = time.monotonic() + while True: + remain = timeout - (time.monotonic() - start) + if remain <= 0: + raise TimeoutError("chat stream timeout") + chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain) + if chunk is None: + break + yield chunk + + def get_stream_result(self, request_id: str) -> dict: + stream = self._chat_streams.get(request_id) or {} + first_ms = None + total_ms = None + if stream.get("first_chunk_at") is not None: + first_ms = int((stream["first_chunk_at"] - stream["started_at"]) * 1000) + if stream.get("finish_at") is not None: + total_ms = int((stream["finish_at"] - stream["started_at"]) * 1000) + return { + "text": "".join(stream.get("parts") or []), + "finish": stream.get("finish") or {}, + "firstTokenLatencyMs": first_ms, + "totalLatencyMs": total_ms, + } + + +class LingmaGatewayClient: + def __init__( + self, + lingma_bin: str, + work_dir: str, + socket_port: int, + startup_timeout: int, + rpc_timeout: int, + default_model: str, + default_ask_mode: str, + ): + self.lingma_bin = Path(lingma_bin) + self.work_dir = Path(work_dir) + self.socket_port = socket_port + self.startup_timeout = startup_timeout + self.rpc_timeout = rpc_timeout + self.default_model = default_model + self.default_ask_mode = default_ask_mode + self._rpc: LspWsRpcClient | None = None + self._ws = None + + async def start(self): + if not self.lingma_bin.exists(): + raise FileNotFoundError(f"Lingma not found: {self.lingma_bin}") + if not _is_port_open("127.0.0.1", self.socket_port): + self.work_dir.mkdir(parents=True, exist_ok=True) + # Remove stale info files from host-mounted workspace before boot. + for p in [self.work_dir / ".info", Path.home() / ".lingma" / ".info"]: + with contextlib.suppress(Exception): + if p.exists(): + p.unlink() + subprocess.Popen( + [str(self.lingma_bin), "start", "--workDir", str(self.work_dir)], + cwd=str(self.lingma_bin.parent), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + info, _, _ = _wait_info_any( + [self.work_dir / ".info", Path.home() / ".lingma" / ".info"], + timeout_sec=self.startup_timeout, + ) + self.socket_port = info + + # Wait for socket to actually become connectable. + deadline = time.time() + self.startup_timeout + while time.time() < deadline: + if _is_port_open("127.0.0.1", self.socket_port, timeout_sec=0.3): + break + await asyncio.sleep(0.2) + else: + raise TimeoutError(f"Lingma socket not open on port {self.socket_port}") + + ws_url = f"ws://127.0.0.1:{self.socket_port}" + self._ws = await websockets.connect(ws_url, max_size=10 * 1024 * 1024) + self._rpc = LspWsRpcClient(self._ws) + await self._rpc.start() + await self._rpc.request( + "initialize", + { + "processId": os.getpid(), + "clientInfo": {"name": "lingma-openai-gateway", "version": "0.1.0"}, + "capabilities": {}, + "workspaceFolders": [], + "rootUri": None, + }, + timeout=self.rpc_timeout, + ) + await self._rpc.notify("initialized", {}) + + async def close(self): + if self._rpc: + await self._rpc.close() + if self._ws: + await self._ws.close() + + @property + def rpc(self) -> LspWsRpcClient: + if self._rpc is None: + raise RuntimeError("Lingma RPC not initialized") + return self._rpc + + async def auth_status(self): + return await self.rpc.request("auth/status", {}, timeout=self.rpc_timeout) + + async def query_models(self): + return await self.rpc.request("config/queryModels", {}, timeout=self.rpc_timeout) + + async def get_endpoint(self): + return await self.rpc.request("config/getEndpoint", {}, timeout=self.rpc_timeout) + + async def update_endpoint(self, endpoint: str): + return await self.rpc.request("config/updateEndpoint", {"endpoint": endpoint}, timeout=self.rpc_timeout) + + async def generate_login_url(self): + result = await self.rpc.request("login/generateUrl", {}, timeout=self.rpc_timeout) + if isinstance(result, str): + return result, {"raw": result} + if isinstance(result, dict): + for key in ("loginUrl", "url", "login_url"): + if isinstance(result.get(key), str): + return result[key], result + return "", result + return "", {"raw": result} + + def _build_payload(self, prompt: str, model_key: str, ask_mode: str, session_id: str, request_id: str): + session_type = "developer" if ask_mode == "agent" else "chat" + return { + "requestId": request_id, + "sessionId": session_id, + "sessionType": session_type, + "chatTask": "FREE_INPUT", + "mode": ask_mode, + "stream": True, + "source": 1, + "isReply": False, + "taskDefinitionType": "system", + "content": prompt, + "text": prompt, + "message": prompt, + "questionText": prompt, + "extra": { + "modelConfig": {"key": model_key}, + "workspacePath": str(Path.cwd()), + }, + "pluginPayloadConfig": { + "isEnableAskAgent": ask_mode == "agent", + "isEnableAutoMemory": True, + }, + "chatContext": { + "text": prompt, + "features": [], + "preferredLanguage": "zh-CN", + "localeLang": "zh-CN", + }, + } + + async def chat_complete(self, prompt: str, model_key: str, ask_mode: str) -> dict: + request_id = str(uuid.uuid4()) + session_id = str(uuid.uuid4()) + payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id) + self.rpc.create_stream(request_id) + try: + await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout) + except (TimeoutError, asyncio.TimeoutError): + pass + async for _ in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 20.0)): + pass + result = self.rpc.get_stream_result(request_id) + finish = result.get("finish") or {} + result["requestId"] = request_id + result["sessionId"] = finish.get("sessionId") or session_id + result["model"] = model_key + result["mode"] = ask_mode + return result + + async def chat_stream(self, prompt: str, model_key: str, ask_mode: str) -> AsyncIterator[str]: + request_id = str(uuid.uuid4()) + session_id = str(uuid.uuid4()) + payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id) + self.rpc.create_stream(request_id) + try: + await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout) + except (TimeoutError, asyncio.TimeoutError): + pass + async for chunk in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 40.0)): + yield chunk diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..0139fc4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +import json +import time +import uuid + +from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, StreamingResponse + +from .auto_login import AutoLoginManager +from .auth import require_bearer +from .config import Settings, load_settings +from .lingma_client import LingmaGatewayClient +from .model_map import build_model_name_map, flatten_model_keys, resolve_model +from .openai_schema import ( + ChatCompletionChoice, + ChatCompletionResponse, + ChatCompletionsRequest, + ModelData, + ModelsResponse, +) +from .stats import StatsCollector, estimate_tokens + + +app = FastAPI(title="Lingma OpenAI Gateway", version="0.1.0") +settings: Settings = load_settings() +lingma: LingmaGatewayClient | None = None +auto_login: AutoLoginManager | None = None +stats_collector = StatsCollector() + + +def auth_guard(request: Request): + require_bearer(request, settings.api_keys) + + +@app.on_event("startup") +async def on_startup(): + global lingma, auto_login + lingma = LingmaGatewayClient( + lingma_bin=settings.lingma_bin, + work_dir=settings.lingma_work_dir, + socket_port=settings.lingma_socket_port, + startup_timeout=settings.lingma_startup_timeout, + rpc_timeout=settings.lingma_rpc_timeout, + default_model=settings.default_model, + default_ask_mode=settings.default_ask_mode, + ) + await lingma.start() + auto_login = AutoLoginManager( + username=settings.lingma_username, + password=settings.lingma_password, + headless=settings.auto_login_headless, + timeout_sec=settings.auto_login_timeout, + max_retry=settings.auto_login_max_retry, + ) + + +@app.on_event("shutdown") +async def on_shutdown(): + if lingma: + await lingma.close() + + +@app.get("/healthz") +async def healthz(): + return {"ok": True, "time": int(time.time())} + + +async def _ensure_logged_in_or_auto_login() -> dict: + assert lingma is not None + status = await lingma.auth_status() + if status and status.get("id"): + return status + + if not settings.auto_login_enabled: + raise HTTPException(status_code=401, detail={"error": {"message": "Lingma not logged in"}}) + + if settings.dedicated_domain_url: + current = await lingma.get_endpoint() + current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" + if current_ep != settings.dedicated_domain_url: + await lingma.update_endpoint(settings.dedicated_domain_url) + + login_url, login_raw = await lingma.generate_login_url() + if not login_url: + raise HTTPException( + status_code=500, + detail={"error": {"message": f"generate login url failed: {login_raw}"}}, + ) + + assert auto_login is not None + await auto_login.ensure_started(login_url) + try: + await auto_login.wait_done(timeout=settings.auto_login_timeout + 20) + except Exception: + pass + + status = await lingma.auth_status() + if status and status.get("id"): + return status + + raise HTTPException( + status_code=401, + detail={"error": {"message": "Lingma auto login failed", "auto_login": auto_login.status()}}, + ) + + +@app.get("/v1/models", dependencies=[Depends(auth_guard)]) +async def v1_models(): + assert lingma is not None + await _ensure_logged_in_or_auto_login() + await stats_collector.inc_models() + models = await lingma.query_models() + keys = flatten_model_keys(models) + name_map = build_model_name_map(models) + resp = ModelsResponse(data=[ModelData(id=k, name=name_map.get(k)) for k in keys]) + return JSONResponse(content=resp.model_dump()) + + +def _messages_to_prompt(messages: list[dict]) -> str: + parts = [] + for m in messages: + role = m.get("role", "user") + content = m.get("content", "") + parts.append(f"[{role}] {content}") + return "\n".join(parts).strip() + + +@app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)]) +async def v1_chat_completions(req: ChatCompletionsRequest): + assert lingma is not None + await _ensure_logged_in_or_auto_login() + + models = await lingma.query_models() + available = flatten_model_keys(models) + name_map = build_model_name_map(models) + model = resolve_model(req.model, available, settings.default_model, name_map) + + ask_mode = settings.default_ask_mode + if req.model.lower() in {"lingma-agent", "agent"}: + ask_mode = "agent" + + prompt = _messages_to_prompt([m.model_dump() for m in req.messages]) + if not prompt: + raise HTTPException(status_code=400, detail={"error": {"message": "messages is empty"}}) + prompt_tokens = estimate_tokens(prompt) + + if req.stream: + created = int(time.time()) + completion_id = f"chatcmpl-{uuid.uuid4().hex}" + completion_tokens_holder = {"n": 0} + + async def event_stream(): + success = False + try: + async for chunk in lingma.chat_stream(prompt, model, ask_mode): + completion_tokens_holder["n"] += estimate_tokens(chunk) + payload = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": chunk}, + "finish_reason": None, + } + ], + } + yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" + + done_payload = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + } + yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n" + yield "data: [DONE]\n\n" + success = True + finally: + await stats_collector.record_chat( + stream=True, + success=success, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens_holder["n"], + ) + + return StreamingResponse(event_stream(), media_type="text/event-stream") + + try: + result = await lingma.chat_complete(prompt, model, ask_mode) + except Exception: + await stats_collector.record_chat( + stream=False, + success=False, + prompt_tokens=prompt_tokens, + completion_tokens=0, + ) + raise + + completion_tokens = estimate_tokens(result.get("text") or "") + await stats_collector.record_chat( + stream=False, + success=True, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + response = ChatCompletionResponse( + id=f"chatcmpl-{uuid.uuid4().hex}", + created=int(time.time()), + model=model, + choices=[ + ChatCompletionChoice( + index=0, + finish_reason="stop", + message={"role": "assistant", "content": result.get("text") or ""}, + ) + ], + ) + data = response.model_dump() + data["latency"] = { + "first_token_ms": result.get("firstTokenLatencyMs"), + "total_ms": result.get("totalLatencyMs"), + } + data["usage"] = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + } + return JSONResponse(content=data) + + +@app.post("/internal/auto-login/start", dependencies=[Depends(auth_guard)]) +async def internal_auto_login_start(): + assert lingma is not None + assert auto_login is not None + + status = await lingma.auth_status() + if status and status.get("id"): + return {"ok": True, "state": "already_logged_in", "auth": status} + + if settings.dedicated_domain_url: + current = await lingma.get_endpoint() + current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" + if current_ep != settings.dedicated_domain_url: + await lingma.update_endpoint(settings.dedicated_domain_url) + + login_url, login_raw = await lingma.generate_login_url() + if not login_url: + raise HTTPException(status_code=500, detail={"error": {"message": "generate login url failed", "raw": login_raw}}) + + started = await auto_login.ensure_started(login_url) + return { + "ok": True, + "state": "running" if started else "already_running", + "loginUrl": login_url, + "auto_login": auto_login.status(), + } + + +@app.get("/internal/auto-login/status", dependencies=[Depends(auth_guard)]) +async def internal_auto_login_status(): + assert auto_login is not None + assert lingma is not None + return { + "ok": True, + "auto_login": auto_login.status(), + "auth": await lingma.auth_status(), + } + + +@app.get("/internal/stats", dependencies=[Depends(auth_guard)]) +async def internal_stats(): + return {"ok": True, "stats": await stats_collector.snapshot()} + + +@app.get("/metrics") +async def metrics(): + text = await stats_collector.prometheus_text() + return StreamingResponse(iter([text]), media_type="text/plain; version=0.0.4") diff --git a/app/model_map.py b/app/model_map.py new file mode 100644 index 0000000..ce41960 --- /dev/null +++ b/app/model_map.py @@ -0,0 +1,84 @@ +from __future__ import annotations + + +DEFAULT_MODEL_NAME_MAP = { + "org_auto": "Auto", + "dashscope_qmodel": "qwen3.6-plus", + "dashscope_qwen3_coder": "qwen3-coder", + "dashscope_qwen_plus_20250428_thinking": "qwen3-thinking", + "dashscope_qwen_max_latest": "qwen3-max", +} + + +def build_model_name_map(models: dict) -> dict[str, str]: + name_map = dict(DEFAULT_MODEL_NAME_MAP) + if not isinstance(models, dict): + return name_map + for group in ("chat", "assistant", "developer", "inline"): + items = models.get(group) or [] + if not isinstance(items, list): + continue + for item in items: + if not isinstance(item, dict): + continue + key = item.get("key") + if not isinstance(key, str) or not key: + continue + display_name = item.get("displayName") or item.get("name") + if isinstance(display_name, str) and display_name.strip(): + name_map[key] = display_name.strip() + return name_map + + +def reverse_name_map(name_map: dict[str, str]) -> dict[str, str]: + rev: dict[str, str] = {} + for key, name in name_map.items(): + if not isinstance(name, str) or not name: + continue + rev[name] = key + rev[name.lower()] = key + return rev + + +def flatten_model_keys(models: dict) -> list[str]: + keys: list[str] = [] + if not isinstance(models, dict): + return keys + for group in ("chat", "assistant", "developer", "inline"): + items = models.get(group) or [] + if not isinstance(items, list): + continue + for item in items: + if not isinstance(item, dict): + continue + key = item.get("key") + if isinstance(key, str) and key and key not in keys: + keys.append(key) + return keys + + +def resolve_model( + request_model: str, + available_keys: list[str], + default_model: str, + model_name_map: dict[str, str] | None = None, +) -> str: + model_name_map = model_name_map or {} + rev_map = reverse_name_map(model_name_map) + + if request_model in available_keys: + return request_model + if request_model in rev_map and rev_map[request_model] in available_keys: + return rev_map[request_model] + if request_model.lower() in rev_map and rev_map[request_model.lower()] in available_keys: + return rev_map[request_model.lower()] + if request_model in {"gpt-4o-mini", "gpt-4o", "gpt-4.1", "gpt-3.5-turbo"}: + if default_model in available_keys: + return default_model + if available_keys: + return available_keys[0] + if default_model in available_keys: + return default_model + if available_keys: + return available_keys[0] + return request_model or default_model diff --git a/app/openai_schema.py b/app/openai_schema.py new file mode 100644 index 0000000..7806015 --- /dev/null +++ b/app/openai_schema.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class ChatMessage(BaseModel): + role: Literal["system", "user", "assistant", "tool"] + content: str + + +class ChatCompletionsRequest(BaseModel): + model: str + messages: list[ChatMessage] + stream: bool = False + temperature: float | None = None + top_p: float | None = None + + +class ModelData(BaseModel): + id: str + name: str | None = None + object: str = "model" + created: int = 0 + owned_by: str = "lingma" + + +class ModelsResponse(BaseModel): + object: str = "list" + data: list[ModelData] + + +class ChatCompletionChoice(BaseModel): + index: int = 0 + finish_reason: str | None = "stop" + message: dict = Field(default_factory=dict) + + +class ChatCompletionResponse(BaseModel): + id: str + object: str = "chat.completion" + created: int + model: str + choices: list[ChatCompletionChoice] diff --git a/app/stats.py b/app/stats.py new file mode 100644 index 0000000..71274a3 --- /dev/null +++ b/app/stats.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import asyncio +import time + + +def estimate_tokens(text: str) -> int: + if not text: + return 0 + # Heuristic only: roughly 1 token ~= 4 bytes. + return max(1, int(len(text.encode("utf-8")) / 4)) + + +class StatsCollector: + def __init__(self): + self._lock = asyncio.Lock() + self.started_at = int(time.time()) + self.models_requests_total = 0 + self.chat_requests_total = 0 + self.chat_requests_success = 0 + self.chat_requests_error = 0 + self.chat_stream_requests = 0 + self.chat_non_stream_requests = 0 + self.prompt_tokens_estimated_total = 0 + self.completion_tokens_estimated_total = 0 + + async def inc_models(self): + async with self._lock: + self.models_requests_total += 1 + + async def record_chat(self, *, stream: bool, success: bool, prompt_tokens: int, completion_tokens: int): + async with self._lock: + self.chat_requests_total += 1 + if stream: + self.chat_stream_requests += 1 + else: + self.chat_non_stream_requests += 1 + + if success: + self.chat_requests_success += 1 + else: + self.chat_requests_error += 1 + + self.prompt_tokens_estimated_total += max(0, int(prompt_tokens)) + self.completion_tokens_estimated_total += max(0, int(completion_tokens)) + + async def snapshot(self) -> dict: + async with self._lock: + total_tokens = self.prompt_tokens_estimated_total + self.completion_tokens_estimated_total + return { + "started_at": self.started_at, + "models_requests_total": self.models_requests_total, + "chat_requests_total": self.chat_requests_total, + "chat_requests_success": self.chat_requests_success, + "chat_requests_error": self.chat_requests_error, + "chat_stream_requests": self.chat_stream_requests, + "chat_non_stream_requests": self.chat_non_stream_requests, + "prompt_tokens_estimated_total": self.prompt_tokens_estimated_total, + "completion_tokens_estimated_total": self.completion_tokens_estimated_total, + "total_tokens_estimated": total_tokens, + } + + async def prometheus_text(self) -> str: + s = await self.snapshot() + lines = [ + "# TYPE gateway_models_requests_total counter", + f"gateway_models_requests_total {s['models_requests_total']}", + "# TYPE gateway_chat_requests_total counter", + f"gateway_chat_requests_total {s['chat_requests_total']}", + "# TYPE gateway_chat_requests_success counter", + f"gateway_chat_requests_success {s['chat_requests_success']}", + "# TYPE gateway_chat_requests_error counter", + f"gateway_chat_requests_error {s['chat_requests_error']}", + "# TYPE gateway_chat_stream_requests counter", + f"gateway_chat_stream_requests {s['chat_stream_requests']}", + "# TYPE gateway_chat_non_stream_requests counter", + f"gateway_chat_non_stream_requests {s['chat_non_stream_requests']}", + "# TYPE gateway_prompt_tokens_estimated_total counter", + f"gateway_prompt_tokens_estimated_total {s['prompt_tokens_estimated_total']}", + "# TYPE gateway_completion_tokens_estimated_total counter", + f"gateway_completion_tokens_estimated_total {s['completion_tokens_estimated_total']}", + "# TYPE gateway_total_tokens_estimated counter", + f"gateway_total_tokens_estimated {s['total_tokens_estimated']}", + ] + return "\n".join(lines) + "\n" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7d9f6d3 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ + +services: + lingma-openai-gateway: + build: . + container_name: lingma-openai-gateway + env_file: + - .env + ports: + - "${PORT:-8317}:${PORT:-8317}" + volumes: + - ./bin:/app/bin:ro + - /root/.lingma:/root/.lingma + restart: unless-stopped diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c141b14 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +websockets==13.1 +pydantic==2.9.2 +playwright==1.52.0