Files
duckmail_gui/gui.py
GitHub Actions 5c054228ef update
2026-03-25 06:44:13 +08:00

909 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from html import unescape
from pathlib import Path
import re
import sys
import time
from PySide6.QtCore import QTimer, Qt
from PySide6.QtGui import QFont, QTextDocumentFragment
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QComboBox,
QFormLayout,
QFrame,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QMainWindow,
QMessageBox,
QPushButton,
QSpinBox,
QSplitter,
QTabWidget,
QTextEdit,
QVBoxLayout,
QWidget,
)
from duckmail_client import DuckMailClient
from account_store import (
ensure_database_and_table,
load_accounts,
save_account,
load_config,
save_config,
test_connection,
)
BASE_DIR = Path(__file__).resolve().parent
CONFIG_PATH = BASE_DIR / "config_gui.json"
LEGACY_CONFIG_PATH = BASE_DIR.parent / "config_gui.json"
def _format_sender(sender_value):
if isinstance(sender_value, dict):
return sender_value.get("address") or sender_value.get("name") or ""
return str(sender_value or "")
def _email_text_from_detail(detail):
if not isinstance(detail, dict):
return ""
text = detail.get("text") or ""
if text:
return str(text)
message = detail.get("message") or ""
if message:
return str(message)
html_val = detail.get("html")
if isinstance(html_val, list):
return "\n".join(str(item) for item in html_val)
if html_val:
return str(html_val)
return str(detail.get("raw") or "")
def _looks_like_html(value):
text = str(value or "").strip()
if not text:
return False
if "<html" in text.lower() or "<body" in text.lower():
return True
return bool(re.search(r"<\s*[a-zA-Z][^>]*>", text))
def _normalize_email_body(body):
if isinstance(body, list):
return "\n\n".join(str(item) for item in body if item is not None)
if body is None:
return ""
return str(body)
def _html_to_plain_text(html_text):
text = QTextDocumentFragment.fromHtml(html_text).toPlainText().strip()
if text:
return text
return unescape(re.sub(r"<[^>]+>", " ", html_text)).strip()
def _extract_verification_code(text):
if not text:
return ""
match = re.search(r"(?<!\d)(\d{6})(?!\d)", text)
if match:
return match.group(1)
return ""
def _highlight_verification_code_html(html_text, code):
if not html_text or not code:
return html_text
pattern = re.compile(rf"(?<!\d){re.escape(code)}(?!\d)")
return pattern.sub(
(
'<span style="background:#fff1bf;color:#8a3b12;'
"font-weight:700;padding:2px 6px;border-radius:6px;"
'letter-spacing:1px;">'
f"{code}</span>"
),
html_text,
count=1,
)
def _generate_password(length=14):
import random
import string
lower = string.ascii_lowercase
upper = string.ascii_uppercase
digits = string.digits
special = "!@#$%&*"
pwd = [
random.choice(lower),
random.choice(upper),
random.choice(digits),
random.choice(special),
]
all_chars = lower + upper + digits + special
pwd += [random.choice(all_chars) for _ in range(length - 4)]
random.shuffle(pwd)
return "".join(pwd)
def _random_name():
import random
first_names = [
"James",
"Emma",
"Liam",
"Olivia",
"Noah",
"Ava",
"Ethan",
"Sophia",
"Lucas",
"Mia",
"Mason",
"Isabella",
"Logan",
"Charlotte",
"Alexander",
"Amelia",
"Benjamin",
"Harper",
"William",
"Evelyn",
]
last_names = [
"Smith",
"Johnson",
"Brown",
"Davis",
"Wilson",
"Moore",
"Taylor",
"Clark",
"Hall",
"Young",
"Anderson",
"Thomas",
"Jackson",
"White",
"Harris",
"Martin",
"Thompson",
"Garcia",
"Robinson",
"Lewis",
]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def _random_birthdate():
import random
y = random.randint(1980, 2002)
m = random.randint(1, 12)
d = random.randint(1, 28)
return f"{y}-{m:02d}-{d:02d}"
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Temp Mail GUI")
self.resize(1280, 780)
self.current_mail_token = ""
self.current_emails = []
self.all_accounts = []
self._migrate_legacy_config_if_needed()
self._build_ui()
self._setup_timer()
self._load_config()
self._load_accounts()
def _build_ui(self):
root = QWidget()
root_layout = QVBoxLayout(root)
self.tabs = QTabWidget()
self.tabs.addTab(self._build_accounts_page(), "账号")
self.tabs.addTab(self._build_settings_page(), "设置")
root_layout.addWidget(self.tabs)
self.setCentralWidget(root)
self.save_config_btn.clicked.connect(self._on_save_config)
self.create_email_btn.clicked.connect(self._on_create_email)
self.fetch_emails_btn.clicked.connect(self._on_fetch_emails)
self.clear_log_btn.clicked.connect(self.log_box.clear)
self.accounts_list.itemSelectionChanged.connect(self._on_account_selected)
self.emails_list.itemSelectionChanged.connect(self._on_email_selected)
self.auto_refresh_check.toggled.connect(self._on_auto_refresh_toggle)
self.refresh_interval.valueChanged.connect(self._on_refresh_interval_changed)
self.import_btn.clicked.connect(self._on_import_account)
self.reload_accounts_btn.clicked.connect(self._on_reload_accounts)
self.search_input.textChanged.connect(self._apply_account_filter)
self.show_passwords_check.toggled.connect(self._on_toggle_password_visibility)
self.backend_type_input.currentTextChanged.connect(
self._on_backend_type_changed
)
self.db_type_input.currentTextChanged.connect(self._on_db_type_changed)
self.test_db_btn.clicked.connect(self._on_test_db_connection)
self.init_db_btn.clicked.connect(self._on_init_db)
self.copy_verification_code_btn.clicked.connect(self._copy_verification_code)
def _migrate_legacy_config_if_needed(self):
if CONFIG_PATH.exists():
return
if not LEGACY_CONFIG_PATH.exists():
return
try:
CONFIG_PATH.write_text(
LEGACY_CONFIG_PATH.read_text(encoding="utf-8"),
encoding="utf-8",
)
except Exception:
pass
def _build_accounts_page(self):
page = QWidget()
layout = QVBoxLayout(page)
toolbar = QHBoxLayout()
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("搜索邮箱 / 姓名 / 创建时间")
self.fetch_emails_btn = QPushButton("获取邮件")
self.reload_accounts_btn = QPushButton("刷新账号")
self.clear_log_btn = QPushButton("清空日志")
toolbar.addWidget(QLabel("账号搜索"))
toolbar.addWidget(self.search_input, 1)
toolbar.addWidget(self.fetch_emails_btn)
toolbar.addWidget(self.reload_accounts_btn)
toolbar.addWidget(self.clear_log_btn)
splitter = QSplitter(Qt.Horizontal)
left_frame = QFrame()
left_layout = QVBoxLayout(left_frame)
left_layout.addWidget(QLabel("账号列表"))
self.accounts_list = QListWidget()
left_layout.addWidget(self.accounts_list)
details_group = QGroupBox("账号详情")
details_layout = QFormLayout(details_group)
self.detail_email = self._create_copyable_label("-")
self.detail_mail_password = self._create_copyable_label("-")
self.detail_chatgpt_password = self._create_copyable_label("-")
self.detail_name = self._create_copyable_label("-")
self.detail_birthdate = self._create_copyable_label("-")
self.detail_mail_token = self._create_copyable_label("-")
self.detail_source = self._create_copyable_label("postgres")
details_layout.addRow("邮箱", self.detail_email)
details_layout.addRow("邮箱密码", self.detail_mail_password)
details_layout.addRow("Mail Token", self.detail_mail_token)
details_layout.addRow("ChatGPT密码", self.detail_chatgpt_password)
details_layout.addRow("姓名", self.detail_name)
details_layout.addRow("生日", self.detail_birthdate)
details_layout.addRow("来源", self.detail_source)
left_layout.addWidget(details_group)
right_frame = QFrame()
right_layout = QVBoxLayout(right_frame)
right_layout.addWidget(QLabel("邮件列表"))
self.emails_list = QListWidget()
right_layout.addWidget(self.emails_list)
code_layout = QHBoxLayout()
self.verification_code_label = QLabel("验证码")
self.verification_code_value = QLineEdit()
self.verification_code_value.setReadOnly(True)
self.verification_code_value.setPlaceholderText("自动提取 6 位验证码")
self.verification_code_value.setFocusPolicy(Qt.StrongFocus)
self.copy_verification_code_btn = QPushButton("复制")
self.copy_verification_code_btn.setEnabled(False)
code_layout.addWidget(self.verification_code_label)
code_layout.addWidget(self.verification_code_value)
code_layout.addWidget(self.copy_verification_code_btn)
right_layout.addLayout(code_layout)
right_layout.addWidget(QLabel("邮件内容"))
self.email_content = QTextEdit()
self.email_content.setReadOnly(True)
self.email_content.setFont(QFont("Consolas", 10))
right_layout.addWidget(self.email_content)
splitter.addWidget(left_frame)
splitter.addWidget(right_frame)
splitter.setSizes([360, 900])
self.log_box = QTextEdit()
self.log_box.setReadOnly(True)
self.log_box.setFixedHeight(150)
layout.addLayout(toolbar)
layout.addWidget(splitter)
layout.addWidget(QLabel("日志"))
layout.addWidget(self.log_box)
return page
def _build_settings_page(self):
page = QWidget()
layout = QVBoxLayout(page)
config_group = QGroupBox("Cloudflare Temp Mail 配置")
config_layout = QFormLayout(config_group)
self.api_base_input = QLineEdit()
self.domain_input = QLineEdit()
self.backend_type_input = QComboBox()
self.backend_type_input.addItems(["cloudflare_temp_email"])
self.bearer_input = QLineEdit()
self.bearer_input.setEchoMode(QLineEdit.Password)
self.proxy_input = QLineEdit()
self.show_passwords_check = QCheckBox("显示密码")
config_layout.addRow("API Base", self.api_base_input)
config_layout.addRow("Domain", self.domain_input)
config_layout.addRow("Backend Type", self.backend_type_input)
config_layout.addRow("Admin Password", self.bearer_input)
config_layout.addRow("Proxy", self.proxy_input)
config_layout.addRow("密码显示", self.show_passwords_check)
auto_layout = QHBoxLayout()
self.auto_refresh_check = QCheckBox("自动刷新")
self.refresh_interval = QSpinBox()
self.refresh_interval.setRange(10, 300)
self.refresh_interval.setValue(30)
self.refresh_interval.setSuffix("")
auto_layout.addWidget(self.auto_refresh_check)
auto_layout.addWidget(QLabel("间隔"))
auto_layout.addWidget(self.refresh_interval)
auto_layout.addStretch(1)
config_layout.addRow("刷新", auto_layout)
pg_group = QGroupBox("数据库配置")
pg_layout = QFormLayout(pg_group)
self.pg_enabled_check = QCheckBox("启用数据库")
self.db_type_input = QComboBox()
self.db_type_input.addItems(["postgresql", "mysql"])
self.pg_host_input = QLineEdit()
self.pg_port_input = QSpinBox()
self.pg_port_input.setRange(1, 65535)
self.pg_port_input.setValue(5432)
self.pg_db_input = QLineEdit()
self.db_table_input = QLineEdit()
self.pg_user_input = QLineEdit()
self.pg_password_input = QLineEdit()
self.pg_password_input.setEchoMode(QLineEdit.Password)
self.db_auto_create_check = QCheckBox("数据库或表不存在时允许创建")
self.test_db_btn = QPushButton("测试连接")
self.init_db_btn = QPushButton("初始化库表")
db_actions = QHBoxLayout()
db_actions.addWidget(self.test_db_btn)
db_actions.addWidget(self.init_db_btn)
pg_layout.addRow("开关", self.pg_enabled_check)
pg_layout.addRow("类型", self.db_type_input)
pg_layout.addRow("Host", self.pg_host_input)
pg_layout.addRow("Port", self.pg_port_input)
pg_layout.addRow("DB", self.pg_db_input)
pg_layout.addRow("Table", self.db_table_input)
pg_layout.addRow("User", self.pg_user_input)
pg_layout.addRow("Password", self.pg_password_input)
pg_layout.addRow("自动创建", self.db_auto_create_check)
pg_layout.addRow("操作", db_actions)
import_group = QGroupBox("账号写入数据库")
import_layout = QFormLayout(import_group)
self.import_email_input = QLineEdit()
self.import_mail_password_input = QLineEdit()
self.import_mail_password_input.setEchoMode(QLineEdit.Password)
self.import_chatgpt_password_input = QLineEdit()
self.import_name_input = QLineEdit()
self.import_birthdate_input = QLineEdit()
self.import_btn = QPushButton("导入账号并补 Token")
self.create_email_btn = QPushButton("创建邮箱并写库")
import_actions = QHBoxLayout()
import_actions.addWidget(self.import_btn)
import_actions.addWidget(self.create_email_btn)
self.save_config_btn = QPushButton("保存设置")
import_layout.addRow("邮箱", self.import_email_input)
import_layout.addRow("邮箱密码(可选)", self.import_mail_password_input)
import_layout.addRow("ChatGPT密码(可选)", self.import_chatgpt_password_input)
import_layout.addRow("姓名(可选)", self.import_name_input)
import_layout.addRow("生日(可选)", self.import_birthdate_input)
import_layout.addRow("操作", import_actions)
layout.addWidget(config_group)
layout.addWidget(pg_group)
layout.addWidget(import_group)
layout.addWidget(self.save_config_btn)
layout.addStretch(1)
return page
def _create_copyable_label(self, text=""):
label = QLabel(text)
label.setTextInteractionFlags(Qt.TextSelectableByMouse)
label.setCursor(Qt.IBeamCursor)
label.setWordWrap(True)
return label
def _setup_timer(self):
self.timer = QTimer(self)
self.timer.setInterval(self.refresh_interval.value() * 1000)
self.timer.timeout.connect(self._auto_refresh)
def _copy_verification_code(self):
code = self.verification_code_value.text().strip()
if not code:
return
QApplication.clipboard().setText(code)
self.verification_code_value.setFocus()
self.verification_code_value.selectAll()
self._log(f"验证码已复制: {code}")
def _log(self, message):
ts = time.strftime("%H:%M:%S")
self.log_box.append(f"[{ts}] {message}")
def _get_config(self):
return {
"api_base": self.api_base_input.text().strip(),
"domain": self.domain_input.text().strip(),
"backend_type": self.backend_type_input.currentText(),
"bearer": self.bearer_input.text().strip(),
"proxy": self.proxy_input.text().strip(),
"show_passwords": self.show_passwords_check.isChecked(),
"auto_refresh": self.auto_refresh_check.isChecked(),
"refresh_interval": self.refresh_interval.value(),
"db_enabled": self.pg_enabled_check.isChecked(),
"db_type": self.db_type_input.currentText(),
"db_host": self.pg_host_input.text().strip(),
"db_port": self.pg_port_input.value(),
"db_name": self.pg_db_input.text().strip(),
"db_table": self.db_table_input.text().strip(),
"db_user": self.pg_user_input.text().strip(),
"db_password": self.pg_password_input.text().strip(),
"db_auto_create": self.db_auto_create_check.isChecked(),
"pg_enabled": self.pg_enabled_check.isChecked(),
"pg_host": self.pg_host_input.text().strip(),
"pg_port": self.pg_port_input.value(),
"pg_db": self.pg_db_input.text().strip(),
"pg_user": self.pg_user_input.text().strip(),
"pg_password": self.pg_password_input.text().strip(),
}
def _client_from_inputs(self):
config = self._get_config()
return DuckMailClient(
config.get("api_base"),
config.get("domain"),
config.get("bearer"),
proxy=config.get("proxy"),
backend_type=config.get("backend_type"),
)
def _load_config(self):
data = load_config(CONFIG_PATH)
self.api_base_input.setText(
data.get("api_base", "https://temp-email-api.example.com")
)
self.domain_input.setText(data.get("domain", "example.com"))
backend_type = (
str(data.get("backend_type", "cloudflare_temp_email")).strip().lower()
or "cloudflare_temp_email"
)
backend_index = self.backend_type_input.findText(backend_type)
if backend_index >= 0:
self.backend_type_input.setCurrentIndex(backend_index)
self.bearer_input.setText(data.get("bearer", ""))
self.proxy_input.setText(data.get("proxy", ""))
self.show_passwords_check.setChecked(bool(data.get("show_passwords", False)))
self.auto_refresh_check.setChecked(bool(data.get("auto_refresh", False)))
self.refresh_interval.setValue(
max(10, min(300, int(data.get("refresh_interval", 30) or 30)))
)
self.pg_enabled_check.setChecked(
bool(data.get("db_enabled", data.get("pg_enabled", True)))
)
db_type = str(data.get("db_type", "postgresql")).strip().lower() or "postgresql"
index = self.db_type_input.findText(db_type)
if index >= 0:
self.db_type_input.setCurrentIndex(index)
self.pg_host_input.setText(data.get("db_host", data.get("pg_host", "")))
self.pg_port_input.setValue(
int(
data.get(
"db_port", data.get("pg_port", 5432 if db_type != "mysql" else 3306)
)
or (3306 if db_type == "mysql" else 5432)
)
)
self.pg_db_input.setText(
data.get("db_name", data.get("pg_db", "mail_accounts_db"))
)
self.db_table_input.setText(data.get("db_table", "registered_accounts"))
self.pg_user_input.setText(data.get("db_user", data.get("pg_user", "")))
self.pg_password_input.setText(
data.get("db_password", data.get("pg_password", ""))
)
self.db_auto_create_check.setChecked(bool(data.get("db_auto_create", False)))
self._on_toggle_password_visibility(self.show_passwords_check.isChecked())
def _save_config(self):
data = self._get_config()
save_config(CONFIG_PATH, data)
return data
def _mask_secret(self, value, placeholder="(空)"):
text = str(value or "")
if not text:
return placeholder
if self.show_passwords_check.isChecked():
return text
if len(text) <= 4:
return "*" * len(text)
return f"{text[:2]}{'*' * max(len(text) - 4, 2)}{text[-2:]}"
def _on_toggle_password_visibility(self, checked):
mode = QLineEdit.Normal if checked else QLineEdit.Password
self.bearer_input.setEchoMode(mode)
self.pg_password_input.setEchoMode(mode)
self.import_mail_password_input.setEchoMode(mode)
self._render_account_detail(self._get_selected_account())
def _on_backend_type_changed(self, backend_type):
self._save_config()
def _on_db_type_changed(self, db_type):
db_type = (db_type or "").strip().lower()
default_port = 3306 if db_type == "mysql" else 5432
current_port = self.pg_port_input.value()
if current_port in (3306, 5432):
self.pg_port_input.setValue(default_port)
def _db_config_for_action(self):
config = self._save_config()
if not config.get("db_enabled"):
raise RuntimeError("请先启用数据库")
return config
def _on_test_db_connection(self):
try:
config = self._db_config_for_action()
result = test_connection(config)
self._log(
f"数据库连接成功: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}"
)
QMessageBox.information(
self,
"数据库连接成功",
f"类型: {result['db_type']}\n数据库存在: {result['database_exists']}\n表存在: {result['table_exists']}\n管理权限: {result['admin_access']}",
)
except Exception as e:
QMessageBox.critical(self, "数据库连接失败", str(e))
self._log(f"数据库连接失败: {e}")
def _on_init_db(self):
try:
config = self._db_config_for_action()
result = ensure_database_and_table(config)
self._log(
f"数据库初始化完成: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}"
)
QMessageBox.information(self, "初始化成功", "数据库和表已经就绪")
except Exception as e:
QMessageBox.critical(self, "初始化失败", str(e))
self._log(f"初始化数据库失败: {e}")
def _account_matches_search(self, account, keyword):
if not keyword:
return True
fields = [
account.get("email", ""),
account.get("name", ""),
account.get("created_at", ""),
]
haystack = " ".join(str(item or "") for item in fields).lower()
return keyword in haystack
def _render_accounts_list(self, accounts):
self.accounts_list.clear()
for item in accounts:
email = item.get("email", "")
created_at = item.get("created_at", "")
label = f"{email} ({created_at})" if created_at else email
list_item = QListWidgetItem(label)
list_item.setData(Qt.UserRole, item)
self.accounts_list.addItem(list_item)
def _apply_account_filter(self):
keyword = self.search_input.text().strip().lower()
filtered = [
item
for item in self.all_accounts
if self._account_matches_search(item, keyword)
]
self._render_accounts_list(filtered)
if not filtered:
self.current_mail_token = ""
self._render_account_detail(None)
def _load_accounts(self):
config = self._save_config()
if not config.get("db_enabled", config.get("pg_enabled")):
self.all_accounts = []
self._apply_account_filter()
self._log("数据库未启用")
return
try:
self.all_accounts = load_accounts(config)
self._apply_account_filter()
self._log(
f"已加载 {config.get('db_type', 'postgresql')} 账号: {len(self.all_accounts)}"
)
except Exception as e:
self.all_accounts = []
self._apply_account_filter()
self._log(f"加载数据库账号失败: {e}")
def _on_reload_accounts(self):
self._load_accounts()
self._log("账号列表已刷新")
def _get_selected_account(self):
items = self.accounts_list.selectedItems()
if not items:
return None
return items[0].data(Qt.UserRole)
def _on_save_config(self):
self._save_config()
self._log("设置已保存")
def _on_create_email(self):
try:
config = self._save_config()
client = self._client_from_inputs()
email, password, mail_token = client.create_email()
chatgpt_password = _generate_password()
name = _random_name()
birthdate = _random_birthdate()
save_account(
config,
email,
password,
mail_token,
chatgpt_password=chatgpt_password,
name=name,
birthdate=birthdate,
)
self._load_accounts()
self._log(
f"创建邮箱成功并写入数据库: {email} | ChatGPT密码: {chatgpt_password} | 姓名: {name} | 生日: {birthdate}"
)
self.tabs.setCurrentIndex(0)
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"创建邮箱失败: {e}")
def _on_import_account(self):
email = self.import_email_input.text().strip()
mail_password = self.import_mail_password_input.text().strip()
chatgpt_password = self.import_chatgpt_password_input.text().strip()
name = self.import_name_input.text().strip()
birthdate = self.import_birthdate_input.text().strip()
if not email:
QMessageBox.information(self, "提示", "请输入邮箱")
return
try:
config = self._save_config()
client = self._client_from_inputs()
mail_token = client.get_token(email, mail_password)
save_account(
config,
email,
mail_password,
mail_token,
chatgpt_password=chatgpt_password,
name=name,
birthdate=birthdate,
)
self._load_accounts()
self._log(f"导入账号成功并写入数据库: {email}")
self.tabs.setCurrentIndex(0)
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"导入账号失败: {e}")
def _on_account_selected(self):
account = self._get_selected_account()
if not account:
self.current_mail_token = ""
self._render_account_detail(None)
return
self.current_mail_token = account.get("mail_token", "")
self._render_account_detail(account)
self._log(f"选中账号: {account.get('email', '')}")
def _render_account_detail(self, account):
if not account:
self.detail_email.setText("-")
self.detail_mail_password.setText("-")
self.detail_mail_token.setText("-")
self.detail_chatgpt_password.setText("-")
self.detail_name.setText("-")
self.detail_birthdate.setText("-")
self.detail_source.setText("-")
return
token_status = "已存在" if account.get("mail_token", "") else "缺失"
self.detail_email.setText(account.get("email", ""))
self.detail_mail_password.setText(
self._mask_secret(account.get("mail_password", ""), "(管理员模式下可留空)")
)
self.detail_mail_token.setText(token_status)
self.detail_chatgpt_password.setText(
self._mask_secret(account.get("chatgpt_password", ""))
)
self.detail_name.setText(account.get("name", ""))
self.detail_birthdate.setText(account.get("birthdate", ""))
self.detail_source.setText(account.get("source", "postgres"))
def _update_account_token(self, account, mail_token):
if not account or not mail_token:
return
config = self._get_config()
updated = dict(account)
updated["mail_token"] = mail_token
save_account(
config,
updated.get("email", ""),
updated.get("mail_password", ""),
mail_token,
chatgpt_password=updated.get("chatgpt_password", ""),
name=updated.get("name", ""),
birthdate=updated.get("birthdate", ""),
)
account["mail_token"] = mail_token
self.current_mail_token = mail_token
self._render_account_detail(account)
def _ensure_mail_token(self, account):
mail_token = str(account.get("mail_token", "")).strip()
if mail_token:
return mail_token
email = str(account.get("email", "")).strip()
mail_password = str(account.get("mail_password", "")).strip()
self._log(f"账号 {email} 缺少 mail_token正在通过管理员接口补全")
client = self._client_from_inputs()
mail_token = client.get_token(email, mail_password)
if not mail_token:
raise RuntimeError("自动获取 mail_token 失败")
self._update_account_token(account, mail_token)
self._log(f"已自动补全 mail_token: {email}")
return mail_token
def _on_fetch_emails(self):
account = self._get_selected_account()
if not account:
QMessageBox.information(self, "提示", "请先选择账号")
return
try:
mail_token = self._ensure_mail_token(account)
client = self._client_from_inputs()
emails = client.fetch_emails(mail_token)
self.current_emails = emails
self._render_emails_list(emails)
self._log(f"已获取邮件: {len(emails)}")
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"获取邮件失败: {e}")
def _render_emails_list(self, emails):
self.emails_list.clear()
for item in emails:
subject = str(item.get("subject", "") or "(无主题)")
sender = _format_sender(item.get("from") or item.get("source"))
date = str(item.get("createdAt", ""))
label = f"{subject} | {sender} | {date}"
list_item = QListWidgetItem(label)
list_item.setData(Qt.UserRole, item)
self.emails_list.addItem(list_item)
def _on_email_selected(self):
items = self.emails_list.selectedItems()
if not items:
return
email_item = items[0].data(Qt.UserRole)
msg_id = (
email_item.get("id") or email_item.get("@id") or email_item.get("messageId")
)
body = (
email_item.get("text")
or email_item.get("message")
or email_item.get("html")
or email_item.get("intro")
or email_item.get("raw")
)
if not body and msg_id and self.current_mail_token:
try:
client = self._client_from_inputs()
detail = client.fetch_message_detail(self.current_mail_token, msg_id)
body = _email_text_from_detail(detail)
except Exception as e:
body = f"(获取邮件详情失败: {e})"
content = _normalize_email_body(body)
if not content:
self.verification_code_value.clear()
self.copy_verification_code_btn.setEnabled(False)
self.verification_code_label.setText("验证码")
self.email_content.setToolTip("")
self.email_content.setPlainText("(无正文)")
return
plain_text = (
_html_to_plain_text(content) if _looks_like_html(content) else content
)
verification_code = _extract_verification_code(plain_text)
self.verification_code_value.setText(verification_code)
self.copy_verification_code_btn.setEnabled(bool(verification_code))
self.verification_code_label.setText(
"验证码 (已提取)" if verification_code else "验证码"
)
display_text = plain_text
if verification_code:
display_text = re.sub(
rf"(?<!\d)({re.escape(verification_code)})(?!\d)",
r"[验证码: \1]",
plain_text,
count=1,
)
self.email_content.setToolTip("")
self.email_content.setPlainText(display_text)
def _on_auto_refresh_toggle(self, checked):
if checked:
self.timer.start()
self._log("自动刷新已开启")
else:
self.timer.stop()
self._log("自动刷新已关闭")
self._save_config()
def _on_refresh_interval_changed(self, value):
self.timer.setInterval(value * 1000)
if self.auto_refresh_check.isChecked():
self.timer.start()
self._save_config()
def _auto_refresh(self):
account = self._get_selected_account()
if not account:
return
try:
mail_token = self._ensure_mail_token(account)
client = self._client_from_inputs()
emails = client.fetch_emails(mail_token)
self.current_emails = emails
self._render_emails_list(emails)
self._log(f"自动刷新: {len(emails)}")
except Exception as e:
self._log(f"自动刷新失败: {e}")
def main():
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()