from __future__ import annotations import asyncio import contextlib import json import os import socket import subprocess import time import uuid from pathlib import Path from typing import AsyncIterator import websockets def _is_port_open(host: str, port: int, timeout_sec: float = 0.5) -> bool: try: with socket.create_connection((host, port), timeout=timeout_sec): return True except OSError: return False def _read_info_file(info_path: Path): if not info_path.exists(): return None, None txt = info_path.read_text(encoding="utf-8", errors="ignore").strip() if not txt: return None, None lines = txt.splitlines() if len(lines) < 2: return None, None try: return int(lines[0].strip()), int(lines[1].strip()) except ValueError: return None, None def _wait_info_any(info_paths: list[Path], timeout_sec: int): start = time.time() while time.time() - start < timeout_sec: for p in info_paths: port, pid = _read_info_file(p) if port and pid: return port, pid, p time.sleep(0.2) raise TimeoutError(".info not ready") def _encode_lsp_frame(payload_obj: dict) -> bytes: body = json.dumps(payload_obj, ensure_ascii=False).encode("utf-8") header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii") return header + body def _parse_lsp_frames(buf: bytes): frames = [] while True: header_end = buf.find(b"\r\n\r\n") if header_end < 0: break header = buf[:header_end] body_start = header_end + 4 content_length = None for line in header.split(b"\r\n"): if line.lower().startswith(b"content-length:"): content_length = int(line.split(b":", 1)[1].strip()) break if content_length is None: buf = buf[body_start:] continue if len(buf) < body_start + content_length: break body = buf[body_start : body_start + content_length] frames.append(body.decode("utf-8", errors="ignore")) buf = buf[body_start + content_length :] return frames, buf class LspWsRpcClient: def __init__(self, ws): self.ws = ws self._id = 1 self._pending: dict[int, asyncio.Future] = {} self._send_lock = asyncio.Lock() self._reader_task = None self._rx_buffer = b"" self._chat_streams: dict[str, dict] = {} async def start(self): self._reader_task = asyncio.create_task(self._reader_loop()) async def close(self): if self._reader_task: self._reader_task.cancel() with contextlib.suppress(Exception): await self._reader_task async def _send(self, payload: dict): async with self._send_lock: await self.ws.send(_encode_lsp_frame(payload)) async def _reader_loop(self): try: while True: raw = await self.ws.recv() chunk = raw if isinstance(raw, bytes) else raw.encode("utf-8", errors="ignore") self._rx_buffer += chunk bodies, self._rx_buffer = _parse_lsp_frames(self._rx_buffer) for body in bodies: try: msg = json.loads(body) except Exception: continue if "method" in msg and "result" not in msg and "error" not in msg: await self._handle_server_message(msg) continue rid = msg.get("id") if rid is None: continue fut = self._pending.pop(rid, None) if fut and not fut.done(): fut.set_result(msg) except asyncio.CancelledError: pass except Exception as exc: for fut in self._pending.values(): if not fut.done(): fut.set_exception(exc) self._pending.clear() async def _handle_server_message(self, msg: dict): method = msg.get("method") params = msg.get("params") or {} if method == "chat/answer": req_id = params.get("requestId") stream = self._chat_streams.get(req_id) if stream is not None: text = params.get("text") or params.get("content") or "" if text: stream["parts"].append(text) if stream["first_chunk_at"] is None: stream["first_chunk_at"] = time.monotonic() stream["chunks"].put_nowait(text) if method == "chat/finish": req_id = params.get("requestId") stream = self._chat_streams.get(req_id) if stream is not None and not stream["done"].is_set(): stream["finish"] = params stream["finish_at"] = time.monotonic() stream["done"].set() stream["chunks"].put_nowait(None) if "id" in msg: await self._send({"jsonrpc": "2.0", "id": msg.get("id"), "result": {}}) async def request(self, method, params=None, timeout=20): rid = self._id self._id += 1 payload = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params or {}} fut = asyncio.get_running_loop().create_future() self._pending[rid] = fut await self._send(payload) try: msg = await asyncio.wait_for(fut, timeout=timeout) except TimeoutError: self._pending.pop(rid, None) raise TimeoutError(f"RPC timeout: {method}") if "error" in msg: raise RuntimeError(f"RPC {method} error: {msg['error']}") return msg.get("result") async def notify(self, method, params=None): await self._send({"jsonrpc": "2.0", "method": method, "params": params or {}}) def create_stream(self, request_id: str): self._chat_streams[request_id] = { "parts": [], "chunks": asyncio.Queue(), "done": asyncio.Event(), "finish": None, "started_at": time.monotonic(), "first_chunk_at": None, "finish_at": None, } async def consume_stream(self, request_id: str, timeout: float) -> AsyncIterator[str]: stream = self._chat_streams[request_id] start = time.monotonic() while True: remain = timeout - (time.monotonic() - start) if remain <= 0: raise TimeoutError("chat stream timeout") chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain) if chunk is None: break yield chunk def get_stream_result(self, request_id: str) -> dict: stream = self._chat_streams.get(request_id) or {} first_ms = None total_ms = None if stream.get("first_chunk_at") is not None: first_ms = int((stream["first_chunk_at"] - stream["started_at"]) * 1000) if stream.get("finish_at") is not None: total_ms = int((stream["finish_at"] - stream["started_at"]) * 1000) return { "text": "".join(stream.get("parts") or []), "finish": stream.get("finish") or {}, "firstTokenLatencyMs": first_ms, "totalLatencyMs": total_ms, } class LingmaGatewayClient: def __init__( self, lingma_bin: str, work_dir: str, socket_port: int, startup_timeout: int, rpc_timeout: int, default_model: str, default_ask_mode: str, ): self.lingma_bin = Path(lingma_bin) self.work_dir = Path(work_dir) self.socket_port = socket_port self.startup_timeout = startup_timeout self.rpc_timeout = rpc_timeout self.default_model = default_model self.default_ask_mode = default_ask_mode self._rpc: LspWsRpcClient | None = None self._ws = None async def start(self): if not self.lingma_bin.exists(): raise FileNotFoundError(f"Lingma not found: {self.lingma_bin}") if not _is_port_open("127.0.0.1", self.socket_port): self.work_dir.mkdir(parents=True, exist_ok=True) # Remove stale info files from host-mounted workspace before boot. for p in [self.work_dir / ".info", Path.home() / ".lingma" / ".info"]: with contextlib.suppress(Exception): if p.exists(): p.unlink() subprocess.Popen( [str(self.lingma_bin), "start", "--workDir", str(self.work_dir)], cwd=str(self.lingma_bin.parent), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) info, _, _ = _wait_info_any( [self.work_dir / ".info", Path.home() / ".lingma" / ".info"], timeout_sec=self.startup_timeout, ) self.socket_port = info # Wait for socket to actually become connectable. deadline = time.time() + self.startup_timeout while time.time() < deadline: if _is_port_open("127.0.0.1", self.socket_port, timeout_sec=0.3): break await asyncio.sleep(0.2) else: raise TimeoutError(f"Lingma socket not open on port {self.socket_port}") ws_url = f"ws://127.0.0.1:{self.socket_port}" self._ws = await websockets.connect(ws_url, max_size=10 * 1024 * 1024) self._rpc = LspWsRpcClient(self._ws) await self._rpc.start() await self._rpc.request( "initialize", { "processId": os.getpid(), "clientInfo": {"name": "lingma-openai-gateway", "version": "0.1.0"}, "capabilities": {}, "workspaceFolders": [], "rootUri": None, }, timeout=self.rpc_timeout, ) await self._rpc.notify("initialized", {}) async def close(self): if self._rpc: await self._rpc.close() if self._ws: await self._ws.close() @property def rpc(self) -> LspWsRpcClient: if self._rpc is None: raise RuntimeError("Lingma RPC not initialized") return self._rpc async def auth_status(self): return await self.rpc.request("auth/status", {}, timeout=self.rpc_timeout) async def query_models(self): return await self.rpc.request("config/queryModels", {}, timeout=self.rpc_timeout) async def get_endpoint(self): return await self.rpc.request("config/getEndpoint", {}, timeout=self.rpc_timeout) async def update_endpoint(self, endpoint: str): return await self.rpc.request("config/updateEndpoint", {"endpoint": endpoint}, timeout=self.rpc_timeout) async def generate_login_url(self): result = await self.rpc.request("login/generateUrl", {}, timeout=self.rpc_timeout) if isinstance(result, str): return result, {"raw": result} if isinstance(result, dict): for key in ("loginUrl", "url", "login_url"): if isinstance(result.get(key), str): return result[key], result return "", result return "", {"raw": result} def _build_payload(self, prompt: str, model_key: str, ask_mode: str, session_id: str, request_id: str): session_type = "developer" if ask_mode == "agent" else "chat" return { "requestId": request_id, "sessionId": session_id, "sessionType": session_type, "chatTask": "FREE_INPUT", "mode": ask_mode, "stream": True, "source": 1, "isReply": False, "taskDefinitionType": "system", "content": prompt, "text": prompt, "message": prompt, "questionText": prompt, "extra": { "modelConfig": {"key": model_key}, "workspacePath": str(Path.cwd()), }, "pluginPayloadConfig": { "isEnableAskAgent": ask_mode == "agent", "isEnableAutoMemory": True, }, "chatContext": { "text": prompt, "features": [], "preferredLanguage": "zh-CN", "localeLang": "zh-CN", }, } async def chat_complete(self, prompt: str, model_key: str, ask_mode: str) -> dict: request_id = str(uuid.uuid4()) session_id = str(uuid.uuid4()) payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id) self.rpc.create_stream(request_id) try: await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout) except (TimeoutError, asyncio.TimeoutError): pass async for _ in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 20.0)): pass result = self.rpc.get_stream_result(request_id) finish = result.get("finish") or {} result["requestId"] = request_id result["sessionId"] = finish.get("sessionId") or session_id result["model"] = model_key result["mode"] = ask_mode return result async def chat_stream(self, prompt: str, model_key: str, ask_mode: str) -> AsyncIterator[str]: request_id = str(uuid.uuid4()) session_id = str(uuid.uuid4()) payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id) self.rpc.create_stream(request_id) try: await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout) except (TimeoutError, asyncio.TimeoutError): pass async for chunk in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 40.0)): yield chunk