Behavior hardening (M1):
- Fix `_chat_streams` memory leak: pop_stream on completion, error, and
client disconnect.
- Add WebSocket reconnect with state machine (stopped/starting/ready/
reconnecting/failed/closed) and exponential backoff, so a Lingma
restart no longer requires restarting the gateway.
- Lazy initialization: startup failure is non-fatal, first real request
triggers retry, `/healthz` reflects readiness.
- Migrate FastAPI on_event to lifespan.
- Structured JSON logging with request_id ContextVar; `x-request-id`
propagated to responses.
- SSE now sets `Cache-Control: no-cache`, `X-Accel-Buffering: no` to
defeat proxy buffering.
- OpenAI schema compatibility: `content` accepts str | list[parts] | None,
added `developer`/`function` roles, `tools/tool_choice/stream_options/
user/max_tokens` fields, and `stream_options.include_usage` emits final
usage chunk.
- `require_bearer` uses `hmac.compare_digest`; `/metrics` now requires
Bearer when `METRICS_TOKEN` or `API_KEYS` are set.
- Python 3.10/3.11 `TimeoutError` vs `asyncio.TimeoutError` unified.
- Error responses no longer leak `auto_login.status()` details.
Backpressure (M2 / A2):
- New `InFlightGuard` with per-request ticket, queue + rejection
accounting, `BackpressureRejected` raises 429 + `Retry-After` once
`GATEWAY_QUEUE_TIMEOUT_SEC` elapses.
- Streaming ticket ownership transfers to the generator so CancelledError
from client disconnect still releases the slot.
- `/internal/stats.concurrency` and `/metrics` expose in_flight/queued/
accepted_total/rejected_total/max_in_flight.
Multi-instance pool (M2 / A1 + B3):
- New `LingmaPool` with N processes, each with its own workDir, socket
port (dynamic when N>1), and `AutoLoginManager`.
- Account parser supports CSV (`u1:p1,u2:p2`) and JSON formats via
`LINGMA_ACCOUNTS`; falls back to `LINGMA_USERNAME/LINGMA_PASSWORD` for
backwards compatibility (N=1 keeps legacy paths/ports).
- Routing: sticky affinity by `user` / system-prompt hash, then
least-in-flight, finally round-robin fallback for unhealthy pool.
- `/healthz` reports per-instance state and ready count.
- `/internal/stats.pool` and `/metrics` expose per-instance
`gateway_pool_instance_in_flight{name}` / `gateway_pool_instance_ready{name}`.
- `/internal/auto-login/start?instance=inst-N` targets a specific instance;
`/internal/auto-login/status` lists all instances.
Compat notes:
- `.env.example` adds `METRICS_TOKEN`, `LOG_LEVEL`, `GATEWAY_MAX_IN_FLIGHT`,
`GATEWAY_QUEUE_TIMEOUT_SEC`, `LINGMA_ACCOUNTS`, `LINGMA_INSTANCE_COUNT`.
- `.gitignore` cleaned up data/ duplication.
- Existing single-instance deployments keep working without config change.
Made-with: Cursor
57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
import contextvars
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
|
|
|
|
request_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
|
|
"request_id", default="-"
|
|
)
|
|
|
|
|
|
class _JsonFormatter(logging.Formatter):
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
ts = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created))
|
|
data: dict = {
|
|
"ts": f"{ts}.{int(record.msecs):03d}Z",
|
|
"level": record.levelname,
|
|
"logger": record.name,
|
|
"msg": record.getMessage(),
|
|
"request_id": request_id_var.get(),
|
|
}
|
|
if record.exc_info:
|
|
data["exc"] = self.formatException(record.exc_info)
|
|
for key, val in record.__dict__.items():
|
|
if key.startswith("ctx_"):
|
|
data[key[4:]] = val
|
|
return json.dumps(data, ensure_ascii=False)
|
|
|
|
|
|
def configure_logging(level: str = "INFO") -> None:
|
|
level = (level or "INFO").upper()
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
handler.setFormatter(_JsonFormatter())
|
|
|
|
root = logging.getLogger()
|
|
root.handlers.clear()
|
|
root.addHandler(handler)
|
|
root.setLevel(level)
|
|
|
|
# Align uvicorn access/error logs with our JSON formatter.
|
|
for name in ("uvicorn", "uvicorn.error", "uvicorn.access", "fastapi"):
|
|
lg = logging.getLogger(name)
|
|
lg.handlers.clear()
|
|
lg.propagate = True
|
|
lg.setLevel(level)
|
|
|
|
# Trim noisy libraries.
|
|
logging.getLogger("websockets").setLevel("WARNING")
|
|
logging.getLogger("websockets.client").setLevel("WARNING")
|
|
|
|
|
|
def get_logger(name: str = "lingma_gateway") -> logging.Logger:
|
|
return logging.getLogger(name)
|