diff --git a/README.md b/README.md index 6745089..9c9ab31 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,13 @@ -# DuckMail GUI (PySide6) +# Temp Mail GUI (PySide6) -一个基于 PySide6 的 DuckMail 邮箱创建与收信工具。 +一个基于 PySide6 的临时邮箱创建与收信工具。 + +当前版本已经适配 `cloudflare_temp_email` 后端,推荐配置: + +- API Base: `https://temp-email-api.example.com` +- Domain: `example.com` +- Backend Type: `cloudflare_temp_email` +- Bearer: 填 `cloudflare_temp_email` 的管理员密码 当前版本改为纯 PostgreSQL 账号源: @@ -9,6 +16,31 @@ - 导入账号并获取 Token 后直接写入 PostgreSQL - 界面采用分页布局,日常使用时只看账号页即可 +现在也支持: + +- PostgreSQL / MySQL 二选一 +- 设置页显示或隐藏密码 +- 数据库连接测试 +- 一键初始化数据库和表 +- 勾选后在数据库或表不存在时自动创建 +- 通过脚本把 PostgreSQL 里的某个域名邮箱补档到 D1 + +## 补档脚本 + +把 PostgreSQL 中某个域名的邮箱导出成 Cloudflare D1 可执行的 SQL: + +```bash +python sync_pg_to_d1.py \ + --pg-host 127.0.0.1 \ + --pg-port 5432 \ + --pg-db mail_accounts_db \ + --pg-user postgres \ + --pg-password 'your-password' \ + --table registered_accounts \ + --domain example.com \ + --output /tmp/kotei_asia_import.sql +``` + ## 安装依赖 ```bash @@ -40,7 +72,7 @@ python gui.py - 导入账号并写入数据库 - 创建邮箱并写入数据库 -## PostgreSQL 表结构 +## 数据表结构 默认读取并写入表:`registered_accounts` @@ -60,9 +92,9 @@ python gui.py 程序会将界面配置保存到 `config_gui.json`,包括: -- DuckMail API Base +- Cloudflare Temp Mail API Base - Domain -- Bearer +- Admin Password - Proxy - 自动刷新开关与间隔 - PostgreSQL Host / Port / DB / User / Password @@ -74,3 +106,4 @@ python gui.py - `PySide6` - `requests` - `psycopg[binary]` +- `PyMySQL` diff --git a/account_store.py b/account_store.py index fff8ffc..ee0b020 100644 --- a/account_store.py +++ b/account_store.py @@ -12,9 +12,16 @@ try: except ImportError: psycopg2 = None +try: + import pymysql +except ImportError: + pymysql = None + _LOCK = threading.Lock() +DEFAULT_TABLE_NAME = "registered_accounts" + def _safe_read_json(path): if not os.path.exists(path): @@ -56,48 +63,291 @@ def _normalize_optional_text(value): return text or None -def _get_pg_driver(): - if psycopg is not None: - return "psycopg" - if psycopg2 is not None: - return "psycopg2" - raise RuntimeError("未安装 PostgreSQL 驱动,请先安装 psycopg[binary] 或 psycopg2-binary") +def _normalize_config(config): + data = dict(config or {}) + db_type = str(data.get("db_type") or ("postgresql" if data.get("pg_enabled", True) else "")).strip().lower() + return { + "db_enabled": bool(data.get("db_enabled", data.get("pg_enabled", True))), + "db_type": db_type or "postgresql", + "db_host": str(data.get("db_host", data.get("pg_host", ""))).strip(), + "db_port": int(data.get("db_port", data.get("pg_port", 5432 if db_type != "mysql" else 3306)) or (3306 if db_type == "mysql" else 5432)), + "db_name": str(data.get("db_name", data.get("pg_db", "mail_accounts_db"))).strip(), + "db_user": str(data.get("db_user", data.get("pg_user", ""))).strip(), + "db_password": str(data.get("db_password", data.get("pg_password", ""))).strip(), + "db_table": str(data.get("db_table", DEFAULT_TABLE_NAME)).strip() or DEFAULT_TABLE_NAME, + "db_auto_create": bool(data.get("db_auto_create", False)), + "db_connect_timeout": int(data.get("db_connect_timeout", data.get("pg_connect_timeout", 10)) or 10), + } -def _get_pg_connection(config): - driver = _get_pg_driver() - host = str(config.get("pg_host", "")).strip() - port = int(config.get("pg_port", 5432) or 5432) - dbname = str(config.get("pg_db", "")).strip() - user = str(config.get("pg_user", "")).strip() - password = str(config.get("pg_password", "")).strip() - connect_timeout = int(config.get("pg_connect_timeout", 10) or 10) +def _validate_identifier(value, label): + text = str(value or "").strip() + if not text: + raise ValueError(f"{label} 不能为空") + if not text.replace("_", "").isalnum(): + raise ValueError(f"{label} 只能包含字母、数字和下划线") + return text - if not host or not dbname or not user: - raise ValueError("PostgreSQL 配置不完整,请填写 Host、DB、User") + +def _get_db_driver(db_type): + if db_type == "postgresql": + if psycopg is not None: + return "psycopg" + if psycopg2 is not None: + return "psycopg2" + raise RuntimeError("未安装 PostgreSQL 驱动,请安装 psycopg[binary] 或 psycopg2-binary") + if db_type == "mysql": + if pymysql is not None: + return "pymysql" + raise RuntimeError("未安装 MySQL 驱动,请安装 PyMySQL") + raise ValueError("仅支持 PostgreSQL 或 MySQL") + + +def _connection_kwargs(db_config, include_database=True): + host = db_config["db_host"] + port = int(db_config["db_port"]) + dbname = db_config["db_name"] + user = db_config["db_user"] + password = db_config["db_password"] + connect_timeout = int(db_config["db_connect_timeout"]) + + if not host or not user: + raise ValueError("数据库配置不完整,请填写 Host 和 User") + if include_database and not dbname: + raise ValueError("数据库名不能为空") + + if db_config["db_type"] == "postgresql": + kwargs = { + "host": host, + "port": port, + "user": user, + "password": password, + "connect_timeout": connect_timeout, + } + if include_database: + kwargs["dbname"] = dbname + return kwargs kwargs = { "host": host, "port": port, - "dbname": dbname, "user": user, "password": password, "connect_timeout": connect_timeout, + "charset": "utf8mb4", + "autocommit": False, + } + if include_database: + kwargs["database"] = dbname + return kwargs + + +def _connect(db_config, include_database=True): + db_type = db_config["db_type"] + driver = _get_db_driver(db_type) + kwargs = _connection_kwargs(db_config, include_database=include_database) + + if db_type == "postgresql": + if driver == "psycopg": + conn = psycopg.connect(**kwargs) + conn.autocommit = False + return conn + conn = psycopg2.connect(**kwargs) + conn.autocommit = False + return conn + + conn = pymysql.connect(**kwargs) + conn.autocommit(False) + return conn + + +def _connect_admin(db_config): + if db_config["db_type"] == "postgresql": + admin_config = dict(db_config) + admin_config["db_name"] = "postgres" + return _connect(admin_config, include_database=True) + return _connect(db_config, include_database=False) + + +def _can_connect_to_configured_database(db_config): + try: + with _connect(db_config) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + return True + except Exception: + return False + + +def _try_database_exists_via_configured_db(db_config): + if not _can_connect_to_configured_database(db_config): + return False + return True + + +def _has_admin_database_access(db_config): + try: + with _connect_admin(db_config) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + return True + except Exception: + return False + + +def _database_exists(db_config): + db_name = _validate_identifier(db_config["db_name"], "数据库名") + if _try_database_exists_via_configured_db(db_config): + return True + with _connect_admin(db_config) as conn: + with conn.cursor() as cur: + if db_config["db_type"] == "postgresql": + cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db_name,)) + else: + cur.execute("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = %s", (db_name,)) + return cur.fetchone() is not None + + +def _table_exists(db_config): + table_name = _validate_identifier(db_config["db_table"], "表名") + with _connect(db_config) as conn: + with conn.cursor() as cur: + if db_config["db_type"] == "postgresql": + cur.execute( + "SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = %s", + (table_name,), + ) + else: + cur.execute( + "SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s", + (db_config["db_name"], table_name), + ) + return cur.fetchone() is not None + + +def _create_database(db_config): + db_name = _validate_identifier(db_config["db_name"], "数据库名") + with _connect_admin(db_config) as conn: + if db_config["db_type"] == "postgresql": + conn.autocommit = True + with conn.cursor() as cur: + if db_config["db_type"] == "postgresql": + cur.execute(f'CREATE DATABASE "{db_name}"') + else: + cur.execute(f"CREATE DATABASE `{db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci") + if db_config["db_type"] != "postgresql": + conn.commit() + + +def _create_table(db_config): + table_name = _validate_identifier(db_config["db_table"], "表名") + if db_config["db_type"] == "postgresql": + sql = f""" + CREATE TABLE IF NOT EXISTS "{table_name}" ( + email TEXT PRIMARY KEY, + mail_password TEXT, + mail_token TEXT, + chatgpt_password TEXT, + name TEXT, + birthdate TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + else: + sql = f""" + CREATE TABLE IF NOT EXISTS `{table_name}` ( + email VARCHAR(255) PRIMARY KEY, + mail_password TEXT NULL, + mail_token TEXT NULL, + chatgpt_password TEXT NULL, + name VARCHAR(255) NULL, + birthdate VARCHAR(64) NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci + """ + with _connect(db_config) as conn: + with conn.cursor() as cur: + cur.execute(sql) + conn.commit() + + +def test_connection(config): + db_config = _normalize_config(config) + can_connect_directly = _can_connect_to_configured_database(db_config) + admin_access = _has_admin_database_access(db_config) + database_exists = can_connect_directly or (admin_access and _database_exists(db_config)) + if database_exists: + if can_connect_directly: + with _connect(db_config) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + elif admin_access: + with _connect_admin(db_config) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + elif admin_access: + with _connect_admin(db_config) as conn: + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.fetchone() + else: + raise RuntimeError("无法连接到配置的数据库,且当前账号也没有数据库管理权限") + return { + "success": True, + "db_type": db_config["db_type"], + "database_exists": database_exists, + "table_exists": can_connect_directly and _table_exists(db_config), + "admin_access": admin_access, } - if driver == "psycopg": - return psycopg.connect(**kwargs) - return psycopg2.connect(**kwargs) + +def ensure_database_and_table(config): + db_config = _normalize_config(config) + can_connect_directly = _can_connect_to_configured_database(db_config) + admin_access = _has_admin_database_access(db_config) + database_exists = can_connect_directly or (admin_access and _database_exists(db_config)) + if not database_exists: + if not admin_access: + raise RuntimeError("当前账号无法创建数据库,请先手动创建数据库,或改用具备管理权限的账号") + _create_database(db_config) + database_exists = True + table_exists = _table_exists(db_config) if database_exists else False + if not table_exists: + _create_table(db_config) + table_exists = True + return { + "success": True, + "db_type": db_config["db_type"], + "database_exists": database_exists, + "table_exists": table_exists, + "admin_access": admin_access, + } + + +def _ensure_ready_if_needed(config): + db_config = _normalize_config(config) + if db_config["db_auto_create"]: + ensure_database_and_table(db_config) + return db_config def load_accounts(config): + db_config = _ensure_ready_if_needed(config) + table_name = _validate_identifier(db_config["db_table"], "表名") query = ( - "select email, mail_password, mail_token, chatgpt_password, name, birthdate, created_at " - "from registered_accounts order by created_at desc nulls last, email asc" + f"select email, mail_password, mail_token, chatgpt_password, name, birthdate, created_at " + f'from "{table_name}" order by created_at desc, email asc' + if db_config["db_type"] == "postgresql" + else f"select email, mail_password, mail_token, chatgpt_password, name, birthdate, created_at " + f"from `{table_name}` order by created_at desc, email asc" ) accounts = [] - with _get_pg_connection(config) as conn: + with _connect(db_config) as conn: with conn.cursor() as cur: cur.execute(query) rows = cur.fetchall() @@ -115,7 +365,7 @@ def load_accounts(config): "name": _to_text(name).strip(), "birthdate": _to_text(birthdate).strip(), "created_at": _to_text(created_at).strip(), - "source": "postgres", + "source": db_config["db_type"], } ) return accounts @@ -130,17 +380,8 @@ def save_account( name=None, birthdate=None, ): - query = ( - "insert into registered_accounts " - "(email, mail_password, mail_token, chatgpt_password, name, birthdate) " - "values (%s, %s, %s, %s, %s, %s) " - "on conflict (email) do update set " - "mail_password = excluded.mail_password, " - "mail_token = excluded.mail_token, " - "chatgpt_password = excluded.chatgpt_password, " - "name = excluded.name, " - "birthdate = excluded.birthdate" - ) + db_config = _ensure_ready_if_needed(config) + table_name = _validate_identifier(db_config["db_table"], "表名") params = ( email, mail_password, @@ -150,8 +391,33 @@ def save_account( _normalize_optional_text(birthdate), ) + if db_config["db_type"] == "postgresql": + query = ( + f'insert into "{table_name}" ' + "(email, mail_password, mail_token, chatgpt_password, name, birthdate) " + "values (%s, %s, %s, %s, %s, %s) " + "on conflict (email) do update set " + "mail_password = excluded.mail_password, " + "mail_token = excluded.mail_token, " + "chatgpt_password = excluded.chatgpt_password, " + "name = excluded.name, " + "birthdate = excluded.birthdate" + ) + else: + query = ( + f"insert into `{table_name}` " + "(email, mail_password, mail_token, chatgpt_password, name, birthdate) " + "values (%s, %s, %s, %s, %s, %s) " + "on duplicate key update " + "mail_password = values(mail_password), " + "mail_token = values(mail_token), " + "chatgpt_password = values(chatgpt_password), " + "name = values(name), " + "birthdate = values(birthdate)" + ) + with _LOCK: - with _get_pg_connection(config) as conn: + with _connect(db_config) as conn: with conn.cursor() as cur: cur.execute(query, params) conn.commit() diff --git a/config_gui.example.jsonc b/config_gui.example.jsonc new file mode 100644 index 0000000..c27ee59 --- /dev/null +++ b/config_gui.example.jsonc @@ -0,0 +1,52 @@ +{ + // Cloudflare Temp Mail 后端 API 地址 + "api_base": "https://temp-email-api.example.com", + + // 默认创建邮箱使用的域名 + "domain": "example.com", + + // 当前仅支持 cloudflare_temp_email + "backend_type": "cloudflare_temp_email", + + // cloudflare_temp_email 的管理员密码 + "bearer": "replace-with-your-admin-password", + + // 可选代理,例如 http://127.0.0.1:7890 + "proxy": "", + + // 是否在界面中显示密码明文 + "show_passwords": false, + + // 是否自动刷新邮件列表 + "auto_refresh": false, + + // 自动刷新间隔,单位秒 + "refresh_interval": 30, + + // 是否启用数据库 + "db_enabled": true, + + // 数据库类型:postgresql 或 mysql + "db_type": "postgresql", + + // 数据库主机 + "db_host": "127.0.0.1", + + // 数据库端口 + "db_port": 5432, + + // 数据库名称 + "db_name": "mail_accounts_db", + + // 存放邮箱账号的表名 + "db_table": "registered_accounts", + + // 数据库用户名 + "db_user": "postgres", + + // 数据库密码 + "db_password": "replace-with-your-db-password", + + // 勾选后,数据库或表不存在时允许自动创建 + "db_auto_create": false +} diff --git a/duckmail_client.py b/duckmail_client.py index 933c40a..5ed615f 100644 --- a/duckmail_client.py +++ b/duckmail_client.py @@ -1,6 +1,7 @@ +import email import random import string -import time +from email import policy import requests from requests.adapters import HTTPAdapter @@ -14,32 +15,74 @@ def _generate_email_local_part(): return letters1 + numbers + letters2 -def _generate_password(length=14): - lower = string.ascii_lowercase - upper = string.ascii_uppercase - digits = string.digits - special = "!@#$%&*" - pwd = [ - random.choice(lower), - random.choice(upper), - random.choice(digits), - random.choice(special), - ] - all_chars = lower + upper + digits + special - pwd += [random.choice(all_chars) for _ in range(length - 4)] - random.shuffle(pwd) - return "".join(pwd) +def _extract_mail_bodies(raw_text): + if not raw_text: + return "", "" + try: + msg = email.message_from_string(raw_text, policy=policy.default) + except Exception: + return "", "" + + text_parts = [] + html_parts = [] + + for part in msg.walk(): + if part.get_content_maintype() == "multipart": + continue + if part.get_filename(): + continue + content_type = part.get_content_type() + try: + content = part.get_content() + except Exception: + payload = part.get_payload(decode=True) or b"" + charset = part.get_content_charset() or "utf-8" + content = payload.decode(charset, errors="replace") + if not isinstance(content, str): + content = str(content or "") + if content_type == "text/plain": + text_parts.append(content) + elif content_type == "text/html": + html_parts.append(content) + + return "\n\n".join(text_parts).strip(), "\n\n".join(html_parts).strip() + + +def _normalize_cloudflare_mail(item): + normalized = dict(item or {}) + raw_text = str(normalized.get("raw") or "") + text_body, html_body = _extract_mail_bodies(raw_text) + if raw_text: + try: + msg = email.message_from_string(raw_text, policy=policy.default) + normalized["subject"] = str(msg.get("subject") or normalized.get("subject") or "").strip() + normalized["from"] = str(msg.get("from") or normalized.get("source") or "").strip() + except Exception: + normalized["subject"] = str(normalized.get("subject") or "").strip() + normalized["from"] = str(normalized.get("source") or "").strip() + else: + normalized["subject"] = str(normalized.get("subject") or "").strip() + normalized["from"] = str(normalized.get("source") or "").strip() + + normalized["createdAt"] = str( + normalized.get("createdAt") or normalized.get("created_at") or "" + ).strip() + normalized["text"] = text_body + normalized["html"] = html_body + normalized["intro"] = text_body[:200] if text_body else "" + return normalized class DuckMailClient: - def __init__(self, api_base, domain, bearer, proxy=None, timeout=120): + def __init__(self, api_base, domain, bearer, proxy=None, timeout=120, backend_type="cloudflare_temp_email"): self.api_base = (api_base or "").rstrip("/") - self.domain = domain or "" + self.domain = (domain or "").strip() self.bearer = (bearer or "").strip() if self.bearer.lower().startswith("bearer "): self.bearer = self.bearer[7:].strip() self.proxy = proxy or "" self.timeout = timeout + self.backend_type = (backend_type or "cloudflare_temp_email").strip().lower() self.session = self._create_session() def _create_session(self): @@ -64,15 +107,10 @@ class DuckMailClient: session.proxies = {"http": self.proxy, "https": self.proxy} return session - def _auth_headers(self): - if not self.bearer: - return {} - return { - "Authorization": f"Bearer {self.bearer}", - "X-Api-Key": self.bearer, - } + def _is_cloudflare_temp_mail(self): + return self.backend_type == "cloudflare_temp_email" - def _duckmail_api_bases(self): + def _api_bases(self): base = self.api_base if not base: return [] @@ -87,200 +125,163 @@ class DuckMailClient: unique.append(item) return unique + def _auth_headers(self): + if not self.bearer: + return {} + return { + "Authorization": f"Bearer {self.bearer}", + "X-Api-Key": self.bearer, + } + + def _admin_headers(self): + if not self.bearer: + return {} + return {"x-admin-auth": self.bearer} + + def _request(self, method, path, *, headers=None, **kwargs): + last_response = None + for idx, api_base in enumerate(self._api_bases()): + response = self.session.request( + method, + f"{api_base}{path}", + headers=headers, + timeout=self.timeout, + **kwargs, + ) + last_response = response + if response.status_code == 404 and idx < len(self._api_bases()) - 1: + continue + return response + return last_response + def create_email(self): if not self.bearer: - raise ValueError("DUCKMAIL_BEARER 未设置") + raise ValueError("管理员密码未设置") if not self.domain: - raise ValueError("DUCKMAIL_DOMAIN 未设置") - api_bases = self._duckmail_api_bases() - if not api_bases: - raise ValueError("DUCKMAIL_API_BASE 未设置") + raise ValueError("邮箱域名未设置") + if not self._api_bases(): + raise ValueError("API Base 未设置") - is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base - max_retries = 5 - - for attempt in range(max_retries): - email_local_part = _generate_email_local_part() - email = f"{email_local_part}@{self.domain}" - password = _generate_password() - - if is_worker: - payload = { - "admin_password": self.bearer, - "name": email_local_part, - "domain": self.domain, - } - res = None - for idx, api_base in enumerate(api_bases): - res = self.session.post( - f"{api_base}/new_address", json=payload, timeout=self.timeout - ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break + if self._is_cloudflare_temp_mail(): + for _ in range(5): + email_local_part = _generate_email_local_part() + res = self._request( + "POST", + "/admin/new_address", + headers=self._admin_headers(), + json={ + "name": email_local_part, + "domain": self.domain, + "enablePrefix": True, + }, + ) if res.status_code == 200: result = res.json() - if result.get("jwt"): - return result.get("address"), password, result.get("jwt") - raise RuntimeError(f"Worker获取邮件 Token 失败: {res.text}") - if res.status_code == 403: - raise RuntimeError( - f"Worker创建邮箱返回 403,可能是 Bearer 无效或权限不足: {res.text[:200]}" - ) - if res.status_code == 400 and "already exists" in res.text: + jwt = result.get("jwt") + address = result.get("address") + if jwt and address: + return address, "", jwt + raise RuntimeError(f"创建邮箱成功但未返回 JWT: {res.text[:200]}") + if res.status_code == 400 and "exists" in res.text.lower(): continue - raise RuntimeError( - f"Worker创建邮箱失败: {res.status_code} - {res.text[:200]}" - ) + if res.status_code in (401, 403): + raise RuntimeError(f"管理员密码无效或权限不足: {res.text[:200]}") + raise RuntimeError(f"创建邮箱失败: {res.status_code} - {res.text[:200]}") + raise RuntimeError("创建邮箱失败: 超过最大重试次数") - headers = {"Authorization": f"Bearer {self.bearer}"} - headers = self._auth_headers() - res = None - used_api_base = None - for idx, api_base in enumerate(api_bases): - used_api_base = api_base - res = self.session.post( - f"{api_base}/accounts", - json={"address": email, "password": password}, - headers=headers, - timeout=self.timeout, - ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break - if res.status_code in (200, 201): - result = res.json() - if result.get("address"): - time.sleep(0.5) - token_res = self.session.post( - f"{used_api_base}/token", - json={"address": email, "password": password}, - headers=headers, - timeout=self.timeout, - ) - if token_res.status_code == 200: - mail_token = token_res.json().get("token") - if mail_token: - return email, password, mail_token - if token_res.status_code == 403: - raise RuntimeError( - "获取邮件 Token 返回 403,可能是 Bearer 无效或权限不足" - ) - raise RuntimeError(f"获取邮件 Token 失败: {token_res.status_code}") - if res.status_code == 403: - raise RuntimeError( - f"创建邮箱返回 403,可能是 Bearer 无效或权限不足: {res.text[:200]}" - ) - if res.status_code == 422 and "already exists" in res.text: - continue - raise RuntimeError( - f"创建邮箱失败: {res.status_code} - {res.text[:200]}" - ) - - raise RuntimeError("DuckMail 创建邮箱失败: 超过最大重试次数") + raise RuntimeError("当前版本只适配 cloudflare_temp_email 后端") def fetch_emails(self, mail_token): if not mail_token: return [] - api_bases = self._duckmail_api_bases() - if not api_bases: + if not self._api_bases(): return [] - is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base - if is_worker: - res = None - for idx, api_base in enumerate(api_bases): - res = self.session.get( - f"{api_base}/mails", - params={"limit": 20, "offset": 0}, - headers={"Authorization": f"Bearer {mail_token}"}, - timeout=self.timeout, - ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break + if self._is_cloudflare_temp_mail(): + res = self._request( + "GET", + "/api/mails", + headers={"Authorization": f"Bearer {mail_token}"}, + params={"limit": 20, "offset": 0}, + ) if res.status_code == 200: data = res.json() - if isinstance(data, dict) and "results" in data: - return data.get("results", []) - if isinstance(data, list): - return data - return [] + items = data.get("results", []) if isinstance(data, dict) else data + return [_normalize_cloudflare_mail(item) for item in (items or [])] + if res.status_code == 401: + raise RuntimeError("邮箱 JWT 已失效,请重新补全 Token") + raise RuntimeError(f"获取邮件失败: {res.status_code} - {res.text[:200]}") - res = None - for idx, api_base in enumerate(api_bases): - res = self.session.get( - f"{api_base}/messages", - params={"page": 1}, - headers={"Authorization": f"Bearer {mail_token}"}, - timeout=self.timeout, - ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break - if res.status_code == 200: - data = res.json() - if isinstance(data, dict) and "hydra:member" in data: - return data.get("hydra:member", []) - if isinstance(data, list): - return data - return [] + raise RuntimeError("当前版本只适配 cloudflare_temp_email 后端") - def get_token(self, email, mail_password): - if not email or not mail_password: - raise ValueError("邮箱或邮箱密码为空") - api_bases = self._duckmail_api_bases() - if not api_bases: - raise ValueError("DUCKMAIL_API_BASE 未设置") - headers = self._auth_headers() - if not headers: - raise ValueError("DUCKMAIL_BEARER 未设置") + def get_token(self, email_address, mail_password): + if not email_address: + raise ValueError("邮箱为空") + if not self._api_bases(): + raise ValueError("API Base 未设置") + if not self.bearer: + raise ValueError("管理员密码未设置") - res = None - used_api_base = None - for idx, api_base in enumerate(api_bases): - used_api_base = api_base - res = self.session.post( - f"{api_base}/token", - json={"address": email, "password": mail_password}, - headers=headers, - timeout=self.timeout, + if self._is_cloudflare_temp_mail(): + res = self._request( + "GET", + "/admin/address", + headers=self._admin_headers(), + params={"query": email_address, "limit": 20, "offset": 0}, ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break - if res.status_code == 200: - data = res.json() - token = data.get("token") - if token: - return token - if res.status_code == 403: - raise RuntimeError( - f"获取邮件 Token 返回 403,可能是 API Key 无效或无权限: {res.text[:200]}" + if res.status_code in (401, 403): + raise RuntimeError(f"管理员密码无效或权限不足: {res.text[:200]}") + if res.status_code != 200: + raise RuntimeError(f"查询邮箱失败: {res.status_code} - {res.text[:200]}") + + data = res.json() if res.content else {} + results = data.get("results", []) if isinstance(data, dict) else [] + matched = None + for item in results: + if str(item.get("name") or "").strip().lower() == email_address.strip().lower(): + matched = item + break + if not matched: + raise RuntimeError( + f"未找到对应邮箱: {email_address}。请确认这个地址已经由当前这套 cloudflare_temp_email 创建," + "并且该域名已经加入当前 Worker 的 DOMAINS 配置。旧系统里的同名地址不会自动出现在新系统里。" + ) + + address_id = matched.get("id") + if not address_id: + raise RuntimeError("已找到邮箱,但缺少 address_id,无法补全 JWT") + + show_password_res = self._request( + "GET", + f"/admin/show_password/{address_id}", + headers=self._admin_headers(), ) - raise RuntimeError( - f"获取邮件 Token 失败: {res.status_code} - {res.text[:200]}" - ) + if show_password_res.status_code != 200: + raise RuntimeError( + f"获取邮箱 JWT 失败: {show_password_res.status_code} - {show_password_res.text[:200]}" + ) + token_data = show_password_res.json() + jwt = token_data.get("jwt") + if not jwt: + raise RuntimeError("管理员接口未返回邮箱 JWT") + return jwt + + raise RuntimeError("当前版本只适配 cloudflare_temp_email 后端") def fetch_message_detail(self, mail_token, msg_id): if not mail_token or not msg_id: return None - if isinstance(msg_id, str) and msg_id.startswith("/messages/"): - msg_id = msg_id.split("/")[-1] - msg_id = str(msg_id).strip() - api_bases = self._duckmail_api_bases() - if not api_bases: + if not self._api_bases(): return None - res = None - for idx, api_base in enumerate(api_bases): - res = self.session.get( - f"{api_base}/messages/{msg_id}", + + if self._is_cloudflare_temp_mail(): + res = self._request( + "GET", + f"/api/mail/{msg_id}", headers={"Authorization": f"Bearer {mail_token}"}, - timeout=self.timeout, ) - if res.status_code == 404 and idx < len(api_bases) - 1: - continue - break - if res.status_code == 200: - return res.json() - return None + if res.status_code == 200: + return _normalize_cloudflare_mail(res.json()) + return None + + raise RuntimeError("当前版本只适配 cloudflare_temp_email 后端") diff --git a/gui.py b/gui.py index 37aec96..c34dabe 100644 --- a/gui.py +++ b/gui.py @@ -1,3 +1,4 @@ +from pathlib import Path import sys import time @@ -6,6 +7,7 @@ from PySide6.QtGui import QFont from PySide6.QtWidgets import ( QApplication, QCheckBox, + QComboBox, QFormLayout, QFrame, QGroupBox, @@ -26,10 +28,18 @@ from PySide6.QtWidgets import ( ) from duckmail_client import DuckMailClient -from account_store import load_accounts, save_account, load_config, save_config +from account_store import ( + ensure_database_and_table, + load_accounts, + save_account, + load_config, + save_config, + test_connection, +) - -CONFIG_PATH = "config_gui.json" +BASE_DIR = Path(__file__).resolve().parent +CONFIG_PATH = BASE_DIR / "config_gui.json" +LEGACY_CONFIG_PATH = BASE_DIR.parent / "config_gui.json" def _format_sender(sender_value): @@ -44,10 +54,15 @@ def _email_text_from_detail(detail): text = detail.get("text") or "" if text: return str(text) + message = detail.get("message") or "" + if message: + return str(message) html_val = detail.get("html") if isinstance(html_val, list): return "\n".join(str(item) for item in html_val) - return str(html_val or "") + if html_val: + return str(html_val) + return str(detail.get("raw") or "") def _generate_password(length=14): @@ -132,13 +147,14 @@ def _random_birthdate(): class MainWindow(QMainWindow): def __init__(self): super().__init__() - self.setWindowTitle("DuckMail GUI") + self.setWindowTitle("Temp Mail GUI") self.resize(1280, 780) self.current_mail_token = "" self.current_emails = [] self.all_accounts = [] + self._migrate_legacy_config_if_needed() self._build_ui() self._setup_timer() self._load_config() @@ -166,6 +182,24 @@ class MainWindow(QMainWindow): self.import_btn.clicked.connect(self._on_import_account) self.reload_accounts_btn.clicked.connect(self._on_reload_accounts) self.search_input.textChanged.connect(self._apply_account_filter) + self.show_passwords_check.toggled.connect(self._on_toggle_password_visibility) + self.backend_type_input.currentTextChanged.connect(self._on_backend_type_changed) + self.db_type_input.currentTextChanged.connect(self._on_db_type_changed) + self.test_db_btn.clicked.connect(self._on_test_db_connection) + self.init_db_btn.clicked.connect(self._on_init_db) + + def _migrate_legacy_config_if_needed(self): + if CONFIG_PATH.exists(): + return + if not LEGACY_CONFIG_PATH.exists(): + return + try: + CONFIG_PATH.write_text( + LEGACY_CONFIG_PATH.read_text(encoding="utf-8"), + encoding="utf-8", + ) + except Exception: + pass def _build_accounts_page(self): page = QWidget() @@ -238,17 +272,22 @@ class MainWindow(QMainWindow): page = QWidget() layout = QVBoxLayout(page) - config_group = QGroupBox("DuckMail 配置") + config_group = QGroupBox("Cloudflare Temp Mail 配置") config_layout = QFormLayout(config_group) self.api_base_input = QLineEdit() self.domain_input = QLineEdit() + self.backend_type_input = QComboBox() + self.backend_type_input.addItems(["cloudflare_temp_email"]) self.bearer_input = QLineEdit() self.bearer_input.setEchoMode(QLineEdit.Password) self.proxy_input = QLineEdit() + self.show_passwords_check = QCheckBox("显示密码") config_layout.addRow("API Base", self.api_base_input) config_layout.addRow("Domain", self.domain_input) - config_layout.addRow("Bearer", self.bearer_input) + config_layout.addRow("Backend Type", self.backend_type_input) + config_layout.addRow("Admin Password", self.bearer_input) config_layout.addRow("Proxy", self.proxy_input) + config_layout.addRow("密码显示", self.show_passwords_check) auto_layout = QHBoxLayout() self.auto_refresh_check = QCheckBox("自动刷新") @@ -262,23 +301,36 @@ class MainWindow(QMainWindow): auto_layout.addStretch(1) config_layout.addRow("刷新", auto_layout) - pg_group = QGroupBox("PostgreSQL 配置") + pg_group = QGroupBox("数据库配置") pg_layout = QFormLayout(pg_group) - self.pg_enabled_check = QCheckBox("启用 PostgreSQL") + self.pg_enabled_check = QCheckBox("启用数据库") + self.db_type_input = QComboBox() + self.db_type_input.addItems(["postgresql", "mysql"]) self.pg_host_input = QLineEdit() self.pg_port_input = QSpinBox() self.pg_port_input.setRange(1, 65535) self.pg_port_input.setValue(5432) self.pg_db_input = QLineEdit() + self.db_table_input = QLineEdit() self.pg_user_input = QLineEdit() self.pg_password_input = QLineEdit() self.pg_password_input.setEchoMode(QLineEdit.Password) + self.db_auto_create_check = QCheckBox("数据库或表不存在时允许创建") + self.test_db_btn = QPushButton("测试连接") + self.init_db_btn = QPushButton("初始化库表") + db_actions = QHBoxLayout() + db_actions.addWidget(self.test_db_btn) + db_actions.addWidget(self.init_db_btn) pg_layout.addRow("开关", self.pg_enabled_check) + pg_layout.addRow("类型", self.db_type_input) pg_layout.addRow("Host", self.pg_host_input) pg_layout.addRow("Port", self.pg_port_input) pg_layout.addRow("DB", self.pg_db_input) + pg_layout.addRow("Table", self.db_table_input) pg_layout.addRow("User", self.pg_user_input) pg_layout.addRow("Password", self.pg_password_input) + pg_layout.addRow("自动创建", self.db_auto_create_check) + pg_layout.addRow("操作", db_actions) import_group = QGroupBox("账号写入数据库") import_layout = QFormLayout(import_group) @@ -288,14 +340,14 @@ class MainWindow(QMainWindow): self.import_chatgpt_password_input = QLineEdit() self.import_name_input = QLineEdit() self.import_birthdate_input = QLineEdit() - self.import_btn = QPushButton("导入并获取 Token") + self.import_btn = QPushButton("导入账号并补 Token") self.create_email_btn = QPushButton("创建邮箱并写库") import_actions = QHBoxLayout() import_actions.addWidget(self.import_btn) import_actions.addWidget(self.create_email_btn) self.save_config_btn = QPushButton("保存设置") import_layout.addRow("邮箱", self.import_email_input) - import_layout.addRow("邮箱密码", self.import_mail_password_input) + import_layout.addRow("邮箱密码(可选)", self.import_mail_password_input) import_layout.addRow("ChatGPT密码(可选)", self.import_chatgpt_password_input) import_layout.addRow("姓名(可选)", self.import_name_input) import_layout.addRow("生日(可选)", self.import_birthdate_input) @@ -321,10 +373,21 @@ class MainWindow(QMainWindow): return { "api_base": self.api_base_input.text().strip(), "domain": self.domain_input.text().strip(), + "backend_type": self.backend_type_input.currentText(), "bearer": self.bearer_input.text().strip(), "proxy": self.proxy_input.text().strip(), + "show_passwords": self.show_passwords_check.isChecked(), "auto_refresh": self.auto_refresh_check.isChecked(), "refresh_interval": self.refresh_interval.value(), + "db_enabled": self.pg_enabled_check.isChecked(), + "db_type": self.db_type_input.currentText(), + "db_host": self.pg_host_input.text().strip(), + "db_port": self.pg_port_input.value(), + "db_name": self.pg_db_input.text().strip(), + "db_table": self.db_table_input.text().strip(), + "db_user": self.pg_user_input.text().strip(), + "db_password": self.pg_password_input.text().strip(), + "db_auto_create": self.db_auto_create_check.isChecked(), "pg_enabled": self.pg_enabled_check.isChecked(), "pg_host": self.pg_host_input.text().strip(), "pg_port": self.pg_port_input.value(), @@ -340,28 +403,102 @@ class MainWindow(QMainWindow): config.get("domain"), config.get("bearer"), proxy=config.get("proxy"), + backend_type=config.get("backend_type"), ) def _load_config(self): data = load_config(CONFIG_PATH) - self.api_base_input.setText(data.get("api_base", "")) - self.domain_input.setText(data.get("domain", "")) + self.api_base_input.setText(data.get("api_base", "https://temp-email-api.example.com")) + self.domain_input.setText(data.get("domain", "example.com")) + backend_type = str(data.get("backend_type", "cloudflare_temp_email")).strip().lower() or "cloudflare_temp_email" + backend_index = self.backend_type_input.findText(backend_type) + if backend_index >= 0: + self.backend_type_input.setCurrentIndex(backend_index) self.bearer_input.setText(data.get("bearer", "")) self.proxy_input.setText(data.get("proxy", "")) + self.show_passwords_check.setChecked(bool(data.get("show_passwords", False))) self.auto_refresh_check.setChecked(bool(data.get("auto_refresh", False))) self.refresh_interval.setValue(max(10, min(300, int(data.get("refresh_interval", 30) or 30)))) - self.pg_enabled_check.setChecked(bool(data.get("pg_enabled", True))) - self.pg_host_input.setText(data.get("pg_host", "")) - self.pg_port_input.setValue(int(data.get("pg_port", 5432) or 5432)) - self.pg_db_input.setText(data.get("pg_db", "mail_accounts_db")) - self.pg_user_input.setText(data.get("pg_user", "")) - self.pg_password_input.setText(data.get("pg_password", "")) + self.pg_enabled_check.setChecked(bool(data.get("db_enabled", data.get("pg_enabled", True)))) + db_type = str(data.get("db_type", "postgresql")).strip().lower() or "postgresql" + index = self.db_type_input.findText(db_type) + if index >= 0: + self.db_type_input.setCurrentIndex(index) + self.pg_host_input.setText(data.get("db_host", data.get("pg_host", ""))) + self.pg_port_input.setValue(int(data.get("db_port", data.get("pg_port", 5432 if db_type != "mysql" else 3306)) or (3306 if db_type == "mysql" else 5432))) + self.pg_db_input.setText(data.get("db_name", data.get("pg_db", "mail_accounts_db"))) + self.db_table_input.setText(data.get("db_table", "registered_accounts")) + self.pg_user_input.setText(data.get("db_user", data.get("pg_user", ""))) + self.pg_password_input.setText(data.get("db_password", data.get("pg_password", ""))) + self.db_auto_create_check.setChecked(bool(data.get("db_auto_create", False))) + self._on_toggle_password_visibility(self.show_passwords_check.isChecked()) def _save_config(self): data = self._get_config() save_config(CONFIG_PATH, data) return data + def _mask_secret(self, value, placeholder="(空)"): + text = str(value or "") + if not text: + return placeholder + if self.show_passwords_check.isChecked(): + return text + if len(text) <= 4: + return "*" * len(text) + return f"{text[:2]}{'*' * max(len(text) - 4, 2)}{text[-2:]}" + + def _on_toggle_password_visibility(self, checked): + mode = QLineEdit.Normal if checked else QLineEdit.Password + self.bearer_input.setEchoMode(mode) + self.pg_password_input.setEchoMode(mode) + self.import_mail_password_input.setEchoMode(mode) + self._render_account_detail(self._get_selected_account()) + + def _on_backend_type_changed(self, backend_type): + self._save_config() + + def _on_db_type_changed(self, db_type): + db_type = (db_type or "").strip().lower() + default_port = 3306 if db_type == "mysql" else 5432 + current_port = self.pg_port_input.value() + if current_port in (3306, 5432): + self.pg_port_input.setValue(default_port) + + def _db_config_for_action(self): + config = self._save_config() + if not config.get("db_enabled"): + raise RuntimeError("请先启用数据库") + return config + + def _on_test_db_connection(self): + try: + config = self._db_config_for_action() + result = test_connection(config) + self._log( + f"数据库连接成功: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}" + ) + QMessageBox.information( + self, + "数据库连接成功", + f"类型: {result['db_type']}\n数据库存在: {result['database_exists']}\n表存在: {result['table_exists']}\n管理权限: {result['admin_access']}", + ) + except Exception as e: + QMessageBox.critical(self, "数据库连接失败", str(e)) + self._log(f"数据库连接失败: {e}") + + def _on_init_db(self): + try: + config = self._db_config_for_action() + result = ensure_database_and_table(config) + self._log( + f"数据库初始化完成: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}" + ) + QMessageBox.information(self, "初始化成功", "数据库和表已经就绪") + except Exception as e: + QMessageBox.critical(self, "初始化失败", str(e)) + self._log(f"初始化数据库失败: {e}") + def _account_matches_search(self, account, keyword): if not keyword: return True @@ -393,19 +530,19 @@ class MainWindow(QMainWindow): def _load_accounts(self): config = self._save_config() - if not config.get("pg_enabled"): + if not config.get("db_enabled", config.get("pg_enabled")): self.all_accounts = [] self._apply_account_filter() - self._log("PostgreSQL 未启用") + self._log("数据库未启用") return try: self.all_accounts = load_accounts(config) self._apply_account_filter() - self._log(f"已加载 PostgreSQL 账号: {len(self.all_accounts)} 个") + self._log(f"已加载 {config.get('db_type', 'postgresql')} 账号: {len(self.all_accounts)} 个") except Exception as e: self.all_accounts = [] self._apply_account_filter() - self._log(f"加载 PostgreSQL 账号失败: {e}") + self._log(f"加载数据库账号失败: {e}") def _on_reload_accounts(self): self._load_accounts() @@ -453,8 +590,8 @@ class MainWindow(QMainWindow): chatgpt_password = self.import_chatgpt_password_input.text().strip() name = self.import_name_input.text().strip() birthdate = self.import_birthdate_input.text().strip() - if not email or not mail_password: - QMessageBox.information(self, "提示", "请输入邮箱和邮箱密码") + if not email: + QMessageBox.information(self, "提示", "请输入邮箱") return try: config = self._save_config() @@ -498,9 +635,9 @@ class MainWindow(QMainWindow): return token_status = "已存在" if account.get("mail_token", "") else "缺失" self.detail_email.setText(account.get("email", "")) - self.detail_mail_password.setText(account.get("mail_password", "")) + self.detail_mail_password.setText(self._mask_secret(account.get("mail_password", ""), "(管理员模式下可留空)")) self.detail_mail_token.setText(token_status) - self.detail_chatgpt_password.setText(account.get("chatgpt_password", "")) + self.detail_chatgpt_password.setText(self._mask_secret(account.get("chatgpt_password", ""))) self.detail_name.setText(account.get("name", "")) self.detail_birthdate.setText(account.get("birthdate", "")) self.detail_source.setText(account.get("source", "postgres")) @@ -531,10 +668,7 @@ class MainWindow(QMainWindow): email = str(account.get("email", "")).strip() mail_password = str(account.get("mail_password", "")).strip() - if not mail_password: - raise RuntimeError("当前账号缺少 mail_token,且没有 mail_password 可用于换取 token") - - self._log(f"账号 {email} 缺少 mail_token,正在自动获取") + self._log(f"账号 {email} 缺少 mail_token,正在通过管理员接口补全") client = self._client_from_inputs() mail_token = client.get_token(email, mail_password) if not mail_token: @@ -563,8 +697,8 @@ class MainWindow(QMainWindow): def _render_emails_list(self, emails): self.emails_list.clear() for item in emails: - subject = str(item.get("subject", "")) - sender = _format_sender(item.get("from")) + subject = str(item.get("subject", "") or "(无主题)") + sender = _format_sender(item.get("from") or item.get("source")) date = str(item.get("createdAt", "")) label = f"{subject} | {sender} | {date}" list_item = QListWidgetItem(label) @@ -577,7 +711,13 @@ class MainWindow(QMainWindow): return email_item = items[0].data(Qt.UserRole) msg_id = email_item.get("id") or email_item.get("@id") or email_item.get("messageId") - body = email_item.get("text") or email_item.get("html") or email_item.get("intro") + body = ( + email_item.get("text") + or email_item.get("message") + or email_item.get("html") + or email_item.get("intro") + or email_item.get("raw") + ) if not body and msg_id and self.current_mail_token: try: client = self._client_from_inputs() diff --git a/requirements.txt b/requirements.txt index b887e5f..f33308e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ PySide6 requests psycopg[binary] +PyMySQL diff --git a/sync_pg_to_d1.py b/sync_pg_to_d1.py new file mode 100644 index 0000000..b344ea1 --- /dev/null +++ b/sync_pg_to_d1.py @@ -0,0 +1,65 @@ +import argparse +from pathlib import Path + +import psycopg + + +def fetch_emails(pg_host, pg_port, pg_db, pg_user, pg_password, table_name, domain): + conn = psycopg.connect( + host=pg_host, + port=pg_port, + dbname=pg_db, + user=pg_user, + password=pg_password, + connect_timeout=10, + ) + try: + with conn, conn.cursor() as cur: + cur.execute( + f"select distinct lower(email) from {table_name} where lower(email) like %s order by lower(email)", + (f"%@{domain.lower()}",), + ) + return [row[0] for row in cur.fetchall()] + finally: + conn.close() + + +def write_sql(output_path, emails, source_meta): + lines = [] + for email in emails: + safe = email.replace("'", "''") + source = source_meta.replace("'", "''") + lines.append( + f"INSERT OR IGNORE INTO address(name, source_meta) VALUES('{safe}', '{source}');" + ) + Path(output_path).write_text("\n".join(lines), encoding="utf-8") + + +def main(): + parser = argparse.ArgumentParser(description="Export domain mailboxes from PostgreSQL into Cloudflare D1 import SQL.") + parser.add_argument("--pg-host", required=True) + parser.add_argument("--pg-port", type=int, required=True) + parser.add_argument("--pg-db", required=True) + parser.add_argument("--pg-user", required=True) + parser.add_argument("--pg-password", required=True) + parser.add_argument("--table", default="registered_accounts") + parser.add_argument("--domain", required=True) + parser.add_argument("--source-meta", default="pg_import") + parser.add_argument("--output", required=True) + args = parser.parse_args() + + emails = fetch_emails( + args.pg_host, + args.pg_port, + args.pg_db, + args.pg_user, + args.pg_password, + args.table, + args.domain, + ) + write_sql(args.output, emails, args.source_meta) + print(f"exported={len(emails)} output={args.output}") + + +if __name__ == "__main__": + main()