from __future__ import annotations import hmac from fastapi import HTTPException, Request, status def _extract_bearer(request: Request) -> str: auth = request.headers.get("authorization", "") if not auth.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Missing or invalid Authorization header", "type": "invalid_request_error", "code": "invalid_api_key", } }, ) return auth[len("Bearer ") :].strip() def _match_any(token: str, candidates: list[str]) -> bool: for c in candidates: if c and hmac.compare_digest(token, c): return True return False def require_bearer(request: Request, api_keys: list[str]) -> None: # Empty api_keys means auth is disabled (kept for local dev). The startup # logger warns loudly in that case so it can't go unnoticed in prod. if not api_keys: return token = _extract_bearer(request) if not _match_any(token, api_keys): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key", } }, ) def require_metrics_access( request: Request, api_keys: list[str], metrics_token: str, *, public: bool = False, ) -> None: """Gate /metrics. Resolution order: 1. `public=True` (METRICS_PUBLIC) — wide open, explicit opt-in for sidecar scrapers on a private network. 2. `METRICS_TOKEN` configured — must match. 3. `API_KEYS` configured — any configured API key works. 4. Nothing configured at all — 503 (scraping disabled) so we don't silently leak the pool topology on an un-hardened deployment. """ if public: return accepted: list[str] = [] if metrics_token: accepted.append(metrics_token) accepted.extend(api_keys) if not accepted: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "error": { "message": ( "metrics scraping is disabled: set METRICS_TOKEN, " "API_KEYS, or METRICS_PUBLIC=true" ), "type": "service_unavailable", "code": "metrics_disabled", } }, ) token = _extract_bearer(request) if not _match_any(token, accepted): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Invalid metrics token", "type": "invalid_request_error", "code": "invalid_api_key", } }, ) def require_admin_access( request: Request, api_keys: list[str], admin_token: str, ) -> None: """Gate /internal/* admin endpoints. Resolution order: 1. `ADMIN_TOKEN` configured — must match exactly. 2. Otherwise fall back to the regular API_KEYS (single-tenant deploys). 3. If nothing is configured — 503 so we never expose auto-login / session-export on an unauthenticated gateway. Backwards compat: existing deployments that only set `API_KEYS` keep working; add ADMIN_TOKEN in .env when you want a dedicated split. """ accepted: list[str] = [] if admin_token: accepted.append(admin_token) else: accepted.extend(api_keys) if not accepted: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail={ "error": { "message": ( "admin endpoints disabled: configure ADMIN_TOKEN " "(recommended) or API_KEYS" ), "type": "service_unavailable", "code": "admin_disabled", } }, ) token = _extract_bearer(request) if not _match_any(token, accepted): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Invalid admin token", "type": "invalid_request_error", "code": "invalid_api_key", } }, )