287 lines
10 KiB
Python
287 lines
10 KiB
Python
import random
|
||
import string
|
||
import time
|
||
|
||
import requests
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
|
||
|
||
def _generate_email_local_part():
|
||
letters1 = "".join(random.choices(string.ascii_lowercase, k=5))
|
||
numbers = "".join(random.choices(string.digits, k=random.randint(1, 3)))
|
||
letters2 = "".join(random.choices(string.ascii_lowercase, k=random.randint(1, 3)))
|
||
return letters1 + numbers + letters2
|
||
|
||
|
||
def _generate_password(length=14):
|
||
lower = string.ascii_lowercase
|
||
upper = string.ascii_uppercase
|
||
digits = string.digits
|
||
special = "!@#$%&*"
|
||
pwd = [
|
||
random.choice(lower),
|
||
random.choice(upper),
|
||
random.choice(digits),
|
||
random.choice(special),
|
||
]
|
||
all_chars = lower + upper + digits + special
|
||
pwd += [random.choice(all_chars) for _ in range(length - 4)]
|
||
random.shuffle(pwd)
|
||
return "".join(pwd)
|
||
|
||
|
||
class DuckMailClient:
|
||
def __init__(self, api_base, domain, bearer, proxy=None, timeout=120):
|
||
self.api_base = (api_base or "").rstrip("/")
|
||
self.domain = domain or ""
|
||
self.bearer = (bearer or "").strip()
|
||
if self.bearer.lower().startswith("bearer "):
|
||
self.bearer = self.bearer[7:].strip()
|
||
self.proxy = proxy or ""
|
||
self.timeout = timeout
|
||
self.session = self._create_session()
|
||
|
||
def _create_session(self):
|
||
session = requests.Session()
|
||
retry_strategy = Retry(
|
||
total=5,
|
||
backoff_factor=1,
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"],
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
session.mount("https://", adapter)
|
||
session.mount("http://", adapter)
|
||
session.headers.update(
|
||
{
|
||
"User-Agent": "DuckMailGUI/1.0",
|
||
"Accept": "application/json",
|
||
"Content-Type": "application/json",
|
||
}
|
||
)
|
||
if self.proxy:
|
||
session.proxies = {"http": self.proxy, "https": self.proxy}
|
||
return session
|
||
|
||
def _auth_headers(self):
|
||
if not self.bearer:
|
||
return {}
|
||
return {
|
||
"Authorization": f"Bearer {self.bearer}",
|
||
"X-Api-Key": self.bearer,
|
||
}
|
||
|
||
def _duckmail_api_bases(self):
|
||
base = self.api_base
|
||
if not base:
|
||
return []
|
||
candidates = [base]
|
||
if base.endswith("/api"):
|
||
candidates.append(base[:-4])
|
||
else:
|
||
candidates.append(f"{base}/api")
|
||
unique = []
|
||
for item in candidates:
|
||
if item and item not in unique:
|
||
unique.append(item)
|
||
return unique
|
||
|
||
def create_email(self):
|
||
if not self.bearer:
|
||
raise ValueError("DUCKMAIL_BEARER 未设置")
|
||
if not self.domain:
|
||
raise ValueError("DUCKMAIL_DOMAIN 未设置")
|
||
api_bases = self._duckmail_api_bases()
|
||
if not api_bases:
|
||
raise ValueError("DUCKMAIL_API_BASE 未设置")
|
||
|
||
is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base
|
||
max_retries = 5
|
||
|
||
for attempt in range(max_retries):
|
||
email_local_part = _generate_email_local_part()
|
||
email = f"{email_local_part}@{self.domain}"
|
||
password = _generate_password()
|
||
|
||
if is_worker:
|
||
payload = {
|
||
"admin_password": self.bearer,
|
||
"name": email_local_part,
|
||
"domain": self.domain,
|
||
}
|
||
res = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
res = self.session.post(
|
||
f"{api_base}/new_address", json=payload, timeout=self.timeout
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code == 200:
|
||
result = res.json()
|
||
if result.get("jwt"):
|
||
return result.get("address"), password, result.get("jwt")
|
||
raise RuntimeError(f"Worker获取邮件 Token 失败: {res.text}")
|
||
if res.status_code == 403:
|
||
raise RuntimeError(
|
||
f"Worker创建邮箱返回 403,可能是 Bearer 无效或权限不足: {res.text[:200]}"
|
||
)
|
||
if res.status_code == 400 and "already exists" in res.text:
|
||
continue
|
||
raise RuntimeError(
|
||
f"Worker创建邮箱失败: {res.status_code} - {res.text[:200]}"
|
||
)
|
||
|
||
headers = {"Authorization": f"Bearer {self.bearer}"}
|
||
headers = self._auth_headers()
|
||
res = None
|
||
used_api_base = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
used_api_base = api_base
|
||
res = self.session.post(
|
||
f"{api_base}/accounts",
|
||
json={"address": email, "password": password},
|
||
headers=headers,
|
||
timeout=self.timeout,
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code in (200, 201):
|
||
result = res.json()
|
||
if result.get("address"):
|
||
time.sleep(0.5)
|
||
token_res = self.session.post(
|
||
f"{used_api_base}/token",
|
||
json={"address": email, "password": password},
|
||
headers=headers,
|
||
timeout=self.timeout,
|
||
)
|
||
if token_res.status_code == 200:
|
||
mail_token = token_res.json().get("token")
|
||
if mail_token:
|
||
return email, password, mail_token
|
||
if token_res.status_code == 403:
|
||
raise RuntimeError(
|
||
"获取邮件 Token 返回 403,可能是 Bearer 无效或权限不足"
|
||
)
|
||
raise RuntimeError(f"获取邮件 Token 失败: {token_res.status_code}")
|
||
if res.status_code == 403:
|
||
raise RuntimeError(
|
||
f"创建邮箱返回 403,可能是 Bearer 无效或权限不足: {res.text[:200]}"
|
||
)
|
||
if res.status_code == 422 and "already exists" in res.text:
|
||
continue
|
||
raise RuntimeError(
|
||
f"创建邮箱失败: {res.status_code} - {res.text[:200]}"
|
||
)
|
||
|
||
raise RuntimeError("DuckMail 创建邮箱失败: 超过最大重试次数")
|
||
|
||
def fetch_emails(self, mail_token):
|
||
if not mail_token:
|
||
return []
|
||
api_bases = self._duckmail_api_bases()
|
||
if not api_bases:
|
||
return []
|
||
is_worker = "workers.dev" in self.api_base or "temp-email" in self.api_base
|
||
|
||
if is_worker:
|
||
res = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
res = self.session.get(
|
||
f"{api_base}/mails",
|
||
params={"limit": 20, "offset": 0},
|
||
headers={"Authorization": f"Bearer {mail_token}"},
|
||
timeout=self.timeout,
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if isinstance(data, dict) and "results" in data:
|
||
return data.get("results", [])
|
||
if isinstance(data, list):
|
||
return data
|
||
return []
|
||
|
||
res = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
res = self.session.get(
|
||
f"{api_base}/messages",
|
||
params={"page": 1},
|
||
headers={"Authorization": f"Bearer {mail_token}"},
|
||
timeout=self.timeout,
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
if isinstance(data, dict) and "hydra:member" in data:
|
||
return data.get("hydra:member", [])
|
||
if isinstance(data, list):
|
||
return data
|
||
return []
|
||
|
||
def get_token(self, email, mail_password):
|
||
if not email or not mail_password:
|
||
raise ValueError("邮箱或邮箱密码为空")
|
||
api_bases = self._duckmail_api_bases()
|
||
if not api_bases:
|
||
raise ValueError("DUCKMAIL_API_BASE 未设置")
|
||
headers = self._auth_headers()
|
||
if not headers:
|
||
raise ValueError("DUCKMAIL_BEARER 未设置")
|
||
|
||
res = None
|
||
used_api_base = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
used_api_base = api_base
|
||
res = self.session.post(
|
||
f"{api_base}/token",
|
||
json={"address": email, "password": mail_password},
|
||
headers=headers,
|
||
timeout=self.timeout,
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code == 200:
|
||
data = res.json()
|
||
token = data.get("token")
|
||
if token:
|
||
return token
|
||
if res.status_code == 403:
|
||
raise RuntimeError(
|
||
f"获取邮件 Token 返回 403,可能是 API Key 无效或无权限: {res.text[:200]}"
|
||
)
|
||
raise RuntimeError(
|
||
f"获取邮件 Token 失败: {res.status_code} - {res.text[:200]}"
|
||
)
|
||
|
||
def fetch_message_detail(self, mail_token, msg_id):
|
||
if not mail_token or not msg_id:
|
||
return None
|
||
if isinstance(msg_id, str) and msg_id.startswith("/messages/"):
|
||
msg_id = msg_id.split("/")[-1]
|
||
msg_id = str(msg_id).strip()
|
||
api_bases = self._duckmail_api_bases()
|
||
if not api_bases:
|
||
return None
|
||
res = None
|
||
for idx, api_base in enumerate(api_bases):
|
||
res = self.session.get(
|
||
f"{api_base}/messages/{msg_id}",
|
||
headers={"Authorization": f"Bearer {mail_token}"},
|
||
timeout=self.timeout,
|
||
)
|
||
if res.status_code == 404 and idx < len(api_bases) - 1:
|
||
continue
|
||
break
|
||
if res.status_code == 200:
|
||
return res.json()
|
||
return None
|