from __future__ import annotations import asyncio import contextlib import os import time from pathlib import Path from typing import Awaitable, Callable from playwright.async_api import TimeoutError as PlaywrightTimeoutError from playwright.async_api import async_playwright class AutoLoginManager: def __init__( self, username: str, password: str, headless: bool, timeout_sec: int, max_retry: int, verify_logged_in: Callable[[], Awaitable[bool]] | None = None, verify_timeout_sec: int = 60, debug_dir: str = "/tmp/lingma-auto-login", ): self.username = username self.password = password self.headless = headless self.timeout_sec = timeout_sec self.max_retry = max_retry self.verify_logged_in = verify_logged_in self.verify_timeout_sec = verify_timeout_sec self.debug_dir = debug_dir self._lock = asyncio.Lock() self._task: asyncio.Task | None = None self._state = "idle" self._last_error = "" self._last_started_at = 0.0 self._last_finished_at = 0.0 @property def state(self) -> str: return self._state def status(self) -> dict: return { "state": self._state, "last_error": self._last_error, "last_started_at": self._last_started_at, "last_finished_at": self._last_finished_at, "running": self._task is not None and not self._task.done(), } async def ensure_started(self, login_url: str): async with self._lock: if self._task and not self._task.done(): return False self._task = asyncio.create_task(self._run(login_url)) return True async def wait_done(self, timeout: float): if not self._task: return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout) async def _run(self, login_url: str): self._state = "running" self._last_error = "" self._last_started_at = time.time() try: for attempt in range(1, self.max_retry + 1): try: await self._auto_login_once(login_url) await self._wait_logged_in_after_browser() self._state = "success" self._last_finished_at = time.time() return except Exception as exc: self._last_error = f"attempt {attempt}: {exc}" if attempt >= self.max_retry: raise await asyncio.sleep(1.5) except Exception: self._state = "failed" self._last_finished_at = time.time() async def _auto_login_once(self, login_url: str): if not self.username or not self.password: raise RuntimeError("LINGMA_USERNAME/LINGMA_PASSWORD not configured") deadline = time.time() + self.timeout_sec async with async_playwright() as p: browser = await p.chromium.launch(headless=self.headless) context = await browser.new_context(ignore_https_errors=True) page = await context.new_page() try: await page.goto(login_url, wait_until="domcontentloaded", timeout=30000) # Try common login selectors. user_filled = await self._fill_if_visible(page, [ 'input[type="email"]', 'input[name="loginId"]', 'input[name="username"]', 'input[name="account"]', 'input[placeholder*="账号"]', 'input[placeholder*="邮箱"]', ], self.username) pwd_filled = await self._fill_if_visible(page, [ 'input[type="password"]', 'input[name="password"]', 'input[placeholder*="密码"]', ], self.password) clicked = await self._click_if_visible(page, [ 'button:has-text("登录")', 'button:has-text("登 录")', 'button:has-text("Login")', 'button[type="submit"]', ]) if not (user_filled and pwd_filled and clicked): debug_file = await self._dump_debug_page(page) raise RuntimeError( "login form selectors not matched " f"(user={user_filled}, password={pwd_filled}, click={clicked}), " f"debug={debug_file}" ) # Wait and validate the core login API response. login_resp = None loop = asyncio.get_running_loop() login_resp_future: asyncio.Future = loop.create_future() def on_response(resp): try: if "/users/ajax/login" in resp.url and resp.request.method == "POST": if not login_resp_future.done(): login_resp_future.set_result(resp) except Exception: pass page.on("response", on_response) try: login_resp = await asyncio.wait_for(login_resp_future, timeout=20) except asyncio.TimeoutError: login_resp = None finally: with contextlib.suppress(Exception): page.remove_listener("response", on_response) if login_resp is None: debug_file = await self._dump_debug_page(page) raise RuntimeError(f"login API not captured, debug={debug_file}") try: payload = await login_resp.json() except Exception: payload = None api_ok = bool( login_resp.status == 200 and isinstance(payload, dict) and payload.get("need_login") is False ) if not api_ok: debug_file = await self._dump_debug_page(page) raise RuntimeError( "login API did not return success " f"(status={login_resp.status}, payload={payload}), debug={debug_file}" ) # Wait for redirect / callback activity. while time.time() < deadline: url = page.url if "lingma" in url and ("callback" in url or "tokenString=" in url): break await asyncio.sleep(1.0) except PlaywrightTimeoutError as exc: raise RuntimeError(f"playwright timeout: {exc}") from exc finally: with contextlib.suppress(Exception): await context.close() with contextlib.suppress(Exception): await browser.close() async def _fill_if_visible(self, page, selectors: list[str], value: str): for sel in selectors: locator = page.locator(sel).first try: if await locator.is_visible(timeout=1500): await locator.fill(value) return True except Exception: continue return False async def _click_if_visible(self, page, selectors: list[str]): for sel in selectors: locator = page.locator(sel).first try: if await locator.is_visible(timeout=1500): await locator.click() return True except Exception: continue return False async def _wait_logged_in_after_browser(self): if self.verify_logged_in is None: return start = time.time() while time.time() - start < self.verify_timeout_sec: try: ok = await self.verify_logged_in() if ok: return except Exception: pass await asyncio.sleep(2.0) raise RuntimeError("auth status still not logged in after browser flow") async def _dump_debug_page(self, page) -> str: Path(self.debug_dir).mkdir(parents=True, exist_ok=True) ts = int(time.time()) png = Path(self.debug_dir) / f"login-{ts}.png" html = Path(self.debug_dir) / f"login-{ts}.html" with contextlib.suppress(Exception): await page.screenshot(path=str(png), full_page=True) with contextlib.suppress(Exception): content = await page.content() html.write_text(content, encoding="utf-8") return str(png if png.exists() else html)