#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import sys import time from pathlib import Path from typing import Any, List, Optional PROJECT_ROOT = Path(__file__).resolve().parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from openai_pool_orchestrator import TOKENS_DIR, __version__ from openai_pool_orchestrator.mail_providers import MultiMailRouter from openai_pool_orchestrator.register import _write_text_atomic, run as register_run from account_store import save_registered_account try: from .support import ( check_proxy, get_pool_maintainer, get_sub2api_maintainer, init_config_from_example, iter_token_files, load_state, load_sync_config, login_sub2api_once, normalize_sub2api_maintain_actions, print_json, print_status_block, read_token_file, save_runtime_proxy, save_sync_config, save_sub2api_credentials, sub2api_actions_description, sync_all_tokens_to_sub2api, sync_token_to_sub2api, upload_all_tokens_to_cpa, ) except ImportError: from support import ( check_proxy, get_pool_maintainer, get_sub2api_maintainer, init_config_from_example, iter_token_files, load_state, load_sync_config, login_sub2api_once, normalize_sub2api_maintain_actions, print_json, print_status_block, read_token_file, save_runtime_proxy, save_sync_config, save_sub2api_credentials, sub2api_actions_description, sync_all_tokens_to_sub2api, sync_token_to_sub2api, upload_all_tokens_to_cpa, ) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="OpenAI Pool Orchestrator CLI") parser.add_argument("--json", action="store_true", help="以 JSON 输出结果") subparsers = parser.add_subparsers(dest="command") register_parser = subparsers.add_parser("register", help="执行注册流程") register_parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7897") register_parser.add_argument("--once", action="store_true", help="只运行一次") register_parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数") register_parser.add_argument("--sleep-max", type=int, default=30, help="循环模式最长等待秒数") register_parser.set_defaults(handler=handle_register) daemon_parser = subparsers.add_parser("daemon", help="常驻运行,按池状态自动补号和定时维护") daemon_parser.add_argument("--interval-minutes", type=int, default=0, help="检查周期分钟数,0 表示按维护间隔自动推导") daemon_parser.add_argument("--max-register-per-loop", type=int, default=0, help="每轮最多补号数量,0 表示使用配置中的 auto_register_max_per_loop") daemon_parser.set_defaults(handler=handle_daemon) config_parser = subparsers.add_parser("config", help="配置管理") config_subparsers = config_parser.add_subparsers(dest="config_command") config_init = config_subparsers.add_parser("init", help="初始化配置文件到 data/") config_init.set_defaults(handler=handle_config_init) config_setup = config_subparsers.add_parser("setup", help="交互式配置引导") config_setup.set_defaults(handler=handle_config_setup) config_show = config_subparsers.add_parser("show", help="显示当前配置") config_show.set_defaults(handler=handle_config_show) config_proxy = config_subparsers.add_parser("proxy", help="保存运行代理") config_proxy.add_argument("proxy", help="代理地址") config_proxy.add_argument("--auto-register", action="store_true", help="同时启用池不足时自动注册") config_proxy.set_defaults(handler=handle_config_proxy) config_check_proxy = config_subparsers.add_parser("check-proxy", help="检测代理可用性") config_check_proxy.add_argument("proxy", help="代理地址") config_check_proxy.set_defaults(handler=handle_check_proxy) config_sub2api = config_subparsers.add_parser("sub2api", help="保存 Sub2Api 配置并校验") config_sub2api.add_argument("--base-url", required=True, help="Sub2Api 平台地址") config_sub2api.add_argument("--bearer-token", default="", help="Bearer Token") config_sub2api.add_argument("--email", default="", help="管理员邮箱") config_sub2api.add_argument("--password", default="", help="管理员密码") config_sub2api.add_argument("--account-name", default=None, help="默认账号名称") config_sub2api.add_argument("--auto-sync", action="store_true", help="注册成功后自动同步 Sub2Api") config_sub2api.set_defaults(handler=handle_config_sub2api) config_login = config_subparsers.add_parser("sub2api-login", help="登录 Sub2Api 并保存 Bearer Token") config_login.add_argument("--base-url", required=True, help="Sub2Api 平台地址") config_login.add_argument("--email", required=True, help="管理员邮箱") config_login.add_argument("--password", required=True, help="管理员密码") config_login.set_defaults(handler=handle_sub2api_login) tokens_parser = subparsers.add_parser("tokens", help="查看本地 token 文件") tokens_parser.add_argument("--limit", type=int, default=20, help="最多显示数量") tokens_parser.set_defaults(handler=handle_tokens) cpa_parser = subparsers.add_parser("cpa", help="CPA 账号池命令") cpa_subparsers = cpa_parser.add_subparsers(dest="cpa_command") cpa_status = cpa_subparsers.add_parser("status", help="查看 CPA 池状态") cpa_status.set_defaults(handler=handle_cpa_status) cpa_check = cpa_subparsers.add_parser("check", help="测试 CPA 连接") cpa_check.set_defaults(handler=handle_cpa_check) cpa_maintain = cpa_subparsers.add_parser("maintain", help="执行 CPA 维护") cpa_maintain.set_defaults(handler=handle_cpa_maintain) cpa_upload = cpa_subparsers.add_parser("upload-all", help="上传本地全部 token 到 CPA") cpa_upload.add_argument("--include-uploaded", action="store_true", help="包含已标记上传的 token") cpa_upload.set_defaults(handler=handle_cpa_upload_all) sub2api_parser = subparsers.add_parser("sub2api", help="Sub2Api 命令") sub2api_subparsers = sub2api_parser.add_subparsers(dest="sub2api_command") sub2api_status = sub2api_subparsers.add_parser("status", help="查看 Sub2Api 池状态") sub2api_status.set_defaults(handler=handle_sub2api_status) sub2api_check = sub2api_subparsers.add_parser("check", help="测试 Sub2Api 连接") sub2api_check.set_defaults(handler=handle_sub2api_check) sub2api_sync = sub2api_subparsers.add_parser("sync-all", help="同步本地全部 token 到 Sub2Api") sub2api_sync.add_argument("--include-uploaded", action="store_true", help="包含已标记同步的 token") sub2api_sync.set_defaults(handler=handle_sub2api_sync_all) sub2api_sync_one = sub2api_subparsers.add_parser("sync-one", help="同步单个 token 到 Sub2Api") sub2api_sync_one.add_argument("file", help="token 文件名或路径") sub2api_sync_one.set_defaults(handler=handle_sub2api_sync_one) sub2api_dedupe = sub2api_subparsers.add_parser("dedupe", help="Sub2Api 重复账号清理") sub2api_dedupe.add_argument("--apply", action="store_true", help="实际执行删除,不仅预览") sub2api_dedupe.set_defaults(handler=handle_sub2api_dedupe) sub2api_handle = sub2api_subparsers.add_parser("handle-exception", help="处理异常账号") sub2api_handle.add_argument("ids", nargs="*", type=int, help="指定账号 ID;为空时处理全部异常账号") sub2api_handle.add_argument("--no-delete", action="store_true", help="刷新后不删除仍异常账号") sub2api_handle.set_defaults(handler=handle_sub2api_handle_exception) sub2api_maintain = sub2api_subparsers.add_parser("maintain", help="执行 Sub2Api 综合维护") sub2api_maintain.add_argument("--refresh-abnormal", action="store_true", help="仅显式开启异常测活") sub2api_maintain.add_argument("--delete-abnormal", action="store_true", help="仅显式开启异常删除") sub2api_maintain.add_argument("--dedupe-duplicate", action="store_true", help="仅显式开启重复清理") sub2api_maintain.add_argument("--no-refresh-abnormal", action="store_true", help="关闭异常测活") sub2api_maintain.add_argument("--no-delete-abnormal", action="store_true", help="关闭异常删除") sub2api_maintain.add_argument("--no-dedupe-duplicate", action="store_true", help="关闭重复清理") sub2api_maintain.set_defaults(handler=handle_sub2api_maintain) stats_parser = subparsers.add_parser("stats", help="显示本地累计统计") stats_parser.set_defaults(handler=handle_stats) return parser def _print_result(args: argparse.Namespace, result: Any) -> int: if args.json: print_json(result) return 0 if isinstance(result, dict): print_json(result) return 0 print(result) return 0 def _perform_registration_once(cfg: dict[str, Any], proxy: Optional[str]) -> dict[str, Any]: os.makedirs(TOKENS_DIR, exist_ok=True) mail_router = MultiMailRouter(cfg) provider_name, provider = mail_router.next_provider() print(f"[*] 本次使用邮箱提供商: {provider_name}") print("[*] 邮箱接口默认直连,不使用代理") try: token_json = register_run( proxy, mail_provider=provider, proxy_pool_config={ "enabled": bool(cfg.get("proxy_pool_enabled", False)), "api_url": cfg.get("proxy_pool_api_url", ""), "auth_mode": cfg.get("proxy_pool_auth_mode", "query"), "api_key": cfg.get("proxy_pool_api_key", ""), "count": cfg.get("proxy_pool_count", 1), "country": cfg.get("proxy_pool_country", "US"), }, ) mail_router.report_success(provider_name) except Exception as exc: try: mail_router.report_failure(provider_name) except Exception: pass print(f"[Error] 发生未捕获异常: {exc}") return {"ok": False, "error": str(exc)} if not token_json: print("[-] 本次注册失败。") return {"ok": False, "error": "本次注册失败"} token_data = json.loads(token_json) email = str(token_data.get("email") or "unknown") file_name = f"token_{email.replace('@', '_')}_{time.time_ns()}.json" file_path = Path(TOKENS_DIR) / file_name _write_text_atomic(str(file_path), token_json) print(f"[*] 成功! Token 已保存至: {file_path}") run_result: dict[str, Any] = {"ok": True, "file": file_name, "email": email} if cfg.get("db_enabled"): try: db_result = save_registered_account( cfg, { "email": token_data.get("email", ""), "chatgpt_password": token_data.get("account_password", ""), "mail_password": token_data.get("mail_password", ""), "oauth_status": "oauth=ok", "mail_token": token_data.get("mail_token", ""), "name": token_data.get("name", ""), "birthdate": token_data.get("birthdate", ""), "source": cfg.get("db_source", "standalone_cli"), }, ) run_result["db_saved"] = bool(db_result.get("ok", False)) print(f"[{'+' if db_result.get('ok') else '-'}] 数据库{'写入成功' if db_result.get('ok') else '写入失败'}: {email}") except Exception as exc: run_result["db_saved"] = False run_result["db_error"] = str(exc) print(f"[-] 数据库写入异常: {exc}") cpa = get_pool_maintainer(cfg) if cpa: cpa_ok = cpa.upload_token(file_name, token_data, proxy="") run_result["cpa_uploaded"] = cpa_ok print(f"[{'+' if cpa_ok else '-'}] CPA {'上传成功' if cpa_ok else '上传失败'}: {email}") if cfg.get("auto_sync"): try: sync_result = sync_token_to_sub2api(file_path, cfg) run_result["sub2api_sync"] = sync_result if sync_result.get("ok"): print(f"[+] Sub2Api 同步成功: {email}") else: print(f"[-] Sub2Api 同步失败: {email}") except Exception as exc: run_result["sub2api_sync"] = {"ok": False, "error": str(exc)} print(f"[-] Sub2Api 同步异常: {exc}") return run_result def _derive_daemon_interval_minutes(cfg: dict[str, Any], explicit_minutes: int) -> int: if explicit_minutes and explicit_minutes > 0: return explicit_minutes candidates: list[int] = [] if bool(cfg.get("auto_maintain", False)): candidates.append(max(1, int(cfg.get("maintain_interval_minutes", 30) or 30))) if bool(cfg.get("sub2api_auto_maintain", False)): candidates.append(max(1, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30))) return min(candidates) if candidates else 30 def _derive_register_cap(cfg: dict[str, Any], explicit_cap: int) -> int: if explicit_cap and explicit_cap > 0: return explicit_cap try: return max(0, int(cfg.get("auto_register_max_per_loop", 1) or 0)) except (TypeError, ValueError): return 1 def _collect_pool_health(cfg: dict[str, Any]) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] cpa = get_pool_maintainer(cfg) if cpa: status = cpa.get_pool_status() results.append({"name": "CPA", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))}) sub2api = get_sub2api_maintainer(cfg) if sub2api: status = sub2api.get_pool_status() results.append({"name": "Sub2Api", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))}) return results def _calculate_register_deficit(pool_results: list[dict[str, Any]]) -> int: deficits: list[int] = [] for item in pool_results: status = item.get("status") or {} try: threshold = max(0, int(status.get("threshold", 0) or 0)) candidates = max(0, int(status.get("candidates", 0) or 0)) except (TypeError, ValueError): continue deficits.append(max(0, threshold - candidates)) return max(deficits) if deficits else 0 def handle_daemon(args: argparse.Namespace) -> dict[str, Any]: print("[*] 启动常驻编排模式,按 Ctrl+C 停止") last_cpa_maintain = 0.0 last_sub2api_maintain = 0.0 loops = 0 try: while True: loops += 1 cfg = load_sync_config() proxy = str(cfg.get("proxy") or "").strip() or None interval_minutes = _derive_daemon_interval_minutes(cfg, args.interval_minutes) interval_seconds = max(60, interval_minutes * 60) register_cap = _derive_register_cap(cfg, args.max_register_per_loop) print(f"\n[{time.strftime('%H:%M:%S')}] >>> Daemon 第 {loops} 轮检查 <<<") print(f"[*] 当前检查周期: {interval_minutes} 分钟") print(f"[*] 每轮最大补号数量: {register_cap}") pool_results = _collect_pool_health(cfg) if not pool_results: print("[!] 未配置可检查的号池,当前仅等待下一轮") else: for item in pool_results: status = item["status"] print( f"[*] {item['name']} 池: candidates={status.get('candidates', 0)} " f"threshold={status.get('threshold', 0)} healthy={item['healthy']}" ) if bool(cfg.get("auto_register", False)) and pool_results: unhealthy = [item for item in pool_results if not item.get("healthy", False)] if unhealthy: names = ", ".join(item["name"] for item in unhealthy) deficit = _calculate_register_deficit(unhealthy) planned = max(1, deficit or 1) if register_cap == 0 else max(1, min(register_cap, deficit or 1)) if register_cap == 0: print(f"[!] 号池不足,开始自动补号: {names},差值约 {deficit},本轮按差值动态补 {planned} 个") else: print(f"[!] 号池不足,开始自动补号: {names},差值约 {deficit},本轮计划补 {planned} 个") for index in range(planned): print(f"[*] 自动补号进度: {index + 1}/{planned}") try: result = _perform_registration_once(cfg, proxy) if not result.get("ok"): print("[!] 本次自动补号失败,停止本轮剩余补号") break except Exception as exc: print(f"[Error] 自动补号失败: {exc}") break else: print("[+] 当前号池满足阈值,本轮不执行注册") elif bool(cfg.get("auto_register", False)): print("[!] 已启用 auto_register,但未配置可检查的号池") now = time.time() cpa = get_pool_maintainer(cfg) if cpa and bool(cfg.get("auto_maintain", False)): maintain_seconds = max(60, int(cfg.get("maintain_interval_minutes", 30) or 30) * 60) if now - last_cpa_maintain >= maintain_seconds: print("[*] 开始执行 CPA 自动维护") try: result = cpa.probe_and_clean_sync() print_json({"cpa_maintain": result}) except Exception as exc: print(f"[Error] CPA 自动维护失败: {exc}") last_cpa_maintain = now sub2api = get_sub2api_maintainer(cfg) if sub2api and bool(cfg.get("sub2api_auto_maintain", False)): maintain_seconds = max(60, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30) * 60) if now - last_sub2api_maintain >= maintain_seconds: print("[*] 开始执行 Sub2Api 自动维护") actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions")) try: result = sub2api.probe_and_clean_sync(actions=actions) print_json({"sub2api_maintain": result}) except Exception as exc: print(f"[Error] Sub2Api 自动维护失败: {exc}") last_sub2api_maintain = now print(f"[*] 休眠 {interval_minutes} 分钟,等待下一轮检查...") time.sleep(interval_seconds) except KeyboardInterrupt: print("\n[*] 已停止常驻编排模式") return {"ok": True, "stopped": True, "loops": loops} def handle_register(args: argparse.Namespace) -> dict[str, Any]: cfg = load_sync_config() os.makedirs(TOKENS_DIR, exist_ok=True) sleep_min = max(1, args.sleep_min) sleep_max = max(sleep_min, args.sleep_max) proxy = args.proxy if args.proxy is not None else str(cfg.get("proxy") or "").strip() or None count = 0 runs: List[dict[str, Any]] = [] while True: count += 1 print(f"\n[{time.strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<") run_result = _perform_registration_once(cfg, proxy) runs.append(run_result) if args.once: break wait_time = sleep_min if sleep_min == sleep_max else __import__("random").randint(sleep_min, sleep_max) print(f"[*] 休息 {wait_time} 秒...") time.sleep(wait_time) return {"runs": runs} def handle_config_init(args: argparse.Namespace) -> dict[str, Any]: path = init_config_from_example(PROJECT_ROOT) return {"ok": True, "config_file": str(path)} def _is_interactive() -> bool: try: return os.isatty(sys.stdin.fileno()) except Exception: return False def _prompt_text(prompt: str, default: str = "") -> str: suffix = f" [{default}]" if default else "" try: value = input(f"{prompt}{suffix}: ").strip() except EOFError: value = "" except KeyboardInterrupt: print("\n已取消配置。") raise SystemExit(130) return value or default def _prompt_bool(prompt: str, default: bool = False) -> bool: suffix = "(Y/n)" if default else "(y/N)" try: value = input(f"{prompt} {suffix}: ").strip().lower() except EOFError: value = "" except KeyboardInterrupt: print("\n已取消配置。") raise SystemExit(130) if not value: return default return value in {"y", "yes", "1", "true"} def _prompt_int(prompt: str, default: int) -> int: while True: value = _prompt_text(prompt, str(default)).strip() try: return int(value) except ValueError: print("请输入整数。") def _prompt_non_negative_int(prompt: str, default: int) -> int: while True: value = _prompt_text(prompt, str(default)).strip() try: parsed = int(value) except ValueError: print("请输入大于等于 0 的整数。") continue if parsed < 0: print("请输入大于等于 0 的整数。") continue return parsed def _prompt_choice(prompt: str, options: list[str], default: str) -> str: option_text = "/".join(options) default_value = default if default in options else options[0] while True: value = _prompt_text(f"{prompt} ({option_text})", default_value).strip().lower() if value in options: return value print(f"请输入以下选项之一: {option_text}") def handle_config_setup(args: argparse.Namespace) -> dict[str, Any]: if not _is_interactive(): raise ValueError("`config setup` 需要交互式终端,请直接编辑 data/sync_config.json 或先使用 config init") init_config_from_example(PROJECT_ROOT) cfg = load_sync_config() print("开始交互式配置,直接回车表示使用当前值。\n") print("说明: OpenAI 注册流程使用 `proxy`,邮箱接口默认直连,不使用代理。\n") cfg["proxy"] = _prompt_text("1) 注册代理地址", str(cfg.get("proxy") or "http://127.0.0.1:17891")) cfg["auto_register"] = _prompt_bool("2) 池不足时自动注册", bool(cfg.get("auto_register", False))) cfg["auto_register_max_per_loop"] = _prompt_non_negative_int(" 每轮最多自动补号数量(0 表示按差值动态补齐)", int(cfg.get("auto_register_max_per_loop", 1) or 1)) print(" 提示: 0 表示按差值动态补齐;值越大补号越快,也会更激进。一般先用 1、2 或 0。") cfg["db_enabled"] = _prompt_bool(" 是否启用 PostgreSQL 注册信息入库", bool(cfg.get("db_enabled", False))) if cfg["db_enabled"]: cfg["db_host"] = _prompt_text(" DB 主机", str(cfg.get("db_host") or "150.158.105.6")) cfg["db_port"] = _prompt_int(" DB 端口", int(cfg.get("db_port", 54321) or 54321)) cfg["db_name"] = _prompt_text(" DB 名称", str(cfg.get("db_name") or "mail_accounts_db")) cfg["db_user"] = _prompt_text(" DB 用户名", str(cfg.get("db_user") or "postgres")) cfg["db_password"] = _prompt_text(" DB 密码", str(cfg.get("db_password") or "")) cfg["db_table"] = _prompt_text(" DB 表名", str(cfg.get("db_table") or "registered_accounts")) cfg["db_source"] = _prompt_text(" source 字段值", str(cfg.get("db_source") or "standalone_cli")) provider_options = ["mailtm", "duckmail", "moemail", "cloudflare_temp_email"] current_provider = str((cfg.get("mail_providers") or ["mailtm"])[0]).strip().lower() or "mailtm" provider_name = _prompt_choice("3) 邮箱提供商", provider_options, current_provider) cfg["mail_providers"] = [provider_name] cfg["mail_strategy"] = "round_robin" provider_cfgs = dict(cfg.get("mail_provider_configs") or {}) provider_cfg = dict(provider_cfgs.get(provider_name) or {}) if provider_name == "mailtm": provider_cfg["api_base"] = _prompt_text(" Mail.tm API 地址", str(provider_cfg.get("api_base") or "https://api.mail.tm")) elif provider_name == "duckmail": provider_cfg["api_base"] = _prompt_text(" DuckMail API 地址", str(provider_cfg.get("api_base") or "https://api.duckmail.sbs")) provider_cfg["bearer_token"] = _prompt_text(" DuckMail Bearer Token", str(provider_cfg.get("bearer_token") or "")) provider_cfg["domain"] = _prompt_text(" DuckMail 域名(填写后将固定使用该域名,不再随机选域)", str(provider_cfg.get("domain") or "")) if provider_cfg.get("domain"): print(f" 已固定 DuckMail 域名: {provider_cfg['domain']},后续邮箱会使用该域名。") elif provider_name == "moemail": provider_cfg["api_base"] = _prompt_text(" MoeMail API 地址", str(provider_cfg.get("api_base") or "")) provider_cfg["api_key"] = _prompt_text(" MoeMail API Key", str(provider_cfg.get("api_key") or "")) elif provider_name == "cloudflare_temp_email": provider_cfg["api_base"] = _prompt_text(" Cloudflare Worker API 地址", str(provider_cfg.get("api_base") or "")) provider_cfg["admin_password"] = _prompt_text(" Worker 管理密码", str(provider_cfg.get("admin_password") or "")) provider_cfg["domain"] = _prompt_text(" 邮箱域名后缀", str(provider_cfg.get("domain") or "")) provider_cfgs[provider_name] = provider_cfg cfg["mail_provider_configs"] = provider_cfgs enable_sub2api = _prompt_bool("4) 是否配置 Sub2Api", bool(str(cfg.get("base_url") or "").strip())) if enable_sub2api: cfg["base_url"] = _prompt_text(" Sub2Api 地址", str(cfg.get("base_url") or "https://your-sub2api.example.com")) auth_mode = _prompt_choice( " Sub2Api 认证方式", ["token", "password"], "token" if str(cfg.get("bearer_token") or "").strip() else "password", ) if auth_mode == "token": cfg["bearer_token"] = _prompt_text(" Bearer Token", str(cfg.get("bearer_token") or "")) cfg["email"] = _prompt_text(" 管理员邮箱(可留空)", str(cfg.get("email") or "")) cfg["password"] = "" else: cfg["email"] = _prompt_text(" 管理员邮箱", str(cfg.get("email") or "admin@example.com")) cfg["password"] = _prompt_text(" 管理员密码", str(cfg.get("password") or "")) cfg["bearer_token"] = "" cfg["auto_sync"] = _prompt_bool(" 注册成功后自动同步到 Sub2Api", bool(cfg.get("auto_sync", False))) cfg["sub2api_min_candidates"] = _prompt_int(" Sub2Api 候选池阈值", int(cfg.get("sub2api_min_candidates", 200))) cfg["sub2api_auto_maintain"] = _prompt_bool(" 启用 Sub2Api 自动维护", bool(cfg.get("sub2api_auto_maintain", False))) else: cfg["base_url"] = "" cfg["bearer_token"] = "" cfg["email"] = "" cfg["password"] = "" cfg["auto_sync"] = False enable_cpa = _prompt_bool("5) 是否配置 CPA 平台", bool(str(cfg.get("cpa_base_url") or "").strip())) if enable_cpa: cfg["cpa_base_url"] = _prompt_text(" CPA 地址", str(cfg.get("cpa_base_url") or "https://your-cpa.example.com")) cfg["cpa_token"] = _prompt_text(" CPA Token", str(cfg.get("cpa_token") or "")) cfg["min_candidates"] = _prompt_int(" CPA 候选池阈值", int(cfg.get("min_candidates", 1000))) cfg["used_percent_threshold"] = _prompt_int(" CPA 已使用比例阈值", int(cfg.get("used_percent_threshold", 95))) cfg["auto_maintain"] = _prompt_bool(" 启用 CPA 自动维护", bool(cfg.get("auto_maintain", True))) cfg["maintain_interval_minutes"] = _prompt_int(" CPA 自动维护间隔(分钟)", int(cfg.get("maintain_interval_minutes", 30))) else: cfg["cpa_base_url"] = "" cfg["cpa_token"] = "" cfg["auto_maintain"] = False enable_proxy_pool = _prompt_bool("6) 是否启用代理池", bool(cfg.get("proxy_pool_enabled", False))) cfg["proxy_pool_enabled"] = enable_proxy_pool if enable_proxy_pool: cfg["proxy_pool_api_url"] = _prompt_text(" 代理池 API 地址", str(cfg.get("proxy_pool_api_url") or "https://zenproxy.top/api/fetch")) cfg["proxy_pool_auth_mode"] = _prompt_choice( " 代理池鉴权方式", ["header", "query"], str(cfg.get("proxy_pool_auth_mode") or "header"), ) cfg["proxy_pool_api_key"] = _prompt_text(" 代理池 API Key", str(cfg.get("proxy_pool_api_key") or "")) cfg["proxy_pool_count"] = _prompt_int(" 每次取代理数量", int(cfg.get("proxy_pool_count", 1))) cfg["proxy_pool_country"] = _prompt_text(" 代理国家代码", str(cfg.get("proxy_pool_country") or "US")).upper() cfg["account_name"] = _prompt_text("7) 默认账号名称", str(cfg.get("account_name") or "AutoReg")) cfg["upload_mode"] = _prompt_choice("8) 上传模式", ["snapshot", "decoupled"], str(cfg.get("upload_mode") or "snapshot")) saved = save_sync_config(cfg) print("\n配置已保存。建议下一步执行:") print("- python3 run.py --json config show") print("- python3 run.py register --once") return {"ok": True, "config_file": str((PROJECT_ROOT / 'data' / 'sync_config.json')), "config": saved} def handle_config_show(args: argparse.Namespace) -> dict[str, Any]: return load_sync_config() def handle_config_proxy(args: argparse.Namespace) -> dict[str, Any]: cfg = save_runtime_proxy(args.proxy, auto_register=args.auto_register) return {"ok": True, "proxy": cfg.get("proxy", ""), "auto_register": cfg.get("auto_register", False)} def handle_check_proxy(args: argparse.Namespace) -> dict[str, Any]: return check_proxy(args.proxy) def handle_config_sub2api(args: argparse.Namespace) -> dict[str, Any]: cfg = save_sub2api_credentials( base_url=args.base_url, bearer_token=args.bearer_token, email=args.email, password=args.password, account_name=args.account_name, auto_sync=args.auto_sync, ) return {"ok": True, "base_url": cfg.get("base_url", ""), "auto_sync": cfg.get("auto_sync", False)} def handle_sub2api_login(args: argparse.Namespace) -> dict[str, Any]: return login_sub2api_once(args.base_url, args.email, args.password) def handle_tokens(args: argparse.Namespace) -> dict[str, Any]: items = [] for index, path in enumerate(iter_token_files()): if index >= max(1, args.limit): break try: token_data = read_token_file(path) items.append( { "file": path.name, "email": token_data.get("email", ""), "uploaded_platforms": token_data.get("uploaded_platforms", []), } ) except Exception as exc: items.append({"file": path.name, "error": str(exc)}) return {"total_shown": len(items), "items": items} def handle_cpa_status(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_pool_maintainer() return maintainer.get_pool_status() def handle_cpa_check(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_pool_maintainer() return maintainer.test_connection() def handle_cpa_maintain(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_pool_maintainer() return maintainer.probe_and_clean_sync() def handle_cpa_upload_all(args: argparse.Namespace) -> dict[str, Any]: return upload_all_tokens_to_cpa(skip_uploaded=not args.include_uploaded) def handle_sub2api_status(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_sub2api_maintainer() status = maintainer.get_pool_status() status["dashboard"] = maintainer.get_dashboard_stats() return status def handle_sub2api_check(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_sub2api_maintainer() return maintainer.test_connection() def handle_sub2api_sync_all(args: argparse.Namespace) -> dict[str, Any]: return sync_all_tokens_to_sub2api(skip_uploaded=not args.include_uploaded) def handle_sub2api_sync_one(args: argparse.Namespace) -> dict[str, Any]: raw_path = Path(args.file) file_path = raw_path if raw_path.is_absolute() else (Path(TOKENS_DIR) / args.file) if not file_path.exists(): raise ValueError(f"token 文件不存在: {args.file}") return sync_token_to_sub2api(file_path) def handle_sub2api_dedupe(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_sub2api_maintainer() return maintainer.dedupe_duplicate_accounts(dry_run=not args.apply) def handle_sub2api_handle_exception(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_sub2api_maintainer() return maintainer.handle_exception_accounts(account_ids=args.ids, delete_unresolved=not args.no_delete) def handle_sub2api_maintain(args: argparse.Namespace) -> dict[str, Any]: maintainer = require_sub2api_maintainer() cfg = load_sync_config() actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions")) overrides = { "refresh_abnormal_accounts": (args.refresh_abnormal, args.no_refresh_abnormal), "delete_abnormal_accounts": (args.delete_abnormal, args.no_delete_abnormal), "dedupe_duplicate_accounts": (args.dedupe_duplicate, args.no_dedupe_duplicate), } for key, (enable, disable) in overrides.items(): if enable: actions[key] = True if disable: actions[key] = False result = maintainer.probe_and_clean_sync(actions=actions) result["actions_text"] = sub2api_actions_description(actions) return result def handle_stats(args: argparse.Namespace) -> dict[str, Any]: return load_state() def require_pool_maintainer(): maintainer = get_pool_maintainer() if not maintainer: raise ValueError("CPA 未配置,请先在 data/sync_config.json 中填写 cpa_base_url 和 cpa_token") return maintainer def require_sub2api_maintainer(): maintainer = get_sub2api_maintainer() if not maintainer: raise ValueError("Sub2Api 未配置,请先在 data/sync_config.json 中填写 base_url 与认证信息") return maintainer def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if not getattr(args, "command", None): parser.print_help() return 0 try: result = args.handler(args) except Exception as exc: if getattr(args, "json", False): print_json({"ok": False, "error": str(exc)}) else: print(f"错误: {exc}", file=sys.stderr) return 1 if not getattr(args, "json", False) and args.command in {"cpa", "sub2api"} and isinstance(result, dict): if args.command == "cpa" and args.cpa_command == "status": print_status_block(f"OpenAI Pool Orchestrator v{__version__} - CPA 状态", result) return 0 if args.command == "sub2api" and args.sub2api_command == "status": print_status_block(f"OpenAI Pool Orchestrator v{__version__} - Sub2Api 状态", result) return 0 return _print_result(args, result) if __name__ == "__main__": raise SystemExit(main())