from __future__ import annotations from pathlib import Path import httpx from cf_temp_email_deploy import app_admin as app_admin_module from cf_temp_email_deploy import deployment as deployment_module from cf_temp_email_deploy.deployment import run_deployment from cf_temp_email_deploy.errors import CloudflareAPIError from cf_temp_email_deploy.models import DeploymentConfig, DeploymentState from cf_temp_email_deploy.source import PreparedSource from cf_temp_email_deploy.subprocess_runner import CommandResult, CommandSpec class FakeRunner: def __init__(self) -> None: self.commands: list[tuple[tuple[str, ...], Path | None, str | None]] = [] def run(self, spec: CommandSpec) -> CommandResult: return self._dispatch(spec) def run_checked(self, spec: CommandSpec) -> CommandResult: result = self._dispatch(spec) if result.returncode != 0: # pragma: no cover raise AssertionError(f"unexpected failure: {result.args}") return result def _dispatch(self, spec: CommandSpec) -> CommandResult: args = tuple(spec.args) self.commands.append((args, spec.cwd, spec.input_text)) cwd = spec.cwd if args == ("git", "--version"): return CommandResult(args=args, returncode=0, stdout="git version 2.43.0\n", stderr="", duration_seconds=0.0) if args == ("node", "--version"): return CommandResult(args=args, returncode=0, stdout="v24.14.0\n", stderr="", duration_seconds=0.0) if args == ("npm", "--version"): return CommandResult(args=args, returncode=0, stdout="11.9.0\n", stderr="", duration_seconds=0.0) if args[:2] == ("npm", "install") and cwd is not None: bin_dir = cwd / "node_modules" / ".bin" bin_dir.mkdir(parents=True, exist_ok=True) if cwd.name == "worker": (bin_dir / "wrangler").write_text("", encoding="utf-8") (cwd / "node_modules" / "telegraf").mkdir(parents=True, exist_ok=True) if cwd.name == "frontend": (bin_dir / "vite").write_text("", encoding="utf-8") if cwd.name == "pages": (bin_dir / "wrangler").write_text("", encoding="utf-8") return CommandResult(args=args, returncode=0, stdout="", stderr="", duration_seconds=0.0) if args[:3] == ("npm", "run", "build:pages") and cwd is not None: (cwd / "dist").mkdir(parents=True, exist_ok=True) (cwd / "dist" / "index.html").write_text("ok", encoding="utf-8") return CommandResult(args=args, returncode=0, stdout="", stderr="", duration_seconds=0.0) if args[:3] == ("npm", "run", "build:pages:nopwa") and cwd is not None: (cwd / "dist").mkdir(parents=True, exist_ok=True) (cwd / "dist" / "index.html").write_text("ok", encoding="utf-8") return CommandResult(args=args, returncode=0, stdout="", stderr="", duration_seconds=0.0) if args[:3] == ("git", "apply", "--check"): return CommandResult(args=args, returncode=0, stdout="", stderr="", duration_seconds=0.0) if args[:4] == ("git", "apply", "--reverse", "--check"): return CommandResult(args=args, returncode=1, stdout="", stderr="", duration_seconds=0.0) return CommandResult(args=args, returncode=0, stdout="", stderr="", duration_seconds=0.0) class FakeCloudflareClient: def __init__(self, config: object) -> None: self.config = config self.history: dict[str, str] = {} self.tables: list[str] = [] self.catch_all_attempts = 0 self.pages_project_exists = True self.worker_exists = True self.d1_exists = True self.catch_all = { "id": "rule-1", "enabled": True, "actions": [{"type": "worker", "value": ["email-api"]}], } def __enter__(self) -> "FakeCloudflareClient": return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: return None def verify_token(self) -> dict[str, str]: return {"status": "active"} def resolve_account(self, *, account_id: str = "", account_name: str = "") -> dict[str, str]: return {"id": account_id or "acc-1", "name": account_name or "demo-account"} def resolve_zone(self, *, zone_name: str, account_id: str = "") -> dict[str, object]: return { "id": "zone-1", "name": zone_name, "account": {"id": account_id or "acc-1", "name": "demo-account"}, } def ensure_pages_project( self, *, account_id: str, project_name: str, production_branch: str, ) -> dict[str, str]: self.pages_project_exists = True return {"id": "proj-1", "name": project_name, "subdomain": f"{project_name}.pages.dev"} def get_pages_project(self, *, account_id: str, project_name: str) -> dict[str, str] | None: if not self.pages_project_exists: return None return {"id": "proj-1", "name": project_name, "subdomain": f"{project_name}.pages.dev"} def ensure_cname_record(self, *, zone_id: str, name: str, content: str, proxied: bool = True, ttl: int = 1) -> dict[str, str]: return {"id": "dns-cname", "name": name, "content": content} def ensure_d1_database(self, *, account_id: str, database_name: str, jurisdiction: str = "") -> dict[str, str]: self.d1_exists = True return {"uuid": "db-1", "name": database_name} def list_d1_databases(self, *, account_id: str, database_name: str = "") -> list[dict[str, str]]: if not self.d1_exists: return [] database = {"uuid": "db-1", "name": database_name or "temp-email"} if database_name and database["name"] != database_name: database["name"] = database_name return [database] def query_d1( self, *, account_id: str, database_id: str, sql: str, params: list[object] | None = None, ) -> list[dict[str, str]]: if "SELECT file_name, sha256 FROM __deploy_history" in sql: return [ {"file_name": file_name, "sha256": sha256} for file_name, sha256 in sorted(self.history.items()) ] if "SELECT name FROM sqlite_master" in sql: return [{"name": name} for name in self.tables] if "INSERT OR REPLACE INTO __deploy_history" in sql: assert params is not None self.history[str(params[0])] = str(params[1]) return [] return [] def get_workers_subdomain(self, *, account_id: str) -> dict[str, str]: return {"subdomain": "acct-workers"} def get_worker_script(self, *, account_id: str, script_name: str) -> dict[str, str] | None: if not self.worker_exists: return None return {"id": script_name, "handlers": ["fetch", "email", "scheduled"]} def list_email_routing_addresses(self, *, account_id: str) -> list[dict[str, str]]: return [{"email": "verified@example.com", "status": "verified"}] def email_address_ready(self, address: dict[str, str], target: str) -> bool: return address["email"] == target and address["status"] == "verified" def get_email_routing_dns(self, *, zone_id: str) -> list[dict[str, object]]: return [ { "type": "MX", "name": "email.example.com", "content": "route1.mx.cloudflare.net", "priority": 10, }, { "type": "TXT", "name": "email.example.com", "content": "v=spf1 include:_spf.mx.cloudflare.net ~all", }, ] def ensure_dns_record( self, *, zone_id: str, record_type: str, name: str, content: str, proxied: bool | None = None, ttl: int | None = 1, priority: int | None = None, ) -> dict[str, str]: suffix = priority if priority is not None else "default" return {"id": f"{record_type.lower()}-{suffix}", "name": name, "content": content} def ensure_catch_all_worker(self, *, zone_id: str, script_name: str) -> dict[str, object]: self.catch_all_attempts += 1 self.catch_all = { "id": "rule-1", "enabled": True, "actions": [{"type": "worker", "value": [script_name]}], } return self.catch_all def ensure_pages_domain(self, *, account_id: str, project_name: str, domain_name: str) -> dict[str, str]: return {"name": domain_name, "status": "pending"} def wait_for_pages_domain_active( self, *, account_id: str, project_name: str, domain_name: str, timeout_seconds: float = 300.0, poll_interval_seconds: float = 5.0, ) -> dict[str, str]: return {"name": domain_name, "status": "active"} def get_catch_all(self, *, zone_id: str) -> dict[str, object]: return self.catch_all def catch_all_points_to_worker(self, rule: dict[str, object], script_name: str) -> bool: actions = rule.get("actions", []) return isinstance(actions, list) and any( isinstance(action, dict) and action.get("type") == "worker" and script_name in action.get("value", []) for action in actions ) @staticmethod def is_authentication_error(error: Exception) -> bool: return isinstance(error, CloudflareAPIError) and "Authentication error" in str(error) @staticmethod def worker_script_supports_email(script: dict[str, object]) -> bool: handlers = script.get("handlers") return isinstance(handlers, list) and "email" in handlers class FakeCloudflareClientAuthLimited(FakeCloudflareClient): def __init__(self, config: object) -> None: super().__init__(config) self.existing_records = [ { "id": "mx-1", "type": "MX", "name": "email.example.com", "content": "route1.mx.cloudflare.net", }, { "id": "txt-1", "type": "TXT", "name": "email.example.com", "content": "v=spf1 include:_spf.mx.cloudflare.net ~all", }, ] def list_email_routing_addresses(self, *, account_id: str) -> list[dict[str, str]]: raise CloudflareAPIError("Cloudflare API 返回错误: Authentication error", status_code=403) def get_email_routing_dns(self, *, zone_id: str) -> list[dict[str, object]]: raise CloudflareAPIError("Cloudflare API 返回错误: Authentication error", status_code=403) def list_dns_records(self, zone_id: str, *, name: str = "", record_type: str = "") -> list[dict[str, str]]: records = [record for record in self.existing_records if not name or record["name"] == name] if record_type: records = [record for record in records if record["type"] == record_type] return records class FakeCloudflareClientAuthLimitedNoDnsTemplate(FakeCloudflareClient): def list_email_routing_addresses(self, *, account_id: str) -> list[dict[str, str]]: raise CloudflareAPIError("Cloudflare API 返回错误: Authentication error", status_code=403) def get_email_routing_dns(self, *, zone_id: str) -> list[dict[str, object]]: raise CloudflareAPIError("Cloudflare API 返回错误: Authentication error", status_code=403) def list_dns_records(self, zone_id: str, *, name: str = "", record_type: str = "") -> list[dict[str, str]]: return [] class FakeCloudflareClientCatchAllFlaky(FakeCloudflareClient): def ensure_catch_all_worker(self, *, zone_id: str, script_name: str) -> dict[str, object]: self.catch_all_attempts += 1 if self.catch_all_attempts < 3: raise CloudflareAPIError("Cloudflare API 返回错误: Workers Script Info not found", status_code=404) self.catch_all = { "id": "rule-1", "enabled": True, "actions": [{"type": "worker", "value": [script_name]}], } return self.catch_all class FakeCloudflareClientCatchAllDeferred(FakeCloudflareClient): def __init__(self, config: object) -> None: super().__init__(config) self.catch_all = { "id": "rule-1", "enabled": True, "actions": [{"type": "drop"}], } def ensure_catch_all_worker(self, *, zone_id: str, script_name: str) -> dict[str, object]: self.catch_all_attempts += 1 if self.catch_all_attempts <= 3: raise CloudflareAPIError("Cloudflare API 返回错误: Workers Script Info not found", status_code=404) self.catch_all = { "id": "rule-1", "enabled": True, "actions": [{"type": "worker", "value": [script_name]}], } return self.catch_all class FakeCloudflareClientMissingPagesProject(FakeCloudflareClient): def __init__(self, config: object) -> None: super().__init__(config) self.pages_project_exists = False class FakeCloudflareClientMissingWorkerScript(FakeCloudflareClient): def __init__(self, config: object) -> None: super().__init__(config) self.worker_exists = False class FakeCloudflareClientMissingD1(FakeCloudflareClient): def __init__(self, config: object) -> None: super().__init__(config) self.d1_exists = False class FakeHTTPClient: def __init__(self, *args: object, **kwargs: object) -> None: return None def __enter__(self) -> "FakeHTTPClient": return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: return None def get(self, url: str) -> httpx.Response: request = httpx.Request("GET", url) if url.endswith("/health_check"): return httpx.Response(200, text="OK", request=request) return httpx.Response(200, text="ok", request=request) class FakeApplicationAdminClient: instances: list["FakeApplicationAdminClient"] = [] user_settings: dict[str, object] = {} oauth2_settings: list[dict[str, object]] = [] def __init__(self, base_url: str, admin_password: str, *, timeout: float = 30.0, transport: object = None) -> None: _ = timeout _ = transport self.base_url = base_url self.admin_password = admin_password type(self).instances.append(self) @classmethod def reset(cls) -> None: cls.instances = [] cls.user_settings = {} cls.oauth2_settings = [] def __enter__(self) -> "FakeApplicationAdminClient": return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: return None def wait_until_ready( self, *, timeout_seconds: float = 180.0, poll_interval_seconds: float = 5.0, ) -> dict[str, object]: _ = timeout_seconds _ = poll_interval_seconds return self.get_user_settings() def get_user_settings(self) -> dict[str, object]: return dict(type(self).user_settings) def sync_user_settings(self, *, allow_user_register: bool) -> dict[str, object]: type(self).user_settings = app_admin_module.merge_user_settings( type(self).user_settings, allow_user_register, ) return self.get_user_settings() def get_user_oauth2_settings(self) -> list[dict[str, object]]: return [dict(item) for item in type(self).oauth2_settings] def sync_linuxdo_oauth2( self, *, pages_domain: str, client_id: str, client_secret: str, ) -> list[dict[str, object]]: type(self).oauth2_settings = app_admin_module.merge_linuxdo_oauth2_settings( type(self).oauth2_settings, pages_domain=pages_domain, client_id=client_id, client_secret=client_secret, ) return self.get_user_oauth2_settings() def write_file(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def create_source_tree(root: Path) -> None: write_file( root / "worker" / "package.json", '{"devDependencies":{"wrangler":"^4.0.0"},"dependencies":{"telegraf":"4.16.3"}}\n', ) write_file( root / "worker" / "wrangler.toml.template", '\n'.join( [ 'name = "cloudflare_temp_email"', 'main = "src/worker.ts"', 'compatibility_date = "2025-04-01"', 'compatibility_flags = ["nodejs_compat"]', "keep_vars = true", ] ) + "\n", ) write_file(root / "worker" / "patches" / "telegraf@4.16.3.patch", "diff --git a/a b/a\n") write_file(root / "frontend" / "package.json", '{"devDependencies":{"vite":"^7.0.0"}}\n') write_file(root / "pages" / "package.json", '{"devDependencies":{"wrangler":"^4.0.0"}}\n') write_file( root / "pages" / "wrangler.toml", '\n'.join( [ 'name = "temp-email-pages"', 'pages_build_output_dir = "../frontend/dist"', 'compatibility_date = "2024-05-13"', ] ) + "\n", ) write_file(root / "db" / "schema.sql", "CREATE TABLE mails(id INTEGER PRIMARY KEY);\n") write_file(root / "db" / "2024-01-13-patch.sql", "ALTER TABLE mails ADD COLUMN subject TEXT;\n") write_file(root / "db" / "2024-04-03-patch.sql", "ALTER TABLE mails ADD COLUMN sender TEXT;\n") def build_config(source_dir: Path) -> DeploymentConfig: return DeploymentConfig.model_validate( { "source": {"mode": "local", "local_path": str(source_dir)}, "cloudflare": {"zone_name": "example.com", "api_token": "token-value"}, "mail": { "domains": ["email.example.com"], "verified_destination_address": "verified@example.com", }, "d1": {"database_name": "temp-email"}, "user_access": { "require_login_to_create": True, "allow_user_register": False, }, "linuxdo": {"linuxdo_oauth": False}, "worker": { "script_name": "email-api", "custom_domain": "email-api.example.com", "vars": { "PREFIX": "tmp", "ENABLE_USER_CREATE_EMAIL": True, "ENABLE_USER_DELETE_EMAIL": True, "DEFAULT_LANG": "zh", "ADMIN_PASSWORDS": ["admin-secret"], }, "secrets": {"JWT_SECRET": "secret-value"}, }, "pages": { "project_name": "demo-pages", "custom_domain": "email.example.com", "build_mode": "pages", "production_branch": "production", }, } ) def patch_deployment_runtime( monkeypatch, source_dir: Path, *, admin_client_class: type[FakeApplicationAdminClient] = FakeApplicationAdminClient, ) -> None: admin_client_class.reset() monkeypatch.setattr( deployment_module, "prepare_source", lambda config, runner: PreparedSource(source_dir=source_dir, commit_sha="abc123"), ) monkeypatch.setattr(deployment_module, "ApplicationAdminClient", admin_client_class) monkeypatch.setattr(deployment_module.httpx, "Client", FakeHTTPClient) def test_run_deployment_executes_full_pipeline(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState() state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, ) assert result.checkpoint == "acceptance_completed" assert result.pages.project_id == "proj-1" assert result.pages.subdomain == "demo-pages.pages.dev" assert result.d1.database_id == "db-1" assert result.worker.script_name == "email-api" assert result.worker.workers_dev_url == "https://email-api.acct-workers.workers.dev" assert result.application.configured is True assert result.application.allow_user_register is False assert result.application.require_login_to_create is True assert result.email_routing.catch_all_enabled is True assert [migration.file_name for migration in result.d1.migrations] == [ "2024-01-13-patch.sql", "2024-04-03-patch.sql", ] worker_wrangler = (source_dir / "worker" / "wrangler.toml").read_text(encoding="utf-8") pages_wrangler = (source_dir / "pages" / "wrangler.toml").read_text(encoding="utf-8") assert 'name = "email-api"' in worker_wrangler assert 'database_id = "db-1"' in worker_wrangler assert 'service = "email-api"' in pages_wrangler command_args = [args for args, _, _ in runner.commands] assert ("npm", "install") in [args[:2] for args in command_args] assert ("npm", "exec", "--", "wrangler", "deploy", "--minify") in command_args assert ("npm", "exec", "--", "wrangler", "pages", "deploy", "--branch", "production") in command_args assert any(args[:6] == ("npm", "exec", "--", "wrangler", "d1", "execute") for args in command_args) assert any( args[:7] == ("npm", "exec", "--", "wrangler", "secret", "put", "JWT_SECRET") and input_text == "secret-value\n" for args, _, input_text in runner.commands ) assert result.events[0].status == "started" assert result.events[-1].status == "completed" assert FakeApplicationAdminClient.user_settings["enable"] is False assert sleep_calls == [30] def test_run_deployment_removes_generated_wrangler_redirects(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) root_redirect = source_dir / ".wrangler" / "deploy" / "config.json" pages_redirect = source_dir / "pages" / ".wrangler" / "deploy" / "config.json" write_file(root_redirect, '{"configPath":"dist/wrangler.jsonc"}\n') write_file(pages_redirect, '{"configPath":"dist/wrangler.jsonc"}\n') config = build_config(source_dir) state = DeploymentState() state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, ) assert result.checkpoint == "acceptance_completed" assert not root_redirect.exists() assert not pages_redirect.exists() assert sleep_calls == [30] def test_run_deployment_generates_jwt_secret_when_empty(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) config.worker.secrets["JWT_SECRET"] = "" state = DeploymentState() state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" config_path.write_text( '\n'.join( [ '[source]', f'mode = "local"', f'local_path = "{source_dir}"', "", "[cloudflare]", 'zone_name = "example.com"', 'api_token = "token-value"', "", "[mail]", 'domains = ["email.example.com"]', 'verified_destination_address = "verified@example.com"', "", "[d1]", 'database_name = "temp-email"', "", "[worker]", 'script_name = "email-api"', 'custom_domain = "email-api.example.com"', "", "[worker.vars]", 'PREFIX = "tmp"', "", "[worker.secrets]", 'JWT_SECRET = ""', "", "[pages]", 'project_name = "demo-pages"', 'custom_domain = "email.example.com"', 'build_mode = "pages"', 'production_branch = "production"', "", ] ), encoding="utf-8", ) runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, ) generated_secret = result.worker.script_name and config.worker.secrets["JWT_SECRET"] config_text = config_path.read_text(encoding="utf-8") assert result.checkpoint == "acceptance_completed" assert isinstance(generated_secret, str) assert generated_secret assert len(generated_secret) >= 64 assert generated_secret in config_text assert 'JWT_SECRET = ""' not in config_text assert any( args[:7] == ("npm", "exec", "--", "wrangler", "secret", "put", "JWT_SECRET") and input_text == f"{generated_secret}\n" for args, _, input_text in runner.commands ) assert sleep_calls == [30] def test_run_deployment_resume_skips_completed_stages(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, is_resume=True, ) command_args = [args for args, _, _ in runner.commands] assert ("npm", "exec", "--", "wrangler", "deploy", "--minify") not in command_args assert not any(args[:6] == ("npm", "exec", "--", "wrangler", "d1", "execute") for args in command_args) assert ("npm", "exec", "--", "wrangler", "pages", "deploy", "--branch", "production") in command_args assert result.checkpoint == "acceptance_completed" assert result.email_routing.catch_all_worker == "email-api" assert sleep_calls == [30] def test_run_deployment_can_update_catch_all_when_email_routing_auth_is_limited( monkeypatch, tmp_path: Path, ) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientAuthLimited, is_resume=True, ) assert result.checkpoint == "acceptance_completed" assert result.email_routing.catch_all_enabled is True assert result.email_routing.catch_all_worker == "email-api" assert result.email_routing.dns_record_ids == [] assert sleep_calls == [30] def test_run_deployment_can_fallback_to_documented_email_routing_dns_records( monkeypatch, tmp_path: Path, ) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientAuthLimitedNoDnsTemplate, is_resume=True, ) assert result.checkpoint == "acceptance_completed" assert sorted(result.email_routing.dns_record_ids) == [ "mx-13", "mx-24", "mx-86", "txt-default", ] assert result.email_routing.catch_all_worker == "email-api" assert sleep_calls == [30] def test_run_deployment_retries_catch_all_before_success(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientCatchAllFlaky, is_resume=True, ) assert result.checkpoint == "acceptance_completed" assert result.email_routing.catch_all_worker == "email-api" assert sleep_calls == [30, 10, 10] def test_run_deployment_can_defer_catch_all_until_acceptance(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientCatchAllDeferred, is_resume=True, ) assert result.checkpoint == "acceptance_completed" assert result.email_routing.catch_all_enabled is True assert result.email_routing.catch_all_worker == "email-api" assert sleep_calls == [30, 10, 10] def test_run_deployment_resume_recreates_missing_pages_project(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "missing-project" state.pages.project_name = config.pages.project_name state.pages.subdomain = "missing.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "stale-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientMissingPagesProject, is_resume=True, ) assert result.checkpoint == "acceptance_completed" assert result.pages.project_id == "proj-1" assert result.pages.subdomain == "demo-pages.pages.dev" assert result.pages.cname_record_id == "dns-cname" assert sleep_calls == [30] def test_run_deployment_resume_redeploys_missing_worker(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "db-1" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientMissingWorkerScript, is_resume=True, ) command_args = [args for args, _, _ in runner.commands] assert ("npm", "exec", "--", "wrangler", "deploy", "--minify") in command_args assert result.checkpoint == "acceptance_completed" assert result.worker.script_name == "email-api" assert sleep_calls == [30] def test_run_deployment_resume_recreates_missing_d1(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) state = DeploymentState(checkpoint="worker_completed") state.source.source_dir = str(source_dir) state.source.commit_sha = "abc123" state.pages.project_id = "proj-1" state.pages.project_name = config.pages.project_name state.pages.subdomain = "demo-pages.pages.dev" state.pages.custom_domain = config.pages.custom_domain state.pages.cname_record_id = "dns-cname" state.d1.database_id = "stale-db" state.d1.database_name = config.d1.database_name state.worker.script_name = config.worker.script_name state.worker.workers_dev_url = "https://email-api.acct-workers.workers.dev" state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClientMissingD1, is_resume=True, ) command_args = [args for args, _, _ in runner.commands] assert any(args[:6] == ("npm", "exec", "--", "wrangler", "d1", "execute") for args in command_args) assert result.checkpoint == "acceptance_completed" assert result.d1.database_id == "db-1" worker_wrangler = (source_dir / "worker" / "wrangler.toml").read_text(encoding="utf-8") assert 'database_id = "db-1"' in worker_wrangler assert sleep_calls == [30] def test_run_deployment_syncs_linuxdo_oauth_settings(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) config.linuxdo.linuxdo_oauth = True config.linuxdo.client_id = "linuxdo-client-id" config.linuxdo.client_secret = "linuxdo-client-secret" config.user_access.require_login_to_create = False state = DeploymentState() state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" config_path.write_text( '\n'.join( [ "[source]", f'local_path = "{source_dir}"', 'mode = "local"', "", "[cloudflare]", 'zone_name = "example.com"', 'api_token = "token-value"', "", "[mail]", 'domains = ["email.example.com"]', 'verified_destination_address = "verified@example.com"', "", "[d1]", 'database_name = "temp-email"', "", "[user_access]", "require_login_to_create = false", "allow_user_register = false", "", "[linuxdo]", "linuxdo_oauth = true", 'client_id = "linuxdo-client-id"', 'client_secret = "linuxdo-client-secret"', "", "[worker]", 'script_name = "email-api"', 'custom_domain = "email-api.example.com"', "", "[worker.vars]", 'PREFIX = "tmp"', 'ENABLE_USER_CREATE_EMAIL = true', 'ENABLE_USER_DELETE_EMAIL = true', 'DEFAULT_LANG = "zh"', 'ADMIN_PASSWORDS = ["admin-secret"]', "", "[worker.secrets]", 'JWT_SECRET = "secret-value"', "", "[pages]", 'project_name = "demo-pages"', 'custom_domain = "email.example.com"', 'build_mode = "pages"', 'production_branch = "production"', "", ] ), encoding="utf-8", ) runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, ) config_text = config_path.read_text(encoding="utf-8") assert result.checkpoint == "acceptance_completed" assert result.application.linuxdo_oauth_enabled is True assert result.application.require_login_to_create is True assert result.application.linuxdo_redirect_url == "https://email.example.com/user/oauth2/callback" assert "require_login_to_create = true" in config_text assert FakeApplicationAdminClient.user_settings["enable"] is False assert len(FakeApplicationAdminClient.oauth2_settings) == 1 assert FakeApplicationAdminClient.oauth2_settings[0]["clientID"] == "linuxdo-client-id" assert FakeApplicationAdminClient.oauth2_settings[0]["redirectURL"] == "https://email.example.com/user/oauth2/callback" assert sleep_calls == [30] def test_run_deployment_allows_empty_verified_destination_address(monkeypatch, tmp_path: Path) -> None: source_dir = tmp_path / "source" create_source_tree(source_dir) config = build_config(source_dir) config.mail.verified_destination_address = "" state = DeploymentState() state_path = tmp_path / ".deploy" / "state.toml" config_path = tmp_path / "config.toml" runner = FakeRunner() sleep_calls: list[int] = [] patch_deployment_runtime(monkeypatch, source_dir) monkeypatch.setattr(deployment_module.time, "sleep", sleep_calls.append) result = run_deployment( config_path=config_path, config=config, state_path=state_path, state=state, runner=runner, cloudflare_client_factory=FakeCloudflareClient, ) assert result.checkpoint == "acceptance_completed" assert result.email_routing.destination_address == "" assert result.email_routing.catch_all_enabled is True assert result.email_routing.catch_all_worker == "email-api" assert sleep_calls == [30]