from __future__ import annotations import json import time import uuid from fastapi import Depends, FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from .auto_login import AutoLoginManager from .auth import require_bearer from .config import Settings, load_settings from .lingma_client import LingmaGatewayClient from .model_map import build_model_name_map, flatten_model_keys, resolve_model from .openai_schema import ( ChatCompletionChoice, ChatCompletionResponse, ChatCompletionsRequest, ModelData, ModelsResponse, ) from .stats import StatsCollector, estimate_tokens app = FastAPI(title="Lingma OpenAI Gateway", version="0.1.0") settings: Settings = load_settings() lingma: LingmaGatewayClient | None = None auto_login: AutoLoginManager | None = None stats_collector = StatsCollector() def auth_guard(request: Request): require_bearer(request, settings.api_keys) @app.on_event("startup") async def on_startup(): global lingma, auto_login lingma = LingmaGatewayClient( lingma_bin=settings.lingma_bin, work_dir=settings.lingma_work_dir, socket_port=settings.lingma_socket_port, startup_timeout=settings.lingma_startup_timeout, rpc_timeout=settings.lingma_rpc_timeout, default_model=settings.default_model, default_ask_mode=settings.default_ask_mode, ) await lingma.start() auto_login = AutoLoginManager( username=settings.lingma_username, password=settings.lingma_password, headless=settings.auto_login_headless, timeout_sec=settings.auto_login_timeout, max_retry=settings.auto_login_max_retry, ) @app.on_event("shutdown") async def on_shutdown(): if lingma: await lingma.close() @app.get("/healthz") async def healthz(): return {"ok": True, "time": int(time.time())} async def _ensure_logged_in_or_auto_login() -> dict: assert lingma is not None status = await lingma.auth_status() if status and status.get("id"): return status if not settings.auto_login_enabled: raise HTTPException(status_code=401, detail={"error": {"message": "Lingma not logged in"}}) if settings.dedicated_domain_url: current = await lingma.get_endpoint() current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" if current_ep != settings.dedicated_domain_url: await lingma.update_endpoint(settings.dedicated_domain_url) login_url, login_raw = await lingma.generate_login_url() if not login_url: raise HTTPException( status_code=500, detail={"error": {"message": f"generate login url failed: {login_raw}"}}, ) assert auto_login is not None await auto_login.ensure_started(login_url) try: await auto_login.wait_done(timeout=settings.auto_login_timeout + 20) except Exception: pass status = await lingma.auth_status() if status and status.get("id"): return status raise HTTPException( status_code=401, detail={"error": {"message": "Lingma auto login failed", "auto_login": auto_login.status()}}, ) @app.get("/v1/models", dependencies=[Depends(auth_guard)]) async def v1_models(): assert lingma is not None await _ensure_logged_in_or_auto_login() await stats_collector.inc_models() models = await lingma.query_models() keys = flatten_model_keys(models) name_map = build_model_name_map(models) resp = ModelsResponse(data=[ModelData(id=k, name=name_map.get(k)) for k in keys]) return JSONResponse(content=resp.model_dump()) def _messages_to_prompt(messages: list[dict]) -> str: parts = [] for m in messages: role = m.get("role", "user") content = m.get("content", "") parts.append(f"[{role}] {content}") return "\n".join(parts).strip() @app.post("/v1/chat/completions", dependencies=[Depends(auth_guard)]) async def v1_chat_completions(req: ChatCompletionsRequest): assert lingma is not None await _ensure_logged_in_or_auto_login() models = await lingma.query_models() available = flatten_model_keys(models) name_map = build_model_name_map(models) model = resolve_model(req.model, available, settings.default_model, name_map) ask_mode = settings.default_ask_mode if req.model.lower() in {"lingma-agent", "agent"}: ask_mode = "agent" prompt = _messages_to_prompt([m.model_dump() for m in req.messages]) if not prompt: raise HTTPException(status_code=400, detail={"error": {"message": "messages is empty"}}) prompt_tokens = estimate_tokens(prompt) if req.stream: created = int(time.time()) completion_id = f"chatcmpl-{uuid.uuid4().hex}" completion_tokens_holder = {"n": 0} async def event_stream(): success = False try: async for chunk in lingma.chat_stream(prompt, model, ask_mode): completion_tokens_holder["n"] += estimate_tokens(chunk) payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [ { "index": 0, "delta": {"content": chunk}, "finish_reason": None, } ], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" done_payload = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(done_payload, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" success = True finally: await stats_collector.record_chat( stream=True, success=success, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens_holder["n"], ) return StreamingResponse(event_stream(), media_type="text/event-stream") try: result = await lingma.chat_complete(prompt, model, ask_mode) except Exception: await stats_collector.record_chat( stream=False, success=False, prompt_tokens=prompt_tokens, completion_tokens=0, ) raise completion_tokens = estimate_tokens(result.get("text") or "") await stats_collector.record_chat( stream=False, success=True, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ) response = ChatCompletionResponse( id=f"chatcmpl-{uuid.uuid4().hex}", created=int(time.time()), model=model, choices=[ ChatCompletionChoice( index=0, finish_reason="stop", message={"role": "assistant", "content": result.get("text") or ""}, ) ], ) data = response.model_dump() data["latency"] = { "first_token_ms": result.get("firstTokenLatencyMs"), "total_ms": result.get("totalLatencyMs"), } data["usage"] = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } return JSONResponse(content=data) @app.post("/internal/auto-login/start", dependencies=[Depends(auth_guard)]) async def internal_auto_login_start(): assert lingma is not None assert auto_login is not None status = await lingma.auth_status() if status and status.get("id"): return {"ok": True, "state": "already_logged_in", "auth": status} if settings.dedicated_domain_url: current = await lingma.get_endpoint() current_ep = (current or {}).get("endpoint", "") if isinstance(current, dict) else "" if current_ep != settings.dedicated_domain_url: await lingma.update_endpoint(settings.dedicated_domain_url) login_url, login_raw = await lingma.generate_login_url() if not login_url: raise HTTPException(status_code=500, detail={"error": {"message": "generate login url failed", "raw": login_raw}}) started = await auto_login.ensure_started(login_url) return { "ok": True, "state": "running" if started else "already_running", "loginUrl": login_url, "auto_login": auto_login.status(), } @app.get("/internal/auto-login/status", dependencies=[Depends(auth_guard)]) async def internal_auto_login_status(): assert auto_login is not None assert lingma is not None return { "ok": True, "auto_login": auto_login.status(), "auth": await lingma.auth_status(), } @app.get("/internal/stats", dependencies=[Depends(auth_guard)]) async def internal_stats(): return {"ok": True, "stats": await stats_collector.snapshot()} @app.get("/metrics") async def metrics(): text = await stats_collector.prometheus_text() return StreamingResponse(iter([text]), media_type="text/plain; version=0.0.4")