242 lines
9.0 KiB
Python
242 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Awaitable, Callable
|
|
|
|
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
|
from playwright.async_api import async_playwright
|
|
|
|
|
|
class AutoLoginManager:
|
|
def __init__(
|
|
self,
|
|
username: str,
|
|
password: str,
|
|
headless: bool,
|
|
timeout_sec: int,
|
|
max_retry: int,
|
|
verify_logged_in: Callable[[], Awaitable[bool]] | None = None,
|
|
verify_timeout_sec: int = 60,
|
|
debug_dir: str = "/tmp/lingma-auto-login",
|
|
):
|
|
self.username = username
|
|
self.password = password
|
|
self.headless = headless
|
|
self.timeout_sec = timeout_sec
|
|
self.max_retry = max_retry
|
|
self.verify_logged_in = verify_logged_in
|
|
self.verify_timeout_sec = verify_timeout_sec
|
|
self.debug_dir = debug_dir
|
|
self._lock = asyncio.Lock()
|
|
self._task: asyncio.Task | None = None
|
|
self._state = "idle"
|
|
self._last_error = ""
|
|
self._last_started_at = 0.0
|
|
self._last_finished_at = 0.0
|
|
|
|
@property
|
|
def state(self) -> str:
|
|
return self._state
|
|
|
|
def status(self) -> dict:
|
|
return {
|
|
"state": self._state,
|
|
"last_error": self._last_error,
|
|
"last_started_at": self._last_started_at,
|
|
"last_finished_at": self._last_finished_at,
|
|
"running": self._task is not None and not self._task.done(),
|
|
}
|
|
|
|
async def ensure_started(self, login_url: str):
|
|
async with self._lock:
|
|
if self._task and not self._task.done():
|
|
return False
|
|
self._task = asyncio.create_task(self._run(login_url))
|
|
return True
|
|
|
|
async def wait_done(self, timeout: float):
|
|
if not self._task:
|
|
return
|
|
await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
|
|
|
|
async def _run(self, login_url: str):
|
|
self._state = "running"
|
|
self._last_error = ""
|
|
self._last_started_at = time.time()
|
|
|
|
try:
|
|
for attempt in range(1, self.max_retry + 1):
|
|
try:
|
|
await self._auto_login_once(login_url)
|
|
await self._wait_logged_in_after_browser()
|
|
self._state = "success"
|
|
self._last_finished_at = time.time()
|
|
return
|
|
except Exception as exc:
|
|
self._last_error = f"attempt {attempt}: {exc}"
|
|
if attempt >= self.max_retry:
|
|
raise
|
|
await asyncio.sleep(1.5)
|
|
except Exception:
|
|
self._state = "failed"
|
|
self._last_finished_at = time.time()
|
|
|
|
async def _auto_login_once(self, login_url: str):
|
|
if not self.username or not self.password:
|
|
raise RuntimeError("LINGMA_USERNAME/LINGMA_PASSWORD not configured")
|
|
|
|
deadline = time.time() + self.timeout_sec
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=self.headless)
|
|
context = await browser.new_context(ignore_https_errors=True)
|
|
page = await context.new_page()
|
|
try:
|
|
await page.goto(login_url, wait_until="domcontentloaded", timeout=30000)
|
|
|
|
# Try common login selectors.
|
|
user_filled = await self._fill_if_visible(page, [
|
|
'input[type="email"]',
|
|
'input[name="loginId"]',
|
|
'input[name="username"]',
|
|
'input[name="account"]',
|
|
'input[placeholder*="账号"]',
|
|
'input[placeholder*="邮箱"]',
|
|
], self.username)
|
|
|
|
pwd_filled = await self._fill_if_visible(page, [
|
|
'input[type="password"]',
|
|
'input[name="password"]',
|
|
'input[placeholder*="密码"]',
|
|
], self.password)
|
|
|
|
clicked = await self._click_if_visible(page, [
|
|
'button:has-text("登录")',
|
|
'button:has-text("登 录")',
|
|
'button:has-text("Login")',
|
|
'button[type="submit"]',
|
|
])
|
|
|
|
if not (user_filled and pwd_filled and clicked):
|
|
debug_file = await self._dump_debug_page(page)
|
|
raise RuntimeError(
|
|
"login form selectors not matched "
|
|
f"(user={user_filled}, password={pwd_filled}, click={clicked}), "
|
|
f"debug={debug_file}"
|
|
)
|
|
|
|
# Wait and validate the core login API response.
|
|
login_resp = None
|
|
loop = asyncio.get_running_loop()
|
|
login_resp_future: asyncio.Future = loop.create_future()
|
|
|
|
def on_response(resp):
|
|
try:
|
|
if "/users/ajax/login" in resp.url and resp.request.method == "POST":
|
|
if not login_resp_future.done():
|
|
login_resp_future.set_result(resp)
|
|
except Exception:
|
|
pass
|
|
|
|
page.on("response", on_response)
|
|
try:
|
|
login_resp = await asyncio.wait_for(login_resp_future, timeout=20)
|
|
except asyncio.TimeoutError:
|
|
login_resp = None
|
|
finally:
|
|
with contextlib.suppress(Exception):
|
|
page.remove_listener("response", on_response)
|
|
|
|
if login_resp is None:
|
|
debug_file = await self._dump_debug_page(page)
|
|
raise RuntimeError(f"login API not captured, debug={debug_file}")
|
|
|
|
try:
|
|
payload = await login_resp.json()
|
|
except Exception:
|
|
payload = None
|
|
|
|
api_ok = False
|
|
if login_resp.status == 200:
|
|
# Some environments return non-JSON/empty body for ajax login;
|
|
# if status is 200, let auth-status polling decide final success.
|
|
if payload is None:
|
|
api_ok = True
|
|
elif isinstance(payload, dict):
|
|
api_ok = payload.get("need_login") is False
|
|
if not api_ok:
|
|
debug_file = await self._dump_debug_page(page)
|
|
raise RuntimeError(
|
|
"login API did not return success "
|
|
f"(status={login_resp.status}, payload={payload}), debug={debug_file}"
|
|
)
|
|
|
|
# Wait for redirect / callback activity.
|
|
while time.time() < deadline:
|
|
url = page.url
|
|
if "lingma" in url and ("callback" in url or "tokenString=" in url):
|
|
break
|
|
await asyncio.sleep(1.0)
|
|
|
|
except PlaywrightTimeoutError as exc:
|
|
raise RuntimeError(f"playwright timeout: {exc}") from exc
|
|
finally:
|
|
with contextlib.suppress(Exception):
|
|
await context.close()
|
|
with contextlib.suppress(Exception):
|
|
await browser.close()
|
|
|
|
async def _fill_if_visible(self, page, selectors: list[str], value: str):
|
|
for sel in selectors:
|
|
locator = page.locator(sel).first
|
|
try:
|
|
if await locator.is_visible(timeout=1500):
|
|
await locator.fill(value)
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
async def _click_if_visible(self, page, selectors: list[str]):
|
|
for sel in selectors:
|
|
locator = page.locator(sel).first
|
|
try:
|
|
if await locator.is_visible(timeout=1500):
|
|
await locator.click()
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
async def _wait_logged_in_after_browser(self):
|
|
if self.verify_logged_in is None:
|
|
return
|
|
|
|
start = time.time()
|
|
while time.time() - start < self.verify_timeout_sec:
|
|
try:
|
|
ok = await self.verify_logged_in()
|
|
if ok:
|
|
return
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(2.0)
|
|
|
|
raise RuntimeError("auth status still not logged in after browser flow")
|
|
|
|
async def _dump_debug_page(self, page) -> str:
|
|
Path(self.debug_dir).mkdir(parents=True, exist_ok=True)
|
|
ts = int(time.time())
|
|
png = Path(self.debug_dir) / f"login-{ts}.png"
|
|
html = Path(self.debug_dir) / f"login-{ts}.html"
|
|
with contextlib.suppress(Exception):
|
|
await page.screenshot(path=str(png), full_page=True)
|
|
with contextlib.suppress(Exception):
|
|
content = await page.content()
|
|
html.write_text(content, encoding="utf-8")
|
|
return str(png if png.exists() else html)
|