from __future__ import annotations """ Lingma session bundle: pack/unpack the minimal set of cache files that represent a logged-in state, so it can be injected as an env var to skip Playwright auto-login entirely. Lingma stores its auth state in `/cache/`. Experimentation shows the following files are necessary and sufficient to restore a session on a fresh workDir: cache/id -> stable client UUID cache/user -> encrypted user/token blob cache/quota -> quota metadata referenced at startup cache/config.json -> endpoint / env config (mostly non-sensitive) Volatile artifacts (db/, logs/, tmp/, index/, diagnosis.bin, .lock, .info) are *not* included: they are rebuilt by Lingma on first run. """ import base64 import io import os import tarfile from pathlib import Path from .logging_config import get_logger logger = get_logger("lingma_gateway.bundle") BUNDLE_FILES: tuple[str, ...] = ( "cache/id", "cache/user", "cache/quota", "cache/config.json", ) # Hard safety cap so a malformed bundle (or a /tmp full of junk) can't blow up # memory or disk. 4 MiB is ~1000x the real payload. MAX_BUNDLE_BYTES = 4 * 1024 * 1024 def is_logged_in_workdir(work_dir: str | os.PathLike) -> bool: """Heuristic: a non-empty `cache/user` means someone logged in here.""" p = Path(work_dir) / "cache" / "user" try: return p.is_file() and p.stat().st_size > 0 except OSError: return False def pack_workdir(work_dir: str | os.PathLike) -> bytes: """Create a tar.gz of the session-relevant subset of `work_dir/cache`. Missing files are silently skipped (e.g. `quota` isn't always present on fresh logins), but `cache/user` MUST exist or we raise -- exporting an empty bundle would just corrupt the remote side. """ base = Path(work_dir) user_file = base / "cache" / "user" if not user_file.is_file() or user_file.stat().st_size == 0: raise RuntimeError( f"workDir {base} has no login state (cache/user missing or empty); " "cannot export a session bundle" ) buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:gz") as tf: for rel in BUNDLE_FILES: src = base / rel if not src.is_file(): continue tf.add(str(src), arcname=rel, recursive=False) data = buf.getvalue() if len(data) > MAX_BUNDLE_BYTES: raise RuntimeError( f"session bundle too large: {len(data)} bytes (limit {MAX_BUNDLE_BYTES})" ) return data def encode_bundle(raw: bytes) -> str: return base64.b64encode(raw).decode("ascii") def decode_bundle(b64: str) -> bytes: b64 = (b64 or "").strip() if not b64: raise ValueError("empty bundle") try: raw = base64.b64decode(b64, validate=True) except Exception as exc: raise ValueError(f"invalid base64: {exc}") from exc if len(raw) > MAX_BUNDLE_BYTES: raise ValueError(f"bundle too large: {len(raw)} bytes") return raw def _is_safe_member(member: tarfile.TarInfo) -> bool: """Reject anything that isn't one of our whitelisted relative files. Guards against path traversal (CVE-2007-4559 class) and symlink tricks. """ if member.name not in BUNDLE_FILES: return False if member.isdir() or member.issym() or member.islnk(): return False if not member.isfile(): return False # Linux-safe absolute path / traversal check (tarfile already normalizes # `./` but be explicit). if member.name.startswith("/") or ".." in Path(member.name).parts: return False return True def apply_bundle_to_workdir(work_dir: str | os.PathLike, raw: bytes) -> list[str]: """Extract the bundle into `/`, creating parents as needed. Returns the list of files actually restored. """ base = Path(work_dir) base.mkdir(parents=True, exist_ok=True) restored: list[str] = [] with tarfile.open(fileobj=io.BytesIO(raw), mode="r:gz") as tf: for member in tf.getmembers(): if not _is_safe_member(member): logger.warning("bundle: skipping unsafe member %r", member.name) continue dest = base / member.name dest.parent.mkdir(parents=True, exist_ok=True) src = tf.extractfile(member) if src is None: continue data = src.read() # Honour original mode bits when they make sense, else 0600 for # the sensitive `user` file. dest.write_bytes(data) mode = 0o600 if member.name.endswith("/user") else 0o644 try: os.chmod(dest, mode) except OSError: pass restored.append(member.name) return restored def resolve_bundle_b64( *, inline: str | None, file_path: str | None, ) -> str | None: """Pick a bundle from either an inline base64 string or a file path. Inline wins if both are set. Returns None if neither is configured. Never logs the raw material. """ if inline and inline.strip(): return inline.strip() if file_path and file_path.strip(): path = Path(file_path.strip()).expanduser() if not path.is_file(): logger.warning("bundle: file %s not found, ignoring", path) return None try: text = path.read_text(encoding="ascii", errors="strict").strip() except Exception as exc: logger.warning("bundle: cannot read %s: %s", path, exc) return None return text or None return None