Adds a lightweight way to pre-seed a Lingma workDir with an existing logged-in session: - New module session_bundle.py packs/unpacks only the four cache files that make up a Lingma login (id, user, quota, config.json). Everything else (db, logs, index, diagnosis) stays local so bundles stay tiny and never leak session-specific artefacts. - Safety: path-traversal/symlink members are rejected; size is capped; refuses to export from a workDir that isn't actually logged in; sensitive cache/user is chmod'd 0600 on restore. - LingmaAccount gains optional session_bundle_b64 / session_bundle_file; LINGMA_SESSION_BUNDLE[_FILE] env provide the singleton fallback. Credentials become optional when a bundle is supplied. - LingmaPool.start() restores the bundle into each instance workDir only if it isn't already logged in, so persistent volumes aren't clobbered and a corrupt bundle falls back to Playwright gracefully. - POST /internal/session/export returns the bundle as base64; ?instance= selects a specific pool instance. Requires an authed, already-logged-in instance to prevent exporting empties. - README + .env.example document the end-to-end flow. Made-with: Cursor
176 lines
5.6 KiB
Python
176 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
"""
|
|
Lingma session bundle: pack/unpack the minimal set of cache files that
|
|
represent a logged-in state, so it can be injected as an env var to skip
|
|
Playwright auto-login entirely.
|
|
|
|
Lingma stores its auth state in `<workDir>/cache/`. Experimentation shows the
|
|
following files are necessary and sufficient to restore a session on a fresh
|
|
workDir:
|
|
|
|
cache/id -> stable client UUID
|
|
cache/user -> encrypted user/token blob
|
|
cache/quota -> quota metadata referenced at startup
|
|
cache/config.json -> endpoint / env config (mostly non-sensitive)
|
|
|
|
Volatile artifacts (db/, logs/, tmp/, index/, diagnosis.bin, .lock, .info)
|
|
are *not* included: they are rebuilt by Lingma on first run.
|
|
"""
|
|
|
|
import base64
|
|
import io
|
|
import os
|
|
import tarfile
|
|
from pathlib import Path
|
|
|
|
from .logging_config import get_logger
|
|
|
|
|
|
logger = get_logger("lingma_gateway.bundle")
|
|
|
|
|
|
BUNDLE_FILES: tuple[str, ...] = (
|
|
"cache/id",
|
|
"cache/user",
|
|
"cache/quota",
|
|
"cache/config.json",
|
|
)
|
|
|
|
# Hard safety cap so a malformed bundle (or a /tmp full of junk) can't blow up
|
|
# memory or disk. 4 MiB is ~1000x the real payload.
|
|
MAX_BUNDLE_BYTES = 4 * 1024 * 1024
|
|
|
|
|
|
def is_logged_in_workdir(work_dir: str | os.PathLike) -> bool:
|
|
"""Heuristic: a non-empty `cache/user` means someone logged in here."""
|
|
p = Path(work_dir) / "cache" / "user"
|
|
try:
|
|
return p.is_file() and p.stat().st_size > 0
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def pack_workdir(work_dir: str | os.PathLike) -> bytes:
|
|
"""Create a tar.gz of the session-relevant subset of `work_dir/cache`.
|
|
|
|
Missing files are silently skipped (e.g. `quota` isn't always present on
|
|
fresh logins), but `cache/user` MUST exist or we raise -- exporting an
|
|
empty bundle would just corrupt the remote side.
|
|
"""
|
|
base = Path(work_dir)
|
|
user_file = base / "cache" / "user"
|
|
if not user_file.is_file() or user_file.stat().st_size == 0:
|
|
raise RuntimeError(
|
|
f"workDir {base} has no login state (cache/user missing or empty); "
|
|
"cannot export a session bundle"
|
|
)
|
|
|
|
buf = io.BytesIO()
|
|
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
|
|
for rel in BUNDLE_FILES:
|
|
src = base / rel
|
|
if not src.is_file():
|
|
continue
|
|
tf.add(str(src), arcname=rel, recursive=False)
|
|
data = buf.getvalue()
|
|
if len(data) > MAX_BUNDLE_BYTES:
|
|
raise RuntimeError(
|
|
f"session bundle too large: {len(data)} bytes (limit {MAX_BUNDLE_BYTES})"
|
|
)
|
|
return data
|
|
|
|
|
|
def encode_bundle(raw: bytes) -> str:
|
|
return base64.b64encode(raw).decode("ascii")
|
|
|
|
|
|
def decode_bundle(b64: str) -> bytes:
|
|
b64 = (b64 or "").strip()
|
|
if not b64:
|
|
raise ValueError("empty bundle")
|
|
try:
|
|
raw = base64.b64decode(b64, validate=True)
|
|
except Exception as exc:
|
|
raise ValueError(f"invalid base64: {exc}") from exc
|
|
if len(raw) > MAX_BUNDLE_BYTES:
|
|
raise ValueError(f"bundle too large: {len(raw)} bytes")
|
|
return raw
|
|
|
|
|
|
def _is_safe_member(member: tarfile.TarInfo) -> bool:
|
|
"""Reject anything that isn't one of our whitelisted relative files.
|
|
|
|
Guards against path traversal (CVE-2007-4559 class) and symlink tricks.
|
|
"""
|
|
if member.name not in BUNDLE_FILES:
|
|
return False
|
|
if member.isdir() or member.issym() or member.islnk():
|
|
return False
|
|
if not member.isfile():
|
|
return False
|
|
# Linux-safe absolute path / traversal check (tarfile already normalizes
|
|
# `./` but be explicit).
|
|
if member.name.startswith("/") or ".." in Path(member.name).parts:
|
|
return False
|
|
return True
|
|
|
|
|
|
def apply_bundle_to_workdir(work_dir: str | os.PathLike, raw: bytes) -> list[str]:
|
|
"""Extract the bundle into `<work_dir>/`, creating parents as needed.
|
|
|
|
Returns the list of files actually restored.
|
|
"""
|
|
base = Path(work_dir)
|
|
base.mkdir(parents=True, exist_ok=True)
|
|
restored: list[str] = []
|
|
|
|
with tarfile.open(fileobj=io.BytesIO(raw), mode="r:gz") as tf:
|
|
for member in tf.getmembers():
|
|
if not _is_safe_member(member):
|
|
logger.warning("bundle: skipping unsafe member %r", member.name)
|
|
continue
|
|
dest = base / member.name
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
src = tf.extractfile(member)
|
|
if src is None:
|
|
continue
|
|
data = src.read()
|
|
# Honour original mode bits when they make sense, else 0600 for
|
|
# the sensitive `user` file.
|
|
dest.write_bytes(data)
|
|
mode = 0o600 if member.name.endswith("/user") else 0o644
|
|
try:
|
|
os.chmod(dest, mode)
|
|
except OSError:
|
|
pass
|
|
restored.append(member.name)
|
|
|
|
return restored
|
|
|
|
|
|
def resolve_bundle_b64(
|
|
*,
|
|
inline: str | None,
|
|
file_path: str | None,
|
|
) -> str | None:
|
|
"""Pick a bundle from either an inline base64 string or a file path.
|
|
|
|
Inline wins if both are set. Returns None if neither is configured.
|
|
Never logs the raw material.
|
|
"""
|
|
if inline and inline.strip():
|
|
return inline.strip()
|
|
if file_path and file_path.strip():
|
|
path = Path(file_path.strip()).expanduser()
|
|
if not path.is_file():
|
|
logger.warning("bundle: file %s not found, ignoring", path)
|
|
return None
|
|
try:
|
|
text = path.read_text(encoding="ascii", errors="strict").strip()
|
|
except Exception as exc:
|
|
logger.warning("bundle: cannot read %s: %s", path, exc)
|
|
return None
|
|
return text or None
|
|
return None
|