Add PostgreSQL-backed DuckMail GUI

This commit is contained in:
52tv
2026-03-20 17:24:48 +08:00
commit 3fe6390250
7 changed files with 1165 additions and 0 deletions

15
.gitignore vendored Normal file
View File

@@ -0,0 +1,15 @@
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
.venv/
venv/
build/
dist/
config_gui.json
accounts.json
.DS_Store
Thumbs.db

76
README.md Normal file
View File

@@ -0,0 +1,76 @@
# DuckMail GUI (PySide6)
一个基于 PySide6 的 DuckMail 邮箱创建与收信工具。
当前版本改为纯 PostgreSQL 账号源:
- 账号列表直接从 PostgreSQL `registered_accounts` 读取
- 创建邮箱后直接写入 PostgreSQL
- 导入账号并获取 Token 后直接写入 PostgreSQL
- 界面采用分页布局,日常使用时只看账号页即可
## 安装依赖
```bash
pip install -r requirements.txt
```
## 启动
```bash
python gui.py
```
## 页面说明
### 账号页
- 搜索账号
- 查看账号列表
- 查看账号详情
- 获取邮件
- 查看邮件正文
- 查看日志
### 设置页
- DuckMail API 配置
- PostgreSQL 连接配置
- 自动刷新配置
- 导入账号并写入数据库
- 创建邮箱并写入数据库
## PostgreSQL 表结构
默认读取并写入表:`registered_accounts`
默认字段:
- `email`
- `mail_password`
- `mail_token`
- `chatgpt_password`
- `name`
- `birthdate`
- `created_at`
其中 `email` 需要能作为唯一键或主键,以支持 `on conflict (email) do update`
## 配置说明
程序会将界面配置保存到 `config_gui.json`,包括:
- DuckMail API Base
- Domain
- Bearer
- Proxy
- 自动刷新开关与间隔
- PostgreSQL Host / Port / DB / User / Password
## requirements
当前依赖:
- `PySide6`
- `requests`
- `psycopg[binary]`

157
account_store.py Normal file
View File

@@ -0,0 +1,157 @@
import json
import os
import threading
try:
import psycopg
except ImportError:
psycopg = None
try:
import psycopg2
except ImportError:
psycopg2 = None
_LOCK = threading.Lock()
def _safe_read_json(path):
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _safe_write_json(path, data):
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def save_config(path, config):
with _LOCK:
_safe_write_json(path, config)
def load_config(path):
data = _safe_read_json(path)
if not isinstance(data, dict):
return {}
return data
def _to_text(value):
if value is None:
return ""
return str(value)
def _normalize_optional_text(value):
text = _to_text(value).strip()
return text or None
def _get_pg_driver():
if psycopg is not None:
return "psycopg"
if psycopg2 is not None:
return "psycopg2"
raise RuntimeError("未安装 PostgreSQL 驱动,请先安装 psycopg[binary] 或 psycopg2-binary")
def _get_pg_connection(config):
driver = _get_pg_driver()
host = str(config.get("pg_host", "")).strip()
port = int(config.get("pg_port", 5432) or 5432)
dbname = str(config.get("pg_db", "")).strip()
user = str(config.get("pg_user", "")).strip()
password = str(config.get("pg_password", "")).strip()
connect_timeout = int(config.get("pg_connect_timeout", 10) or 10)
if not host or not dbname or not user:
raise ValueError("PostgreSQL 配置不完整,请填写 Host、DB、User")
kwargs = {
"host": host,
"port": port,
"dbname": dbname,
"user": user,
"password": password,
"connect_timeout": connect_timeout,
}
if driver == "psycopg":
return psycopg.connect(**kwargs)
return psycopg2.connect(**kwargs)
def load_accounts(config):
query = (
"select email, mail_password, mail_token, chatgpt_password, name, birthdate, created_at "
"from registered_accounts order by created_at desc nulls last, email asc"
)
accounts = []
with _get_pg_connection(config) as conn:
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
for row in rows:
email, mail_password, mail_token, chatgpt_password, name, birthdate, created_at = row
if not email:
continue
accounts.append(
{
"email": _to_text(email).strip(),
"mail_password": _to_text(mail_password).strip(),
"mail_token": _to_text(mail_token).strip(),
"chatgpt_password": _to_text(chatgpt_password).strip(),
"name": _to_text(name).strip(),
"birthdate": _to_text(birthdate).strip(),
"created_at": _to_text(created_at).strip(),
"source": "postgres",
}
)
return accounts
def save_account(
config,
email,
mail_password,
mail_token,
chatgpt_password=None,
name=None,
birthdate=None,
):
query = (
"insert into registered_accounts "
"(email, mail_password, mail_token, chatgpt_password, name, birthdate) "
"values (%s, %s, %s, %s, %s, %s) "
"on conflict (email) do update set "
"mail_password = excluded.mail_password, "
"mail_token = excluded.mail_token, "
"chatgpt_password = excluded.chatgpt_password, "
"name = excluded.name, "
"birthdate = excluded.birthdate"
)
params = (
email,
mail_password,
mail_token,
_normalize_optional_text(chatgpt_password),
_normalize_optional_text(name),
_normalize_optional_text(birthdate),
)
with _LOCK:
with _get_pg_connection(config) as conn:
with conn.cursor() as cur:
cur.execute(query, params)
conn.commit()

286
duckmail_client.py Normal file
View File

@@ -0,0 +1,286 @@
import random
import string
import time
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def _generate_email_local_part():
letters1 = "".join(random.choices(string.ascii_lowercase, k=5))
numbers = "".join(random.choices(string.digits, k=random.randint(1, 3)))
letters2 = "".join(random.choices(string.ascii_lowercase, k=random.randint(1, 3)))
return letters1 + numbers + letters2
def _generate_password(length=14):
lower = string.ascii_lowercase
upper = string.ascii_uppercase
digits = string.digits
special = "!@#$%&*"
pwd = [
random.choice(lower),
random.choice(upper),
random.choice(digits),
random.choice(special),
]
all_chars = lower + upper + digits + special
pwd += [random.choice(all_chars) for _ in range(length - 4)]
random.shuffle(pwd)
return "".join(pwd)
class DuckMailClient:
def __init__(self, api_base, domain, bearer, proxy=None, timeout=120):
self.api_base = (api_base or "").rstrip("/")
self.domain = domain or ""
self.bearer = (bearer or "").strip()
if self.bearer.lower().startswith("bearer "):
self.bearer = self.bearer[7:].strip()
self.proxy = proxy or ""
self.timeout = timeout
self.session = self._create_session()
def _create_session(self):
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update(
{
"User-Agent": "DuckMailGUI/1.0",
"Accept": "application/json",
"Content-Type": "application/json",
}
)
if self.proxy:
session.proxies = {"http": self.proxy, "https": self.proxy}
return session
def _auth_headers(self):
if not self.bearer:
return {}
return {
"Authorization": f"Bearer {self.bearer}",
"X-Api-Key": self.bearer,
}
def _duckmail_api_bases(self):
base = self.api_base
if not base:
return []
candidates = [base]
if base.endswith("/api"):
candidates.append(base[:-4])
else:
candidates.append(f"{base}/api")
unique = []
for item in candidates:
if item and item not in unique:
unique.append(item)
return unique
def create_email(self):
if not self.bearer:
raise ValueError("DUCKMAIL_BEARER 未设置")
if not self.domain:
raise ValueError("DUCKMAIL_DOMAIN 未设置")
api_bases = self._duckmail_api_bases()
if not api_bases:
raise ValueError("DUCKMAIL_API_BASE 未设置")
is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base
max_retries = 5
for attempt in range(max_retries):
email_local_part = _generate_email_local_part()
email = f"{email_local_part}@{self.domain}"
password = _generate_password()
if is_worker:
payload = {
"admin_password": self.bearer,
"name": email_local_part,
"domain": self.domain,
}
res = None
for idx, api_base in enumerate(api_bases):
res = self.session.post(
f"{api_base}/new_address", json=payload, timeout=self.timeout
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code == 200:
result = res.json()
if result.get("jwt"):
return result.get("address"), password, result.get("jwt")
raise RuntimeError(f"Worker获取邮件 Token 失败: {res.text}")
if res.status_code == 403:
raise RuntimeError(
f"Worker创建邮箱返回 403可能是 Bearer 无效或权限不足: {res.text[:200]}"
)
if res.status_code == 400 and "already exists" in res.text:
continue
raise RuntimeError(
f"Worker创建邮箱失败: {res.status_code} - {res.text[:200]}"
)
headers = {"Authorization": f"Bearer {self.bearer}"}
headers = self._auth_headers()
res = None
used_api_base = None
for idx, api_base in enumerate(api_bases):
used_api_base = api_base
res = self.session.post(
f"{api_base}/accounts",
json={"address": email, "password": password},
headers=headers,
timeout=self.timeout,
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code in (200, 201):
result = res.json()
if result.get("address"):
time.sleep(0.5)
token_res = self.session.post(
f"{used_api_base}/token",
json={"address": email, "password": password},
headers=headers,
timeout=self.timeout,
)
if token_res.status_code == 200:
mail_token = token_res.json().get("token")
if mail_token:
return email, password, mail_token
if token_res.status_code == 403:
raise RuntimeError(
"获取邮件 Token 返回 403可能是 Bearer 无效或权限不足"
)
raise RuntimeError(f"获取邮件 Token 失败: {token_res.status_code}")
if res.status_code == 403:
raise RuntimeError(
f"创建邮箱返回 403可能是 Bearer 无效或权限不足: {res.text[:200]}"
)
if res.status_code == 422 and "already exists" in res.text:
continue
raise RuntimeError(
f"创建邮箱失败: {res.status_code} - {res.text[:200]}"
)
raise RuntimeError("DuckMail 创建邮箱失败: 超过最大重试次数")
def fetch_emails(self, mail_token):
if not mail_token:
return []
api_bases = self._duckmail_api_bases()
if not api_bases:
return []
is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base
if is_worker:
res = None
for idx, api_base in enumerate(api_bases):
res = self.session.get(
f"{api_base}/mails",
params={"limit": 20, "offset": 0},
headers={"Authorization": f"Bearer {mail_token}"},
timeout=self.timeout,
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code == 200:
data = res.json()
if isinstance(data, dict) and "results" in data:
return data.get("results", [])
if isinstance(data, list):
return data
return []
res = None
for idx, api_base in enumerate(api_bases):
res = self.session.get(
f"{api_base}/messages",
params={"page": 1},
headers={"Authorization": f"Bearer {mail_token}"},
timeout=self.timeout,
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code == 200:
data = res.json()
if isinstance(data, dict) and "hydra:member" in data:
return data.get("hydra:member", [])
if isinstance(data, list):
return data
return []
def get_token(self, email, mail_password):
if not email or not mail_password:
raise ValueError("邮箱或邮箱密码为空")
api_bases = self._duckmail_api_bases()
if not api_bases:
raise ValueError("DUCKMAIL_API_BASE 未设置")
headers = self._auth_headers()
if not headers:
raise ValueError("DUCKMAIL_BEARER 未设置")
res = None
used_api_base = None
for idx, api_base in enumerate(api_bases):
used_api_base = api_base
res = self.session.post(
f"{api_base}/token",
json={"address": email, "password": mail_password},
headers=headers,
timeout=self.timeout,
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code == 200:
data = res.json()
token = data.get("token")
if token:
return token
if res.status_code == 403:
raise RuntimeError(
f"获取邮件 Token 返回 403可能是 API Key 无效或无权限: {res.text[:200]}"
)
raise RuntimeError(
f"获取邮件 Token 失败: {res.status_code} - {res.text[:200]}"
)
def fetch_message_detail(self, mail_token, msg_id):
if not mail_token or not msg_id:
return None
if isinstance(msg_id, str) and msg_id.startswith("/messages/"):
msg_id = msg_id.split("/")[-1]
msg_id = str(msg_id).strip()
api_bases = self._duckmail_api_bases()
if not api_bases:
return None
res = None
for idx, api_base in enumerate(api_bases):
res = self.session.get(
f"{api_base}/messages/{msg_id}",
headers={"Authorization": f"Bearer {mail_token}"},
timeout=self.timeout,
)
if res.status_code == 404 and idx < len(api_bases) - 1:
continue
break
if res.status_code == 200:
return res.json()
return None

628
gui.py Normal file
View File

@@ -0,0 +1,628 @@
import sys
import time
from PySide6.QtCore import QTimer, Qt
from PySide6.QtGui import QFont
from PySide6.QtWidgets import (
QApplication,
QCheckBox,
QFormLayout,
QFrame,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QMainWindow,
QMessageBox,
QPushButton,
QSpinBox,
QSplitter,
QTabWidget,
QTextEdit,
QVBoxLayout,
QWidget,
)
from duckmail_client import DuckMailClient
from account_store import load_accounts, save_account, load_config, save_config
CONFIG_PATH = "config_gui.json"
def _format_sender(sender_value):
if isinstance(sender_value, dict):
return sender_value.get("address") or sender_value.get("name") or ""
return str(sender_value or "")
def _email_text_from_detail(detail):
if not isinstance(detail, dict):
return ""
text = detail.get("text") or ""
if text:
return str(text)
html_val = detail.get("html")
if isinstance(html_val, list):
return "\n".join(str(item) for item in html_val)
return str(html_val or "")
def _generate_password(length=14):
import random
import string
lower = string.ascii_lowercase
upper = string.ascii_uppercase
digits = string.digits
special = "!@#$%&*"
pwd = [
random.choice(lower),
random.choice(upper),
random.choice(digits),
random.choice(special),
]
all_chars = lower + upper + digits + special
pwd += [random.choice(all_chars) for _ in range(length - 4)]
random.shuffle(pwd)
return "".join(pwd)
def _random_name():
import random
first_names = [
"James",
"Emma",
"Liam",
"Olivia",
"Noah",
"Ava",
"Ethan",
"Sophia",
"Lucas",
"Mia",
"Mason",
"Isabella",
"Logan",
"Charlotte",
"Alexander",
"Amelia",
"Benjamin",
"Harper",
"William",
"Evelyn",
]
last_names = [
"Smith",
"Johnson",
"Brown",
"Davis",
"Wilson",
"Moore",
"Taylor",
"Clark",
"Hall",
"Young",
"Anderson",
"Thomas",
"Jackson",
"White",
"Harris",
"Martin",
"Thompson",
"Garcia",
"Robinson",
"Lewis",
]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def _random_birthdate():
import random
y = random.randint(1980, 2002)
m = random.randint(1, 12)
d = random.randint(1, 28)
return f"{y}-{m:02d}-{d:02d}"
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("DuckMail GUI")
self.resize(1280, 780)
self.current_mail_token = ""
self.current_emails = []
self.all_accounts = []
self._build_ui()
self._setup_timer()
self._load_config()
self._load_accounts()
def _build_ui(self):
root = QWidget()
root_layout = QVBoxLayout(root)
self.tabs = QTabWidget()
self.tabs.addTab(self._build_accounts_page(), "账号")
self.tabs.addTab(self._build_settings_page(), "设置")
root_layout.addWidget(self.tabs)
self.setCentralWidget(root)
self.save_config_btn.clicked.connect(self._on_save_config)
self.create_email_btn.clicked.connect(self._on_create_email)
self.fetch_emails_btn.clicked.connect(self._on_fetch_emails)
self.clear_log_btn.clicked.connect(self.log_box.clear)
self.accounts_list.itemSelectionChanged.connect(self._on_account_selected)
self.emails_list.itemSelectionChanged.connect(self._on_email_selected)
self.auto_refresh_check.toggled.connect(self._on_auto_refresh_toggle)
self.refresh_interval.valueChanged.connect(self._on_refresh_interval_changed)
self.import_btn.clicked.connect(self._on_import_account)
self.reload_accounts_btn.clicked.connect(self._on_reload_accounts)
self.search_input.textChanged.connect(self._apply_account_filter)
def _build_accounts_page(self):
page = QWidget()
layout = QVBoxLayout(page)
toolbar = QHBoxLayout()
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("搜索邮箱 / 姓名 / 创建时间")
self.fetch_emails_btn = QPushButton("获取邮件")
self.reload_accounts_btn = QPushButton("刷新账号")
self.clear_log_btn = QPushButton("清空日志")
toolbar.addWidget(QLabel("账号搜索"))
toolbar.addWidget(self.search_input, 1)
toolbar.addWidget(self.fetch_emails_btn)
toolbar.addWidget(self.reload_accounts_btn)
toolbar.addWidget(self.clear_log_btn)
splitter = QSplitter(Qt.Horizontal)
left_frame = QFrame()
left_layout = QVBoxLayout(left_frame)
left_layout.addWidget(QLabel("账号列表"))
self.accounts_list = QListWidget()
left_layout.addWidget(self.accounts_list)
details_group = QGroupBox("账号详情")
details_layout = QFormLayout(details_group)
self.detail_email = QLabel("-")
self.detail_mail_password = QLabel("-")
self.detail_chatgpt_password = QLabel("-")
self.detail_name = QLabel("-")
self.detail_birthdate = QLabel("-")
self.detail_mail_token = QLabel("-")
self.detail_source = QLabel("postgres")
details_layout.addRow("邮箱", self.detail_email)
details_layout.addRow("邮箱密码", self.detail_mail_password)
details_layout.addRow("Mail Token", self.detail_mail_token)
details_layout.addRow("ChatGPT密码", self.detail_chatgpt_password)
details_layout.addRow("姓名", self.detail_name)
details_layout.addRow("生日", self.detail_birthdate)
details_layout.addRow("来源", self.detail_source)
left_layout.addWidget(details_group)
right_frame = QFrame()
right_layout = QVBoxLayout(right_frame)
right_layout.addWidget(QLabel("邮件列表"))
self.emails_list = QListWidget()
right_layout.addWidget(self.emails_list)
right_layout.addWidget(QLabel("邮件内容"))
self.email_content = QTextEdit()
self.email_content.setReadOnly(True)
self.email_content.setFont(QFont("Consolas", 10))
right_layout.addWidget(self.email_content)
splitter.addWidget(left_frame)
splitter.addWidget(right_frame)
splitter.setSizes([360, 900])
self.log_box = QTextEdit()
self.log_box.setReadOnly(True)
self.log_box.setFixedHeight(150)
layout.addLayout(toolbar)
layout.addWidget(splitter)
layout.addWidget(QLabel("日志"))
layout.addWidget(self.log_box)
return page
def _build_settings_page(self):
page = QWidget()
layout = QVBoxLayout(page)
config_group = QGroupBox("DuckMail 配置")
config_layout = QFormLayout(config_group)
self.api_base_input = QLineEdit()
self.domain_input = QLineEdit()
self.bearer_input = QLineEdit()
self.bearer_input.setEchoMode(QLineEdit.Password)
self.proxy_input = QLineEdit()
config_layout.addRow("API Base", self.api_base_input)
config_layout.addRow("Domain", self.domain_input)
config_layout.addRow("Bearer", self.bearer_input)
config_layout.addRow("Proxy", self.proxy_input)
auto_layout = QHBoxLayout()
self.auto_refresh_check = QCheckBox("自动刷新")
self.refresh_interval = QSpinBox()
self.refresh_interval.setRange(10, 300)
self.refresh_interval.setValue(30)
self.refresh_interval.setSuffix("")
auto_layout.addWidget(self.auto_refresh_check)
auto_layout.addWidget(QLabel("间隔"))
auto_layout.addWidget(self.refresh_interval)
auto_layout.addStretch(1)
config_layout.addRow("刷新", auto_layout)
pg_group = QGroupBox("PostgreSQL 配置")
pg_layout = QFormLayout(pg_group)
self.pg_enabled_check = QCheckBox("启用 PostgreSQL")
self.pg_host_input = QLineEdit()
self.pg_port_input = QSpinBox()
self.pg_port_input.setRange(1, 65535)
self.pg_port_input.setValue(5432)
self.pg_db_input = QLineEdit()
self.pg_user_input = QLineEdit()
self.pg_password_input = QLineEdit()
self.pg_password_input.setEchoMode(QLineEdit.Password)
pg_layout.addRow("开关", self.pg_enabled_check)
pg_layout.addRow("Host", self.pg_host_input)
pg_layout.addRow("Port", self.pg_port_input)
pg_layout.addRow("DB", self.pg_db_input)
pg_layout.addRow("User", self.pg_user_input)
pg_layout.addRow("Password", self.pg_password_input)
import_group = QGroupBox("账号写入数据库")
import_layout = QFormLayout(import_group)
self.import_email_input = QLineEdit()
self.import_mail_password_input = QLineEdit()
self.import_mail_password_input.setEchoMode(QLineEdit.Password)
self.import_chatgpt_password_input = QLineEdit()
self.import_name_input = QLineEdit()
self.import_birthdate_input = QLineEdit()
self.import_btn = QPushButton("导入并获取 Token")
self.create_email_btn = QPushButton("创建邮箱并写库")
import_actions = QHBoxLayout()
import_actions.addWidget(self.import_btn)
import_actions.addWidget(self.create_email_btn)
self.save_config_btn = QPushButton("保存设置")
import_layout.addRow("邮箱", self.import_email_input)
import_layout.addRow("邮箱密码", self.import_mail_password_input)
import_layout.addRow("ChatGPT密码(可选)", self.import_chatgpt_password_input)
import_layout.addRow("姓名(可选)", self.import_name_input)
import_layout.addRow("生日(可选)", self.import_birthdate_input)
import_layout.addRow("操作", import_actions)
layout.addWidget(config_group)
layout.addWidget(pg_group)
layout.addWidget(import_group)
layout.addWidget(self.save_config_btn)
layout.addStretch(1)
return page
def _setup_timer(self):
self.timer = QTimer(self)
self.timer.setInterval(self.refresh_interval.value() * 1000)
self.timer.timeout.connect(self._auto_refresh)
def _log(self, message):
ts = time.strftime("%H:%M:%S")
self.log_box.append(f"[{ts}] {message}")
def _get_config(self):
return {
"api_base": self.api_base_input.text().strip(),
"domain": self.domain_input.text().strip(),
"bearer": self.bearer_input.text().strip(),
"proxy": self.proxy_input.text().strip(),
"auto_refresh": self.auto_refresh_check.isChecked(),
"refresh_interval": self.refresh_interval.value(),
"pg_enabled": self.pg_enabled_check.isChecked(),
"pg_host": self.pg_host_input.text().strip(),
"pg_port": self.pg_port_input.value(),
"pg_db": self.pg_db_input.text().strip(),
"pg_user": self.pg_user_input.text().strip(),
"pg_password": self.pg_password_input.text().strip(),
}
def _client_from_inputs(self):
config = self._get_config()
return DuckMailClient(
config.get("api_base"),
config.get("domain"),
config.get("bearer"),
proxy=config.get("proxy"),
)
def _load_config(self):
data = load_config(CONFIG_PATH)
self.api_base_input.setText(data.get("api_base", ""))
self.domain_input.setText(data.get("domain", ""))
self.bearer_input.setText(data.get("bearer", ""))
self.proxy_input.setText(data.get("proxy", ""))
self.auto_refresh_check.setChecked(bool(data.get("auto_refresh", False)))
self.refresh_interval.setValue(max(10, min(300, int(data.get("refresh_interval", 30) or 30))))
self.pg_enabled_check.setChecked(bool(data.get("pg_enabled", True)))
self.pg_host_input.setText(data.get("pg_host", ""))
self.pg_port_input.setValue(int(data.get("pg_port", 5432) or 5432))
self.pg_db_input.setText(data.get("pg_db", "mail_accounts_db"))
self.pg_user_input.setText(data.get("pg_user", ""))
self.pg_password_input.setText(data.get("pg_password", ""))
def _save_config(self):
data = self._get_config()
save_config(CONFIG_PATH, data)
return data
def _account_matches_search(self, account, keyword):
if not keyword:
return True
fields = [
account.get("email", ""),
account.get("name", ""),
account.get("created_at", ""),
]
haystack = " ".join(str(item or "") for item in fields).lower()
return keyword in haystack
def _render_accounts_list(self, accounts):
self.accounts_list.clear()
for item in accounts:
email = item.get("email", "")
created_at = item.get("created_at", "")
label = f"{email} ({created_at})" if created_at else email
list_item = QListWidgetItem(label)
list_item.setData(Qt.UserRole, item)
self.accounts_list.addItem(list_item)
def _apply_account_filter(self):
keyword = self.search_input.text().strip().lower()
filtered = [item for item in self.all_accounts if self._account_matches_search(item, keyword)]
self._render_accounts_list(filtered)
if not filtered:
self.current_mail_token = ""
self._render_account_detail(None)
def _load_accounts(self):
config = self._save_config()
if not config.get("pg_enabled"):
self.all_accounts = []
self._apply_account_filter()
self._log("PostgreSQL 未启用")
return
try:
self.all_accounts = load_accounts(config)
self._apply_account_filter()
self._log(f"已加载 PostgreSQL 账号: {len(self.all_accounts)}")
except Exception as e:
self.all_accounts = []
self._apply_account_filter()
self._log(f"加载 PostgreSQL 账号失败: {e}")
def _on_reload_accounts(self):
self._load_accounts()
self._log("账号列表已刷新")
def _get_selected_account(self):
items = self.accounts_list.selectedItems()
if not items:
return None
return items[0].data(Qt.UserRole)
def _on_save_config(self):
self._save_config()
self._log("设置已保存")
def _on_create_email(self):
try:
config = self._save_config()
client = self._client_from_inputs()
email, password, mail_token = client.create_email()
chatgpt_password = _generate_password()
name = _random_name()
birthdate = _random_birthdate()
save_account(
config,
email,
password,
mail_token,
chatgpt_password=chatgpt_password,
name=name,
birthdate=birthdate,
)
self._load_accounts()
self._log(
f"创建邮箱成功并写入数据库: {email} | ChatGPT密码: {chatgpt_password} | 姓名: {name} | 生日: {birthdate}"
)
self.tabs.setCurrentIndex(0)
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"创建邮箱失败: {e}")
def _on_import_account(self):
email = self.import_email_input.text().strip()
mail_password = self.import_mail_password_input.text().strip()
chatgpt_password = self.import_chatgpt_password_input.text().strip()
name = self.import_name_input.text().strip()
birthdate = self.import_birthdate_input.text().strip()
if not email or not mail_password:
QMessageBox.information(self, "提示", "请输入邮箱和邮箱密码")
return
try:
config = self._save_config()
client = self._client_from_inputs()
mail_token = client.get_token(email, mail_password)
save_account(
config,
email,
mail_password,
mail_token,
chatgpt_password=chatgpt_password,
name=name,
birthdate=birthdate,
)
self._load_accounts()
self._log(f"导入账号成功并写入数据库: {email}")
self.tabs.setCurrentIndex(0)
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"导入账号失败: {e}")
def _on_account_selected(self):
account = self._get_selected_account()
if not account:
self.current_mail_token = ""
self._render_account_detail(None)
return
self.current_mail_token = account.get("mail_token", "")
self._render_account_detail(account)
self._log(f"选中账号: {account.get('email', '')}")
def _render_account_detail(self, account):
if not account:
self.detail_email.setText("-")
self.detail_mail_password.setText("-")
self.detail_mail_token.setText("-")
self.detail_chatgpt_password.setText("-")
self.detail_name.setText("-")
self.detail_birthdate.setText("-")
self.detail_source.setText("-")
return
token_status = "已存在" if account.get("mail_token", "") else "缺失"
self.detail_email.setText(account.get("email", ""))
self.detail_mail_password.setText(account.get("mail_password", ""))
self.detail_mail_token.setText(token_status)
self.detail_chatgpt_password.setText(account.get("chatgpt_password", ""))
self.detail_name.setText(account.get("name", ""))
self.detail_birthdate.setText(account.get("birthdate", ""))
self.detail_source.setText(account.get("source", "postgres"))
def _update_account_token(self, account, mail_token):
if not account or not mail_token:
return
config = self._get_config()
updated = dict(account)
updated["mail_token"] = mail_token
save_account(
config,
updated.get("email", ""),
updated.get("mail_password", ""),
mail_token,
chatgpt_password=updated.get("chatgpt_password", ""),
name=updated.get("name", ""),
birthdate=updated.get("birthdate", ""),
)
account["mail_token"] = mail_token
self.current_mail_token = mail_token
self._render_account_detail(account)
def _ensure_mail_token(self, account):
mail_token = str(account.get("mail_token", "")).strip()
if mail_token:
return mail_token
email = str(account.get("email", "")).strip()
mail_password = str(account.get("mail_password", "")).strip()
if not mail_password:
raise RuntimeError("当前账号缺少 mail_token且没有 mail_password 可用于换取 token")
self._log(f"账号 {email} 缺少 mail_token正在自动获取")
client = self._client_from_inputs()
mail_token = client.get_token(email, mail_password)
if not mail_token:
raise RuntimeError("自动获取 mail_token 失败")
self._update_account_token(account, mail_token)
self._log(f"已自动补全 mail_token: {email}")
return mail_token
def _on_fetch_emails(self):
account = self._get_selected_account()
if not account:
QMessageBox.information(self, "提示", "请先选择账号")
return
try:
mail_token = self._ensure_mail_token(account)
client = self._client_from_inputs()
emails = client.fetch_emails(mail_token)
self.current_emails = emails
self._render_emails_list(emails)
self._log(f"已获取邮件: {len(emails)}")
except Exception as e:
QMessageBox.critical(self, "错误", str(e))
self._log(f"获取邮件失败: {e}")
def _render_emails_list(self, emails):
self.emails_list.clear()
for item in emails:
subject = str(item.get("subject", ""))
sender = _format_sender(item.get("from"))
date = str(item.get("createdAt", ""))
label = f"{subject} | {sender} | {date}"
list_item = QListWidgetItem(label)
list_item.setData(Qt.UserRole, item)
self.emails_list.addItem(list_item)
def _on_email_selected(self):
items = self.emails_list.selectedItems()
if not items:
return
email_item = items[0].data(Qt.UserRole)
msg_id = email_item.get("id") or email_item.get("@id") or email_item.get("messageId")
body = email_item.get("text") or email_item.get("html") or email_item.get("intro")
if not body and msg_id and self.current_mail_token:
try:
client = self._client_from_inputs()
detail = client.fetch_message_detail(self.current_mail_token, msg_id)
body = _email_text_from_detail(detail)
except Exception as e:
body = f"(获取邮件详情失败: {e})"
self.email_content.setPlainText(str(body or "(无正文)"))
def _on_auto_refresh_toggle(self, checked):
if checked:
self.timer.start()
self._log("自动刷新已开启")
else:
self.timer.stop()
self._log("自动刷新已关闭")
self._save_config()
def _on_refresh_interval_changed(self, value):
self.timer.setInterval(value * 1000)
if self.auto_refresh_check.isChecked():
self.timer.start()
self._save_config()
def _auto_refresh(self):
account = self._get_selected_account()
if not account:
return
try:
mail_token = self._ensure_mail_token(account)
client = self._client_from_inputs()
emails = client.fetch_emails(mail_token)
self.current_emails = emails
self._render_emails_list(emails)
self._log(f"自动刷新: {len(emails)}")
except Exception as e:
self._log(f"自动刷新失败: {e}")
def main():
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
PySide6
requests
psycopg[binary]

Binary file not shown.

After

Width:  |  Height:  |  Size: 97 KiB