Initial import of cf-temp-email deploy CLI

This commit is contained in:
mmc
2026-03-26 08:06:02 +08:00
commit 4100e9cf72
29 changed files with 6703 additions and 0 deletions

101
tests/test_app_admin.py Normal file
View File

@@ -0,0 +1,101 @@
from __future__ import annotations
from cf_temp_email_deploy.app_admin import (
LINUXDO_ACCESS_TOKEN_URL,
LINUXDO_AUTHORIZATION_URL,
LINUXDO_SCOPE,
LINUXDO_USER_INFO_URL,
build_linuxdo_oauth2_setting,
merge_linuxdo_oauth2_settings,
merge_user_settings,
)
def test_merge_user_settings_preserves_existing_fields_and_updates_enable() -> None:
current = {
"enable": True,
"enableMailVerify": True,
"verifyMailSender": "noreply@example.com",
"enableMailAllowList": True,
"mailAllowList": ["allowed@example.com"],
"maxAddressCount": 9,
"enableEmailCheckRegex": True,
"emailCheckRegex": ".*",
}
merged = merge_user_settings(current, False)
assert merged["enable"] is False
assert merged["enableMailVerify"] is True
assert merged["verifyMailSender"] == "noreply@example.com"
assert merged["enableMailAllowList"] is True
assert merged["mailAllowList"] == ["allowed@example.com"]
assert merged["maxAddressCount"] == 9
assert merged["enableEmailCheckRegex"] is True
assert merged["emailCheckRegex"] == ".*"
def test_build_linuxdo_oauth2_setting_matches_expected_fields() -> None:
setting = build_linuxdo_oauth2_setting(
pages_domain="email.example.com",
client_id="linuxdo-client-id",
client_secret="linuxdo-client-secret",
)
assert setting["name"] == "LINUX DO"
assert setting["clientID"] == "linuxdo-client-id"
assert setting["clientSecret"] == "linuxdo-client-secret"
assert setting["authorizationURL"] == LINUXDO_AUTHORIZATION_URL
assert setting["accessTokenURL"] == LINUXDO_ACCESS_TOKEN_URL
assert setting["userInfoURL"] == LINUXDO_USER_INFO_URL
assert setting["redirectURL"] == "https://email.example.com/user/oauth2/callback"
assert setting["scope"] == LINUXDO_SCOPE
assert setting["enableEmailFormat"] is True
assert setting["userEmailReplace"] == "linux_do_$1@oauth.linux.do"
def test_merge_linuxdo_oauth2_settings_appends_when_missing() -> None:
merged = merge_linuxdo_oauth2_settings(
[{"name": "GitHub", "clientID": "github-client"}],
pages_domain="email.example.com",
client_id="linuxdo-client-id",
client_secret="linuxdo-client-secret",
)
assert len(merged) == 2
assert merged[0]["name"] == "GitHub"
assert merged[1]["name"] == "LINUX DO"
assert merged[1]["redirectURL"] == "https://email.example.com/user/oauth2/callback"
def test_merge_linuxdo_oauth2_settings_replaces_existing_and_preserves_selected_fields() -> None:
current = [
{
"name": "linux do",
"clientID": "old-client-id",
"clientSecret": "old-client-secret",
"authorizationURL": LINUXDO_AUTHORIZATION_URL,
"accessTokenURL": LINUXDO_ACCESS_TOKEN_URL,
"userInfoURL": LINUXDO_USER_INFO_URL,
"redirectURL": "https://old.example.com/user/oauth2/callback",
"logoutURL": "https://connect.linux.do/logout",
"enableMailAllowList": True,
"mailAllowList": ["allowed@example.com"],
}
]
merged = merge_linuxdo_oauth2_settings(
current,
pages_domain="email.example.com",
client_id="linuxdo-client-id",
client_secret="linuxdo-client-secret",
)
assert len(merged) == 1
assert merged[0]["name"] == "LINUX DO"
assert merged[0]["clientID"] == "linuxdo-client-id"
assert merged[0]["clientSecret"] == "linuxdo-client-secret"
assert merged[0]["redirectURL"] == "https://email.example.com/user/oauth2/callback"
assert merged[0]["logoutURL"] == "https://connect.linux.do/logout"
assert merged[0]["enableMailAllowList"] is True
assert merged[0]["mailAllowList"] == ["allowed@example.com"]

275
tests/test_cli.py Normal file
View File

@@ -0,0 +1,275 @@
from __future__ import annotations
from pathlib import Path
from cf_temp_email_deploy import cli
from cf_temp_email_deploy.environment import ToolVersion
from cf_temp_email_deploy.wrangler import build_wrangler_command
class FakeCloudflareClient:
def __init__(self, config: object) -> None:
self.config = config
self.resolve_account_called = False
def __enter__(self) -> "FakeCloudflareClient":
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
return None
def verify_token(self) -> dict[str, str]:
return {"status": "active"}
def resolve_account(self, *, account_id: str = "", account_name: str = "") -> dict[str, str]:
self.resolve_account_called = True
return {"id": account_id or "acc-1", "name": account_name or "demo-account"}
def resolve_zone(self, *, zone_name: str, account_id: str = "") -> dict[str, str]:
payload = {"id": "zone-1", "name": zone_name}
if not account_id:
payload["account"] = {"id": "acc-from-zone", "name": "zone-account"}
return payload
def list_dns_records(self, zone_id: str, *, name: str = "", record_type: str = "") -> list[dict[str, str]]:
_ = zone_id
_ = name
_ = record_type
return [
{"type": "MX", "name": "mail.kotei.us.ci", "content": "route1.mx.cloudflare.net"},
{"type": "MX", "name": "maila.kotei.us.ci", "content": "route2.mx.cloudflare.net"},
{"type": "CNAME", "name": "email.kotei.us.ci", "content": "cf-temp-email-pages.pages.dev"},
]
def list_email_routing_addresses(self, *, account_id: str) -> list[dict[str, str]]:
_ = account_id
return [{"email": "verified@kotei.us.ci", "status": "verified"}]
def email_address_ready(self, address: dict[str, str], target: str) -> bool:
return address["email"] == target and address["status"] == "verified"
def list_d1_databases(self, *, account_id: str, database_name: str = "") -> list[dict[str, str]]:
_ = account_id
_ = database_name
return [{"name": "cf-temp-email-kotei"}]
def list_pages_projects(self, *, account_id: str) -> list[dict[str, str]]:
_ = account_id
return [{"name": "cf-temp-email-pages", "subdomain": "cf-temp-email-pages.pages.dev"}]
def get_catch_all(self, *, zone_id: str) -> dict[str, object]:
_ = zone_id
return {"actions": [{"type": "worker", "value": ["temp-email-api"]}]}
def extract_worker_targets(self, value: object) -> set[str]:
if isinstance(value, list):
return {str(item) for item in value}
return set()
def list_worker_scripts(self, *, account_id: str) -> list[dict[str, object]]:
_ = account_id
return [{"id": "temp-email-api", "handlers": ["fetch", "email"]}]
def worker_script_supports_email(self, script: dict[str, object]) -> bool:
handlers = script.get("handlers", [])
return isinstance(handlers, list) and "email" in handlers
def is_authentication_error(self, error: Exception) -> bool:
_ = error
return False
def test_init_config_and_check_command(monkeypatch, tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
monkeypatch.setattr(
cli,
"check_required_tools",
lambda runner: {
"git": ToolVersion(name="git", version="git version 2.43.0"),
"node": ToolVersion(name="node", version="v24.14.0"),
"npm": ToolVersion(name="npm", version="11.9.0"),
},
)
monkeypatch.setattr(cli, "CloudflareClient", FakeCloudflareClient)
assert cli.main(["init-config", "--config", str(config_path)]) == 0
assert (
cli.main(
[
"check",
"--config",
str(config_path),
"--api-token",
"token-value",
"--account-id",
"acc-1",
"--zone-name",
"example.com",
]
)
== 0
)
state_path = tmp_path / ".deploy" / "state.toml"
assert state_path.exists()
state_text = state_path.read_text(encoding="utf-8")
config_text = config_path.read_text(encoding="utf-8")
assert "zone-1" in state_text
assert 'api_token = "token-value"' in config_text
def test_build_wrangler_command_uses_npm_exec() -> None:
assert build_wrangler_command("deploy", "--minify") == (
"npm",
"exec",
"--",
"wrangler",
"deploy",
"--minify",
)
def test_build_parser_uses_cf_temp_email_program_name() -> None:
parser = cli.build_parser()
assert parser.prog == "cf-temp-email"
def test_resolve_cli_argv_defaults_to_deploy() -> None:
assert cli.resolve_cli_argv([]) == ["deploy"]
assert cli.resolve_cli_argv(["--config", "config.toml"]) == ["deploy", "--config", "config.toml"]
assert cli.resolve_cli_argv(["deploy", "--config", "config.toml"]) == ["deploy", "--config", "config.toml"]
def test_main_defaults_to_deploy_when_subcommand_is_omitted(monkeypatch, tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
captured: dict[str, object] = {}
def fake_handle_deploy(args: object) -> int:
captured["command"] = getattr(args, "command", "")
captured["config"] = getattr(args, "config", None)
return 0
monkeypatch.setattr(cli, "handle_deploy", fake_handle_deploy)
assert cli.main(["--config", str(config_path)]) == 0
assert captured["command"] == "deploy"
assert captured["config"] == config_path
def test_check_command_can_resolve_zone_without_explicit_account(monkeypatch, tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
monkeypatch.setattr(
cli,
"check_required_tools",
lambda runner: {
"git": ToolVersion(name="git", version="git version 2.43.0"),
"node": ToolVersion(name="node", version="v24.14.0"),
"npm": ToolVersion(name="npm", version="11.9.0"),
},
)
monkeypatch.setattr(cli, "CloudflareClient", FakeCloudflareClient)
assert cli.main(["init-config", "--config", str(config_path)]) == 0
assert (
cli.main(
[
"check",
"--config",
str(config_path),
"--api-token",
"token-value",
"--zone-name",
"osozos.top",
]
)
== 0
)
state_path = tmp_path / ".deploy" / "state.toml"
state_text = state_path.read_text(encoding="utf-8")
assert "acc-from-zone" in state_text
assert "osozos.top" in state_text
def test_check_command_supports_profile_scoped_config_and_state(monkeypatch, tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
monkeypatch.setattr(
cli,
"check_required_tools",
lambda runner: {
"git": ToolVersion(name="git", version="git version 2.43.0"),
"node": ToolVersion(name="node", version="v24.14.0"),
"npm": ToolVersion(name="npm", version="11.9.0"),
},
)
monkeypatch.setattr(cli, "CloudflareClient", FakeCloudflareClient)
assert cli.main(["init-config", "--config", str(config_path)]) == 0
assert (
cli.main(
[
"check",
"--config",
str(config_path),
"--profile",
"account_b",
"--api-token",
"token-b",
"--account-id",
"acc-b",
"--zone-name",
"kotei.asia",
"--pages-domain",
"email.kotei.asia",
"--set",
'mail.domains=["mail.kotei.asia","maila.kotei.asia"]',
]
)
== 0
)
profile_state_path = tmp_path / ".deploy" / "profiles" / "account_b" / "state.toml"
assert profile_state_path.exists()
config_text = config_path.read_text(encoding="utf-8")
state_text = profile_state_path.read_text(encoding="utf-8")
assert '[profiles.account_b.cloudflare]' in config_text
assert 'api_token = "token-b"' in config_text
assert 'domains = ["mail.kotei.asia", "maila.kotei.asia"]' in config_text
assert "acc-b" in state_text
def test_discover_config_writes_detected_cloudflare_resources(monkeypatch, tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
monkeypatch.setattr(cli, "CloudflareClient", FakeCloudflareClient)
assert cli.main(["init-config", "--config", str(config_path)]) == 0
assert (
cli.main(
[
"discover-config",
"--config",
str(config_path),
"--profile",
"kotei",
"--api-token",
"token-value",
"--zone-name",
"kotei.us.ci",
]
)
== 0
)
config_text = config_path.read_text(encoding="utf-8")
assert '[profiles.kotei.cloudflare]' in config_text
assert 'zone_name = "kotei.us.ci"' in config_text
assert 'account_id = "acc-from-zone"' in config_text
assert 'verified_destination_address = "verified@kotei.us.ci"' in config_text
assert 'domains = ["mail.kotei.us.ci", "maila.kotei.us.ci"]' in config_text
assert 'project_name = "cf-temp-email-pages"' in config_text
assert 'custom_domain = "email.kotei.us.ci"' in config_text
assert 'script_name = "temp-email-api"' in config_text

295
tests/test_cloudflare.py Normal file
View File

@@ -0,0 +1,295 @@
from __future__ import annotations
import httpx
import respx
from cf_temp_email_deploy.cloudflare import CloudflareClient
from cf_temp_email_deploy.errors import CloudflareAPIError, ConfigError
from cf_temp_email_deploy.models import CloudflareConfig
def build_config() -> CloudflareConfig:
return CloudflareConfig(
account_name="demo-account",
zone_name="example.com",
api_token="token-value",
api_email="demo@example.com",
global_api_key="global-key",
)
@respx.mock
def test_verify_token_uses_bearer_token() -> None:
route = respx.get("https://api.cloudflare.com/client/v4/user/tokens/verify").mock(
return_value=httpx.Response(
200,
json={"success": True, "result": {"status": "active"}},
)
)
with CloudflareClient(build_config()) as client:
result = client.verify_token()
assert route.called is True
assert route.calls[0].request.headers["Authorization"] == "Bearer token-value"
assert result["status"] == "active"
@respx.mock
def test_resolve_account_uses_global_key_headers() -> None:
route = respx.get("https://api.cloudflare.com/client/v4/accounts").mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": [{"id": "acc-1", "name": "demo-account"}],
"result_info": {"page": 1, "per_page": 50, "count": 1, "total_pages": 1},
},
)
)
with CloudflareClient(build_config()) as client:
result = client.resolve_account(account_name="demo-account")
request = route.calls[0].request
assert request.headers["X-Auth-Email"] == "demo@example.com"
assert request.headers["X-Auth-Key"] == "global-key"
assert result["id"] == "acc-1"
@respx.mock
def test_resolve_zone_uses_account_filter() -> None:
route = respx.get("https://api.cloudflare.com/client/v4/zones").mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": [{"id": "zone-1", "name": "example.com"}],
"result_info": {"page": 1, "per_page": 50, "count": 1, "total_pages": 1},
},
)
)
with CloudflareClient(build_config()) as client:
result = client.resolve_zone(zone_name="example.com", account_id="acc-1")
request = route.calls[0].request
assert "account.id=acc-1" in str(request.url)
assert request.headers["Authorization"] == "Bearer token-value"
assert result["id"] == "zone-1"
@respx.mock
def test_email_routing_request_falls_back_to_global_key() -> None:
route = respx.get("https://api.cloudflare.com/client/v4/zones/zone-1/email/routing/rules/catch_all").mock(
side_effect=[
httpx.Response(
403,
json={
"success": False,
"errors": [{"message": "Use X-Auth-Email and X-Auth-Key for this endpoint."}],
},
),
httpx.Response(
200,
json={"success": True, "result": {"tag": "catch_all"}},
),
]
)
with CloudflareClient(build_config()) as client:
result = client.request_email_routing("GET", "/zones/zone-1/email/routing/rules/catch_all")
assert len(route.calls) == 2
assert route.calls[0].request.headers["Authorization"] == "Bearer token-value"
assert route.calls[1].request.headers["X-Auth-Key"] == "global-key"
assert result["result"]["tag"] == "catch_all"
@respx.mock
def test_request_raises_cloudflare_error_for_unsuccessful_payload() -> None:
respx.get("https://api.cloudflare.com/client/v4/user/tokens/verify").mock(
return_value=httpx.Response(
403,
json={"success": False, "errors": [{"message": "forbidden"}]},
)
)
with CloudflareClient(build_config()) as client:
try:
client.verify_token()
except CloudflareAPIError as exc:
assert "forbidden" in str(exc)
else: # pragma: no cover
raise AssertionError("expected CloudflareAPIError")
@respx.mock
def test_ensure_pages_project_creates_when_missing() -> None:
get_route = respx.get(
"https://api.cloudflare.com/client/v4/accounts/acc-1/pages/projects/demo-pages"
).mock(return_value=httpx.Response(404, json={"success": False, "errors": [{"message": "not found"}]}))
post_route = respx.post(
"https://api.cloudflare.com/client/v4/accounts/acc-1/pages/projects"
).mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": {"id": "proj-1", "name": "demo-pages", "subdomain": "demo-pages.pages.dev"},
},
)
)
with CloudflareClient(build_config()) as client:
result = client.ensure_pages_project(
account_id="acc-1",
project_name="demo-pages",
production_branch="production",
)
assert get_route.called is True
assert post_route.called is True
assert result["subdomain"] == "demo-pages.pages.dev"
@respx.mock
def test_ensure_cname_record_updates_existing_record() -> None:
list_route = respx.get("https://api.cloudflare.com/client/v4/zones/zone-1/dns_records").mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": [
{
"id": "dns-1",
"type": "CNAME",
"name": "email.example.com",
"content": "old.pages.dev",
"proxied": True,
"ttl": 1,
}
],
"result_info": {"page": 1, "per_page": 50, "count": 1, "total_pages": 1},
},
)
)
update_route = respx.put(
"https://api.cloudflare.com/client/v4/zones/zone-1/dns_records/dns-1"
).mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": {
"id": "dns-1",
"type": "CNAME",
"name": "email.example.com",
"content": "new.pages.dev",
"proxied": True,
"ttl": 1,
},
},
)
)
with CloudflareClient(build_config()) as client:
result = client.ensure_cname_record(
zone_id="zone-1",
name="email.example.com",
content="new.pages.dev",
)
assert list_route.called is True
assert update_route.called is True
assert result["content"] == "new.pages.dev"
@respx.mock
def test_ensure_catch_all_worker_skips_when_already_targeted() -> None:
get_route = respx.get(
"https://api.cloudflare.com/client/v4/zones/zone-1/email/routing/rules/catch_all"
).mock(
return_value=httpx.Response(
200,
json={
"success": True,
"result": {
"id": "rule-1",
"enabled": True,
"actions": [{"type": "worker", "value": ["email-api"]}],
},
},
)
)
put_route = respx.put(
"https://api.cloudflare.com/client/v4/zones/zone-1/email/routing/rules/catch_all"
).mock(
return_value=httpx.Response(
200,
json={"success": True, "result": {"id": "rule-1", "enabled": True}},
)
)
with CloudflareClient(build_config()) as client:
result = client.ensure_catch_all_worker(zone_id="zone-1", script_name="email-api")
assert get_route.called is True
assert put_route.called is False
assert result["id"] == "rule-1"
@respx.mock
def test_wait_for_pages_domain_active_polls_until_ready() -> None:
route = respx.get(
"https://api.cloudflare.com/client/v4/accounts/acc-1/pages/projects/demo-pages/domains/email.example.com"
).mock(
side_effect=[
httpx.Response(
200,
json={"success": True, "result": {"name": "email.example.com", "status": "pending"}},
),
httpx.Response(
200,
json={"success": True, "result": {"name": "email.example.com", "status": "active"}},
),
]
)
with CloudflareClient(build_config()) as client:
result = client.wait_for_pages_domain_active(
account_id="acc-1",
project_name="demo-pages",
domain_name="email.example.com",
timeout_seconds=1.0,
poll_interval_seconds=0.0,
)
assert len(route.calls) == 2
assert result["status"] == "active"
def test_catch_all_points_to_worker_accepts_nested_value_shapes() -> None:
rule = {
"enabled": True,
"actions": [
{
"type": "worker",
"value": [
{
"service": {
"name": "email-api",
}
}
],
}
],
}
assert CloudflareClient.catch_all_points_to_worker(rule, "email-api") is True
def test_is_authentication_error_accepts_global_key_config_errors() -> None:
error = ConfigError("旧式鉴权需要同时提供 api_email 与 global_api_key。")
assert CloudflareClient.is_authentication_error(error) is True

160
tests/test_config.py Normal file
View File

@@ -0,0 +1,160 @@
from __future__ import annotations
from pathlib import Path
import pytest
from cf_temp_email_deploy.config import (
apply_overrides,
default_config_document,
load_config,
parse_toml_value,
save_toml_document,
save_state,
set_dotted_value,
)
from cf_temp_email_deploy.errors import ConfigError
from cf_temp_email_deploy.models import DeploymentState
def test_parse_toml_value_supports_structured_literals() -> None:
assert parse_toml_value('"abc"') == "abc"
assert parse_toml_value("true") is True
assert parse_toml_value("[1, 2, 3]") == [1, 2, 3]
assert parse_toml_value("plain-text") == "plain-text"
def test_apply_overrides_and_load_config(tmp_path: Path) -> None:
document = default_config_document()
apply_overrides(
document,
{
"cloudflare.zone_name": "example.org",
"mail.domains": ["mail.example.org"],
"worker.vars.PREFIX": "custom",
},
)
config_path = tmp_path / "config.toml"
save_toml_document(config_path, document)
config = load_config(config_path)
assert config.cloudflare.zone_name == "example.org"
assert config.mail.domains == ["mail.example.org"]
assert config.worker.vars["PREFIX"] == "custom"
assert config.derived_worker_vars()["DOMAINS"] == ["mail.example.org"]
assert config.derived_worker_vars()["DISABLE_ANONYMOUS_USER_CREATE_EMAIL"] is True
assert config.source.repo_ref == ""
def test_default_config_marks_jwt_secret_optional() -> None:
document = default_config_document()
assert document["worker"]["secrets"]["JWT_SECRET"] == ""
def test_default_config_includes_user_access_and_linuxdo_sections() -> None:
document = default_config_document()
assert document["user_access"]["require_login_to_create"] is True
assert document["user_access"]["allow_user_register"] is False
assert document["linuxdo"]["linuxdo_oauth"] is False
assert document["linuxdo"]["client_id"] == ""
assert document["linuxdo"]["client_secret"] == ""
def test_load_config_requires_linuxdo_credentials_when_oauth_enabled(tmp_path: Path) -> None:
document = default_config_document()
document["linuxdo"]["linuxdo_oauth"] = True
document["linuxdo"]["client_id"] = ""
document["linuxdo"]["client_secret"] = ""
config_path = tmp_path / "config.toml"
save_toml_document(config_path, document)
with pytest.raises(ConfigError, match="linuxdo.client_id"):
load_config(config_path)
def test_load_config_requires_admin_passwords(tmp_path: Path) -> None:
document = default_config_document()
document["worker"]["vars"]["ADMIN_PASSWORDS"] = []
config_path = tmp_path / "config.toml"
save_toml_document(config_path, document)
with pytest.raises(ConfigError, match="ADMIN_PASSWORDS"):
load_config(config_path)
def test_load_config_requires_worker_domain_or_workers_dev(tmp_path: Path) -> None:
document = default_config_document()
document["worker"]["use_workers_dev"] = False
document["worker"]["custom_domain"] = ""
config_path = tmp_path / "config.toml"
save_toml_document(config_path, document)
with pytest.raises(ConfigError, match="worker.custom_domain"):
load_config(config_path)
def test_load_config_merges_selected_profile(tmp_path: Path) -> None:
document = default_config_document()
document["cloudflare"]["zone_name"] = "base.example.com"
document["mail"]["domains"] = ["base.example.com"]
document["pages"]["custom_domain"] = "app.base.example.com"
document["profiles"] = {
"account_b": {
"cloudflare": {
"account_id": "acc-b",
"zone_name": "kotei.asia",
"api_token": "token-b",
},
"mail": {"domains": ["mail.kotei.asia", "maila.kotei.asia"]},
"pages": {"custom_domain": "email.kotei.asia"},
}
}
config_path = tmp_path / "config.toml"
save_toml_document(config_path, document)
config = load_config(config_path, profile="account_b")
assert config.cloudflare.account_id == "acc-b"
assert config.cloudflare.zone_name == "kotei.asia"
assert config.mail.domains == ["mail.kotei.asia", "maila.kotei.asia"]
assert config.pages.custom_domain == "email.kotei.asia"
def test_load_config_rejects_missing_profile(tmp_path: Path) -> None:
config_path = tmp_path / "config.toml"
save_toml_document(config_path, default_config_document())
with pytest.raises(ConfigError, match="未找到 profile"):
load_config(config_path, profile="missing")
def test_set_dotted_value_rejects_scalar_parent() -> None:
document = default_config_document()
set_dotted_value(document, "cloudflare.account_id", "abc")
try:
set_dotted_value(document, "cloudflare.account_id.value", "x")
except Exception as exc: # pragma: no cover - narrow assertion below
assert "父节点不是表" in str(exc)
else: # pragma: no cover
raise AssertionError("expected parent table validation failure")
def test_save_state_roundtrip(tmp_path: Path) -> None:
state_path = tmp_path / ".deploy" / "state.toml"
state = DeploymentState()
state.mark_checkpoint("environment_checked")
state.worker.script_name = "worker-a"
state.worker.workers_dev_url = "https://worker-a.example.workers.dev"
save_state(state_path, state)
loaded_text = state_path.read_text(encoding="utf-8")
assert "environment_checked" in loaded_text
assert "worker-a" in loaded_text

1143
tests/test_deployment.py Normal file

File diff suppressed because it is too large Load Diff

16
tests/test_environment.py Normal file
View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import pytest
from cf_temp_email_deploy.environment import parse_semver
from cf_temp_email_deploy.errors import EnvironmentCheckError
def test_parse_semver_accepts_prefixed_versions() -> None:
assert parse_semver("v24.14.0") == (24, 14, 0)
def test_parse_semver_rejects_invalid_versions() -> None:
with pytest.raises(EnvironmentCheckError):
parse_semver("unknown")

102
tests/test_source.py Normal file
View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from cf_temp_email_deploy.models import DeploymentConfig, DeploymentState
from cf_temp_email_deploy.source import prepare_source
from cf_temp_email_deploy.subprocess_runner import CommandRunner
def run_git(args: tuple[str, ...], cwd: Path) -> str:
completed = subprocess.run(args, cwd=cwd, check=True, capture_output=True, text=True)
return completed.stdout.strip()
def create_git_repository(path: Path) -> str:
path.mkdir(parents=True, exist_ok=True)
run_git(("git", "init", "-b", "main"), path)
run_git(("git", "config", "user.name", "Codex"), path)
run_git(("git", "config", "user.email", "codex@example.com"), path)
(path / "README.md").write_text("demo\n", encoding="utf-8")
run_git(("git", "add", "README.md"), path)
run_git(("git", "commit", "-m", "initial"), path)
return run_git(("git", "rev-parse", "HEAD"), path)
def test_prepare_local_source_records_commit(tmp_path: Path) -> None:
repo_dir = tmp_path / "local-repo"
expected_sha = create_git_repository(repo_dir)
config = DeploymentConfig.model_validate(
{
"source": {
"mode": "local",
"local_path": str(repo_dir),
},
"cloudflare": {"zone_name": "example.com"},
"mail": {"domains": ["mail.example.com"], "verified_destination_address": "target@example.com"},
"pages": {"custom_domain": "mail.example.com"},
}
)
prepared = prepare_source(config, CommandRunner())
assert prepared.source_dir == repo_dir.resolve()
assert prepared.commit_sha == expected_sha
state = DeploymentState()
prepared.apply_to_state(state)
assert state.source.source_dir == str(repo_dir.resolve())
assert state.source.commit_sha == expected_sha
def test_prepare_clone_source_clones_and_checks_out_ref(tmp_path: Path) -> None:
origin_dir = tmp_path / "origin-repo"
expected_sha = create_git_repository(origin_dir)
workspace_dir = tmp_path / "workspace"
config = DeploymentConfig.model_validate(
{
"source": {
"mode": "clone",
"repo_url": str(origin_dir),
"repo_ref": "main",
"workspace_dir": str(workspace_dir),
},
"cloudflare": {"zone_name": "example.com"},
"mail": {"domains": ["mail.example.com"], "verified_destination_address": "target@example.com"},
"pages": {"custom_domain": "mail.example.com"},
}
)
prepared = prepare_source(config, CommandRunner())
assert prepared.source_dir == (workspace_dir / "source").resolve()
assert prepared.commit_sha == expected_sha
assert (prepared.source_dir / "README.md").read_text(encoding="utf-8") == "demo\n"
def test_prepare_clone_source_uses_latest_default_branch_when_repo_ref_empty(tmp_path: Path) -> None:
origin_dir = tmp_path / "origin-repo"
expected_sha = create_git_repository(origin_dir)
workspace_dir = tmp_path / "workspace"
config = DeploymentConfig.model_validate(
{
"source": {
"mode": "clone",
"repo_url": str(origin_dir),
"repo_ref": "",
"workspace_dir": str(workspace_dir),
},
"cloudflare": {"zone_name": "example.com"},
"mail": {"domains": ["example.com"], "verified_destination_address": "target@example.com"},
"pages": {"custom_domain": "email.example.com"},
}
)
prepared = prepare_source(config, CommandRunner())
assert prepared.source_dir == (workspace_dir / "source").resolve()
assert prepared.commit_sha == expected_sha

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
import pytest
from cf_temp_email_deploy.errors import CommandExecutionError
from cf_temp_email_deploy.subprocess_runner import CommandRunner, CommandSpec
def test_command_runner_captures_stdout() -> None:
runner = CommandRunner()
result = runner.run_checked(CommandSpec(args=("python3", "-c", "print('ok')")))
assert result.stdout.strip() == "ok"
def test_command_runner_raises_for_non_zero_exit() -> None:
runner = CommandRunner()
with pytest.raises(CommandExecutionError):
runner.run_checked(CommandSpec(args=("python3", "-c", "raise SystemExit(3)")))