Add a proxy-side tool emulation layer so Lingma requests can surface stable OpenAI tool_calls and Anthropic tool_use blocks even when upstream tool events are missing or inconsistent. Constraint: Keep native Lingma tool event bridging as the first path and layer emulation as a fallback Rejected: Depend exclusively on Lingma native tool/invoke events | tool visibility remains inconsistent across models and transports Confidence: high Scope-risk: moderate
272 lines
10 KiB
Python
272 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import types
|
|
import unittest
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
import zipfile
|
|
|
|
# app.lingma_pool imports auto_login; tests here don't execute Playwright paths.
|
|
# Stub module import so test environments without playwright can import pool code.
|
|
_playwright = types.ModuleType("playwright")
|
|
_playwright_async = types.ModuleType("playwright.async_api")
|
|
|
|
|
|
class _StubPlaywrightTimeoutError(Exception):
|
|
pass
|
|
|
|
|
|
async def _stub_async_playwright():
|
|
raise RuntimeError("playwright is stubbed in unit tests")
|
|
|
|
|
|
_playwright_async.TimeoutError = _StubPlaywrightTimeoutError
|
|
_playwright_async.async_playwright = _stub_async_playwright
|
|
sys.modules.setdefault("playwright", _playwright)
|
|
sys.modules.setdefault("playwright.async_api", _playwright_async)
|
|
|
|
from app.config import _parse_accounts, load_settings
|
|
from app.bootstrap_lingma import bootstrap_from_vsix
|
|
from app.lingma_pool import LingmaPool
|
|
from app.stats import StatsCollector, estimate_tokens
|
|
|
|
|
|
def _affinity_key_for_bucket(pool_size: int, bucket_index: int) -> str:
|
|
for i in range(20000):
|
|
key = f"k-{i}"
|
|
if abs(hash(key)) % pool_size == bucket_index:
|
|
return key
|
|
raise RuntimeError("failed to find affinity key")
|
|
|
|
|
|
class _FakeInstance:
|
|
def __init__(self, idx: int, *, healthy: bool, in_flight: int):
|
|
self.name = f"inst-{idx}"
|
|
self.cfg = SimpleNamespace(index=idx)
|
|
self._healthy = healthy
|
|
self.in_flight = in_flight
|
|
|
|
@property
|
|
def healthy(self) -> bool:
|
|
return self._healthy
|
|
|
|
|
|
class LingmaPoolRoutingTests(unittest.TestCase):
|
|
def test_pool_pick_prefers_healthy_affinity_bucket(self) -> None:
|
|
inst0 = _FakeInstance(0, healthy=True, in_flight=0)
|
|
inst1 = _FakeInstance(1, healthy=True, in_flight=9)
|
|
pool = LingmaPool([inst0, inst1])
|
|
|
|
key = _affinity_key_for_bucket(2, 1)
|
|
picked = pool.pick(affinity_key=key)
|
|
|
|
self.assertIs(picked, inst1)
|
|
|
|
def test_pool_pick_falls_back_to_least_in_flight_when_affinity_unhealthy(self) -> None:
|
|
inst0 = _FakeInstance(0, healthy=True, in_flight=1)
|
|
inst1 = _FakeInstance(1, healthy=False, in_flight=0)
|
|
inst2 = _FakeInstance(2, healthy=True, in_flight=1)
|
|
pool = LingmaPool([inst0, inst1, inst2])
|
|
|
|
key = _affinity_key_for_bucket(3, 1)
|
|
picked = pool.pick(affinity_key=key)
|
|
|
|
self.assertIs(picked, inst0)
|
|
|
|
def test_pool_pick_round_robin_when_all_unhealthy(self) -> None:
|
|
inst0 = _FakeInstance(0, healthy=False, in_flight=0)
|
|
inst1 = _FakeInstance(1, healthy=False, in_flight=0)
|
|
inst2 = _FakeInstance(2, healthy=False, in_flight=0)
|
|
pool = LingmaPool([inst0, inst1, inst2])
|
|
|
|
self.assertIs(pool.pick(), inst0)
|
|
self.assertIs(pool.pick(), inst1)
|
|
self.assertIs(pool.pick(), inst2)
|
|
self.assertIs(pool.pick(), inst0)
|
|
|
|
def test_pool_prometheus_lines_include_required_metrics(self) -> None:
|
|
inst0 = _FakeInstance(0, healthy=True, in_flight=2)
|
|
inst1 = _FakeInstance(1, healthy=False, in_flight=5)
|
|
pool = LingmaPool([inst0, inst1])
|
|
|
|
text = "\n".join(pool.prometheus_lines())
|
|
|
|
self.assertIn("# TYPE gateway_pool_instance_in_flight gauge", text)
|
|
self.assertIn("# TYPE gateway_pool_instance_ready gauge", text)
|
|
self.assertIn('gateway_pool_instance_in_flight{name="inst-0",idx="0"} 2', text)
|
|
self.assertIn('gateway_pool_instance_ready{name="inst-0",idx="0"} 1', text)
|
|
self.assertIn('gateway_pool_instance_ready{name="inst-1",idx="1"} 0', text)
|
|
|
|
|
|
class StatsCollectorTests(unittest.IsolatedAsyncioTestCase):
|
|
def test_estimate_tokens_empty_short_utf8(self) -> None:
|
|
self.assertEqual(estimate_tokens(""), 0)
|
|
self.assertGreaterEqual(estimate_tokens("a"), 1)
|
|
self.assertEqual(estimate_tokens("你好世界"), 3)
|
|
|
|
async def test_record_chat_updates_counters_and_clamps_negative_tokens(self) -> None:
|
|
s = StatsCollector()
|
|
|
|
await s.record_chat(stream=True, success=True, prompt_tokens=-3, completion_tokens=5)
|
|
await s.record_chat(stream=False, success=False, prompt_tokens=2, completion_tokens=-7)
|
|
snap = await s.snapshot()
|
|
|
|
self.assertEqual(snap["chat_requests_total"], 2)
|
|
self.assertEqual(snap["chat_requests_success"], 1)
|
|
self.assertEqual(snap["chat_requests_error"], 1)
|
|
self.assertEqual(snap["chat_stream_requests"], 1)
|
|
self.assertEqual(snap["chat_non_stream_requests"], 1)
|
|
self.assertEqual(snap["prompt_tokens_estimated_total"], 2)
|
|
self.assertEqual(snap["completion_tokens_estimated_total"], 5)
|
|
|
|
async def test_snapshot_and_prometheus_text_consistency(self) -> None:
|
|
s = StatsCollector()
|
|
|
|
await s.record_chat(stream=True, success=True, prompt_tokens=3, completion_tokens=4)
|
|
snap = await s.snapshot()
|
|
text = await s.prometheus_text()
|
|
|
|
self.assertEqual(snap["total_tokens_estimated"], 7)
|
|
self.assertIn("gateway_total_tokens_estimated 7", text)
|
|
self.assertIn("gateway_chat_requests_total 1", text)
|
|
self.assertTrue(text.endswith("\n"))
|
|
|
|
|
|
class ConfigParsingTests(unittest.TestCase):
|
|
def test_parse_accounts_accepts_json_csv_newline_formats(self) -> None:
|
|
raw_json = json.dumps([
|
|
{"username": "u1", "password": "p1"},
|
|
{"username": "u2", "password": "p2"},
|
|
])
|
|
parsed_json = _parse_accounts(raw_json)
|
|
self.assertEqual([a.username for a in parsed_json], ["u1", "u2"])
|
|
|
|
parsed_csv = _parse_accounts("u3:p3,u4:p4")
|
|
self.assertEqual([a.username for a in parsed_csv], ["u3", "u4"])
|
|
|
|
parsed_nl = _parse_accounts("u5:p5\nu6:p6")
|
|
self.assertEqual([a.username for a in parsed_nl], ["u5", "u6"])
|
|
|
|
def test_parse_accounts_allows_bundle_only_in_json(self) -> None:
|
|
raw = json.dumps([{"session_bundle": "abc"}])
|
|
parsed = _parse_accounts(raw)
|
|
|
|
self.assertEqual(len(parsed), 1)
|
|
self.assertEqual(parsed[0].username, "")
|
|
self.assertEqual(parsed[0].password, "")
|
|
self.assertEqual(parsed[0].session_bundle_b64, "abc")
|
|
|
|
def test_parse_accounts_csv_splits_only_first_colon(self) -> None:
|
|
parsed = _parse_accounts("u:p:with:colon")
|
|
|
|
self.assertEqual(len(parsed), 1)
|
|
self.assertEqual(parsed[0].username, "u")
|
|
self.assertEqual(parsed[0].password, "p:with:colon")
|
|
|
|
def test_load_settings_creates_bundle_only_account_without_credentials(self) -> None:
|
|
with patch.dict(os.environ, {"LINGMA_SESSION_BUNDLE": "abc"}, clear=True):
|
|
settings = load_settings()
|
|
|
|
self.assertEqual(len(settings.accounts), 1)
|
|
self.assertEqual(settings.accounts[0].username, "")
|
|
self.assertEqual(settings.accounts[0].password, "")
|
|
self.assertEqual(settings.accounts[0].session_bundle_b64, "abc")
|
|
|
|
def test_load_settings_invalid_instance_count_fallback(self) -> None:
|
|
with patch.dict(
|
|
os.environ,
|
|
{"LINGMA_ACCOUNTS": "u1:p1,u2:p2", "LINGMA_INSTANCE_COUNT": "not-a-number"},
|
|
clear=True,
|
|
):
|
|
settings_with_accounts = load_settings()
|
|
|
|
self.assertEqual(settings_with_accounts.instance_count, 2)
|
|
|
|
with patch.dict(os.environ, {"LINGMA_INSTANCE_COUNT": "not-a-number"}, clear=True):
|
|
settings_without_accounts = load_settings()
|
|
|
|
self.assertEqual(settings_without_accounts.instance_count, 1)
|
|
def test_load_settings_parses_tool_allowlist_csv(self) -> None:
|
|
with patch.dict(os.environ, {"TOOL_ALLOWLIST": " lookup , write_file ,,search_docs "}, clear=True):
|
|
settings = load_settings()
|
|
|
|
self.assertEqual(settings.tool_allowlist, ["lookup", "write_file", "search_docs"])
|
|
|
|
def test_load_settings_defaults_tool_forward_enabled_true(self) -> None:
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
settings = load_settings()
|
|
|
|
self.assertTrue(settings.tool_forward_enabled)
|
|
|
|
def test_load_settings_respects_tool_forward_enabled_false(self) -> None:
|
|
with patch.dict(os.environ, {"TOOL_FORWARD_ENABLED": "false"}, clear=True):
|
|
settings = load_settings()
|
|
|
|
self.assertFalse(settings.tool_forward_enabled)
|
|
|
|
def test_load_settings_empty_tool_allowlist(self) -> None:
|
|
with patch.dict(os.environ, {"TOOL_ALLOWLIST": " , , "}, clear=True):
|
|
settings = load_settings()
|
|
|
|
self.assertEqual(settings.tool_allowlist, [])
|
|
|
|
|
|
class BootstrapLingmaTests(unittest.TestCase):
|
|
def _make_test_vsix(self, root: str) -> str:
|
|
nested_zip_path = os.path.join(root, "nested.zip")
|
|
with zipfile.ZipFile(nested_zip_path, "w") as nested:
|
|
nested.writestr("2.5.20/x86_64_linux/Lingma", b"new-binary")
|
|
nested.writestr("2.5.20/extension/main.js", b"console.log('ok')")
|
|
|
|
vsix_path = os.path.join(root, "test.vsix")
|
|
with zipfile.ZipFile(vsix_path, "w") as vsix:
|
|
with open(nested_zip_path, "rb") as nested_file:
|
|
vsix.writestr(
|
|
"extension/dist/bin/lingma-2.5.20.zip",
|
|
nested_file.read(),
|
|
)
|
|
return vsix_path
|
|
|
|
def test_bootstrap_refreshes_when_extension_assets_missing(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
bin_dir = os.path.join(tmpdir, "data", "bin")
|
|
release_dir = os.path.join(bin_dir, "2.5.20")
|
|
os.makedirs(release_dir, exist_ok=True)
|
|
|
|
lingma_bin = os.path.join(bin_dir, "Lingma")
|
|
with open(lingma_bin, "wb") as f:
|
|
f.write(b"old-binary")
|
|
|
|
marker = {
|
|
"version": "2.5.20",
|
|
"release_root": "2.5.20",
|
|
}
|
|
with open(os.path.join(bin_dir, ".lingma-bootstrap.json"), "w", encoding="utf-8") as f:
|
|
json.dump(marker, f)
|
|
|
|
vsix_path = self._make_test_vsix(tmpdir)
|
|
|
|
env = {
|
|
"LINGMA_BIN": lingma_bin,
|
|
"LINGMA_SOURCE_TYPE": "vsix",
|
|
"LINGMA_VSIX_URL": f"file://{vsix_path}",
|
|
"LINGMA_BOOTSTRAP_ALWAYS": "false",
|
|
"LINGMA_FORCE_REFRESH": "false",
|
|
}
|
|
with patch.dict(os.environ, env, clear=False):
|
|
bootstrap_from_vsix()
|
|
|
|
with open(lingma_bin, "rb") as f:
|
|
self.assertEqual(f.read(), b"new-binary")
|
|
self.assertTrue(
|
|
os.path.exists(os.path.join(release_dir, "extension", "main.js"))
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|