from __future__ import annotations import hmac from fastapi import HTTPException, Request, status def _extract_bearer(request: Request) -> str: auth = request.headers.get("authorization", "") if not auth.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Missing or invalid Authorization header", "type": "invalid_request_error", "code": "invalid_api_key", } }, ) return auth[len("Bearer ") :].strip() def _match_any(token: str, candidates: list[str]) -> bool: for c in candidates: if c and hmac.compare_digest(token, c): return True return False def require_bearer(request: Request, api_keys: list[str]) -> None: # Empty api_keys means auth is disabled (keeps the old behavior). if not api_keys: return token = _extract_bearer(request) if not _match_any(token, api_keys): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Invalid API key", "type": "invalid_request_error", "code": "invalid_api_key", } }, ) def require_metrics_access( request: Request, api_keys: list[str], metrics_token: str ) -> None: """Allow metrics if any of: METRICS_TOKEN matches, or any API_KEYS match. If neither METRICS_TOKEN nor API_KEYS are configured, metrics is public (backwards compatible default). """ accepted: list[str] = [] if metrics_token: accepted.append(metrics_token) accepted.extend(api_keys) if not accepted: return token = _extract_bearer(request) if not _match_any(token, accepted): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={ "error": { "message": "Invalid metrics token", "type": "invalid_request_error", "code": "invalid_api_key", } }, )