from pathlib import Path import sys import time from PySide6.QtCore import QTimer, Qt from PySide6.QtGui import QFont from PySide6.QtWidgets import ( QApplication, QCheckBox, QComboBox, QFormLayout, QFrame, QGroupBox, QHBoxLayout, QLabel, QLineEdit, QListWidget, QListWidgetItem, QMainWindow, QMessageBox, QPushButton, QSpinBox, QSplitter, QTabWidget, QTextEdit, QVBoxLayout, QWidget, ) from duckmail_client import DuckMailClient from account_store import ( ensure_database_and_table, load_accounts, save_account, load_config, save_config, test_connection, ) BASE_DIR = Path(__file__).resolve().parent CONFIG_PATH = BASE_DIR / "config_gui.json" LEGACY_CONFIG_PATH = BASE_DIR.parent / "config_gui.json" def _format_sender(sender_value): if isinstance(sender_value, dict): return sender_value.get("address") or sender_value.get("name") or "" return str(sender_value or "") def _email_text_from_detail(detail): if not isinstance(detail, dict): return "" text = detail.get("text") or "" if text: return str(text) message = detail.get("message") or "" if message: return str(message) html_val = detail.get("html") if isinstance(html_val, list): return "\n".join(str(item) for item in html_val) if html_val: return str(html_val) return str(detail.get("raw") or "") def _generate_password(length=14): import random import string lower = string.ascii_lowercase upper = string.ascii_uppercase digits = string.digits special = "!@#$%&*" pwd = [ random.choice(lower), random.choice(upper), random.choice(digits), random.choice(special), ] all_chars = lower + upper + digits + special pwd += [random.choice(all_chars) for _ in range(length - 4)] random.shuffle(pwd) return "".join(pwd) def _random_name(): import random first_names = [ "James", "Emma", "Liam", "Olivia", "Noah", "Ava", "Ethan", "Sophia", "Lucas", "Mia", "Mason", "Isabella", "Logan", "Charlotte", "Alexander", "Amelia", "Benjamin", "Harper", "William", "Evelyn", ] last_names = [ "Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Clark", "Hall", "Young", "Anderson", "Thomas", "Jackson", "White", "Harris", "Martin", "Thompson", "Garcia", "Robinson", "Lewis", ] return f"{random.choice(first_names)} {random.choice(last_names)}" def _random_birthdate(): import random y = random.randint(1980, 2002) m = random.randint(1, 12) d = random.randint(1, 28) return f"{y}-{m:02d}-{d:02d}" class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Temp Mail GUI") self.resize(1280, 780) self.current_mail_token = "" self.current_emails = [] self.all_accounts = [] self._migrate_legacy_config_if_needed() self._build_ui() self._setup_timer() self._load_config() self._load_accounts() def _build_ui(self): root = QWidget() root_layout = QVBoxLayout(root) self.tabs = QTabWidget() self.tabs.addTab(self._build_accounts_page(), "账号") self.tabs.addTab(self._build_settings_page(), "设置") root_layout.addWidget(self.tabs) self.setCentralWidget(root) self.save_config_btn.clicked.connect(self._on_save_config) self.create_email_btn.clicked.connect(self._on_create_email) self.fetch_emails_btn.clicked.connect(self._on_fetch_emails) self.clear_log_btn.clicked.connect(self.log_box.clear) self.accounts_list.itemSelectionChanged.connect(self._on_account_selected) self.emails_list.itemSelectionChanged.connect(self._on_email_selected) self.auto_refresh_check.toggled.connect(self._on_auto_refresh_toggle) self.refresh_interval.valueChanged.connect(self._on_refresh_interval_changed) self.import_btn.clicked.connect(self._on_import_account) self.reload_accounts_btn.clicked.connect(self._on_reload_accounts) self.search_input.textChanged.connect(self._apply_account_filter) self.show_passwords_check.toggled.connect(self._on_toggle_password_visibility) self.backend_type_input.currentTextChanged.connect(self._on_backend_type_changed) self.db_type_input.currentTextChanged.connect(self._on_db_type_changed) self.test_db_btn.clicked.connect(self._on_test_db_connection) self.init_db_btn.clicked.connect(self._on_init_db) def _migrate_legacy_config_if_needed(self): if CONFIG_PATH.exists(): return if not LEGACY_CONFIG_PATH.exists(): return try: CONFIG_PATH.write_text( LEGACY_CONFIG_PATH.read_text(encoding="utf-8"), encoding="utf-8", ) except Exception: pass def _build_accounts_page(self): page = QWidget() layout = QVBoxLayout(page) toolbar = QHBoxLayout() self.search_input = QLineEdit() self.search_input.setPlaceholderText("搜索邮箱 / 姓名 / 创建时间") self.fetch_emails_btn = QPushButton("获取邮件") self.reload_accounts_btn = QPushButton("刷新账号") self.clear_log_btn = QPushButton("清空日志") toolbar.addWidget(QLabel("账号搜索")) toolbar.addWidget(self.search_input, 1) toolbar.addWidget(self.fetch_emails_btn) toolbar.addWidget(self.reload_accounts_btn) toolbar.addWidget(self.clear_log_btn) splitter = QSplitter(Qt.Horizontal) left_frame = QFrame() left_layout = QVBoxLayout(left_frame) left_layout.addWidget(QLabel("账号列表")) self.accounts_list = QListWidget() left_layout.addWidget(self.accounts_list) details_group = QGroupBox("账号详情") details_layout = QFormLayout(details_group) self.detail_email = QLabel("-") self.detail_mail_password = QLabel("-") self.detail_chatgpt_password = QLabel("-") self.detail_name = QLabel("-") self.detail_birthdate = QLabel("-") self.detail_mail_token = QLabel("-") self.detail_source = QLabel("postgres") details_layout.addRow("邮箱", self.detail_email) details_layout.addRow("邮箱密码", self.detail_mail_password) details_layout.addRow("Mail Token", self.detail_mail_token) details_layout.addRow("ChatGPT密码", self.detail_chatgpt_password) details_layout.addRow("姓名", self.detail_name) details_layout.addRow("生日", self.detail_birthdate) details_layout.addRow("来源", self.detail_source) left_layout.addWidget(details_group) right_frame = QFrame() right_layout = QVBoxLayout(right_frame) right_layout.addWidget(QLabel("邮件列表")) self.emails_list = QListWidget() right_layout.addWidget(self.emails_list) right_layout.addWidget(QLabel("邮件内容")) self.email_content = QTextEdit() self.email_content.setReadOnly(True) self.email_content.setFont(QFont("Consolas", 10)) right_layout.addWidget(self.email_content) splitter.addWidget(left_frame) splitter.addWidget(right_frame) splitter.setSizes([360, 900]) self.log_box = QTextEdit() self.log_box.setReadOnly(True) self.log_box.setFixedHeight(150) layout.addLayout(toolbar) layout.addWidget(splitter) layout.addWidget(QLabel("日志")) layout.addWidget(self.log_box) return page def _build_settings_page(self): page = QWidget() layout = QVBoxLayout(page) config_group = QGroupBox("Cloudflare Temp Mail 配置") config_layout = QFormLayout(config_group) self.api_base_input = QLineEdit() self.domain_input = QLineEdit() self.backend_type_input = QComboBox() self.backend_type_input.addItems(["cloudflare_temp_email"]) self.bearer_input = QLineEdit() self.bearer_input.setEchoMode(QLineEdit.Password) self.proxy_input = QLineEdit() self.show_passwords_check = QCheckBox("显示密码") config_layout.addRow("API Base", self.api_base_input) config_layout.addRow("Domain", self.domain_input) config_layout.addRow("Backend Type", self.backend_type_input) config_layout.addRow("Admin Password", self.bearer_input) config_layout.addRow("Proxy", self.proxy_input) config_layout.addRow("密码显示", self.show_passwords_check) auto_layout = QHBoxLayout() self.auto_refresh_check = QCheckBox("自动刷新") self.refresh_interval = QSpinBox() self.refresh_interval.setRange(10, 300) self.refresh_interval.setValue(30) self.refresh_interval.setSuffix(" 秒") auto_layout.addWidget(self.auto_refresh_check) auto_layout.addWidget(QLabel("间隔")) auto_layout.addWidget(self.refresh_interval) auto_layout.addStretch(1) config_layout.addRow("刷新", auto_layout) pg_group = QGroupBox("数据库配置") pg_layout = QFormLayout(pg_group) self.pg_enabled_check = QCheckBox("启用数据库") self.db_type_input = QComboBox() self.db_type_input.addItems(["postgresql", "mysql"]) self.pg_host_input = QLineEdit() self.pg_port_input = QSpinBox() self.pg_port_input.setRange(1, 65535) self.pg_port_input.setValue(5432) self.pg_db_input = QLineEdit() self.db_table_input = QLineEdit() self.pg_user_input = QLineEdit() self.pg_password_input = QLineEdit() self.pg_password_input.setEchoMode(QLineEdit.Password) self.db_auto_create_check = QCheckBox("数据库或表不存在时允许创建") self.test_db_btn = QPushButton("测试连接") self.init_db_btn = QPushButton("初始化库表") db_actions = QHBoxLayout() db_actions.addWidget(self.test_db_btn) db_actions.addWidget(self.init_db_btn) pg_layout.addRow("开关", self.pg_enabled_check) pg_layout.addRow("类型", self.db_type_input) pg_layout.addRow("Host", self.pg_host_input) pg_layout.addRow("Port", self.pg_port_input) pg_layout.addRow("DB", self.pg_db_input) pg_layout.addRow("Table", self.db_table_input) pg_layout.addRow("User", self.pg_user_input) pg_layout.addRow("Password", self.pg_password_input) pg_layout.addRow("自动创建", self.db_auto_create_check) pg_layout.addRow("操作", db_actions) import_group = QGroupBox("账号写入数据库") import_layout = QFormLayout(import_group) self.import_email_input = QLineEdit() self.import_mail_password_input = QLineEdit() self.import_mail_password_input.setEchoMode(QLineEdit.Password) self.import_chatgpt_password_input = QLineEdit() self.import_name_input = QLineEdit() self.import_birthdate_input = QLineEdit() self.import_btn = QPushButton("导入账号并补 Token") self.create_email_btn = QPushButton("创建邮箱并写库") import_actions = QHBoxLayout() import_actions.addWidget(self.import_btn) import_actions.addWidget(self.create_email_btn) self.save_config_btn = QPushButton("保存设置") import_layout.addRow("邮箱", self.import_email_input) import_layout.addRow("邮箱密码(可选)", self.import_mail_password_input) import_layout.addRow("ChatGPT密码(可选)", self.import_chatgpt_password_input) import_layout.addRow("姓名(可选)", self.import_name_input) import_layout.addRow("生日(可选)", self.import_birthdate_input) import_layout.addRow("操作", import_actions) layout.addWidget(config_group) layout.addWidget(pg_group) layout.addWidget(import_group) layout.addWidget(self.save_config_btn) layout.addStretch(1) return page def _setup_timer(self): self.timer = QTimer(self) self.timer.setInterval(self.refresh_interval.value() * 1000) self.timer.timeout.connect(self._auto_refresh) def _log(self, message): ts = time.strftime("%H:%M:%S") self.log_box.append(f"[{ts}] {message}") def _get_config(self): return { "api_base": self.api_base_input.text().strip(), "domain": self.domain_input.text().strip(), "backend_type": self.backend_type_input.currentText(), "bearer": self.bearer_input.text().strip(), "proxy": self.proxy_input.text().strip(), "show_passwords": self.show_passwords_check.isChecked(), "auto_refresh": self.auto_refresh_check.isChecked(), "refresh_interval": self.refresh_interval.value(), "db_enabled": self.pg_enabled_check.isChecked(), "db_type": self.db_type_input.currentText(), "db_host": self.pg_host_input.text().strip(), "db_port": self.pg_port_input.value(), "db_name": self.pg_db_input.text().strip(), "db_table": self.db_table_input.text().strip(), "db_user": self.pg_user_input.text().strip(), "db_password": self.pg_password_input.text().strip(), "db_auto_create": self.db_auto_create_check.isChecked(), "pg_enabled": self.pg_enabled_check.isChecked(), "pg_host": self.pg_host_input.text().strip(), "pg_port": self.pg_port_input.value(), "pg_db": self.pg_db_input.text().strip(), "pg_user": self.pg_user_input.text().strip(), "pg_password": self.pg_password_input.text().strip(), } def _client_from_inputs(self): config = self._get_config() return DuckMailClient( config.get("api_base"), config.get("domain"), config.get("bearer"), proxy=config.get("proxy"), backend_type=config.get("backend_type"), ) def _load_config(self): data = load_config(CONFIG_PATH) self.api_base_input.setText(data.get("api_base", "https://temp-email-api.example.com")) self.domain_input.setText(data.get("domain", "example.com")) backend_type = str(data.get("backend_type", "cloudflare_temp_email")).strip().lower() or "cloudflare_temp_email" backend_index = self.backend_type_input.findText(backend_type) if backend_index >= 0: self.backend_type_input.setCurrentIndex(backend_index) self.bearer_input.setText(data.get("bearer", "")) self.proxy_input.setText(data.get("proxy", "")) self.show_passwords_check.setChecked(bool(data.get("show_passwords", False))) self.auto_refresh_check.setChecked(bool(data.get("auto_refresh", False))) self.refresh_interval.setValue(max(10, min(300, int(data.get("refresh_interval", 30) or 30)))) self.pg_enabled_check.setChecked(bool(data.get("db_enabled", data.get("pg_enabled", True)))) db_type = str(data.get("db_type", "postgresql")).strip().lower() or "postgresql" index = self.db_type_input.findText(db_type) if index >= 0: self.db_type_input.setCurrentIndex(index) self.pg_host_input.setText(data.get("db_host", data.get("pg_host", ""))) self.pg_port_input.setValue(int(data.get("db_port", data.get("pg_port", 5432 if db_type != "mysql" else 3306)) or (3306 if db_type == "mysql" else 5432))) self.pg_db_input.setText(data.get("db_name", data.get("pg_db", "mail_accounts_db"))) self.db_table_input.setText(data.get("db_table", "registered_accounts")) self.pg_user_input.setText(data.get("db_user", data.get("pg_user", ""))) self.pg_password_input.setText(data.get("db_password", data.get("pg_password", ""))) self.db_auto_create_check.setChecked(bool(data.get("db_auto_create", False))) self._on_toggle_password_visibility(self.show_passwords_check.isChecked()) def _save_config(self): data = self._get_config() save_config(CONFIG_PATH, data) return data def _mask_secret(self, value, placeholder="(空)"): text = str(value or "") if not text: return placeholder if self.show_passwords_check.isChecked(): return text if len(text) <= 4: return "*" * len(text) return f"{text[:2]}{'*' * max(len(text) - 4, 2)}{text[-2:]}" def _on_toggle_password_visibility(self, checked): mode = QLineEdit.Normal if checked else QLineEdit.Password self.bearer_input.setEchoMode(mode) self.pg_password_input.setEchoMode(mode) self.import_mail_password_input.setEchoMode(mode) self._render_account_detail(self._get_selected_account()) def _on_backend_type_changed(self, backend_type): self._save_config() def _on_db_type_changed(self, db_type): db_type = (db_type or "").strip().lower() default_port = 3306 if db_type == "mysql" else 5432 current_port = self.pg_port_input.value() if current_port in (3306, 5432): self.pg_port_input.setValue(default_port) def _db_config_for_action(self): config = self._save_config() if not config.get("db_enabled"): raise RuntimeError("请先启用数据库") return config def _on_test_db_connection(self): try: config = self._db_config_for_action() result = test_connection(config) self._log( f"数据库连接成功: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}" ) QMessageBox.information( self, "数据库连接成功", f"类型: {result['db_type']}\n数据库存在: {result['database_exists']}\n表存在: {result['table_exists']}\n管理权限: {result['admin_access']}", ) except Exception as e: QMessageBox.critical(self, "数据库连接失败", str(e)) self._log(f"数据库连接失败: {e}") def _on_init_db(self): try: config = self._db_config_for_action() result = ensure_database_and_table(config) self._log( f"数据库初始化完成: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}" ) QMessageBox.information(self, "初始化成功", "数据库和表已经就绪") except Exception as e: QMessageBox.critical(self, "初始化失败", str(e)) self._log(f"初始化数据库失败: {e}") def _account_matches_search(self, account, keyword): if not keyword: return True fields = [ account.get("email", ""), account.get("name", ""), account.get("created_at", ""), ] haystack = " ".join(str(item or "") for item in fields).lower() return keyword in haystack def _render_accounts_list(self, accounts): self.accounts_list.clear() for item in accounts: email = item.get("email", "") created_at = item.get("created_at", "") label = f"{email} ({created_at})" if created_at else email list_item = QListWidgetItem(label) list_item.setData(Qt.UserRole, item) self.accounts_list.addItem(list_item) def _apply_account_filter(self): keyword = self.search_input.text().strip().lower() filtered = [item for item in self.all_accounts if self._account_matches_search(item, keyword)] self._render_accounts_list(filtered) if not filtered: self.current_mail_token = "" self._render_account_detail(None) def _load_accounts(self): config = self._save_config() if not config.get("db_enabled", config.get("pg_enabled")): self.all_accounts = [] self._apply_account_filter() self._log("数据库未启用") return try: self.all_accounts = load_accounts(config) self._apply_account_filter() self._log(f"已加载 {config.get('db_type', 'postgresql')} 账号: {len(self.all_accounts)} 个") except Exception as e: self.all_accounts = [] self._apply_account_filter() self._log(f"加载数据库账号失败: {e}") def _on_reload_accounts(self): self._load_accounts() self._log("账号列表已刷新") def _get_selected_account(self): items = self.accounts_list.selectedItems() if not items: return None return items[0].data(Qt.UserRole) def _on_save_config(self): self._save_config() self._log("设置已保存") def _on_create_email(self): try: config = self._save_config() client = self._client_from_inputs() email, password, mail_token = client.create_email() chatgpt_password = _generate_password() name = _random_name() birthdate = _random_birthdate() save_account( config, email, password, mail_token, chatgpt_password=chatgpt_password, name=name, birthdate=birthdate, ) self._load_accounts() self._log( f"创建邮箱成功并写入数据库: {email} | ChatGPT密码: {chatgpt_password} | 姓名: {name} | 生日: {birthdate}" ) self.tabs.setCurrentIndex(0) except Exception as e: QMessageBox.critical(self, "错误", str(e)) self._log(f"创建邮箱失败: {e}") def _on_import_account(self): email = self.import_email_input.text().strip() mail_password = self.import_mail_password_input.text().strip() chatgpt_password = self.import_chatgpt_password_input.text().strip() name = self.import_name_input.text().strip() birthdate = self.import_birthdate_input.text().strip() if not email: QMessageBox.information(self, "提示", "请输入邮箱") return try: config = self._save_config() client = self._client_from_inputs() mail_token = client.get_token(email, mail_password) save_account( config, email, mail_password, mail_token, chatgpt_password=chatgpt_password, name=name, birthdate=birthdate, ) self._load_accounts() self._log(f"导入账号成功并写入数据库: {email}") self.tabs.setCurrentIndex(0) except Exception as e: QMessageBox.critical(self, "错误", str(e)) self._log(f"导入账号失败: {e}") def _on_account_selected(self): account = self._get_selected_account() if not account: self.current_mail_token = "" self._render_account_detail(None) return self.current_mail_token = account.get("mail_token", "") self._render_account_detail(account) self._log(f"选中账号: {account.get('email', '')}") def _render_account_detail(self, account): if not account: self.detail_email.setText("-") self.detail_mail_password.setText("-") self.detail_mail_token.setText("-") self.detail_chatgpt_password.setText("-") self.detail_name.setText("-") self.detail_birthdate.setText("-") self.detail_source.setText("-") return token_status = "已存在" if account.get("mail_token", "") else "缺失" self.detail_email.setText(account.get("email", "")) self.detail_mail_password.setText(self._mask_secret(account.get("mail_password", ""), "(管理员模式下可留空)")) self.detail_mail_token.setText(token_status) self.detail_chatgpt_password.setText(self._mask_secret(account.get("chatgpt_password", ""))) self.detail_name.setText(account.get("name", "")) self.detail_birthdate.setText(account.get("birthdate", "")) self.detail_source.setText(account.get("source", "postgres")) def _update_account_token(self, account, mail_token): if not account or not mail_token: return config = self._get_config() updated = dict(account) updated["mail_token"] = mail_token save_account( config, updated.get("email", ""), updated.get("mail_password", ""), mail_token, chatgpt_password=updated.get("chatgpt_password", ""), name=updated.get("name", ""), birthdate=updated.get("birthdate", ""), ) account["mail_token"] = mail_token self.current_mail_token = mail_token self._render_account_detail(account) def _ensure_mail_token(self, account): mail_token = str(account.get("mail_token", "")).strip() if mail_token: return mail_token email = str(account.get("email", "")).strip() mail_password = str(account.get("mail_password", "")).strip() self._log(f"账号 {email} 缺少 mail_token,正在通过管理员接口补全") client = self._client_from_inputs() mail_token = client.get_token(email, mail_password) if not mail_token: raise RuntimeError("自动获取 mail_token 失败") self._update_account_token(account, mail_token) self._log(f"已自动补全 mail_token: {email}") return mail_token def _on_fetch_emails(self): account = self._get_selected_account() if not account: QMessageBox.information(self, "提示", "请先选择账号") return try: mail_token = self._ensure_mail_token(account) client = self._client_from_inputs() emails = client.fetch_emails(mail_token) self.current_emails = emails self._render_emails_list(emails) self._log(f"已获取邮件: {len(emails)} 封") except Exception as e: QMessageBox.critical(self, "错误", str(e)) self._log(f"获取邮件失败: {e}") def _render_emails_list(self, emails): self.emails_list.clear() for item in emails: subject = str(item.get("subject", "") or "(无主题)") sender = _format_sender(item.get("from") or item.get("source")) date = str(item.get("createdAt", "")) label = f"{subject} | {sender} | {date}" list_item = QListWidgetItem(label) list_item.setData(Qt.UserRole, item) self.emails_list.addItem(list_item) def _on_email_selected(self): items = self.emails_list.selectedItems() if not items: return email_item = items[0].data(Qt.UserRole) msg_id = email_item.get("id") or email_item.get("@id") or email_item.get("messageId") body = ( email_item.get("text") or email_item.get("message") or email_item.get("html") or email_item.get("intro") or email_item.get("raw") ) if not body and msg_id and self.current_mail_token: try: client = self._client_from_inputs() detail = client.fetch_message_detail(self.current_mail_token, msg_id) body = _email_text_from_detail(detail) except Exception as e: body = f"(获取邮件详情失败: {e})" self.email_content.setPlainText(str(body or "(无正文)")) def _on_auto_refresh_toggle(self, checked): if checked: self.timer.start() self._log("自动刷新已开启") else: self.timer.stop() self._log("自动刷新已关闭") self._save_config() def _on_refresh_interval_changed(self, value): self.timer.setInterval(value * 1000) if self.auto_refresh_check.isChecked(): self.timer.start() self._save_config() def _auto_refresh(self): account = self._get_selected_account() if not account: return try: mail_token = self._ensure_mail_token(account) client = self._client_from_inputs() emails = client.fetch_emails(mail_token) self.current_emails = emails self._render_emails_list(emails) self._log(f"自动刷新: {len(emails)} 封") except Exception as e: self._log(f"自动刷新失败: {e}") def main(): app = QApplication(sys.argv) win = MainWindow() win.show() sys.exit(app.exec()) if __name__ == "__main__": main()