- authz: new ADMIN_TOKEN gates /internal/*; METRICS_PUBLIC=false by default, so /metrics returns 503 when neither METRICS_TOKEN nor API_KEYS is set (previously leaked pool topology). Startup logs loudly if API_KEYS is empty or admin falls back to chat keys. - lingma_client: keep a Popen handle instead of orphaning Lingma with start_new_session, drain stderr to logger at DEBUG, SIGTERM -> 5s grace -> SIGKILL on shutdown. Fixes the zombie-process leak on container reload. - pool: asyncio.gather to start N instances concurrently; N=2 pool shaves ~startup_timeout seconds off boot. - Dockerfile: HEALTHCHECK hits /healthz and greps for pool_ready>0 so Docker / compose orchestrators see "stuck on login" as unhealthy. Made-with: Cursor
334 lines
11 KiB
Python
334 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from .auto_login import AutoLoginManager
|
|
from .config import LingmaAccount
|
|
from .lingma_client import LingmaGatewayClient
|
|
from .logging_config import get_logger
|
|
from .session_bundle import (
|
|
apply_bundle_to_workdir,
|
|
decode_bundle,
|
|
is_logged_in_workdir,
|
|
resolve_bundle_b64,
|
|
)
|
|
|
|
|
|
logger = get_logger("lingma_gateway.pool")
|
|
|
|
|
|
@dataclass
|
|
class InstanceConfig:
|
|
index: int
|
|
name: str
|
|
work_dir: str
|
|
socket_port: int
|
|
account: LingmaAccount
|
|
|
|
|
|
class PoolInstance:
|
|
"""A single Lingma process + its auto_login + in-flight counter."""
|
|
|
|
__slots__ = ("cfg", "client", "auto_login", "in_flight")
|
|
|
|
def __init__(
|
|
self,
|
|
cfg: InstanceConfig,
|
|
client: LingmaGatewayClient,
|
|
auto_login: AutoLoginManager,
|
|
):
|
|
self.cfg = cfg
|
|
self.client = client
|
|
self.auto_login = auto_login
|
|
self.in_flight = 0
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.cfg.name
|
|
|
|
@property
|
|
def healthy(self) -> bool:
|
|
return self.client.state == LingmaGatewayClient.STATE_READY
|
|
|
|
|
|
class LingmaPool:
|
|
"""N-Lingma process pool with least-in-flight + affinity routing.
|
|
|
|
For N=1 this degenerates into the original single-client setup, preserving
|
|
backwards compatibility with `LINGMA_USERNAME/LINGMA_PASSWORD`-only deploys.
|
|
"""
|
|
|
|
def __init__(self, instances: list[PoolInstance]):
|
|
if not instances:
|
|
raise RuntimeError("LingmaPool requires at least 1 instance")
|
|
self._instances: list[PoolInstance] = instances
|
|
self._rr_counter = 0
|
|
|
|
@classmethod
|
|
def build(
|
|
cls,
|
|
*,
|
|
lingma_bin: str,
|
|
base_work_dir: str,
|
|
legacy_socket_port: int,
|
|
startup_timeout: int,
|
|
rpc_timeout: int,
|
|
default_model: str,
|
|
default_ask_mode: str,
|
|
accounts: list[LingmaAccount],
|
|
instance_count: int,
|
|
auto_login_headless: bool,
|
|
auto_login_timeout: int,
|
|
auto_login_max_retry: int,
|
|
verify_timeout_sec: int | None = None,
|
|
) -> "LingmaPool":
|
|
"""Materialize N PoolInstances.
|
|
|
|
Single-instance (N=1) uses the legacy workDir and LINGMA_SOCKET_PORT so
|
|
existing deployments keep their state after upgrade. N>1 derives per-instance
|
|
workDirs under `<base_work_dir>/../pool/inst-<i>` and uses dynamic ports.
|
|
"""
|
|
|
|
if instance_count < 1:
|
|
instance_count = 1
|
|
|
|
resolved_accounts: list[LingmaAccount] = []
|
|
for i in range(instance_count):
|
|
if accounts:
|
|
resolved_accounts.append(accounts[i % len(accounts)])
|
|
else:
|
|
resolved_accounts.append(LingmaAccount(username="", password=""))
|
|
|
|
if instance_count > len(accounts) and accounts:
|
|
logger.warning(
|
|
"instance_count=%d exceeds unique accounts=%d; accounts will be reused",
|
|
instance_count,
|
|
len(accounts),
|
|
)
|
|
|
|
base_dir = Path(base_work_dir)
|
|
# Put per-instance workDirs under `<data>/.lingma/pool/inst-<i>`.
|
|
# Walk up past the vscode/sharedClientCache layout if present.
|
|
pool_root = base_dir
|
|
for _ in range(3):
|
|
if pool_root.name == ".lingma":
|
|
break
|
|
if pool_root.parent == pool_root:
|
|
break
|
|
pool_root = pool_root.parent
|
|
pool_root = pool_root / "pool"
|
|
|
|
instances: list[PoolInstance] = []
|
|
for i, acc in enumerate(resolved_accounts):
|
|
if instance_count == 1:
|
|
work_dir = str(base_dir)
|
|
socket_port = legacy_socket_port
|
|
extra_info: list[Path] | None = None
|
|
else:
|
|
work_dir = str(pool_root / f"inst-{i}")
|
|
socket_port = 0
|
|
# In pool mode each instance reads only its own workDir .info to
|
|
# avoid the shared ~/.lingma/.info race between instances.
|
|
extra_info = []
|
|
|
|
name = f"inst-{i}"
|
|
|
|
client = LingmaGatewayClient(
|
|
lingma_bin=lingma_bin,
|
|
work_dir=work_dir,
|
|
socket_port=socket_port,
|
|
startup_timeout=startup_timeout,
|
|
rpc_timeout=rpc_timeout,
|
|
default_model=default_model,
|
|
default_ask_mode=default_ask_mode,
|
|
name=name,
|
|
extra_info_paths=extra_info,
|
|
)
|
|
|
|
def _make_verify(_client: LingmaGatewayClient):
|
|
async def _verify() -> bool:
|
|
try:
|
|
st = await _client.auth_status()
|
|
except Exception:
|
|
return False
|
|
return bool(st and st.get("id"))
|
|
|
|
return _verify
|
|
|
|
auto_login = AutoLoginManager(
|
|
username=acc.username,
|
|
password=acc.password,
|
|
headless=auto_login_headless,
|
|
timeout_sec=auto_login_timeout,
|
|
max_retry=auto_login_max_retry,
|
|
verify_logged_in=_make_verify(client),
|
|
verify_timeout_sec=verify_timeout_sec
|
|
or max(30, min(180, auto_login_timeout)),
|
|
debug_dir=f"/tmp/lingma-auto-login/{name}",
|
|
)
|
|
|
|
cfg = InstanceConfig(
|
|
index=i,
|
|
name=name,
|
|
work_dir=work_dir,
|
|
socket_port=socket_port,
|
|
account=acc,
|
|
)
|
|
instances.append(PoolInstance(cfg, client, auto_login))
|
|
|
|
return cls(instances)
|
|
|
|
# -------------------------------------------------------------- lifecycle
|
|
|
|
async def start(self) -> None:
|
|
"""Boot every pool instance in parallel.
|
|
|
|
Bundle restore is still sequential (cheap, filesystem-level) and logged
|
|
per instance; only the expensive `client.start()` path — which waits on
|
|
the Lingma socket and an LSP initialize round-trip — runs concurrently.
|
|
|
|
Any one instance failing is non-fatal: per-instance reconnect loops
|
|
take over once their first `ensure_ready()` fires.
|
|
"""
|
|
for inst in self._instances:
|
|
self._maybe_apply_session_bundle(inst)
|
|
logger.info(
|
|
"pool starting %s (workDir=%s port=%d account=%s bundle=%s logged_in=%s)",
|
|
inst.name,
|
|
inst.cfg.work_dir,
|
|
inst.cfg.socket_port,
|
|
inst.cfg.account.username or "<empty>",
|
|
bool(
|
|
inst.cfg.account.session_bundle_b64
|
|
or inst.cfg.account.session_bundle_file
|
|
),
|
|
is_logged_in_workdir(inst.cfg.work_dir),
|
|
)
|
|
|
|
async def _start_one(inst: PoolInstance) -> None:
|
|
try:
|
|
await inst.client.start()
|
|
except Exception as exc:
|
|
logger.warning("pool start %s failed: %s", inst.name, exc)
|
|
|
|
await asyncio.gather(
|
|
*(_start_one(inst) for inst in self._instances),
|
|
return_exceptions=False,
|
|
)
|
|
|
|
@staticmethod
|
|
def _maybe_apply_session_bundle(inst: "PoolInstance") -> None:
|
|
"""Restore an exported Lingma session into inst.work_dir, if needed.
|
|
|
|
Skipped when:
|
|
- the workDir already looks logged in (persistent volume case);
|
|
- no bundle is configured.
|
|
"""
|
|
acc = inst.cfg.account
|
|
if is_logged_in_workdir(inst.cfg.work_dir):
|
|
return
|
|
|
|
b64 = resolve_bundle_b64(
|
|
inline=acc.session_bundle_b64 or None,
|
|
file_path=acc.session_bundle_file or None,
|
|
)
|
|
if not b64:
|
|
return
|
|
|
|
try:
|
|
raw = decode_bundle(b64)
|
|
restored = apply_bundle_to_workdir(inst.cfg.work_dir, raw)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"pool %s: failed to apply session bundle, will fall back to auto-login: %s",
|
|
inst.name,
|
|
exc,
|
|
)
|
|
return
|
|
|
|
logger.info(
|
|
"pool %s: applied session bundle (%d files: %s)",
|
|
inst.name,
|
|
len(restored),
|
|
",".join(restored),
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
tasks = [asyncio.create_task(inst.client.close()) for inst in self._instances]
|
|
for t in tasks:
|
|
try:
|
|
await t
|
|
except Exception:
|
|
pass
|
|
|
|
# -------------------------------------------------------------- inspection
|
|
|
|
@property
|
|
def instances(self) -> list[PoolInstance]:
|
|
return list(self._instances)
|
|
|
|
def size(self) -> int:
|
|
return len(self._instances)
|
|
|
|
def stats(self) -> list[dict]:
|
|
return [
|
|
{
|
|
"index": inst.cfg.index,
|
|
"name": inst.name,
|
|
"state": inst.client.state,
|
|
"last_error": inst.client.last_error,
|
|
"in_flight": inst.in_flight,
|
|
"work_dir": inst.cfg.work_dir,
|
|
"socket_port": inst.cfg.socket_port,
|
|
"username": inst.cfg.account.username,
|
|
"auto_login": inst.auto_login.status(),
|
|
}
|
|
for inst in self._instances
|
|
]
|
|
|
|
def prometheus_lines(self) -> list[str]:
|
|
lines: list[str] = [
|
|
"# TYPE gateway_pool_instance_in_flight gauge",
|
|
"# TYPE gateway_pool_instance_ready gauge",
|
|
]
|
|
for inst in self._instances:
|
|
lbl = f'name="{inst.name}",idx="{inst.cfg.index}"'
|
|
lines.append(f"gateway_pool_instance_in_flight{{{lbl}}} {inst.in_flight}")
|
|
lines.append(
|
|
f"gateway_pool_instance_ready{{{lbl}}} {1 if inst.healthy else 0}"
|
|
)
|
|
return lines
|
|
|
|
# -------------------------------------------------------------- selection
|
|
|
|
def pick(self, affinity_key: str | None = None) -> PoolInstance:
|
|
"""Pick an instance for a request.
|
|
|
|
Preference order:
|
|
1. Sticky affinity if `affinity_key` is provided and the bucket is healthy.
|
|
2. Least-in-flight among healthy instances.
|
|
3. Round-robin fallback when nothing is healthy (lazy-start will kick in).
|
|
"""
|
|
if not self._instances:
|
|
raise RuntimeError("lingma pool is empty")
|
|
|
|
healthy = [i for i in self._instances if i.healthy]
|
|
|
|
if affinity_key:
|
|
bucket = self._instances[
|
|
abs(hash(affinity_key)) % len(self._instances)
|
|
]
|
|
if bucket.healthy:
|
|
return bucket
|
|
|
|
if healthy:
|
|
return min(healthy, key=lambda x: (x.in_flight, x.cfg.index))
|
|
|
|
# Nothing healthy. Fall back to round-robin so every instance gets a
|
|
# chance to reconnect via ensure_ready().
|
|
idx = self._rr_counter % len(self._instances)
|
|
self._rr_counter += 1
|
|
return self._instances[idx]
|