feat: add Lingma OpenAI-compatible gateway service
This commit is contained in:
149
app/auto_login.py
Normal file
149
app/auto_login.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import time
|
||||
|
||||
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
|
||||
class AutoLoginManager:
|
||||
def __init__(
|
||||
self,
|
||||
username: str,
|
||||
password: str,
|
||||
headless: bool,
|
||||
timeout_sec: int,
|
||||
max_retry: int,
|
||||
):
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.headless = headless
|
||||
self.timeout_sec = timeout_sec
|
||||
self.max_retry = max_retry
|
||||
self._lock = asyncio.Lock()
|
||||
self._task: asyncio.Task | None = None
|
||||
self._state = "idle"
|
||||
self._last_error = ""
|
||||
self._last_started_at = 0.0
|
||||
self._last_finished_at = 0.0
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
return self._state
|
||||
|
||||
def status(self) -> dict:
|
||||
return {
|
||||
"state": self._state,
|
||||
"last_error": self._last_error,
|
||||
"last_started_at": self._last_started_at,
|
||||
"last_finished_at": self._last_finished_at,
|
||||
"running": self._task is not None and not self._task.done(),
|
||||
}
|
||||
|
||||
async def ensure_started(self, login_url: str):
|
||||
async with self._lock:
|
||||
if self._task and not self._task.done():
|
||||
return False
|
||||
self._task = asyncio.create_task(self._run(login_url))
|
||||
return True
|
||||
|
||||
async def wait_done(self, timeout: float):
|
||||
if not self._task:
|
||||
return
|
||||
await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
|
||||
|
||||
async def _run(self, login_url: str):
|
||||
self._state = "running"
|
||||
self._last_error = ""
|
||||
self._last_started_at = time.time()
|
||||
|
||||
try:
|
||||
for attempt in range(1, self.max_retry + 1):
|
||||
try:
|
||||
await self._auto_login_once(login_url)
|
||||
self._state = "success"
|
||||
self._last_finished_at = time.time()
|
||||
return
|
||||
except Exception as exc:
|
||||
self._last_error = f"attempt {attempt}: {exc}"
|
||||
if attempt >= self.max_retry:
|
||||
raise
|
||||
await asyncio.sleep(1.5)
|
||||
except Exception:
|
||||
self._state = "failed"
|
||||
self._last_finished_at = time.time()
|
||||
|
||||
async def _auto_login_once(self, login_url: str):
|
||||
if not self.username or not self.password:
|
||||
raise RuntimeError("LINGMA_USERNAME/LINGMA_PASSWORD not configured")
|
||||
|
||||
deadline = time.time() + self.timeout_sec
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=self.headless)
|
||||
context = await browser.new_context(ignore_https_errors=True)
|
||||
page = await context.new_page()
|
||||
try:
|
||||
await page.goto(login_url, wait_until="domcontentloaded", timeout=30000)
|
||||
|
||||
# Try common login selectors.
|
||||
await self._fill_if_visible(page, [
|
||||
'input[type="email"]',
|
||||
'input[name="loginId"]',
|
||||
'input[name="username"]',
|
||||
'input[name="account"]',
|
||||
'input[placeholder*="账号"]',
|
||||
'input[placeholder*="邮箱"]',
|
||||
], self.username)
|
||||
|
||||
await self._fill_if_visible(page, [
|
||||
'input[type="password"]',
|
||||
'input[name="password"]',
|
||||
'input[placeholder*="密码"]',
|
||||
], self.password)
|
||||
|
||||
await self._click_if_visible(page, [
|
||||
'button:has-text("登录")',
|
||||
'button:has-text("登 录")',
|
||||
'button:has-text("Login")',
|
||||
'button[type="submit"]',
|
||||
])
|
||||
|
||||
# Wait for redirect / callback activity.
|
||||
while time.time() < deadline:
|
||||
url = page.url
|
||||
if "lingma" in url and ("callback" in url or "tokenString=" in url):
|
||||
break
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
except PlaywrightTimeoutError as exc:
|
||||
raise RuntimeError(f"playwright timeout: {exc}") from exc
|
||||
finally:
|
||||
with contextlib.suppress(Exception):
|
||||
await context.close()
|
||||
with contextlib.suppress(Exception):
|
||||
await browser.close()
|
||||
|
||||
async def _fill_if_visible(self, page, selectors: list[str], value: str):
|
||||
for sel in selectors:
|
||||
locator = page.locator(sel).first
|
||||
try:
|
||||
if await locator.is_visible(timeout=1500):
|
||||
await locator.fill(value)
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
return False
|
||||
|
||||
async def _click_if_visible(self, page, selectors: list[str]):
|
||||
for sel in selectors:
|
||||
locator = page.locator(sel).first
|
||||
try:
|
||||
if await locator.is_visible(timeout=1500):
|
||||
await locator.click()
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
return False
|
||||
Reference in New Issue
Block a user