909 lines
33 KiB
Python
909 lines
33 KiB
Python
from html import unescape
|
||
from pathlib import Path
|
||
import re
|
||
import sys
|
||
import time
|
||
|
||
from PySide6.QtCore import QTimer, Qt
|
||
from PySide6.QtGui import QFont, QTextDocumentFragment
|
||
from PySide6.QtWidgets import (
|
||
QApplication,
|
||
QCheckBox,
|
||
QComboBox,
|
||
QFormLayout,
|
||
QFrame,
|
||
QGroupBox,
|
||
QHBoxLayout,
|
||
QLabel,
|
||
QLineEdit,
|
||
QListWidget,
|
||
QListWidgetItem,
|
||
QMainWindow,
|
||
QMessageBox,
|
||
QPushButton,
|
||
QSpinBox,
|
||
QSplitter,
|
||
QTabWidget,
|
||
QTextEdit,
|
||
QVBoxLayout,
|
||
QWidget,
|
||
)
|
||
|
||
from duckmail_client import DuckMailClient
|
||
from account_store import (
|
||
ensure_database_and_table,
|
||
load_accounts,
|
||
save_account,
|
||
load_config,
|
||
save_config,
|
||
test_connection,
|
||
)
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
CONFIG_PATH = BASE_DIR / "config_gui.json"
|
||
LEGACY_CONFIG_PATH = BASE_DIR.parent / "config_gui.json"
|
||
|
||
|
||
def _format_sender(sender_value):
|
||
if isinstance(sender_value, dict):
|
||
return sender_value.get("address") or sender_value.get("name") or ""
|
||
return str(sender_value or "")
|
||
|
||
|
||
def _email_text_from_detail(detail):
|
||
if not isinstance(detail, dict):
|
||
return ""
|
||
text = detail.get("text") or ""
|
||
if text:
|
||
return str(text)
|
||
message = detail.get("message") or ""
|
||
if message:
|
||
return str(message)
|
||
html_val = detail.get("html")
|
||
if isinstance(html_val, list):
|
||
return "\n".join(str(item) for item in html_val)
|
||
if html_val:
|
||
return str(html_val)
|
||
return str(detail.get("raw") or "")
|
||
|
||
|
||
def _looks_like_html(value):
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return False
|
||
if "<html" in text.lower() or "<body" in text.lower():
|
||
return True
|
||
return bool(re.search(r"<\s*[a-zA-Z][^>]*>", text))
|
||
|
||
|
||
def _normalize_email_body(body):
|
||
if isinstance(body, list):
|
||
return "\n\n".join(str(item) for item in body if item is not None)
|
||
if body is None:
|
||
return ""
|
||
return str(body)
|
||
|
||
|
||
def _html_to_plain_text(html_text):
|
||
text = QTextDocumentFragment.fromHtml(html_text).toPlainText().strip()
|
||
if text:
|
||
return text
|
||
return unescape(re.sub(r"<[^>]+>", " ", html_text)).strip()
|
||
|
||
|
||
def _extract_verification_code(text):
|
||
if not text:
|
||
return ""
|
||
match = re.search(r"(?<!\d)(\d{6})(?!\d)", text)
|
||
if match:
|
||
return match.group(1)
|
||
return ""
|
||
|
||
|
||
def _highlight_verification_code_html(html_text, code):
|
||
if not html_text or not code:
|
||
return html_text
|
||
pattern = re.compile(rf"(?<!\d){re.escape(code)}(?!\d)")
|
||
return pattern.sub(
|
||
(
|
||
'<span style="background:#fff1bf;color:#8a3b12;'
|
||
"font-weight:700;padding:2px 6px;border-radius:6px;"
|
||
'letter-spacing:1px;">'
|
||
f"{code}</span>"
|
||
),
|
||
html_text,
|
||
count=1,
|
||
)
|
||
|
||
|
||
def _generate_password(length=14):
|
||
import random
|
||
import string
|
||
|
||
lower = string.ascii_lowercase
|
||
upper = string.ascii_uppercase
|
||
digits = string.digits
|
||
special = "!@#$%&*"
|
||
pwd = [
|
||
random.choice(lower),
|
||
random.choice(upper),
|
||
random.choice(digits),
|
||
random.choice(special),
|
||
]
|
||
all_chars = lower + upper + digits + special
|
||
pwd += [random.choice(all_chars) for _ in range(length - 4)]
|
||
random.shuffle(pwd)
|
||
return "".join(pwd)
|
||
|
||
|
||
def _random_name():
|
||
import random
|
||
|
||
first_names = [
|
||
"James",
|
||
"Emma",
|
||
"Liam",
|
||
"Olivia",
|
||
"Noah",
|
||
"Ava",
|
||
"Ethan",
|
||
"Sophia",
|
||
"Lucas",
|
||
"Mia",
|
||
"Mason",
|
||
"Isabella",
|
||
"Logan",
|
||
"Charlotte",
|
||
"Alexander",
|
||
"Amelia",
|
||
"Benjamin",
|
||
"Harper",
|
||
"William",
|
||
"Evelyn",
|
||
]
|
||
last_names = [
|
||
"Smith",
|
||
"Johnson",
|
||
"Brown",
|
||
"Davis",
|
||
"Wilson",
|
||
"Moore",
|
||
"Taylor",
|
||
"Clark",
|
||
"Hall",
|
||
"Young",
|
||
"Anderson",
|
||
"Thomas",
|
||
"Jackson",
|
||
"White",
|
||
"Harris",
|
||
"Martin",
|
||
"Thompson",
|
||
"Garcia",
|
||
"Robinson",
|
||
"Lewis",
|
||
]
|
||
return f"{random.choice(first_names)} {random.choice(last_names)}"
|
||
|
||
|
||
def _random_birthdate():
|
||
import random
|
||
|
||
y = random.randint(1980, 2002)
|
||
m = random.randint(1, 12)
|
||
d = random.randint(1, 28)
|
||
return f"{y}-{m:02d}-{d:02d}"
|
||
|
||
|
||
class MainWindow(QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("Temp Mail GUI")
|
||
self.resize(1280, 780)
|
||
|
||
self.current_mail_token = ""
|
||
self.current_emails = []
|
||
self.all_accounts = []
|
||
|
||
self._migrate_legacy_config_if_needed()
|
||
self._build_ui()
|
||
self._setup_timer()
|
||
self._load_config()
|
||
self._load_accounts()
|
||
|
||
def _build_ui(self):
|
||
root = QWidget()
|
||
root_layout = QVBoxLayout(root)
|
||
|
||
self.tabs = QTabWidget()
|
||
self.tabs.addTab(self._build_accounts_page(), "账号")
|
||
self.tabs.addTab(self._build_settings_page(), "设置")
|
||
|
||
root_layout.addWidget(self.tabs)
|
||
self.setCentralWidget(root)
|
||
|
||
self.save_config_btn.clicked.connect(self._on_save_config)
|
||
self.create_email_btn.clicked.connect(self._on_create_email)
|
||
self.fetch_emails_btn.clicked.connect(self._on_fetch_emails)
|
||
self.clear_log_btn.clicked.connect(self.log_box.clear)
|
||
self.accounts_list.itemSelectionChanged.connect(self._on_account_selected)
|
||
self.emails_list.itemSelectionChanged.connect(self._on_email_selected)
|
||
self.auto_refresh_check.toggled.connect(self._on_auto_refresh_toggle)
|
||
self.refresh_interval.valueChanged.connect(self._on_refresh_interval_changed)
|
||
self.import_btn.clicked.connect(self._on_import_account)
|
||
self.reload_accounts_btn.clicked.connect(self._on_reload_accounts)
|
||
self.search_input.textChanged.connect(self._apply_account_filter)
|
||
self.show_passwords_check.toggled.connect(self._on_toggle_password_visibility)
|
||
self.backend_type_input.currentTextChanged.connect(
|
||
self._on_backend_type_changed
|
||
)
|
||
self.db_type_input.currentTextChanged.connect(self._on_db_type_changed)
|
||
self.test_db_btn.clicked.connect(self._on_test_db_connection)
|
||
self.init_db_btn.clicked.connect(self._on_init_db)
|
||
self.copy_verification_code_btn.clicked.connect(self._copy_verification_code)
|
||
|
||
def _migrate_legacy_config_if_needed(self):
|
||
if CONFIG_PATH.exists():
|
||
return
|
||
if not LEGACY_CONFIG_PATH.exists():
|
||
return
|
||
try:
|
||
CONFIG_PATH.write_text(
|
||
LEGACY_CONFIG_PATH.read_text(encoding="utf-8"),
|
||
encoding="utf-8",
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
def _build_accounts_page(self):
|
||
page = QWidget()
|
||
layout = QVBoxLayout(page)
|
||
|
||
toolbar = QHBoxLayout()
|
||
self.search_input = QLineEdit()
|
||
self.search_input.setPlaceholderText("搜索邮箱 / 姓名 / 创建时间")
|
||
self.fetch_emails_btn = QPushButton("获取邮件")
|
||
self.reload_accounts_btn = QPushButton("刷新账号")
|
||
self.clear_log_btn = QPushButton("清空日志")
|
||
toolbar.addWidget(QLabel("账号搜索"))
|
||
toolbar.addWidget(self.search_input, 1)
|
||
toolbar.addWidget(self.fetch_emails_btn)
|
||
toolbar.addWidget(self.reload_accounts_btn)
|
||
toolbar.addWidget(self.clear_log_btn)
|
||
|
||
splitter = QSplitter(Qt.Horizontal)
|
||
|
||
left_frame = QFrame()
|
||
left_layout = QVBoxLayout(left_frame)
|
||
left_layout.addWidget(QLabel("账号列表"))
|
||
self.accounts_list = QListWidget()
|
||
left_layout.addWidget(self.accounts_list)
|
||
|
||
details_group = QGroupBox("账号详情")
|
||
details_layout = QFormLayout(details_group)
|
||
self.detail_email = self._create_copyable_label("-")
|
||
self.detail_mail_password = self._create_copyable_label("-")
|
||
self.detail_chatgpt_password = self._create_copyable_label("-")
|
||
self.detail_name = self._create_copyable_label("-")
|
||
self.detail_birthdate = self._create_copyable_label("-")
|
||
self.detail_mail_token = self._create_copyable_label("-")
|
||
self.detail_source = self._create_copyable_label("postgres")
|
||
details_layout.addRow("邮箱", self.detail_email)
|
||
details_layout.addRow("邮箱密码", self.detail_mail_password)
|
||
details_layout.addRow("Mail Token", self.detail_mail_token)
|
||
details_layout.addRow("ChatGPT密码", self.detail_chatgpt_password)
|
||
details_layout.addRow("姓名", self.detail_name)
|
||
details_layout.addRow("生日", self.detail_birthdate)
|
||
details_layout.addRow("来源", self.detail_source)
|
||
left_layout.addWidget(details_group)
|
||
|
||
right_frame = QFrame()
|
||
right_layout = QVBoxLayout(right_frame)
|
||
right_layout.addWidget(QLabel("邮件列表"))
|
||
self.emails_list = QListWidget()
|
||
right_layout.addWidget(self.emails_list)
|
||
code_layout = QHBoxLayout()
|
||
self.verification_code_label = QLabel("验证码")
|
||
self.verification_code_value = QLineEdit()
|
||
self.verification_code_value.setReadOnly(True)
|
||
self.verification_code_value.setPlaceholderText("自动提取 6 位验证码")
|
||
self.verification_code_value.setFocusPolicy(Qt.StrongFocus)
|
||
self.copy_verification_code_btn = QPushButton("复制")
|
||
self.copy_verification_code_btn.setEnabled(False)
|
||
code_layout.addWidget(self.verification_code_label)
|
||
code_layout.addWidget(self.verification_code_value)
|
||
code_layout.addWidget(self.copy_verification_code_btn)
|
||
right_layout.addLayout(code_layout)
|
||
right_layout.addWidget(QLabel("邮件内容"))
|
||
self.email_content = QTextEdit()
|
||
self.email_content.setReadOnly(True)
|
||
self.email_content.setFont(QFont("Consolas", 10))
|
||
right_layout.addWidget(self.email_content)
|
||
|
||
splitter.addWidget(left_frame)
|
||
splitter.addWidget(right_frame)
|
||
splitter.setSizes([360, 900])
|
||
|
||
self.log_box = QTextEdit()
|
||
self.log_box.setReadOnly(True)
|
||
self.log_box.setFixedHeight(150)
|
||
|
||
layout.addLayout(toolbar)
|
||
layout.addWidget(splitter)
|
||
layout.addWidget(QLabel("日志"))
|
||
layout.addWidget(self.log_box)
|
||
return page
|
||
|
||
def _build_settings_page(self):
|
||
page = QWidget()
|
||
layout = QVBoxLayout(page)
|
||
|
||
config_group = QGroupBox("Cloudflare Temp Mail 配置")
|
||
config_layout = QFormLayout(config_group)
|
||
self.api_base_input = QLineEdit()
|
||
self.domain_input = QLineEdit()
|
||
self.backend_type_input = QComboBox()
|
||
self.backend_type_input.addItems(["cloudflare_temp_email"])
|
||
self.bearer_input = QLineEdit()
|
||
self.bearer_input.setEchoMode(QLineEdit.Password)
|
||
self.proxy_input = QLineEdit()
|
||
self.show_passwords_check = QCheckBox("显示密码")
|
||
config_layout.addRow("API Base", self.api_base_input)
|
||
config_layout.addRow("Domain", self.domain_input)
|
||
config_layout.addRow("Backend Type", self.backend_type_input)
|
||
config_layout.addRow("Admin Password", self.bearer_input)
|
||
config_layout.addRow("Proxy", self.proxy_input)
|
||
config_layout.addRow("密码显示", self.show_passwords_check)
|
||
|
||
auto_layout = QHBoxLayout()
|
||
self.auto_refresh_check = QCheckBox("自动刷新")
|
||
self.refresh_interval = QSpinBox()
|
||
self.refresh_interval.setRange(10, 300)
|
||
self.refresh_interval.setValue(30)
|
||
self.refresh_interval.setSuffix(" 秒")
|
||
auto_layout.addWidget(self.auto_refresh_check)
|
||
auto_layout.addWidget(QLabel("间隔"))
|
||
auto_layout.addWidget(self.refresh_interval)
|
||
auto_layout.addStretch(1)
|
||
config_layout.addRow("刷新", auto_layout)
|
||
|
||
pg_group = QGroupBox("数据库配置")
|
||
pg_layout = QFormLayout(pg_group)
|
||
self.pg_enabled_check = QCheckBox("启用数据库")
|
||
self.db_type_input = QComboBox()
|
||
self.db_type_input.addItems(["postgresql", "mysql"])
|
||
self.pg_host_input = QLineEdit()
|
||
self.pg_port_input = QSpinBox()
|
||
self.pg_port_input.setRange(1, 65535)
|
||
self.pg_port_input.setValue(5432)
|
||
self.pg_db_input = QLineEdit()
|
||
self.db_table_input = QLineEdit()
|
||
self.pg_user_input = QLineEdit()
|
||
self.pg_password_input = QLineEdit()
|
||
self.pg_password_input.setEchoMode(QLineEdit.Password)
|
||
self.db_auto_create_check = QCheckBox("数据库或表不存在时允许创建")
|
||
self.test_db_btn = QPushButton("测试连接")
|
||
self.init_db_btn = QPushButton("初始化库表")
|
||
db_actions = QHBoxLayout()
|
||
db_actions.addWidget(self.test_db_btn)
|
||
db_actions.addWidget(self.init_db_btn)
|
||
pg_layout.addRow("开关", self.pg_enabled_check)
|
||
pg_layout.addRow("类型", self.db_type_input)
|
||
pg_layout.addRow("Host", self.pg_host_input)
|
||
pg_layout.addRow("Port", self.pg_port_input)
|
||
pg_layout.addRow("DB", self.pg_db_input)
|
||
pg_layout.addRow("Table", self.db_table_input)
|
||
pg_layout.addRow("User", self.pg_user_input)
|
||
pg_layout.addRow("Password", self.pg_password_input)
|
||
pg_layout.addRow("自动创建", self.db_auto_create_check)
|
||
pg_layout.addRow("操作", db_actions)
|
||
|
||
import_group = QGroupBox("账号写入数据库")
|
||
import_layout = QFormLayout(import_group)
|
||
self.import_email_input = QLineEdit()
|
||
self.import_mail_password_input = QLineEdit()
|
||
self.import_mail_password_input.setEchoMode(QLineEdit.Password)
|
||
self.import_chatgpt_password_input = QLineEdit()
|
||
self.import_name_input = QLineEdit()
|
||
self.import_birthdate_input = QLineEdit()
|
||
self.import_btn = QPushButton("导入账号并补 Token")
|
||
self.create_email_btn = QPushButton("创建邮箱并写库")
|
||
import_actions = QHBoxLayout()
|
||
import_actions.addWidget(self.import_btn)
|
||
import_actions.addWidget(self.create_email_btn)
|
||
self.save_config_btn = QPushButton("保存设置")
|
||
import_layout.addRow("邮箱", self.import_email_input)
|
||
import_layout.addRow("邮箱密码(可选)", self.import_mail_password_input)
|
||
import_layout.addRow("ChatGPT密码(可选)", self.import_chatgpt_password_input)
|
||
import_layout.addRow("姓名(可选)", self.import_name_input)
|
||
import_layout.addRow("生日(可选)", self.import_birthdate_input)
|
||
import_layout.addRow("操作", import_actions)
|
||
|
||
layout.addWidget(config_group)
|
||
layout.addWidget(pg_group)
|
||
layout.addWidget(import_group)
|
||
layout.addWidget(self.save_config_btn)
|
||
layout.addStretch(1)
|
||
return page
|
||
|
||
def _create_copyable_label(self, text=""):
|
||
label = QLabel(text)
|
||
label.setTextInteractionFlags(Qt.TextSelectableByMouse)
|
||
label.setCursor(Qt.IBeamCursor)
|
||
label.setWordWrap(True)
|
||
return label
|
||
|
||
def _setup_timer(self):
|
||
self.timer = QTimer(self)
|
||
self.timer.setInterval(self.refresh_interval.value() * 1000)
|
||
self.timer.timeout.connect(self._auto_refresh)
|
||
|
||
def _copy_verification_code(self):
|
||
code = self.verification_code_value.text().strip()
|
||
if not code:
|
||
return
|
||
QApplication.clipboard().setText(code)
|
||
self.verification_code_value.setFocus()
|
||
self.verification_code_value.selectAll()
|
||
self._log(f"验证码已复制: {code}")
|
||
|
||
def _log(self, message):
|
||
ts = time.strftime("%H:%M:%S")
|
||
self.log_box.append(f"[{ts}] {message}")
|
||
|
||
def _get_config(self):
|
||
return {
|
||
"api_base": self.api_base_input.text().strip(),
|
||
"domain": self.domain_input.text().strip(),
|
||
"backend_type": self.backend_type_input.currentText(),
|
||
"bearer": self.bearer_input.text().strip(),
|
||
"proxy": self.proxy_input.text().strip(),
|
||
"show_passwords": self.show_passwords_check.isChecked(),
|
||
"auto_refresh": self.auto_refresh_check.isChecked(),
|
||
"refresh_interval": self.refresh_interval.value(),
|
||
"db_enabled": self.pg_enabled_check.isChecked(),
|
||
"db_type": self.db_type_input.currentText(),
|
||
"db_host": self.pg_host_input.text().strip(),
|
||
"db_port": self.pg_port_input.value(),
|
||
"db_name": self.pg_db_input.text().strip(),
|
||
"db_table": self.db_table_input.text().strip(),
|
||
"db_user": self.pg_user_input.text().strip(),
|
||
"db_password": self.pg_password_input.text().strip(),
|
||
"db_auto_create": self.db_auto_create_check.isChecked(),
|
||
"pg_enabled": self.pg_enabled_check.isChecked(),
|
||
"pg_host": self.pg_host_input.text().strip(),
|
||
"pg_port": self.pg_port_input.value(),
|
||
"pg_db": self.pg_db_input.text().strip(),
|
||
"pg_user": self.pg_user_input.text().strip(),
|
||
"pg_password": self.pg_password_input.text().strip(),
|
||
}
|
||
|
||
def _client_from_inputs(self):
|
||
config = self._get_config()
|
||
return DuckMailClient(
|
||
config.get("api_base"),
|
||
config.get("domain"),
|
||
config.get("bearer"),
|
||
proxy=config.get("proxy"),
|
||
backend_type=config.get("backend_type"),
|
||
)
|
||
|
||
def _load_config(self):
|
||
data = load_config(CONFIG_PATH)
|
||
self.api_base_input.setText(
|
||
data.get("api_base", "https://temp-email-api.example.com")
|
||
)
|
||
self.domain_input.setText(data.get("domain", "example.com"))
|
||
backend_type = (
|
||
str(data.get("backend_type", "cloudflare_temp_email")).strip().lower()
|
||
or "cloudflare_temp_email"
|
||
)
|
||
backend_index = self.backend_type_input.findText(backend_type)
|
||
if backend_index >= 0:
|
||
self.backend_type_input.setCurrentIndex(backend_index)
|
||
self.bearer_input.setText(data.get("bearer", ""))
|
||
self.proxy_input.setText(data.get("proxy", ""))
|
||
self.show_passwords_check.setChecked(bool(data.get("show_passwords", False)))
|
||
self.auto_refresh_check.setChecked(bool(data.get("auto_refresh", False)))
|
||
self.refresh_interval.setValue(
|
||
max(10, min(300, int(data.get("refresh_interval", 30) or 30)))
|
||
)
|
||
self.pg_enabled_check.setChecked(
|
||
bool(data.get("db_enabled", data.get("pg_enabled", True)))
|
||
)
|
||
db_type = str(data.get("db_type", "postgresql")).strip().lower() or "postgresql"
|
||
index = self.db_type_input.findText(db_type)
|
||
if index >= 0:
|
||
self.db_type_input.setCurrentIndex(index)
|
||
self.pg_host_input.setText(data.get("db_host", data.get("pg_host", "")))
|
||
self.pg_port_input.setValue(
|
||
int(
|
||
data.get(
|
||
"db_port", data.get("pg_port", 5432 if db_type != "mysql" else 3306)
|
||
)
|
||
or (3306 if db_type == "mysql" else 5432)
|
||
)
|
||
)
|
||
self.pg_db_input.setText(
|
||
data.get("db_name", data.get("pg_db", "mail_accounts_db"))
|
||
)
|
||
self.db_table_input.setText(data.get("db_table", "registered_accounts"))
|
||
self.pg_user_input.setText(data.get("db_user", data.get("pg_user", "")))
|
||
self.pg_password_input.setText(
|
||
data.get("db_password", data.get("pg_password", ""))
|
||
)
|
||
self.db_auto_create_check.setChecked(bool(data.get("db_auto_create", False)))
|
||
self._on_toggle_password_visibility(self.show_passwords_check.isChecked())
|
||
|
||
def _save_config(self):
|
||
data = self._get_config()
|
||
save_config(CONFIG_PATH, data)
|
||
return data
|
||
|
||
def _mask_secret(self, value, placeholder="(空)"):
|
||
text = str(value or "")
|
||
if not text:
|
||
return placeholder
|
||
if self.show_passwords_check.isChecked():
|
||
return text
|
||
if len(text) <= 4:
|
||
return "*" * len(text)
|
||
return f"{text[:2]}{'*' * max(len(text) - 4, 2)}{text[-2:]}"
|
||
|
||
def _on_toggle_password_visibility(self, checked):
|
||
mode = QLineEdit.Normal if checked else QLineEdit.Password
|
||
self.bearer_input.setEchoMode(mode)
|
||
self.pg_password_input.setEchoMode(mode)
|
||
self.import_mail_password_input.setEchoMode(mode)
|
||
self._render_account_detail(self._get_selected_account())
|
||
|
||
def _on_backend_type_changed(self, backend_type):
|
||
self._save_config()
|
||
|
||
def _on_db_type_changed(self, db_type):
|
||
db_type = (db_type or "").strip().lower()
|
||
default_port = 3306 if db_type == "mysql" else 5432
|
||
current_port = self.pg_port_input.value()
|
||
if current_port in (3306, 5432):
|
||
self.pg_port_input.setValue(default_port)
|
||
|
||
def _db_config_for_action(self):
|
||
config = self._save_config()
|
||
if not config.get("db_enabled"):
|
||
raise RuntimeError("请先启用数据库")
|
||
return config
|
||
|
||
def _on_test_db_connection(self):
|
||
try:
|
||
config = self._db_config_for_action()
|
||
result = test_connection(config)
|
||
self._log(
|
||
f"数据库连接成功: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}"
|
||
)
|
||
QMessageBox.information(
|
||
self,
|
||
"数据库连接成功",
|
||
f"类型: {result['db_type']}\n数据库存在: {result['database_exists']}\n表存在: {result['table_exists']}\n管理权限: {result['admin_access']}",
|
||
)
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "数据库连接失败", str(e))
|
||
self._log(f"数据库连接失败: {e}")
|
||
|
||
def _on_init_db(self):
|
||
try:
|
||
config = self._db_config_for_action()
|
||
result = ensure_database_and_table(config)
|
||
self._log(
|
||
f"数据库初始化完成: type={result['db_type']} db_exists={result['database_exists']} table_exists={result['table_exists']} admin_access={result['admin_access']}"
|
||
)
|
||
QMessageBox.information(self, "初始化成功", "数据库和表已经就绪")
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "初始化失败", str(e))
|
||
self._log(f"初始化数据库失败: {e}")
|
||
|
||
def _account_matches_search(self, account, keyword):
|
||
if not keyword:
|
||
return True
|
||
fields = [
|
||
account.get("email", ""),
|
||
account.get("name", ""),
|
||
account.get("created_at", ""),
|
||
]
|
||
haystack = " ".join(str(item or "") for item in fields).lower()
|
||
return keyword in haystack
|
||
|
||
def _render_accounts_list(self, accounts):
|
||
self.accounts_list.clear()
|
||
for item in accounts:
|
||
email = item.get("email", "")
|
||
created_at = item.get("created_at", "")
|
||
label = f"{email} ({created_at})" if created_at else email
|
||
list_item = QListWidgetItem(label)
|
||
list_item.setData(Qt.UserRole, item)
|
||
self.accounts_list.addItem(list_item)
|
||
|
||
def _apply_account_filter(self):
|
||
keyword = self.search_input.text().strip().lower()
|
||
filtered = [
|
||
item
|
||
for item in self.all_accounts
|
||
if self._account_matches_search(item, keyword)
|
||
]
|
||
self._render_accounts_list(filtered)
|
||
if not filtered:
|
||
self.current_mail_token = ""
|
||
self._render_account_detail(None)
|
||
|
||
def _load_accounts(self):
|
||
config = self._save_config()
|
||
if not config.get("db_enabled", config.get("pg_enabled")):
|
||
self.all_accounts = []
|
||
self._apply_account_filter()
|
||
self._log("数据库未启用")
|
||
return
|
||
try:
|
||
self.all_accounts = load_accounts(config)
|
||
self._apply_account_filter()
|
||
self._log(
|
||
f"已加载 {config.get('db_type', 'postgresql')} 账号: {len(self.all_accounts)} 个"
|
||
)
|
||
except Exception as e:
|
||
self.all_accounts = []
|
||
self._apply_account_filter()
|
||
self._log(f"加载数据库账号失败: {e}")
|
||
|
||
def _on_reload_accounts(self):
|
||
self._load_accounts()
|
||
self._log("账号列表已刷新")
|
||
|
||
def _get_selected_account(self):
|
||
items = self.accounts_list.selectedItems()
|
||
if not items:
|
||
return None
|
||
return items[0].data(Qt.UserRole)
|
||
|
||
def _on_save_config(self):
|
||
self._save_config()
|
||
self._log("设置已保存")
|
||
|
||
def _on_create_email(self):
|
||
try:
|
||
config = self._save_config()
|
||
client = self._client_from_inputs()
|
||
email, password, mail_token = client.create_email()
|
||
chatgpt_password = _generate_password()
|
||
name = _random_name()
|
||
birthdate = _random_birthdate()
|
||
save_account(
|
||
config,
|
||
email,
|
||
password,
|
||
mail_token,
|
||
chatgpt_password=chatgpt_password,
|
||
name=name,
|
||
birthdate=birthdate,
|
||
)
|
||
self._load_accounts()
|
||
self._log(
|
||
f"创建邮箱成功并写入数据库: {email} | ChatGPT密码: {chatgpt_password} | 姓名: {name} | 生日: {birthdate}"
|
||
)
|
||
self.tabs.setCurrentIndex(0)
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "错误", str(e))
|
||
self._log(f"创建邮箱失败: {e}")
|
||
|
||
def _on_import_account(self):
|
||
email = self.import_email_input.text().strip()
|
||
mail_password = self.import_mail_password_input.text().strip()
|
||
chatgpt_password = self.import_chatgpt_password_input.text().strip()
|
||
name = self.import_name_input.text().strip()
|
||
birthdate = self.import_birthdate_input.text().strip()
|
||
if not email:
|
||
QMessageBox.information(self, "提示", "请输入邮箱")
|
||
return
|
||
try:
|
||
config = self._save_config()
|
||
client = self._client_from_inputs()
|
||
mail_token = client.get_token(email, mail_password)
|
||
save_account(
|
||
config,
|
||
email,
|
||
mail_password,
|
||
mail_token,
|
||
chatgpt_password=chatgpt_password,
|
||
name=name,
|
||
birthdate=birthdate,
|
||
)
|
||
self._load_accounts()
|
||
self._log(f"导入账号成功并写入数据库: {email}")
|
||
self.tabs.setCurrentIndex(0)
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "错误", str(e))
|
||
self._log(f"导入账号失败: {e}")
|
||
|
||
def _on_account_selected(self):
|
||
account = self._get_selected_account()
|
||
if not account:
|
||
self.current_mail_token = ""
|
||
self._render_account_detail(None)
|
||
return
|
||
self.current_mail_token = account.get("mail_token", "")
|
||
self._render_account_detail(account)
|
||
self._log(f"选中账号: {account.get('email', '')}")
|
||
|
||
def _render_account_detail(self, account):
|
||
if not account:
|
||
self.detail_email.setText("-")
|
||
self.detail_mail_password.setText("-")
|
||
self.detail_mail_token.setText("-")
|
||
self.detail_chatgpt_password.setText("-")
|
||
self.detail_name.setText("-")
|
||
self.detail_birthdate.setText("-")
|
||
self.detail_source.setText("-")
|
||
return
|
||
token_status = "已存在" if account.get("mail_token", "") else "缺失"
|
||
self.detail_email.setText(account.get("email", ""))
|
||
self.detail_mail_password.setText(
|
||
self._mask_secret(account.get("mail_password", ""), "(管理员模式下可留空)")
|
||
)
|
||
self.detail_mail_token.setText(token_status)
|
||
self.detail_chatgpt_password.setText(
|
||
self._mask_secret(account.get("chatgpt_password", ""))
|
||
)
|
||
self.detail_name.setText(account.get("name", ""))
|
||
self.detail_birthdate.setText(account.get("birthdate", ""))
|
||
self.detail_source.setText(account.get("source", "postgres"))
|
||
|
||
def _update_account_token(self, account, mail_token):
|
||
if not account or not mail_token:
|
||
return
|
||
config = self._get_config()
|
||
updated = dict(account)
|
||
updated["mail_token"] = mail_token
|
||
save_account(
|
||
config,
|
||
updated.get("email", ""),
|
||
updated.get("mail_password", ""),
|
||
mail_token,
|
||
chatgpt_password=updated.get("chatgpt_password", ""),
|
||
name=updated.get("name", ""),
|
||
birthdate=updated.get("birthdate", ""),
|
||
)
|
||
account["mail_token"] = mail_token
|
||
self.current_mail_token = mail_token
|
||
self._render_account_detail(account)
|
||
|
||
def _ensure_mail_token(self, account):
|
||
mail_token = str(account.get("mail_token", "")).strip()
|
||
if mail_token:
|
||
return mail_token
|
||
|
||
email = str(account.get("email", "")).strip()
|
||
mail_password = str(account.get("mail_password", "")).strip()
|
||
self._log(f"账号 {email} 缺少 mail_token,正在通过管理员接口补全")
|
||
client = self._client_from_inputs()
|
||
mail_token = client.get_token(email, mail_password)
|
||
if not mail_token:
|
||
raise RuntimeError("自动获取 mail_token 失败")
|
||
|
||
self._update_account_token(account, mail_token)
|
||
self._log(f"已自动补全 mail_token: {email}")
|
||
return mail_token
|
||
|
||
def _on_fetch_emails(self):
|
||
account = self._get_selected_account()
|
||
if not account:
|
||
QMessageBox.information(self, "提示", "请先选择账号")
|
||
return
|
||
try:
|
||
mail_token = self._ensure_mail_token(account)
|
||
client = self._client_from_inputs()
|
||
emails = client.fetch_emails(mail_token)
|
||
self.current_emails = emails
|
||
self._render_emails_list(emails)
|
||
self._log(f"已获取邮件: {len(emails)} 封")
|
||
except Exception as e:
|
||
QMessageBox.critical(self, "错误", str(e))
|
||
self._log(f"获取邮件失败: {e}")
|
||
|
||
def _render_emails_list(self, emails):
|
||
self.emails_list.clear()
|
||
for item in emails:
|
||
subject = str(item.get("subject", "") or "(无主题)")
|
||
sender = _format_sender(item.get("from") or item.get("source"))
|
||
date = str(item.get("createdAt", ""))
|
||
label = f"{subject} | {sender} | {date}"
|
||
list_item = QListWidgetItem(label)
|
||
list_item.setData(Qt.UserRole, item)
|
||
self.emails_list.addItem(list_item)
|
||
|
||
def _on_email_selected(self):
|
||
items = self.emails_list.selectedItems()
|
||
if not items:
|
||
return
|
||
email_item = items[0].data(Qt.UserRole)
|
||
msg_id = (
|
||
email_item.get("id") or email_item.get("@id") or email_item.get("messageId")
|
||
)
|
||
body = (
|
||
email_item.get("text")
|
||
or email_item.get("message")
|
||
or email_item.get("html")
|
||
or email_item.get("intro")
|
||
or email_item.get("raw")
|
||
)
|
||
if not body and msg_id and self.current_mail_token:
|
||
try:
|
||
client = self._client_from_inputs()
|
||
detail = client.fetch_message_detail(self.current_mail_token, msg_id)
|
||
body = _email_text_from_detail(detail)
|
||
except Exception as e:
|
||
body = f"(获取邮件详情失败: {e})"
|
||
content = _normalize_email_body(body)
|
||
if not content:
|
||
self.verification_code_value.clear()
|
||
self.copy_verification_code_btn.setEnabled(False)
|
||
self.verification_code_label.setText("验证码")
|
||
self.email_content.setToolTip("")
|
||
self.email_content.setPlainText("(无正文)")
|
||
return
|
||
plain_text = (
|
||
_html_to_plain_text(content) if _looks_like_html(content) else content
|
||
)
|
||
verification_code = _extract_verification_code(plain_text)
|
||
self.verification_code_value.setText(verification_code)
|
||
self.copy_verification_code_btn.setEnabled(bool(verification_code))
|
||
self.verification_code_label.setText(
|
||
"验证码 (已提取)" if verification_code else "验证码"
|
||
)
|
||
display_text = plain_text
|
||
if verification_code:
|
||
display_text = re.sub(
|
||
rf"(?<!\d)({re.escape(verification_code)})(?!\d)",
|
||
r"[验证码: \1]",
|
||
plain_text,
|
||
count=1,
|
||
)
|
||
self.email_content.setToolTip("")
|
||
self.email_content.setPlainText(display_text)
|
||
|
||
def _on_auto_refresh_toggle(self, checked):
|
||
if checked:
|
||
self.timer.start()
|
||
self._log("自动刷新已开启")
|
||
else:
|
||
self.timer.stop()
|
||
self._log("自动刷新已关闭")
|
||
self._save_config()
|
||
|
||
def _on_refresh_interval_changed(self, value):
|
||
self.timer.setInterval(value * 1000)
|
||
if self.auto_refresh_check.isChecked():
|
||
self.timer.start()
|
||
self._save_config()
|
||
|
||
def _auto_refresh(self):
|
||
account = self._get_selected_account()
|
||
if not account:
|
||
return
|
||
try:
|
||
mail_token = self._ensure_mail_token(account)
|
||
client = self._client_from_inputs()
|
||
emails = client.fetch_emails(mail_token)
|
||
self.current_emails = emails
|
||
self._render_emails_list(emails)
|
||
self._log(f"自动刷新: {len(emails)} 封")
|
||
except Exception as e:
|
||
self._log(f"自动刷新失败: {e}")
|
||
|
||
|
||
def main():
|
||
app = QApplication(sys.argv)
|
||
win = MainWindow()
|
||
win.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|