Files
2026-03-19 16:58:18 +08:00

824 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import random
import sys
import time
from pathlib import Path
from typing import Any, List, Optional
PROJECT_ROOT = Path(__file__).resolve().parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from openai_pool_orchestrator import TOKENS_DIR, __version__
from openai_pool_orchestrator.mail_providers import MultiMailRouter
from openai_pool_orchestrator.register import _write_text_atomic, run as register_run
from account_store import save_registered_account
try:
from .support import (
check_proxy,
get_pool_maintainer,
get_sub2api_maintainer,
init_config_from_example,
iter_token_files,
load_state,
load_sync_config,
login_sub2api_once,
normalize_sub2api_maintain_actions,
print_json,
print_status_block,
read_token_file,
save_runtime_proxy,
save_sync_config,
save_sub2api_credentials,
sub2api_actions_description,
sync_all_tokens_to_sub2api,
sync_token_to_sub2api,
upload_all_tokens_to_cpa,
)
except ImportError:
from support import (
check_proxy,
get_pool_maintainer,
get_sub2api_maintainer,
init_config_from_example,
iter_token_files,
load_state,
load_sync_config,
login_sub2api_once,
normalize_sub2api_maintain_actions,
print_json,
print_status_block,
read_token_file,
save_runtime_proxy,
save_sync_config,
save_sub2api_credentials,
sub2api_actions_description,
sync_all_tokens_to_sub2api,
sync_token_to_sub2api,
upload_all_tokens_to_cpa,
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="OpenAI Pool Orchestrator CLI")
parser.add_argument("--json", action="store_true", help="以 JSON 输出结果")
subparsers = parser.add_subparsers(dest="command")
register_parser = subparsers.add_parser("register", help="执行注册流程")
register_parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7897")
register_parser.add_argument("--once", action="store_true", help="只运行一次")
register_parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
register_parser.add_argument("--sleep-max", type=int, default=30, help="循环模式最长等待秒数")
register_parser.set_defaults(handler=handle_register)
daemon_parser = subparsers.add_parser("daemon", help="常驻运行,按池状态自动补号和定时维护")
daemon_parser.add_argument("--interval-minutes", type=int, default=0, help="检查周期分钟数0 表示按维护间隔自动推导")
daemon_parser.add_argument("--max-register-per-loop", type=int, default=0, help="每轮最多补号数量0 表示使用配置中的 auto_register_max_per_loop")
daemon_parser.set_defaults(handler=handle_daemon)
config_parser = subparsers.add_parser("config", help="配置管理")
config_subparsers = config_parser.add_subparsers(dest="config_command")
config_init = config_subparsers.add_parser("init", help="初始化配置文件到 data/")
config_init.set_defaults(handler=handle_config_init)
config_setup = config_subparsers.add_parser("setup", help="交互式配置引导")
config_setup.set_defaults(handler=handle_config_setup)
config_show = config_subparsers.add_parser("show", help="显示当前配置")
config_show.set_defaults(handler=handle_config_show)
config_proxy = config_subparsers.add_parser("proxy", help="保存运行代理")
config_proxy.add_argument("proxy", help="代理地址")
config_proxy.add_argument("--auto-register", action="store_true", help="同时启用池不足时自动注册")
config_proxy.set_defaults(handler=handle_config_proxy)
config_check_proxy = config_subparsers.add_parser("check-proxy", help="检测代理可用性")
config_check_proxy.add_argument("proxy", help="代理地址")
config_check_proxy.set_defaults(handler=handle_check_proxy)
config_sub2api = config_subparsers.add_parser("sub2api", help="保存 Sub2Api 配置并校验")
config_sub2api.add_argument("--base-url", required=True, help="Sub2Api 平台地址")
config_sub2api.add_argument("--bearer-token", default="", help="Bearer Token")
config_sub2api.add_argument("--email", default="", help="管理员邮箱")
config_sub2api.add_argument("--password", default="", help="管理员密码")
config_sub2api.add_argument("--account-name", default=None, help="默认账号名称")
config_sub2api.add_argument("--auto-sync", action="store_true", help="注册成功后自动同步 Sub2Api")
config_sub2api.set_defaults(handler=handle_config_sub2api)
config_login = config_subparsers.add_parser("sub2api-login", help="登录 Sub2Api 并保存 Bearer Token")
config_login.add_argument("--base-url", required=True, help="Sub2Api 平台地址")
config_login.add_argument("--email", required=True, help="管理员邮箱")
config_login.add_argument("--password", required=True, help="管理员密码")
config_login.set_defaults(handler=handle_sub2api_login)
tokens_parser = subparsers.add_parser("tokens", help="查看本地 token 文件")
tokens_parser.add_argument("--limit", type=int, default=20, help="最多显示数量")
tokens_parser.set_defaults(handler=handle_tokens)
cpa_parser = subparsers.add_parser("cpa", help="CPA 账号池命令")
cpa_subparsers = cpa_parser.add_subparsers(dest="cpa_command")
cpa_status = cpa_subparsers.add_parser("status", help="查看 CPA 池状态")
cpa_status.set_defaults(handler=handle_cpa_status)
cpa_check = cpa_subparsers.add_parser("check", help="测试 CPA 连接")
cpa_check.set_defaults(handler=handle_cpa_check)
cpa_maintain = cpa_subparsers.add_parser("maintain", help="执行 CPA 维护")
cpa_maintain.set_defaults(handler=handle_cpa_maintain)
cpa_upload = cpa_subparsers.add_parser("upload-all", help="上传本地全部 token 到 CPA")
cpa_upload.add_argument("--include-uploaded", action="store_true", help="包含已标记上传的 token")
cpa_upload.set_defaults(handler=handle_cpa_upload_all)
sub2api_parser = subparsers.add_parser("sub2api", help="Sub2Api 命令")
sub2api_subparsers = sub2api_parser.add_subparsers(dest="sub2api_command")
sub2api_status = sub2api_subparsers.add_parser("status", help="查看 Sub2Api 池状态")
sub2api_status.set_defaults(handler=handle_sub2api_status)
sub2api_check = sub2api_subparsers.add_parser("check", help="测试 Sub2Api 连接")
sub2api_check.set_defaults(handler=handle_sub2api_check)
sub2api_sync = sub2api_subparsers.add_parser("sync-all", help="同步本地全部 token 到 Sub2Api")
sub2api_sync.add_argument("--include-uploaded", action="store_true", help="包含已标记同步的 token")
sub2api_sync.set_defaults(handler=handle_sub2api_sync_all)
sub2api_sync_one = sub2api_subparsers.add_parser("sync-one", help="同步单个 token 到 Sub2Api")
sub2api_sync_one.add_argument("file", help="token 文件名或路径")
sub2api_sync_one.set_defaults(handler=handle_sub2api_sync_one)
sub2api_dedupe = sub2api_subparsers.add_parser("dedupe", help="Sub2Api 重复账号清理")
sub2api_dedupe.add_argument("--apply", action="store_true", help="实际执行删除,不仅预览")
sub2api_dedupe.set_defaults(handler=handle_sub2api_dedupe)
sub2api_handle = sub2api_subparsers.add_parser("handle-exception", help="处理异常账号")
sub2api_handle.add_argument("ids", nargs="*", type=int, help="指定账号 ID为空时处理全部异常账号")
sub2api_handle.add_argument("--no-delete", action="store_true", help="刷新后不删除仍异常账号")
sub2api_handle.set_defaults(handler=handle_sub2api_handle_exception)
sub2api_maintain = sub2api_subparsers.add_parser("maintain", help="执行 Sub2Api 综合维护")
sub2api_maintain.add_argument("--refresh-abnormal", action="store_true", help="仅显式开启异常测活")
sub2api_maintain.add_argument("--delete-abnormal", action="store_true", help="仅显式开启异常删除")
sub2api_maintain.add_argument("--dedupe-duplicate", action="store_true", help="仅显式开启重复清理")
sub2api_maintain.add_argument("--no-refresh-abnormal", action="store_true", help="关闭异常测活")
sub2api_maintain.add_argument("--no-delete-abnormal", action="store_true", help="关闭异常删除")
sub2api_maintain.add_argument("--no-dedupe-duplicate", action="store_true", help="关闭重复清理")
sub2api_maintain.set_defaults(handler=handle_sub2api_maintain)
stats_parser = subparsers.add_parser("stats", help="显示本地累计统计")
stats_parser.set_defaults(handler=handle_stats)
return parser
def _print_result(args: argparse.Namespace, result: Any) -> int:
if args.json:
print_json(result)
return 0
if isinstance(result, dict):
print_json(result)
return 0
print(result)
return 0
def _perform_registration_once(cfg: dict[str, Any], proxy: Optional[str]) -> dict[str, Any]:
os.makedirs(TOKENS_DIR, exist_ok=True)
mail_router = MultiMailRouter(cfg)
provider_name, provider = mail_router.next_provider()
print(f"[*] 本次使用邮箱提供商: {provider_name}")
print("[*] 邮箱接口默认直连,不使用代理")
try:
token_json = register_run(
proxy,
mail_provider=provider,
proxy_pool_config={
"enabled": bool(cfg.get("proxy_pool_enabled", False)),
"api_url": cfg.get("proxy_pool_api_url", ""),
"auth_mode": cfg.get("proxy_pool_auth_mode", "query"),
"api_key": cfg.get("proxy_pool_api_key", ""),
"count": cfg.get("proxy_pool_count", 1),
"country": cfg.get("proxy_pool_country", "US"),
},
)
mail_router.report_success(provider_name)
except Exception as exc:
try:
mail_router.report_failure(provider_name)
except Exception:
pass
print(f"[Error] 发生未捕获异常: {exc}")
return {"ok": False, "error": str(exc)}
if not token_json:
print("[-] 本次注册失败。")
return {"ok": False, "error": "本次注册失败"}
token_data = json.loads(token_json)
email = str(token_data.get("email") or "unknown")
oauth_status = str(token_data.get("oauth_status") or "oauth=ok").strip() or "oauth=ok"
if oauth_status != "oauth=ok":
run_result: dict[str, Any] = {"ok": False, "email": email, "oauth_status": oauth_status}
if cfg.get("db_enabled"):
try:
db_result = save_registered_account(
cfg,
{
"email": token_data.get("email", ""),
"chatgpt_password": token_data.get("account_password", ""),
"mail_password": token_data.get("mail_password", ""),
"oauth_status": oauth_status,
"mail_token": token_data.get("mail_token", ""),
"name": token_data.get("name", ""),
"birthdate": token_data.get("birthdate", ""),
"source": cfg.get("db_source", "standalone_cli"),
},
)
run_result["db_saved"] = bool(db_result.get("ok", False))
print(f"[{'+' if db_result.get('ok') else '-'}] 失败记录{'写入成功' if db_result.get('ok') else '写入失败'}: {email}")
except Exception as exc:
run_result["db_saved"] = False
run_result["db_error"] = str(exc)
print(f"[-] 失败记录写入异常: {exc}")
print("[-] 本次注册失败。")
return run_result
file_name = f"token_{email.replace('@', '_')}_{time.time_ns()}.json"
file_path = Path(TOKENS_DIR) / file_name
_write_text_atomic(str(file_path), token_json)
print(f"[*] 成功! Token 已保存至: {file_path}")
run_result: dict[str, Any] = {"ok": True, "file": file_name, "email": email}
if cfg.get("db_enabled"):
try:
db_result = save_registered_account(
cfg,
{
"email": token_data.get("email", ""),
"chatgpt_password": token_data.get("account_password", ""),
"mail_password": token_data.get("mail_password", ""),
"oauth_status": "oauth=ok",
"mail_token": token_data.get("mail_token", ""),
"name": token_data.get("name", ""),
"birthdate": token_data.get("birthdate", ""),
"source": cfg.get("db_source", "standalone_cli"),
},
)
run_result["db_saved"] = bool(db_result.get("ok", False))
print(f"[{'+' if db_result.get('ok') else '-'}] 数据库{'写入成功' if db_result.get('ok') else '写入失败'}: {email}")
except Exception as exc:
run_result["db_saved"] = False
run_result["db_error"] = str(exc)
print(f"[-] 数据库写入异常: {exc}")
cpa = get_pool_maintainer(cfg)
if cpa:
cpa_ok = cpa.upload_token(file_name, token_data, proxy="")
run_result["cpa_uploaded"] = cpa_ok
print(f"[{'+' if cpa_ok else '-'}] CPA {'上传成功' if cpa_ok else '上传失败'}: {email}")
if cfg.get("auto_sync"):
try:
sync_result = sync_token_to_sub2api(file_path, cfg)
run_result["sub2api_sync"] = sync_result
if sync_result.get("ok"):
print(f"[+] Sub2Api 同步成功: {email}")
else:
print(f"[-] Sub2Api 同步失败: {email}")
except Exception as exc:
run_result["sub2api_sync"] = {"ok": False, "error": str(exc)}
print(f"[-] Sub2Api 同步异常: {exc}")
return run_result
def _derive_daemon_interval_minutes(cfg: dict[str, Any], explicit_minutes: int) -> int:
if explicit_minutes and explicit_minutes > 0:
return explicit_minutes
candidates: list[int] = []
if bool(cfg.get("auto_maintain", False)):
candidates.append(max(1, int(cfg.get("maintain_interval_minutes", 30) or 30)))
if bool(cfg.get("sub2api_auto_maintain", False)):
candidates.append(max(1, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30)))
return min(candidates) if candidates else 30
def _derive_register_cap(cfg: dict[str, Any], explicit_cap: int) -> int:
if explicit_cap and explicit_cap > 0:
return explicit_cap
try:
return max(0, int(cfg.get("auto_register_max_per_loop", 1) or 0))
except (TypeError, ValueError):
return 1
def _collect_pool_health(cfg: dict[str, Any]) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
cpa = get_pool_maintainer(cfg)
if cpa:
status = cpa.get_pool_status()
results.append({"name": "CPA", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))})
sub2api = get_sub2api_maintainer(cfg)
if sub2api:
status = sub2api.get_pool_status()
results.append({"name": "Sub2Api", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))})
return results
def _calculate_register_deficit(pool_results: list[dict[str, Any]]) -> int:
deficits: list[int] = []
for item in pool_results:
status = item.get("status") or {}
try:
threshold = max(0, int(status.get("threshold", 0) or 0))
candidates = max(0, int(status.get("candidates", 0) or 0))
except (TypeError, ValueError):
continue
deficits.append(max(0, threshold - candidates))
return max(deficits) if deficits else 0
def handle_daemon(args: argparse.Namespace) -> dict[str, Any]:
print("[*] 启动常驻编排模式,按 Ctrl+C 停止")
last_cpa_maintain = 0.0
last_sub2api_maintain = 0.0
loops = 0
try:
while True:
loops += 1
cfg = load_sync_config()
proxy = str(cfg.get("proxy") or "").strip() or None
interval_minutes = _derive_daemon_interval_minutes(cfg, args.interval_minutes)
interval_seconds = max(60, interval_minutes * 60)
register_cap = _derive_register_cap(cfg, args.max_register_per_loop)
print(f"\n[{time.strftime('%H:%M:%S')}] >>> Daemon 第 {loops} 轮检查 <<<")
print(f"[*] 当前检查周期: {interval_minutes} 分钟")
print(f"[*] 每轮最大补号数量: {register_cap}")
pool_results = _collect_pool_health(cfg)
if not pool_results:
print("[!] 未配置可检查的号池,当前仅等待下一轮")
else:
for item in pool_results:
status = item["status"]
print(
f"[*] {item['name']} 池: candidates={status.get('candidates', 0)} "
f"threshold={status.get('threshold', 0)} healthy={item['healthy']}"
)
if bool(cfg.get("auto_register", False)) and pool_results:
unhealthy = [item for item in pool_results if not item.get("healthy", False)]
if unhealthy:
names = ", ".join(item["name"] for item in unhealthy)
deficit = _calculate_register_deficit(unhealthy)
planned = max(1, deficit or 1) if register_cap == 0 else max(1, min(register_cap, deficit or 1))
if register_cap == 0:
print(f"[!] 号池不足,开始自动补号: {names},差值约 {deficit},本轮按差值动态补 {planned}")
else:
print(f"[!] 号池不足,开始自动补号: {names},差值约 {deficit},本轮计划补 {planned}")
for index in range(planned):
print(f"[*] 自动补号进度: {index + 1}/{planned}")
try:
result = _perform_registration_once(cfg, proxy)
if not result.get("ok"):
email = str(result.get("email") or "").strip()
status = str(result.get("oauth_status") or result.get("error") or "failed").strip()
if email:
print(f"[!] 本次自动补号失败,继续下一个: {email} ({status})")
else:
print(f"[!] 本次自动补号失败,继续下一个: {status}")
continue
if index < planned - 1:
wait_seconds = random.randint(30, 60)
print(f"[*] 本次注册成功,等待 {wait_seconds} 秒后继续下一个账号...")
time.sleep(wait_seconds)
except Exception as exc:
print(f"[Error] 自动补号异常,继续下一个: {exc}")
continue
else:
print("[+] 当前号池满足阈值,本轮不执行注册")
elif bool(cfg.get("auto_register", False)):
print("[!] 已启用 auto_register但未配置可检查的号池")
now = time.time()
cpa = get_pool_maintainer(cfg)
if cpa and bool(cfg.get("auto_maintain", False)):
maintain_seconds = max(60, int(cfg.get("maintain_interval_minutes", 30) or 30) * 60)
if now - last_cpa_maintain >= maintain_seconds:
print("[*] 开始执行 CPA 自动维护")
try:
result = cpa.probe_and_clean_sync()
print_json({"cpa_maintain": result})
except Exception as exc:
print(f"[Error] CPA 自动维护失败: {exc}")
last_cpa_maintain = now
sub2api = get_sub2api_maintainer(cfg)
if sub2api and bool(cfg.get("sub2api_auto_maintain", False)):
maintain_seconds = max(60, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30) * 60)
if now - last_sub2api_maintain >= maintain_seconds:
print("[*] 开始执行 Sub2Api 自动维护")
actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions"))
try:
result = sub2api.probe_and_clean_sync(actions=actions)
print_json({"sub2api_maintain": result})
except Exception as exc:
print(f"[Error] Sub2Api 自动维护失败: {exc}")
last_sub2api_maintain = now
print(f"[*] 休眠 {interval_minutes} 分钟,等待下一轮检查...")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\n[*] 已停止常驻编排模式")
return {"ok": True, "stopped": True, "loops": loops}
def handle_register(args: argparse.Namespace) -> dict[str, Any]:
cfg = load_sync_config()
os.makedirs(TOKENS_DIR, exist_ok=True)
sleep_min = max(1, args.sleep_min)
sleep_max = max(sleep_min, args.sleep_max)
proxy = args.proxy if args.proxy is not None else str(cfg.get("proxy") or "").strip() or None
count = 0
runs: List[dict[str, Any]] = []
while True:
count += 1
print(f"\n[{time.strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<")
run_result = _perform_registration_once(cfg, proxy)
runs.append(run_result)
if args.once:
break
wait_time = sleep_min if sleep_min == sleep_max else __import__("random").randint(sleep_min, sleep_max)
print(f"[*] 休息 {wait_time} 秒...")
time.sleep(wait_time)
return {"runs": runs}
def handle_config_init(args: argparse.Namespace) -> dict[str, Any]:
path = init_config_from_example(PROJECT_ROOT)
return {"ok": True, "config_file": str(path)}
def _is_interactive() -> bool:
try:
return os.isatty(sys.stdin.fileno())
except Exception:
return False
def _prompt_text(prompt: str, default: str = "") -> str:
suffix = f" [{default}]" if default else ""
try:
value = input(f"{prompt}{suffix}: ").strip()
except EOFError:
value = ""
except KeyboardInterrupt:
print("\n已取消配置。")
raise SystemExit(130)
return value or default
def _prompt_bool(prompt: str, default: bool = False) -> bool:
suffix = "(Y/n)" if default else "(y/N)"
try:
value = input(f"{prompt} {suffix}: ").strip().lower()
except EOFError:
value = ""
except KeyboardInterrupt:
print("\n已取消配置。")
raise SystemExit(130)
if not value:
return default
return value in {"y", "yes", "1", "true"}
def _prompt_int(prompt: str, default: int) -> int:
while True:
value = _prompt_text(prompt, str(default)).strip()
try:
return int(value)
except ValueError:
print("请输入整数。")
def _prompt_non_negative_int(prompt: str, default: int) -> int:
while True:
value = _prompt_text(prompt, str(default)).strip()
try:
parsed = int(value)
except ValueError:
print("请输入大于等于 0 的整数。")
continue
if parsed < 0:
print("请输入大于等于 0 的整数。")
continue
return parsed
def _prompt_choice(prompt: str, options: list[str], default: str) -> str:
option_text = "/".join(options)
default_value = default if default in options else options[0]
while True:
value = _prompt_text(f"{prompt} ({option_text})", default_value).strip().lower()
if value in options:
return value
print(f"请输入以下选项之一: {option_text}")
def handle_config_setup(args: argparse.Namespace) -> dict[str, Any]:
if not _is_interactive():
raise ValueError("`config setup` 需要交互式终端,请直接编辑 data/sync_config.json 或先使用 config init")
init_config_from_example(PROJECT_ROOT)
cfg = load_sync_config()
print("开始交互式配置,直接回车表示使用当前值。\n")
print("说明: OpenAI 注册流程使用 `proxy`,邮箱接口默认直连,不使用代理。\n")
cfg["proxy"] = _prompt_text("1) 注册代理地址", str(cfg.get("proxy") or "http://127.0.0.1:17891"))
cfg["auto_register"] = _prompt_bool("2) 池不足时自动注册", bool(cfg.get("auto_register", False)))
cfg["auto_register_max_per_loop"] = _prompt_non_negative_int(" 每轮最多自动补号数量(0 表示按差值动态补齐)", int(cfg.get("auto_register_max_per_loop", 1) or 1))
print(" 提示: 0 表示按差值动态补齐;值越大补号越快,也会更激进。一般先用 1、2 或 0。")
cfg["db_enabled"] = _prompt_bool(" 是否启用 PostgreSQL 注册信息入库", bool(cfg.get("db_enabled", False)))
if cfg["db_enabled"]:
cfg["db_host"] = _prompt_text(" DB 主机", str(cfg.get("db_host") or "150.158.105.6"))
cfg["db_port"] = _prompt_int(" DB 端口", int(cfg.get("db_port", 54321) or 54321))
cfg["db_name"] = _prompt_text(" DB 名称", str(cfg.get("db_name") or "mail_accounts_db"))
cfg["db_user"] = _prompt_text(" DB 用户名", str(cfg.get("db_user") or "postgres"))
cfg["db_password"] = _prompt_text(" DB 密码", str(cfg.get("db_password") or ""))
cfg["db_table"] = _prompt_text(" DB 表名", str(cfg.get("db_table") or "registered_accounts"))
cfg["db_source"] = _prompt_text(" source 字段值", str(cfg.get("db_source") or "standalone_cli"))
provider_options = ["mailtm", "duckmail", "moemail", "cloudflare_temp_email"]
current_provider = str((cfg.get("mail_providers") or ["mailtm"])[0]).strip().lower() or "mailtm"
provider_name = _prompt_choice("3) 邮箱提供商", provider_options, current_provider)
cfg["mail_providers"] = [provider_name]
cfg["mail_strategy"] = "round_robin"
provider_cfgs = dict(cfg.get("mail_provider_configs") or {})
provider_cfg = dict(provider_cfgs.get(provider_name) or {})
if provider_name == "mailtm":
provider_cfg["api_base"] = _prompt_text(" Mail.tm API 地址", str(provider_cfg.get("api_base") or "https://api.mail.tm"))
elif provider_name == "duckmail":
provider_cfg["api_base"] = _prompt_text(" DuckMail API 地址", str(provider_cfg.get("api_base") or "https://api.duckmail.sbs"))
provider_cfg["bearer_token"] = _prompt_text(" DuckMail Bearer Token", str(provider_cfg.get("bearer_token") or ""))
provider_cfg["domain"] = _prompt_text(" DuckMail 域名(填写后将固定使用该域名,不再随机选域)", str(provider_cfg.get("domain") or ""))
if provider_cfg.get("domain"):
print(f" 已固定 DuckMail 域名: {provider_cfg['domain']},后续邮箱会使用该域名。")
elif provider_name == "moemail":
provider_cfg["api_base"] = _prompt_text(" MoeMail API 地址", str(provider_cfg.get("api_base") or ""))
provider_cfg["api_key"] = _prompt_text(" MoeMail API Key", str(provider_cfg.get("api_key") or ""))
elif provider_name == "cloudflare_temp_email":
provider_cfg["api_base"] = _prompt_text(" Cloudflare Worker API 地址", str(provider_cfg.get("api_base") or ""))
provider_cfg["admin_password"] = _prompt_text(" Worker 管理密码", str(provider_cfg.get("admin_password") or ""))
provider_cfg["domain"] = _prompt_text(" 邮箱域名后缀", str(provider_cfg.get("domain") or ""))
provider_cfgs[provider_name] = provider_cfg
cfg["mail_provider_configs"] = provider_cfgs
enable_sub2api = _prompt_bool("4) 是否配置 Sub2Api", bool(str(cfg.get("base_url") or "").strip()))
if enable_sub2api:
cfg["base_url"] = _prompt_text(" Sub2Api 地址", str(cfg.get("base_url") or "https://your-sub2api.example.com"))
auth_mode = _prompt_choice(
" Sub2Api 认证方式",
["token", "password"],
"token" if str(cfg.get("bearer_token") or "").strip() else "password",
)
if auth_mode == "token":
cfg["bearer_token"] = _prompt_text(" Bearer Token", str(cfg.get("bearer_token") or ""))
cfg["email"] = _prompt_text(" 管理员邮箱(可留空)", str(cfg.get("email") or ""))
cfg["password"] = ""
else:
cfg["email"] = _prompt_text(" 管理员邮箱", str(cfg.get("email") or "admin@example.com"))
cfg["password"] = _prompt_text(" 管理员密码", str(cfg.get("password") or ""))
cfg["bearer_token"] = ""
cfg["auto_sync"] = _prompt_bool(" 注册成功后自动同步到 Sub2Api", bool(cfg.get("auto_sync", False)))
cfg["sub2api_min_candidates"] = _prompt_int(" Sub2Api 候选池阈值", int(cfg.get("sub2api_min_candidates", 200)))
cfg["sub2api_auto_maintain"] = _prompt_bool(" 启用 Sub2Api 自动维护", bool(cfg.get("sub2api_auto_maintain", False)))
else:
cfg["base_url"] = ""
cfg["bearer_token"] = ""
cfg["email"] = ""
cfg["password"] = ""
cfg["auto_sync"] = False
enable_cpa = _prompt_bool("5) 是否配置 CPA 平台", bool(str(cfg.get("cpa_base_url") or "").strip()))
if enable_cpa:
cfg["cpa_base_url"] = _prompt_text(" CPA 地址", str(cfg.get("cpa_base_url") or "https://your-cpa.example.com"))
cfg["cpa_token"] = _prompt_text(" CPA Token", str(cfg.get("cpa_token") or ""))
cfg["min_candidates"] = _prompt_int(" CPA 候选池阈值", int(cfg.get("min_candidates", 1000)))
cfg["used_percent_threshold"] = _prompt_int(" CPA 已使用比例阈值", int(cfg.get("used_percent_threshold", 95)))
cfg["auto_maintain"] = _prompt_bool(" 启用 CPA 自动维护", bool(cfg.get("auto_maintain", True)))
cfg["maintain_interval_minutes"] = _prompt_int(" CPA 自动维护间隔(分钟)", int(cfg.get("maintain_interval_minutes", 30)))
else:
cfg["cpa_base_url"] = ""
cfg["cpa_token"] = ""
cfg["auto_maintain"] = False
enable_proxy_pool = _prompt_bool("6) 是否启用代理池", bool(cfg.get("proxy_pool_enabled", False)))
cfg["proxy_pool_enabled"] = enable_proxy_pool
if enable_proxy_pool:
cfg["proxy_pool_api_url"] = _prompt_text(" 代理池 API 地址", str(cfg.get("proxy_pool_api_url") or "https://zenproxy.top/api/fetch"))
cfg["proxy_pool_auth_mode"] = _prompt_choice(
" 代理池鉴权方式",
["header", "query"],
str(cfg.get("proxy_pool_auth_mode") or "header"),
)
cfg["proxy_pool_api_key"] = _prompt_text(" 代理池 API Key", str(cfg.get("proxy_pool_api_key") or ""))
cfg["proxy_pool_count"] = _prompt_int(" 每次取代理数量", int(cfg.get("proxy_pool_count", 1)))
cfg["proxy_pool_country"] = _prompt_text(" 代理国家代码", str(cfg.get("proxy_pool_country") or "US")).upper()
cfg["account_name"] = _prompt_text("7) 默认账号名称", str(cfg.get("account_name") or "AutoReg"))
cfg["upload_mode"] = _prompt_choice("8) 上传模式", ["snapshot", "decoupled"], str(cfg.get("upload_mode") or "snapshot"))
saved = save_sync_config(cfg)
print("\n配置已保存。建议下一步执行:")
print("- python3 run.py --json config show")
print("- python3 run.py register --once")
return {"ok": True, "config_file": str((PROJECT_ROOT / 'data' / 'sync_config.json')), "config": saved}
def handle_config_show(args: argparse.Namespace) -> dict[str, Any]:
return load_sync_config()
def handle_config_proxy(args: argparse.Namespace) -> dict[str, Any]:
cfg = save_runtime_proxy(args.proxy, auto_register=args.auto_register)
return {"ok": True, "proxy": cfg.get("proxy", ""), "auto_register": cfg.get("auto_register", False)}
def handle_check_proxy(args: argparse.Namespace) -> dict[str, Any]:
return check_proxy(args.proxy)
def handle_config_sub2api(args: argparse.Namespace) -> dict[str, Any]:
cfg = save_sub2api_credentials(
base_url=args.base_url,
bearer_token=args.bearer_token,
email=args.email,
password=args.password,
account_name=args.account_name,
auto_sync=args.auto_sync,
)
return {"ok": True, "base_url": cfg.get("base_url", ""), "auto_sync": cfg.get("auto_sync", False)}
def handle_sub2api_login(args: argparse.Namespace) -> dict[str, Any]:
return login_sub2api_once(args.base_url, args.email, args.password)
def handle_tokens(args: argparse.Namespace) -> dict[str, Any]:
items = []
for index, path in enumerate(iter_token_files()):
if index >= max(1, args.limit):
break
try:
token_data = read_token_file(path)
items.append(
{
"file": path.name,
"email": token_data.get("email", ""),
"uploaded_platforms": token_data.get("uploaded_platforms", []),
}
)
except Exception as exc:
items.append({"file": path.name, "error": str(exc)})
return {"total_shown": len(items), "items": items}
def handle_cpa_status(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.get_pool_status()
def handle_cpa_check(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.test_connection()
def handle_cpa_maintain(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.probe_and_clean_sync()
def handle_cpa_upload_all(args: argparse.Namespace) -> dict[str, Any]:
return upload_all_tokens_to_cpa(skip_uploaded=not args.include_uploaded)
def handle_sub2api_status(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
status = maintainer.get_pool_status()
status["dashboard"] = maintainer.get_dashboard_stats()
return status
def handle_sub2api_check(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.test_connection()
def handle_sub2api_sync_all(args: argparse.Namespace) -> dict[str, Any]:
return sync_all_tokens_to_sub2api(skip_uploaded=not args.include_uploaded)
def handle_sub2api_sync_one(args: argparse.Namespace) -> dict[str, Any]:
raw_path = Path(args.file)
file_path = raw_path if raw_path.is_absolute() else (Path(TOKENS_DIR) / args.file)
if not file_path.exists():
raise ValueError(f"token 文件不存在: {args.file}")
return sync_token_to_sub2api(file_path)
def handle_sub2api_dedupe(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.dedupe_duplicate_accounts(dry_run=not args.apply)
def handle_sub2api_handle_exception(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.handle_exception_accounts(account_ids=args.ids, delete_unresolved=not args.no_delete)
def handle_sub2api_maintain(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
cfg = load_sync_config()
actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions"))
overrides = {
"refresh_abnormal_accounts": (args.refresh_abnormal, args.no_refresh_abnormal),
"delete_abnormal_accounts": (args.delete_abnormal, args.no_delete_abnormal),
"dedupe_duplicate_accounts": (args.dedupe_duplicate, args.no_dedupe_duplicate),
}
for key, (enable, disable) in overrides.items():
if enable:
actions[key] = True
if disable:
actions[key] = False
result = maintainer.probe_and_clean_sync(actions=actions)
result["actions_text"] = sub2api_actions_description(actions)
return result
def handle_stats(args: argparse.Namespace) -> dict[str, Any]:
return load_state()
def require_pool_maintainer():
maintainer = get_pool_maintainer()
if not maintainer:
raise ValueError("CPA 未配置,请先在 data/sync_config.json 中填写 cpa_base_url 和 cpa_token")
return maintainer
def require_sub2api_maintainer():
maintainer = get_sub2api_maintainer()
if not maintainer:
raise ValueError("Sub2Api 未配置,请先在 data/sync_config.json 中填写 base_url 与认证信息")
return maintainer
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if not getattr(args, "command", None):
parser.print_help()
return 0
try:
result = args.handler(args)
except Exception as exc:
if getattr(args, "json", False):
print_json({"ok": False, "error": str(exc)})
else:
print(f"错误: {exc}", file=sys.stderr)
return 1
if not getattr(args, "json", False) and args.command in {"cpa", "sub2api"} and isinstance(result, dict):
if args.command == "cpa" and args.cpa_command == "status":
print_status_block(f"OpenAI Pool Orchestrator v{__version__} - CPA 状态", result)
return 0
if args.command == "sub2api" and args.sub2api_command == "status":
print_status_block(f"OpenAI Pool Orchestrator v{__version__} - Sub2Api 状态", result)
return 0
return _print_result(args, result)
if __name__ == "__main__":
raise SystemExit(main())