diff --git a/.env.example b/.env.example index d2d7304..6882604 100644 --- a/.env.example +++ b/.env.example @@ -2,10 +2,14 @@ HOST=0.0.0.0 # 网关监听端口 PORT=8317 -# API Key,可配置多个(逗号分隔) +# API Key,可配置多个(逗号分隔)。空 = 不鉴权(启动会打 warning),仅用于本地 dev API_KEYS=sk-your-api-key -# 独立的 /metrics 鉴权 token(留空则退化为 API_KEYS 也可访问;若连 API_KEYS 都没配,/metrics 为公开) +# 独立的 /metrics 鉴权 token(留空则退化为 API_KEYS 亦可访问;若与 API_KEYS 同时为空,/metrics 默认 503) METRICS_TOKEN= +# 显式把 /metrics 设为公开(仅在私网采集器场景使用) +METRICS_PUBLIC=false +# 独立的 /internal/* 管理 token(留空则退化为 API_KEYS);强烈建议生产环境单独配置 +ADMIN_TOKEN= # 日志级别(DEBUG / INFO / WARNING / ERROR) LOG_LEVEL=INFO diff --git a/Dockerfile b/Dockerfile index 4f641c2..915e15f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,15 @@ COPY app /app/app EXPOSE 8317 +# Container-level health signal. Docker Compose / orchestrators rely on this +# to stop sending traffic when the pool is wedged, restart unhealthy replicas, +# and drive rolling deploys. /healthz returns ok=true only when at least one +# Lingma instance is in state=ready, so it catches the "stuck on login" case +# that a raw TCP probe would miss. +HEALTHCHECK --interval=30s --timeout=5s --start-period=60s --retries=3 \ + CMD python -c "import os,json,urllib.request,sys; \ +port=os.environ.get('PORT','8317'); \ +r=urllib.request.urlopen(f'http://127.0.0.1:{port}/healthz', timeout=3); \ +sys.exit(0 if json.load(r).get('ok') else 1)" || exit 1 + CMD ["sh", "-c", "python /app/app/bootstrap_lingma.py && uvicorn app.main:app --host ${HOST:-0.0.0.0} --port ${PORT:-8317}"] diff --git a/README.md b/README.md index fc43ec6..97217ea 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,9 @@ cp .env.example .env - `AUTO_LOGIN_MAX_RETRY`:自动登录重试次数 - `LINGMA_USERNAME`:Lingma 登录用户名 - `LINGMA_PASSWORD`:Lingma 登录密码 -- `METRICS_TOKEN`:`/metrics` 独立鉴权 token(留空则 `API_KEYS` 也可访问;两者都留空时 `/metrics` 为公开) +- `METRICS_TOKEN`:`/metrics` 独立鉴权 token(留空则 `API_KEYS` 也可访问;两者皆空时 `/metrics` 默认 503,除非显式开 `METRICS_PUBLIC=true`) +- `METRICS_PUBLIC`:显式把 `/metrics` 设为公开,仅在私网采集器场景使用(默认 `false`) +- `ADMIN_TOKEN`:`/internal/*` 管理端点独立鉴权 token(留空则退化为 `API_KEYS`)。生产环境建议单独配置,这样轮换 `API_KEYS` 不需要重新颁发 session bundle 导出权限 - `LOG_LEVEL`:日志级别(默认 `INFO`,输出结构化 JSON,包含 `request_id`) - `GATEWAY_MAX_IN_FLIGHT`:`/v1/chat/completions` 并发上限(默认 4,`<=0` 表示不限流) - `GATEWAY_QUEUE_TIMEOUT_SEC`:排队等待超时秒数(默认 30,超过后直接 429 + `Retry-After`) @@ -236,7 +238,8 @@ curl -s http://127.0.0.1:8317/metrics \ - `usage.prompt_tokens/completion_tokens` 为估算值(按字节近似换算)。 - 非流式响应里会附带 `usage` 字段。 - 流式响应可传 `stream_options: {"include_usage": true}` 让最后一帧返回 `usage`。 -- `/metrics` 默认需要 Bearer 鉴权:优先匹配 `METRICS_TOKEN`,否则接受 `API_KEYS` 里任意一个;两者都未配置时保持公开。 +- `/metrics` 默认需要 Bearer 鉴权:优先匹配 `METRICS_TOKEN`,否则接受 `API_KEYS` 里任意一个;两者皆未配置时返回 503,显式 `METRICS_PUBLIC=true` 才公开。 +- `/internal/*` 管理端点(auto-login, session export, models/raw, stats)默认走 `ADMIN_TOKEN`,未配置时退化为 `API_KEYS`;两者都未配置则 503。 ## 6. 容器内自动登录 diff --git a/app/auth.py b/app/auth.py index c320933..437d08f 100644 --- a/app/auth.py +++ b/app/auth.py @@ -29,7 +29,8 @@ def _match_any(token: str, candidates: list[str]) -> bool: def require_bearer(request: Request, api_keys: list[str]) -> None: - # Empty api_keys means auth is disabled (keeps the old behavior). + # Empty api_keys means auth is disabled (kept for local dev). The startup + # logger warns loudly in that case so it can't go unnoticed in prod. if not api_keys: return token = _extract_bearer(request) @@ -47,19 +48,42 @@ def require_bearer(request: Request, api_keys: list[str]) -> None: def require_metrics_access( - request: Request, api_keys: list[str], metrics_token: str + request: Request, + api_keys: list[str], + metrics_token: str, + *, + public: bool = False, ) -> None: - """Allow metrics if any of: METRICS_TOKEN matches, or any API_KEYS match. + """Gate /metrics. - If neither METRICS_TOKEN nor API_KEYS are configured, metrics is public - (backwards compatible default). + Resolution order: + 1. `public=True` (METRICS_PUBLIC) — wide open, explicit opt-in for + sidecar scrapers on a private network. + 2. `METRICS_TOKEN` configured — must match. + 3. `API_KEYS` configured — any configured API key works. + 4. Nothing configured at all — 503 (scraping disabled) so we don't + silently leak the pool topology on an un-hardened deployment. """ + if public: + return accepted: list[str] = [] if metrics_token: accepted.append(metrics_token) accepted.extend(api_keys) if not accepted: - return + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": { + "message": ( + "metrics scraping is disabled: set METRICS_TOKEN, " + "API_KEYS, or METRICS_PUBLIC=true" + ), + "type": "service_unavailable", + "code": "metrics_disabled", + } + }, + ) token = _extract_bearer(request) if not _match_any(token, accepted): raise HTTPException( @@ -72,3 +96,52 @@ def require_metrics_access( } }, ) + + +def require_admin_access( + request: Request, + api_keys: list[str], + admin_token: str, +) -> None: + """Gate /internal/* admin endpoints. + + Resolution order: + 1. `ADMIN_TOKEN` configured — must match exactly. + 2. Otherwise fall back to the regular API_KEYS (single-tenant deploys). + 3. If nothing is configured — 503 so we never expose auto-login / + session-export on an unauthenticated gateway. + + Backwards compat: existing deployments that only set `API_KEYS` keep + working; add ADMIN_TOKEN in .env when you want a dedicated split. + """ + accepted: list[str] = [] + if admin_token: + accepted.append(admin_token) + else: + accepted.extend(api_keys) + if not accepted: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": { + "message": ( + "admin endpoints disabled: configure ADMIN_TOKEN " + "(recommended) or API_KEYS" + ), + "type": "service_unavailable", + "code": "admin_disabled", + } + }, + ) + token = _extract_bearer(request) + if not _match_any(token, accepted): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={ + "error": { + "message": "Invalid admin token", + "type": "invalid_request_error", + "code": "invalid_api_key", + } + }, + ) diff --git a/app/config.py b/app/config.py index c127ff6..12716c0 100644 --- a/app/config.py +++ b/app/config.py @@ -22,6 +22,8 @@ class Settings: port: int api_keys: list[str] metrics_token: str + admin_token: str + metrics_public: bool log_level: str gateway_max_in_flight: int gateway_queue_timeout_sec: float @@ -151,6 +153,8 @@ def load_settings() -> Settings: port=int(os.getenv("PORT", "8317")), api_keys=api_keys, metrics_token=os.getenv("METRICS_TOKEN", "").strip(), + admin_token=os.getenv("ADMIN_TOKEN", "").strip(), + metrics_public=_bool_env("METRICS_PUBLIC", False), log_level=os.getenv("LOG_LEVEL", "INFO").strip() or "INFO", gateway_max_in_flight=int(os.getenv("GATEWAY_MAX_IN_FLIGHT", "4")), gateway_queue_timeout_sec=float(os.getenv("GATEWAY_QUEUE_TIMEOUT_SEC", "30")), diff --git a/app/lingma_client.py b/app/lingma_client.py index 13f293e..2bb503b 100644 --- a/app/lingma_client.py +++ b/app/lingma_client.py @@ -316,6 +316,11 @@ class LingmaGatewayClient: self._ready_event = asyncio.Event() self._reconnect_task: asyncio.Task | None = None self._last_error: str = "" + # Lingma subprocess handle. Kept so we can reap on shutdown and read + # stderr for debugging (pre-v0.4 we forked with DEVNULL + new_session + # which orphaned the process and hid crash logs). + self._proc: subprocess.Popen | None = None + self._stderr_task: asyncio.Task | None = None # ------------------------------------------------------------------ state @@ -359,6 +364,76 @@ class LingmaGatewayClient: if self._ws: with contextlib.suppress(Exception): await self._ws.close() + await self._terminate_proc() + if self._stderr_task and not self._stderr_task.done(): + self._stderr_task.cancel() + with contextlib.suppress(Exception): + await self._stderr_task + + async def _drain_stderr(self, proc: subprocess.Popen) -> None: + """Mirror Lingma stderr to the logger at DEBUG level. + + Running in a worker thread (readline is blocking) and dumping lines + through logger.debug means crashes like native-module load failures + are visible when LOG_LEVEL=DEBUG but don't spam production logs. + """ + if proc.stderr is None: + return + + name = self.name + + def reader() -> None: + try: + for line in iter(proc.stderr.readline, b""): + if not line: + break + text = line.decode("utf-8", errors="replace").rstrip() + if text: + logger.debug("[%s] lingma stderr: %s", name, text) + except Exception as exc: # pragma: no cover -- defensive + logger.debug("[%s] stderr drain aborted: %s", name, exc) + + try: + await asyncio.to_thread(reader) + except asyncio.CancelledError: + pass + + async def _terminate_proc(self) -> None: + """Reap the Lingma subprocess we spawned. + + SIGTERM first with a short grace period, then SIGKILL. Blocking waits + are off-loaded to a thread so they don't stall the FastAPI shutdown + event loop. Idempotent: safe to call even if nothing was spawned. + """ + proc = self._proc + if proc is None: + return + self._proc = None + try: + if proc.poll() is None: + try: + proc.terminate() + except Exception as exc: + logger.warning("[%s] proc.terminate failed: %s", self.name, exc) + try: + await asyncio.wait_for(asyncio.to_thread(proc.wait), timeout=5.0) + except TIMEOUT_EXCEPTIONS: + logger.warning( + "[%s] lingma (pid=%s) didn't exit in 5s, sending SIGKILL", + self.name, + proc.pid, + ) + with contextlib.suppress(Exception): + proc.kill() + with contextlib.suppress(Exception): + await asyncio.wait_for( + asyncio.to_thread(proc.wait), timeout=3.0 + ) + finally: + # Close stderr pipe so the drain thread can exit cleanly. + if proc.stderr is not None: + with contextlib.suppress(Exception): + proc.stderr.close() async def ensure_ready(self, timeout: float | None = None) -> None: """Block until the RPC connection is usable, (re)connecting on demand.""" @@ -413,12 +488,26 @@ class LingmaGatewayClient: self.lingma_bin, self.work_dir, ) - subprocess.Popen( + # Reap any old proc from a previous connect attempt before spawning + # a fresh one so we never accumulate zombie Lingma instances. + await self._terminate_proc() + if self._stderr_task and not self._stderr_task.done(): + self._stderr_task.cancel() + with contextlib.suppress(Exception): + await self._stderr_task + self._stderr_task = None + + self._proc = subprocess.Popen( [str(self.lingma_bin), "start", "--workDir", str(self.work_dir)], cwd=str(self.lingma_bin.parent), stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - start_new_session=True, + stderr=subprocess.PIPE, + ) + logger.info( + "[%s] lingma spawned (pid=%s)", self.name, self._proc.pid + ) + self._stderr_task = asyncio.create_task( + self._drain_stderr(self._proc) ) info, _, _ = _wait_info_any(info_paths, timeout_sec=self.startup_timeout) self.socket_port = info diff --git a/app/lingma_pool.py b/app/lingma_pool.py index 118c60f..e54d209 100644 --- a/app/lingma_pool.py +++ b/app/lingma_pool.py @@ -183,16 +183,14 @@ class LingmaPool: # -------------------------------------------------------------- lifecycle async def start(self) -> None: - """Start all instances sequentially. + """Boot every pool instance in parallel. - Sequential startup avoids racing on the shared ~/.lingma/.info file (for - pool-mode we skip it anyway, but Lingma may still write there internally) - and keeps docker logs readable. Failures are non-fatal; per-instance - reconnect loops will take over. + Bundle restore is still sequential (cheap, filesystem-level) and logged + per instance; only the expensive `client.start()` path — which waits on + the Lingma socket and an LSP initialize round-trip — runs concurrently. - Before spawning each Lingma process we optionally restore a pre-captured - session bundle into the workDir, which lets us skip Playwright login - entirely on a fresh volume. + Any one instance failing is non-fatal: per-instance reconnect loops + take over once their first `ensure_ready()` fires. """ for inst in self._instances: self._maybe_apply_session_bundle(inst) @@ -208,11 +206,18 @@ class LingmaPool: ), is_logged_in_workdir(inst.cfg.work_dir), ) + + async def _start_one(inst: PoolInstance) -> None: try: await inst.client.start() except Exception as exc: logger.warning("pool start %s failed: %s", inst.name, exc) + await asyncio.gather( + *(_start_one(inst) for inst in self._instances), + return_exceptions=False, + ) + @staticmethod def _maybe_apply_session_bundle(inst: "PoolInstance") -> None: """Restore an exported Lingma session into inst.work_dir, if needed. diff --git a/app/main.py b/app/main.py index 3ff6492..f240536 100644 --- a/app/main.py +++ b/app/main.py @@ -10,7 +10,7 @@ from contextlib import asynccontextmanager from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse -from .auth import require_bearer, require_metrics_access +from .auth import require_admin_access, require_bearer, require_metrics_access from .concurrency import BackpressureRejected, InFlightGuard from .config import Settings, load_settings from .lingma_pool import LingmaPool, PoolInstance @@ -76,6 +76,7 @@ async def lifespan(_app: FastAPI): pool.size(), settings.gateway_max_in_flight, ) + _log_auth_posture() await pool.start() try: yield @@ -121,7 +122,37 @@ def auth_guard(request: Request): def metrics_auth_guard(request: Request): - require_metrics_access(request, settings.api_keys, settings.metrics_token) + require_metrics_access( + request, + settings.api_keys, + settings.metrics_token, + public=settings.metrics_public, + ) + + +def admin_auth_guard(request: Request): + require_admin_access(request, settings.api_keys, settings.admin_token) + + +def _log_auth_posture() -> None: + """Loud warnings on misconfigured auth so ops can't miss them.""" + if not settings.api_keys: + logger.warning( + "AUTH DISABLED: API_KEYS is empty, /v1/* is wide open. " + "Set API_KEYS before exposing this gateway to anything " + "other than localhost." + ) + if not settings.admin_token: + logger.warning( + "ADMIN_TOKEN not set: /internal/* reuses API_KEYS for auth. " + "For production set a dedicated ADMIN_TOKEN so rotating chat " + "keys doesn't require exporting the session bundle." + ) + if settings.metrics_public: + logger.warning( + "METRICS_PUBLIC=true: /metrics is open. Only enable this " + "when the gateway is behind a private-network scraper." + ) @app.get("/healthz") @@ -563,7 +594,7 @@ async def v1_chat_completions(req: ChatCompletionsRequest, request: Request): ticket.release() -@app.post("/internal/auto-login/start", dependencies=[Depends(auth_guard)]) +@app.post("/internal/auto-login/start", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_start(instance: str | None = None): p = _require_pool() target = None @@ -614,7 +645,7 @@ async def internal_auto_login_start(instance: str | None = None): } -@app.get("/internal/auto-login/status", dependencies=[Depends(auth_guard)]) +@app.get("/internal/auto-login/status", dependencies=[Depends(admin_auth_guard)]) async def internal_auto_login_status(): p = _require_pool() out = [] @@ -634,7 +665,7 @@ async def internal_auto_login_status(): return {"ok": True, "instances": out} -@app.post("/internal/session/export", dependencies=[Depends(auth_guard)]) +@app.post("/internal/session/export", dependencies=[Depends(admin_auth_guard)]) async def internal_session_export(instance: str | None = None): """Export a logged-in Lingma session as a base64 tar.gz bundle. @@ -693,7 +724,7 @@ async def internal_session_export(instance: str | None = None): } -@app.get("/internal/models/raw", dependencies=[Depends(auth_guard)]) +@app.get("/internal/models/raw", dependencies=[Depends(admin_auth_guard)]) async def internal_models_raw(instance: str | None = None): """Return the raw `config/queryModels` response from Lingma. @@ -723,7 +754,7 @@ async def internal_models_raw(instance: str | None = None): } -@app.get("/internal/stats", dependencies=[Depends(auth_guard)]) +@app.get("/internal/stats", dependencies=[Depends(admin_auth_guard)]) async def internal_stats(): p = _require_pool() return {