from __future__ import annotations import asyncio from dataclasses import dataclass from pathlib import Path from .auto_login import AutoLoginManager from .config import LingmaAccount from .lingma_client import LingmaGatewayClient from .logging_config import get_logger from .session_bundle import ( apply_bundle_to_workdir, decode_bundle, is_logged_in_workdir, resolve_bundle_b64, ) logger = get_logger("lingma_gateway.pool") @dataclass class InstanceConfig: index: int name: str work_dir: str socket_port: int account: LingmaAccount class PoolInstance: """A single Lingma process + its auto_login + in-flight counter.""" __slots__ = ("cfg", "client", "auto_login", "in_flight") def __init__( self, cfg: InstanceConfig, client: LingmaGatewayClient, auto_login: AutoLoginManager, ): self.cfg = cfg self.client = client self.auto_login = auto_login self.in_flight = 0 @property def name(self) -> str: return self.cfg.name @property def healthy(self) -> bool: return self.client.state == LingmaGatewayClient.STATE_READY class LingmaPool: """N-Lingma process pool with least-in-flight + affinity routing. For N=1 this degenerates into the original single-client setup, preserving backwards compatibility with `LINGMA_USERNAME/LINGMA_PASSWORD`-only deploys. """ def __init__(self, instances: list[PoolInstance]): if not instances: raise RuntimeError("LingmaPool requires at least 1 instance") self._instances: list[PoolInstance] = instances self._rr_counter = 0 @classmethod def build( cls, *, lingma_bin: str, base_work_dir: str, legacy_socket_port: int, startup_timeout: int, rpc_timeout: int, default_model: str, default_ask_mode: str, accounts: list[LingmaAccount], instance_count: int, auto_login_headless: bool, auto_login_timeout: int, auto_login_max_retry: int, verify_timeout_sec: int | None = None, ) -> "LingmaPool": """Materialize N PoolInstances. Single-instance (N=1) uses the legacy workDir and LINGMA_SOCKET_PORT so existing deployments keep their state after upgrade. N>1 derives per-instance workDirs under `/../pool/inst-` and uses dynamic ports. """ if instance_count < 1: instance_count = 1 resolved_accounts: list[LingmaAccount] = [] for i in range(instance_count): if accounts: resolved_accounts.append(accounts[i % len(accounts)]) else: resolved_accounts.append(LingmaAccount(username="", password="")) if instance_count > len(accounts) and accounts: logger.warning( "instance_count=%d exceeds unique accounts=%d; accounts will be reused", instance_count, len(accounts), ) base_dir = Path(base_work_dir) # Put per-instance workDirs under `/.lingma/pool/inst-`. # Walk up past the vscode/sharedClientCache layout if present. pool_root = base_dir for _ in range(3): if pool_root.name == ".lingma": break if pool_root.parent == pool_root: break pool_root = pool_root.parent pool_root = pool_root / "pool" instances: list[PoolInstance] = [] for i, acc in enumerate(resolved_accounts): if instance_count == 1: work_dir = str(base_dir) socket_port = legacy_socket_port extra_info: list[Path] | None = None else: work_dir = str(pool_root / f"inst-{i}") socket_port = 0 # In pool mode each instance reads only its own workDir .info to # avoid the shared ~/.lingma/.info race between instances. extra_info = [] name = f"inst-{i}" client = LingmaGatewayClient( lingma_bin=lingma_bin, work_dir=work_dir, socket_port=socket_port, startup_timeout=startup_timeout, rpc_timeout=rpc_timeout, default_model=default_model, default_ask_mode=default_ask_mode, name=name, extra_info_paths=extra_info, ) def _make_verify(_client: LingmaGatewayClient): async def _verify() -> bool: try: st = await _client.auth_status() except Exception: return False return bool(st and st.get("id")) return _verify auto_login = AutoLoginManager( username=acc.username, password=acc.password, headless=auto_login_headless, timeout_sec=auto_login_timeout, max_retry=auto_login_max_retry, verify_logged_in=_make_verify(client), verify_timeout_sec=verify_timeout_sec or max(30, min(180, auto_login_timeout)), debug_dir=f"/tmp/lingma-auto-login/{name}", ) cfg = InstanceConfig( index=i, name=name, work_dir=work_dir, socket_port=socket_port, account=acc, ) instances.append(PoolInstance(cfg, client, auto_login)) return cls(instances) # -------------------------------------------------------------- lifecycle async def start(self) -> None: """Start all instances sequentially. Sequential startup avoids racing on the shared ~/.lingma/.info file (for pool-mode we skip it anyway, but Lingma may still write there internally) and keeps docker logs readable. Failures are non-fatal; per-instance reconnect loops will take over. Before spawning each Lingma process we optionally restore a pre-captured session bundle into the workDir, which lets us skip Playwright login entirely on a fresh volume. """ for inst in self._instances: self._maybe_apply_session_bundle(inst) logger.info( "pool starting %s (workDir=%s port=%d account=%s bundle=%s logged_in=%s)", inst.name, inst.cfg.work_dir, inst.cfg.socket_port, inst.cfg.account.username or "", bool( inst.cfg.account.session_bundle_b64 or inst.cfg.account.session_bundle_file ), is_logged_in_workdir(inst.cfg.work_dir), ) try: await inst.client.start() except Exception as exc: logger.warning("pool start %s failed: %s", inst.name, exc) @staticmethod def _maybe_apply_session_bundle(inst: "PoolInstance") -> None: """Restore an exported Lingma session into inst.work_dir, if needed. Skipped when: - the workDir already looks logged in (persistent volume case); - no bundle is configured. """ acc = inst.cfg.account if is_logged_in_workdir(inst.cfg.work_dir): return b64 = resolve_bundle_b64( inline=acc.session_bundle_b64 or None, file_path=acc.session_bundle_file or None, ) if not b64: return try: raw = decode_bundle(b64) restored = apply_bundle_to_workdir(inst.cfg.work_dir, raw) except Exception as exc: logger.warning( "pool %s: failed to apply session bundle, will fall back to auto-login: %s", inst.name, exc, ) return logger.info( "pool %s: applied session bundle (%d files: %s)", inst.name, len(restored), ",".join(restored), ) async def close(self) -> None: tasks = [asyncio.create_task(inst.client.close()) for inst in self._instances] for t in tasks: try: await t except Exception: pass # -------------------------------------------------------------- inspection @property def instances(self) -> list[PoolInstance]: return list(self._instances) def size(self) -> int: return len(self._instances) def stats(self) -> list[dict]: return [ { "index": inst.cfg.index, "name": inst.name, "state": inst.client.state, "last_error": inst.client.last_error, "in_flight": inst.in_flight, "work_dir": inst.cfg.work_dir, "socket_port": inst.cfg.socket_port, "username": inst.cfg.account.username, "auto_login": inst.auto_login.status(), } for inst in self._instances ] def prometheus_lines(self) -> list[str]: lines: list[str] = [ "# TYPE gateway_pool_instance_in_flight gauge", "# TYPE gateway_pool_instance_ready gauge", ] for inst in self._instances: lbl = f'name="{inst.name}",idx="{inst.cfg.index}"' lines.append(f"gateway_pool_instance_in_flight{{{lbl}}} {inst.in_flight}") lines.append( f"gateway_pool_instance_ready{{{lbl}}} {1 if inst.healthy else 0}" ) return lines # -------------------------------------------------------------- selection def pick(self, affinity_key: str | None = None) -> PoolInstance: """Pick an instance for a request. Preference order: 1. Sticky affinity if `affinity_key` is provided and the bucket is healthy. 2. Least-in-flight among healthy instances. 3. Round-robin fallback when nothing is healthy (lazy-start will kick in). """ if not self._instances: raise RuntimeError("lingma pool is empty") healthy = [i for i in self._instances if i.healthy] if affinity_key: bucket = self._instances[ abs(hash(affinity_key)) % len(self._instances) ] if bucket.healthy: return bucket if healthy: return min(healthy, key=lambda x: (x.in_flight, x.cfg.index)) # Nothing healthy. Fall back to round-robin so every instance gets a # chance to reconnect via ensure_ready(). idx = self._rr_counter % len(self._instances) self._rr_counter += 1 return self._instances[idx]