chore: initialize clean history without secrets
Some checks failed
CI / lint-and-compile (push) Has been cancelled
Some checks failed
CI / lint-and-compile (push) Has been cancelled
This commit is contained in:
386
app/lingma_client.py
Normal file
386
app/lingma_client.py
Normal file
@@ -0,0 +1,386 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import AsyncIterator
|
||||
|
||||
import websockets
|
||||
|
||||
|
||||
def _is_port_open(host: str, port: int, timeout_sec: float = 0.5) -> bool:
|
||||
try:
|
||||
with socket.create_connection((host, port), timeout=timeout_sec):
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _read_info_file(info_path: Path):
|
||||
if not info_path.exists():
|
||||
return None, None
|
||||
txt = info_path.read_text(encoding="utf-8", errors="ignore").strip()
|
||||
if not txt:
|
||||
return None, None
|
||||
lines = txt.splitlines()
|
||||
if len(lines) < 2:
|
||||
return None, None
|
||||
try:
|
||||
return int(lines[0].strip()), int(lines[1].strip())
|
||||
except ValueError:
|
||||
return None, None
|
||||
|
||||
|
||||
def _wait_info_any(info_paths: list[Path], timeout_sec: int):
|
||||
start = time.time()
|
||||
while time.time() - start < timeout_sec:
|
||||
for p in info_paths:
|
||||
port, pid = _read_info_file(p)
|
||||
if port and pid:
|
||||
return port, pid, p
|
||||
time.sleep(0.2)
|
||||
raise TimeoutError(".info not ready")
|
||||
|
||||
|
||||
def _encode_lsp_frame(payload_obj: dict) -> bytes:
|
||||
body = json.dumps(payload_obj, ensure_ascii=False).encode("utf-8")
|
||||
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
|
||||
return header + body
|
||||
|
||||
|
||||
def _parse_lsp_frames(buf: bytes):
|
||||
frames = []
|
||||
while True:
|
||||
header_end = buf.find(b"\r\n\r\n")
|
||||
if header_end < 0:
|
||||
break
|
||||
header = buf[:header_end]
|
||||
body_start = header_end + 4
|
||||
content_length = None
|
||||
for line in header.split(b"\r\n"):
|
||||
if line.lower().startswith(b"content-length:"):
|
||||
content_length = int(line.split(b":", 1)[1].strip())
|
||||
break
|
||||
if content_length is None:
|
||||
buf = buf[body_start:]
|
||||
continue
|
||||
if len(buf) < body_start + content_length:
|
||||
break
|
||||
body = buf[body_start : body_start + content_length]
|
||||
frames.append(body.decode("utf-8", errors="ignore"))
|
||||
buf = buf[body_start + content_length :]
|
||||
return frames, buf
|
||||
|
||||
|
||||
class LspWsRpcClient:
|
||||
def __init__(self, ws):
|
||||
self.ws = ws
|
||||
self._id = 1
|
||||
self._pending: dict[int, asyncio.Future] = {}
|
||||
self._send_lock = asyncio.Lock()
|
||||
self._reader_task = None
|
||||
self._rx_buffer = b""
|
||||
self._chat_streams: dict[str, dict] = {}
|
||||
|
||||
async def start(self):
|
||||
self._reader_task = asyncio.create_task(self._reader_loop())
|
||||
|
||||
async def close(self):
|
||||
if self._reader_task:
|
||||
self._reader_task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
await self._reader_task
|
||||
|
||||
async def _send(self, payload: dict):
|
||||
async with self._send_lock:
|
||||
await self.ws.send(_encode_lsp_frame(payload))
|
||||
|
||||
async def _reader_loop(self):
|
||||
try:
|
||||
while True:
|
||||
raw = await self.ws.recv()
|
||||
chunk = raw if isinstance(raw, bytes) else raw.encode("utf-8", errors="ignore")
|
||||
self._rx_buffer += chunk
|
||||
bodies, self._rx_buffer = _parse_lsp_frames(self._rx_buffer)
|
||||
for body in bodies:
|
||||
try:
|
||||
msg = json.loads(body)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if "method" in msg and "result" not in msg and "error" not in msg:
|
||||
await self._handle_server_message(msg)
|
||||
continue
|
||||
|
||||
rid = msg.get("id")
|
||||
if rid is None:
|
||||
continue
|
||||
fut = self._pending.pop(rid, None)
|
||||
if fut and not fut.done():
|
||||
fut.set_result(msg)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
for fut in self._pending.values():
|
||||
if not fut.done():
|
||||
fut.set_exception(exc)
|
||||
self._pending.clear()
|
||||
|
||||
async def _handle_server_message(self, msg: dict):
|
||||
method = msg.get("method")
|
||||
params = msg.get("params") or {}
|
||||
|
||||
if method == "chat/answer":
|
||||
req_id = params.get("requestId")
|
||||
stream = self._chat_streams.get(req_id)
|
||||
if stream is not None:
|
||||
text = params.get("text") or params.get("content") or ""
|
||||
if text:
|
||||
stream["parts"].append(text)
|
||||
if stream["first_chunk_at"] is None:
|
||||
stream["first_chunk_at"] = time.monotonic()
|
||||
stream["chunks"].put_nowait(text)
|
||||
|
||||
if method == "chat/finish":
|
||||
req_id = params.get("requestId")
|
||||
stream = self._chat_streams.get(req_id)
|
||||
if stream is not None and not stream["done"].is_set():
|
||||
stream["finish"] = params
|
||||
stream["finish_at"] = time.monotonic()
|
||||
stream["done"].set()
|
||||
stream["chunks"].put_nowait(None)
|
||||
|
||||
if "id" in msg:
|
||||
await self._send({"jsonrpc": "2.0", "id": msg.get("id"), "result": {}})
|
||||
|
||||
async def request(self, method, params=None, timeout=20):
|
||||
rid = self._id
|
||||
self._id += 1
|
||||
payload = {"jsonrpc": "2.0", "id": rid, "method": method, "params": params or {}}
|
||||
fut = asyncio.get_running_loop().create_future()
|
||||
self._pending[rid] = fut
|
||||
await self._send(payload)
|
||||
try:
|
||||
msg = await asyncio.wait_for(fut, timeout=timeout)
|
||||
except TimeoutError:
|
||||
self._pending.pop(rid, None)
|
||||
raise TimeoutError(f"RPC timeout: {method}")
|
||||
if "error" in msg:
|
||||
raise RuntimeError(f"RPC {method} error: {msg['error']}")
|
||||
return msg.get("result")
|
||||
|
||||
async def notify(self, method, params=None):
|
||||
await self._send({"jsonrpc": "2.0", "method": method, "params": params or {}})
|
||||
|
||||
def create_stream(self, request_id: str):
|
||||
self._chat_streams[request_id] = {
|
||||
"parts": [],
|
||||
"chunks": asyncio.Queue(),
|
||||
"done": asyncio.Event(),
|
||||
"finish": None,
|
||||
"started_at": time.monotonic(),
|
||||
"first_chunk_at": None,
|
||||
"finish_at": None,
|
||||
}
|
||||
|
||||
async def consume_stream(self, request_id: str, timeout: float) -> AsyncIterator[str]:
|
||||
stream = self._chat_streams[request_id]
|
||||
start = time.monotonic()
|
||||
while True:
|
||||
remain = timeout - (time.monotonic() - start)
|
||||
if remain <= 0:
|
||||
raise TimeoutError("chat stream timeout")
|
||||
chunk = await asyncio.wait_for(stream["chunks"].get(), timeout=remain)
|
||||
if chunk is None:
|
||||
break
|
||||
yield chunk
|
||||
|
||||
def get_stream_result(self, request_id: str) -> dict:
|
||||
stream = self._chat_streams.get(request_id) or {}
|
||||
first_ms = None
|
||||
total_ms = None
|
||||
if stream.get("first_chunk_at") is not None:
|
||||
first_ms = int((stream["first_chunk_at"] - stream["started_at"]) * 1000)
|
||||
if stream.get("finish_at") is not None:
|
||||
total_ms = int((stream["finish_at"] - stream["started_at"]) * 1000)
|
||||
return {
|
||||
"text": "".join(stream.get("parts") or []),
|
||||
"finish": stream.get("finish") or {},
|
||||
"firstTokenLatencyMs": first_ms,
|
||||
"totalLatencyMs": total_ms,
|
||||
}
|
||||
|
||||
|
||||
class LingmaGatewayClient:
|
||||
def __init__(
|
||||
self,
|
||||
lingma_bin: str,
|
||||
work_dir: str,
|
||||
socket_port: int,
|
||||
startup_timeout: int,
|
||||
rpc_timeout: int,
|
||||
default_model: str,
|
||||
default_ask_mode: str,
|
||||
):
|
||||
self.lingma_bin = Path(lingma_bin)
|
||||
self.work_dir = Path(work_dir)
|
||||
self.socket_port = socket_port
|
||||
self.startup_timeout = startup_timeout
|
||||
self.rpc_timeout = rpc_timeout
|
||||
self.default_model = default_model
|
||||
self.default_ask_mode = default_ask_mode
|
||||
self._rpc: LspWsRpcClient | None = None
|
||||
self._ws = None
|
||||
|
||||
async def start(self):
|
||||
if not self.lingma_bin.exists():
|
||||
raise FileNotFoundError(f"Lingma not found: {self.lingma_bin}")
|
||||
if not _is_port_open("127.0.0.1", self.socket_port):
|
||||
self.work_dir.mkdir(parents=True, exist_ok=True)
|
||||
# Remove stale info files from host-mounted workspace before boot.
|
||||
for p in [self.work_dir / ".info", Path.home() / ".lingma" / ".info"]:
|
||||
with contextlib.suppress(Exception):
|
||||
if p.exists():
|
||||
p.unlink()
|
||||
subprocess.Popen(
|
||||
[str(self.lingma_bin), "start", "--workDir", str(self.work_dir)],
|
||||
cwd=str(self.lingma_bin.parent),
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
start_new_session=True,
|
||||
)
|
||||
info, _, _ = _wait_info_any(
|
||||
[self.work_dir / ".info", Path.home() / ".lingma" / ".info"],
|
||||
timeout_sec=self.startup_timeout,
|
||||
)
|
||||
self.socket_port = info
|
||||
|
||||
# Wait for socket to actually become connectable.
|
||||
deadline = time.time() + self.startup_timeout
|
||||
while time.time() < deadline:
|
||||
if _is_port_open("127.0.0.1", self.socket_port, timeout_sec=0.3):
|
||||
break
|
||||
await asyncio.sleep(0.2)
|
||||
else:
|
||||
raise TimeoutError(f"Lingma socket not open on port {self.socket_port}")
|
||||
|
||||
ws_url = f"ws://127.0.0.1:{self.socket_port}"
|
||||
self._ws = await websockets.connect(ws_url, max_size=10 * 1024 * 1024)
|
||||
self._rpc = LspWsRpcClient(self._ws)
|
||||
await self._rpc.start()
|
||||
await self._rpc.request(
|
||||
"initialize",
|
||||
{
|
||||
"processId": os.getpid(),
|
||||
"clientInfo": {"name": "lingma-openai-gateway", "version": "0.1.0"},
|
||||
"capabilities": {},
|
||||
"workspaceFolders": [],
|
||||
"rootUri": None,
|
||||
},
|
||||
timeout=self.rpc_timeout,
|
||||
)
|
||||
await self._rpc.notify("initialized", {})
|
||||
|
||||
async def close(self):
|
||||
if self._rpc:
|
||||
await self._rpc.close()
|
||||
if self._ws:
|
||||
await self._ws.close()
|
||||
|
||||
@property
|
||||
def rpc(self) -> LspWsRpcClient:
|
||||
if self._rpc is None:
|
||||
raise RuntimeError("Lingma RPC not initialized")
|
||||
return self._rpc
|
||||
|
||||
async def auth_status(self):
|
||||
return await self.rpc.request("auth/status", {}, timeout=self.rpc_timeout)
|
||||
|
||||
async def query_models(self):
|
||||
return await self.rpc.request("config/queryModels", {}, timeout=self.rpc_timeout)
|
||||
|
||||
async def get_endpoint(self):
|
||||
return await self.rpc.request("config/getEndpoint", {}, timeout=self.rpc_timeout)
|
||||
|
||||
async def update_endpoint(self, endpoint: str):
|
||||
return await self.rpc.request("config/updateEndpoint", {"endpoint": endpoint}, timeout=self.rpc_timeout)
|
||||
|
||||
async def generate_login_url(self):
|
||||
result = await self.rpc.request("login/generateUrl", {}, timeout=self.rpc_timeout)
|
||||
if isinstance(result, str):
|
||||
return result, {"raw": result}
|
||||
if isinstance(result, dict):
|
||||
for key in ("loginUrl", "url", "login_url"):
|
||||
if isinstance(result.get(key), str):
|
||||
return result[key], result
|
||||
return "", result
|
||||
return "", {"raw": result}
|
||||
|
||||
def _build_payload(self, prompt: str, model_key: str, ask_mode: str, session_id: str, request_id: str):
|
||||
session_type = "developer" if ask_mode == "agent" else "chat"
|
||||
return {
|
||||
"requestId": request_id,
|
||||
"sessionId": session_id,
|
||||
"sessionType": session_type,
|
||||
"chatTask": "FREE_INPUT",
|
||||
"mode": ask_mode,
|
||||
"stream": True,
|
||||
"source": 1,
|
||||
"isReply": False,
|
||||
"taskDefinitionType": "system",
|
||||
"content": prompt,
|
||||
"text": prompt,
|
||||
"message": prompt,
|
||||
"questionText": prompt,
|
||||
"extra": {
|
||||
"modelConfig": {"key": model_key},
|
||||
"workspacePath": str(Path.cwd()),
|
||||
},
|
||||
"pluginPayloadConfig": {
|
||||
"isEnableAskAgent": ask_mode == "agent",
|
||||
"isEnableAutoMemory": True,
|
||||
},
|
||||
"chatContext": {
|
||||
"text": prompt,
|
||||
"features": [],
|
||||
"preferredLanguage": "zh-CN",
|
||||
"localeLang": "zh-CN",
|
||||
},
|
||||
}
|
||||
|
||||
async def chat_complete(self, prompt: str, model_key: str, ask_mode: str) -> dict:
|
||||
request_id = str(uuid.uuid4())
|
||||
session_id = str(uuid.uuid4())
|
||||
payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id)
|
||||
self.rpc.create_stream(request_id)
|
||||
try:
|
||||
await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout)
|
||||
except (TimeoutError, asyncio.TimeoutError):
|
||||
pass
|
||||
async for _ in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 20.0)):
|
||||
pass
|
||||
result = self.rpc.get_stream_result(request_id)
|
||||
finish = result.get("finish") or {}
|
||||
result["requestId"] = request_id
|
||||
result["sessionId"] = finish.get("sessionId") or session_id
|
||||
result["model"] = model_key
|
||||
result["mode"] = ask_mode
|
||||
return result
|
||||
|
||||
async def chat_stream(self, prompt: str, model_key: str, ask_mode: str) -> AsyncIterator[str]:
|
||||
request_id = str(uuid.uuid4())
|
||||
session_id = str(uuid.uuid4())
|
||||
payload = self._build_payload(prompt, model_key, ask_mode, session_id, request_id)
|
||||
self.rpc.create_stream(request_id)
|
||||
try:
|
||||
await self.rpc.request("chat/ask", payload, timeout=self.rpc_timeout)
|
||||
except (TimeoutError, asyncio.TimeoutError):
|
||||
pass
|
||||
async for chunk in self.rpc.consume_stream(request_id, timeout=max(20.0, self.rpc_timeout + 40.0)):
|
||||
yield chunk
|
||||
Reference in New Issue
Block a user