Files
standalone-openai-pool-cli/main.py
2026-03-19 11:12:26 +08:00

588 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Any, List
PROJECT_ROOT = Path(__file__).resolve().parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from openai_pool_orchestrator import TOKENS_DIR, __version__
from openai_pool_orchestrator.mail_providers import MultiMailRouter
from openai_pool_orchestrator.register import _write_text_atomic, run as register_run
try:
from .support import (
check_proxy,
get_pool_maintainer,
get_sub2api_maintainer,
init_config_from_example,
iter_token_files,
load_state,
load_sync_config,
login_sub2api_once,
normalize_sub2api_maintain_actions,
print_json,
print_status_block,
read_token_file,
save_runtime_proxy,
save_sync_config,
save_sub2api_credentials,
sub2api_actions_description,
sync_all_tokens_to_sub2api,
sync_token_to_sub2api,
upload_all_tokens_to_cpa,
)
except ImportError:
from support import (
check_proxy,
get_pool_maintainer,
get_sub2api_maintainer,
init_config_from_example,
iter_token_files,
load_state,
load_sync_config,
login_sub2api_once,
normalize_sub2api_maintain_actions,
print_json,
print_status_block,
read_token_file,
save_runtime_proxy,
save_sync_config,
save_sub2api_credentials,
sub2api_actions_description,
sync_all_tokens_to_sub2api,
sync_token_to_sub2api,
upload_all_tokens_to_cpa,
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="OpenAI Pool Orchestrator CLI")
parser.add_argument("--json", action="store_true", help="以 JSON 输出结果")
subparsers = parser.add_subparsers(dest="command")
register_parser = subparsers.add_parser("register", help="执行注册流程")
register_parser.add_argument("--proxy", default=None, help="代理地址,如 http://127.0.0.1:7897")
register_parser.add_argument("--once", action="store_true", help="只运行一次")
register_parser.add_argument("--sleep-min", type=int, default=5, help="循环模式最短等待秒数")
register_parser.add_argument("--sleep-max", type=int, default=30, help="循环模式最长等待秒数")
register_parser.set_defaults(handler=handle_register)
config_parser = subparsers.add_parser("config", help="配置管理")
config_subparsers = config_parser.add_subparsers(dest="config_command")
config_init = config_subparsers.add_parser("init", help="初始化配置文件到 data/")
config_init.set_defaults(handler=handle_config_init)
config_setup = config_subparsers.add_parser("setup", help="交互式配置引导")
config_setup.set_defaults(handler=handle_config_setup)
config_show = config_subparsers.add_parser("show", help="显示当前配置")
config_show.set_defaults(handler=handle_config_show)
config_proxy = config_subparsers.add_parser("proxy", help="保存运行代理")
config_proxy.add_argument("proxy", help="代理地址")
config_proxy.add_argument("--auto-register", action="store_true", help="同时启用池不足时自动注册")
config_proxy.set_defaults(handler=handle_config_proxy)
config_check_proxy = config_subparsers.add_parser("check-proxy", help="检测代理可用性")
config_check_proxy.add_argument("proxy", help="代理地址")
config_check_proxy.set_defaults(handler=handle_check_proxy)
config_sub2api = config_subparsers.add_parser("sub2api", help="保存 Sub2Api 配置并校验")
config_sub2api.add_argument("--base-url", required=True, help="Sub2Api 平台地址")
config_sub2api.add_argument("--bearer-token", default="", help="Bearer Token")
config_sub2api.add_argument("--email", default="", help="管理员邮箱")
config_sub2api.add_argument("--password", default="", help="管理员密码")
config_sub2api.add_argument("--account-name", default=None, help="默认账号名称")
config_sub2api.add_argument("--auto-sync", action="store_true", help="注册成功后自动同步 Sub2Api")
config_sub2api.set_defaults(handler=handle_config_sub2api)
config_login = config_subparsers.add_parser("sub2api-login", help="登录 Sub2Api 并保存 Bearer Token")
config_login.add_argument("--base-url", required=True, help="Sub2Api 平台地址")
config_login.add_argument("--email", required=True, help="管理员邮箱")
config_login.add_argument("--password", required=True, help="管理员密码")
config_login.set_defaults(handler=handle_sub2api_login)
tokens_parser = subparsers.add_parser("tokens", help="查看本地 token 文件")
tokens_parser.add_argument("--limit", type=int, default=20, help="最多显示数量")
tokens_parser.set_defaults(handler=handle_tokens)
cpa_parser = subparsers.add_parser("cpa", help="CPA 账号池命令")
cpa_subparsers = cpa_parser.add_subparsers(dest="cpa_command")
cpa_status = cpa_subparsers.add_parser("status", help="查看 CPA 池状态")
cpa_status.set_defaults(handler=handle_cpa_status)
cpa_check = cpa_subparsers.add_parser("check", help="测试 CPA 连接")
cpa_check.set_defaults(handler=handle_cpa_check)
cpa_maintain = cpa_subparsers.add_parser("maintain", help="执行 CPA 维护")
cpa_maintain.set_defaults(handler=handle_cpa_maintain)
cpa_upload = cpa_subparsers.add_parser("upload-all", help="上传本地全部 token 到 CPA")
cpa_upload.add_argument("--include-uploaded", action="store_true", help="包含已标记上传的 token")
cpa_upload.set_defaults(handler=handle_cpa_upload_all)
sub2api_parser = subparsers.add_parser("sub2api", help="Sub2Api 命令")
sub2api_subparsers = sub2api_parser.add_subparsers(dest="sub2api_command")
sub2api_status = sub2api_subparsers.add_parser("status", help="查看 Sub2Api 池状态")
sub2api_status.set_defaults(handler=handle_sub2api_status)
sub2api_check = sub2api_subparsers.add_parser("check", help="测试 Sub2Api 连接")
sub2api_check.set_defaults(handler=handle_sub2api_check)
sub2api_sync = sub2api_subparsers.add_parser("sync-all", help="同步本地全部 token 到 Sub2Api")
sub2api_sync.add_argument("--include-uploaded", action="store_true", help="包含已标记同步的 token")
sub2api_sync.set_defaults(handler=handle_sub2api_sync_all)
sub2api_sync_one = sub2api_subparsers.add_parser("sync-one", help="同步单个 token 到 Sub2Api")
sub2api_sync_one.add_argument("file", help="token 文件名或路径")
sub2api_sync_one.set_defaults(handler=handle_sub2api_sync_one)
sub2api_dedupe = sub2api_subparsers.add_parser("dedupe", help="Sub2Api 重复账号清理")
sub2api_dedupe.add_argument("--apply", action="store_true", help="实际执行删除,不仅预览")
sub2api_dedupe.set_defaults(handler=handle_sub2api_dedupe)
sub2api_handle = sub2api_subparsers.add_parser("handle-exception", help="处理异常账号")
sub2api_handle.add_argument("ids", nargs="*", type=int, help="指定账号 ID为空时处理全部异常账号")
sub2api_handle.add_argument("--no-delete", action="store_true", help="刷新后不删除仍异常账号")
sub2api_handle.set_defaults(handler=handle_sub2api_handle_exception)
sub2api_maintain = sub2api_subparsers.add_parser("maintain", help="执行 Sub2Api 综合维护")
sub2api_maintain.add_argument("--refresh-abnormal", action="store_true", help="仅显式开启异常测活")
sub2api_maintain.add_argument("--delete-abnormal", action="store_true", help="仅显式开启异常删除")
sub2api_maintain.add_argument("--dedupe-duplicate", action="store_true", help="仅显式开启重复清理")
sub2api_maintain.add_argument("--no-refresh-abnormal", action="store_true", help="关闭异常测活")
sub2api_maintain.add_argument("--no-delete-abnormal", action="store_true", help="关闭异常删除")
sub2api_maintain.add_argument("--no-dedupe-duplicate", action="store_true", help="关闭重复清理")
sub2api_maintain.set_defaults(handler=handle_sub2api_maintain)
stats_parser = subparsers.add_parser("stats", help="显示本地累计统计")
stats_parser.set_defaults(handler=handle_stats)
return parser
def _print_result(args: argparse.Namespace, result: Any) -> int:
if args.json:
print_json(result)
return 0
if isinstance(result, dict):
print_json(result)
return 0
print(result)
return 0
def handle_register(args: argparse.Namespace) -> dict[str, Any]:
cfg = load_sync_config()
os.makedirs(TOKENS_DIR, exist_ok=True)
sleep_min = max(1, args.sleep_min)
sleep_max = max(sleep_min, args.sleep_max)
proxy = args.proxy if args.proxy is not None else str(cfg.get("proxy") or "").strip() or None
mail_router = MultiMailRouter(cfg)
count = 0
runs: List[dict[str, Any]] = []
while True:
count += 1
print(f"\n[{time.strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<")
try:
provider_name, provider = mail_router.next_provider()
print(f"[*] 本次使用邮箱提供商: {provider_name}")
token_json = register_run(
proxy,
mail_provider=provider,
proxy_pool_config={
"enabled": bool(cfg.get("proxy_pool_enabled", False)),
"api_url": cfg.get("proxy_pool_api_url", ""),
"auth_mode": cfg.get("proxy_pool_auth_mode", "query"),
"api_key": cfg.get("proxy_pool_api_key", ""),
"count": cfg.get("proxy_pool_count", 1),
"country": cfg.get("proxy_pool_country", "US"),
},
)
mail_router.report_success(provider_name)
except Exception as exc:
token_json = None
try:
mail_router.report_failure(provider_name)
except Exception:
pass
runs.append({"ok": False, "error": str(exc)})
print(f"[Error] 发生未捕获异常: {exc}")
if token_json:
token_data = json.loads(token_json)
email = str(token_data.get("email") or "unknown")
file_name = f"token_{email.replace('@', '_')}_{time.time_ns()}.json"
file_path = Path(TOKENS_DIR) / file_name
_write_text_atomic(str(file_path), token_json)
print(f"[*] 成功! Token 已保存至: {file_path}")
run_result: dict[str, Any] = {"ok": True, "file": file_name, "email": email}
cpa = get_pool_maintainer(cfg)
if cpa:
cpa_ok = cpa.upload_token(file_name, token_data, proxy=proxy or "")
run_result["cpa_uploaded"] = cpa_ok
print(f"[{'+' if cpa_ok else '-'}] CPA {'上传成功' if cpa_ok else '上传失败'}: {email}")
if cfg.get("auto_sync"):
try:
sync_result = sync_token_to_sub2api(file_path, cfg)
run_result["sub2api_sync"] = sync_result
if sync_result.get("ok"):
print(f"[+] Sub2Api 同步成功: {email}")
else:
print(f"[-] Sub2Api 同步失败: {email}")
except Exception as exc:
run_result["sub2api_sync"] = {"ok": False, "error": str(exc)}
print(f"[-] Sub2Api 同步异常: {exc}")
runs.append(run_result)
else:
if not runs or runs[-1].get("ok") is not False:
runs.append({"ok": False, "error": "本次注册失败"})
print("[-] 本次注册失败。")
if args.once:
break
wait_time = sleep_min if sleep_min == sleep_max else __import__("random").randint(sleep_min, sleep_max)
print(f"[*] 休息 {wait_time} 秒...")
time.sleep(wait_time)
return {"runs": runs}
def handle_config_init(args: argparse.Namespace) -> dict[str, Any]:
path = init_config_from_example(PROJECT_ROOT)
return {"ok": True, "config_file": str(path)}
def _is_interactive() -> bool:
try:
return os.isatty(sys.stdin.fileno())
except Exception:
return False
def _prompt_text(prompt: str, default: str = "") -> str:
suffix = f" [{default}]" if default else ""
try:
value = input(f"{prompt}{suffix}: ").strip()
except EOFError:
value = ""
except KeyboardInterrupt:
print("\n已取消配置。")
raise SystemExit(130)
return value or default
def _prompt_bool(prompt: str, default: bool = False) -> bool:
suffix = "(Y/n)" if default else "(y/N)"
try:
value = input(f"{prompt} {suffix}: ").strip().lower()
except EOFError:
value = ""
except KeyboardInterrupt:
print("\n已取消配置。")
raise SystemExit(130)
if not value:
return default
return value in {"y", "yes", "1", "true"}
def _prompt_int(prompt: str, default: int) -> int:
while True:
value = _prompt_text(prompt, str(default)).strip()
try:
return int(value)
except ValueError:
print("请输入整数。")
def _prompt_choice(prompt: str, options: list[str], default: str) -> str:
option_text = "/".join(options)
default_value = default if default in options else options[0]
while True:
value = _prompt_text(f"{prompt} ({option_text})", default_value).strip().lower()
if value in options:
return value
print(f"请输入以下选项之一: {option_text}")
def handle_config_setup(args: argparse.Namespace) -> dict[str, Any]:
if not _is_interactive():
raise ValueError("`config setup` 需要交互式终端,请直接编辑 data/sync_config.json 或先使用 config init")
init_config_from_example(PROJECT_ROOT)
cfg = load_sync_config()
print("开始交互式配置,直接回车表示使用当前值。\n")
cfg["proxy"] = _prompt_text("1) 注册代理地址", str(cfg.get("proxy") or "http://127.0.0.1:17891"))
cfg["auto_register"] = _prompt_bool("2) 池不足时自动注册", bool(cfg.get("auto_register", False)))
provider_options = ["mailtm", "duckmail", "moemail", "cloudflare_temp_email"]
current_provider = str((cfg.get("mail_providers") or ["mailtm"])[0]).strip().lower() or "mailtm"
provider_name = _prompt_choice("3) 邮箱提供商", provider_options, current_provider)
cfg["mail_providers"] = [provider_name]
cfg["mail_strategy"] = "round_robin"
provider_cfgs = dict(cfg.get("mail_provider_configs") or {})
provider_cfg = dict(provider_cfgs.get(provider_name) or {})
if provider_name == "mailtm":
provider_cfg["api_base"] = _prompt_text(" Mail.tm API 地址", str(provider_cfg.get("api_base") or "https://api.mail.tm"))
elif provider_name == "duckmail":
provider_cfg["api_base"] = _prompt_text(" DuckMail API 地址", str(provider_cfg.get("api_base") or "https://api.duckmail.sbs"))
provider_cfg["bearer_token"] = _prompt_text(" DuckMail Bearer Token", str(provider_cfg.get("bearer_token") or ""))
provider_cfg["domain"] = _prompt_text(" DuckMail 域名(可留空)", str(provider_cfg.get("domain") or ""))
elif provider_name == "moemail":
provider_cfg["api_base"] = _prompt_text(" MoeMail API 地址", str(provider_cfg.get("api_base") or ""))
provider_cfg["api_key"] = _prompt_text(" MoeMail API Key", str(provider_cfg.get("api_key") or ""))
elif provider_name == "cloudflare_temp_email":
provider_cfg["api_base"] = _prompt_text(" Cloudflare Worker API 地址", str(provider_cfg.get("api_base") or ""))
provider_cfg["admin_password"] = _prompt_text(" Worker 管理密码", str(provider_cfg.get("admin_password") or ""))
provider_cfg["domain"] = _prompt_text(" 邮箱域名后缀", str(provider_cfg.get("domain") or ""))
provider_cfgs[provider_name] = provider_cfg
cfg["mail_provider_configs"] = provider_cfgs
enable_sub2api = _prompt_bool("4) 是否配置 Sub2Api", bool(str(cfg.get("base_url") or "").strip()))
if enable_sub2api:
cfg["base_url"] = _prompt_text(" Sub2Api 地址", str(cfg.get("base_url") or "https://your-sub2api.example.com"))
auth_mode = _prompt_choice(
" Sub2Api 认证方式",
["token", "password"],
"token" if str(cfg.get("bearer_token") or "").strip() else "password",
)
if auth_mode == "token":
cfg["bearer_token"] = _prompt_text(" Bearer Token", str(cfg.get("bearer_token") or ""))
cfg["email"] = _prompt_text(" 管理员邮箱(可留空)", str(cfg.get("email") or ""))
cfg["password"] = ""
else:
cfg["email"] = _prompt_text(" 管理员邮箱", str(cfg.get("email") or "admin@example.com"))
cfg["password"] = _prompt_text(" 管理员密码", str(cfg.get("password") or ""))
cfg["bearer_token"] = ""
cfg["auto_sync"] = _prompt_bool(" 注册成功后自动同步到 Sub2Api", bool(cfg.get("auto_sync", False)))
cfg["sub2api_min_candidates"] = _prompt_int(" Sub2Api 候选池阈值", int(cfg.get("sub2api_min_candidates", 200)))
cfg["sub2api_auto_maintain"] = _prompt_bool(" 启用 Sub2Api 自动维护", bool(cfg.get("sub2api_auto_maintain", False)))
else:
cfg["base_url"] = ""
cfg["bearer_token"] = ""
cfg["email"] = ""
cfg["password"] = ""
cfg["auto_sync"] = False
enable_cpa = _prompt_bool("5) 是否配置 CPA 平台", bool(str(cfg.get("cpa_base_url") or "").strip()))
if enable_cpa:
cfg["cpa_base_url"] = _prompt_text(" CPA 地址", str(cfg.get("cpa_base_url") or "https://your-cpa.example.com"))
cfg["cpa_token"] = _prompt_text(" CPA Token", str(cfg.get("cpa_token") or ""))
cfg["min_candidates"] = _prompt_int(" CPA 候选池阈值", int(cfg.get("min_candidates", 1000)))
cfg["used_percent_threshold"] = _prompt_int(" CPA 已使用比例阈值", int(cfg.get("used_percent_threshold", 95)))
cfg["auto_maintain"] = _prompt_bool(" 启用 CPA 自动维护", bool(cfg.get("auto_maintain", True)))
cfg["maintain_interval_minutes"] = _prompt_int(" CPA 自动维护间隔(分钟)", int(cfg.get("maintain_interval_minutes", 30)))
else:
cfg["cpa_base_url"] = ""
cfg["cpa_token"] = ""
cfg["auto_maintain"] = False
enable_proxy_pool = _prompt_bool("6) 是否启用代理池", bool(cfg.get("proxy_pool_enabled", False)))
cfg["proxy_pool_enabled"] = enable_proxy_pool
if enable_proxy_pool:
cfg["proxy_pool_api_url"] = _prompt_text(" 代理池 API 地址", str(cfg.get("proxy_pool_api_url") or "https://zenproxy.top/api/fetch"))
cfg["proxy_pool_auth_mode"] = _prompt_choice(
" 代理池鉴权方式",
["header", "query"],
str(cfg.get("proxy_pool_auth_mode") or "header"),
)
cfg["proxy_pool_api_key"] = _prompt_text(" 代理池 API Key", str(cfg.get("proxy_pool_api_key") or ""))
cfg["proxy_pool_count"] = _prompt_int(" 每次取代理数量", int(cfg.get("proxy_pool_count", 1)))
cfg["proxy_pool_country"] = _prompt_text(" 代理国家代码", str(cfg.get("proxy_pool_country") or "US")).upper()
cfg["account_name"] = _prompt_text("7) 默认账号名称", str(cfg.get("account_name") or "AutoReg"))
cfg["upload_mode"] = _prompt_choice("8) 上传模式", ["snapshot", "decoupled"], str(cfg.get("upload_mode") or "snapshot"))
saved = save_sync_config(cfg)
print("\n配置已保存。建议下一步执行:")
print("- python3 run.py --json config show")
print("- python3 run.py register --once")
return {"ok": True, "config_file": str((PROJECT_ROOT / 'data' / 'sync_config.json')), "config": saved}
def handle_config_show(args: argparse.Namespace) -> dict[str, Any]:
return load_sync_config()
def handle_config_proxy(args: argparse.Namespace) -> dict[str, Any]:
cfg = save_runtime_proxy(args.proxy, auto_register=args.auto_register)
return {"ok": True, "proxy": cfg.get("proxy", ""), "auto_register": cfg.get("auto_register", False)}
def handle_check_proxy(args: argparse.Namespace) -> dict[str, Any]:
return check_proxy(args.proxy)
def handle_config_sub2api(args: argparse.Namespace) -> dict[str, Any]:
cfg = save_sub2api_credentials(
base_url=args.base_url,
bearer_token=args.bearer_token,
email=args.email,
password=args.password,
account_name=args.account_name,
auto_sync=args.auto_sync,
)
return {"ok": True, "base_url": cfg.get("base_url", ""), "auto_sync": cfg.get("auto_sync", False)}
def handle_sub2api_login(args: argparse.Namespace) -> dict[str, Any]:
return login_sub2api_once(args.base_url, args.email, args.password)
def handle_tokens(args: argparse.Namespace) -> dict[str, Any]:
items = []
for index, path in enumerate(iter_token_files()):
if index >= max(1, args.limit):
break
try:
token_data = read_token_file(path)
items.append(
{
"file": path.name,
"email": token_data.get("email", ""),
"uploaded_platforms": token_data.get("uploaded_platforms", []),
}
)
except Exception as exc:
items.append({"file": path.name, "error": str(exc)})
return {"total_shown": len(items), "items": items}
def handle_cpa_status(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.get_pool_status()
def handle_cpa_check(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.test_connection()
def handle_cpa_maintain(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_pool_maintainer()
return maintainer.probe_and_clean_sync()
def handle_cpa_upload_all(args: argparse.Namespace) -> dict[str, Any]:
return upload_all_tokens_to_cpa(skip_uploaded=not args.include_uploaded)
def handle_sub2api_status(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
status = maintainer.get_pool_status()
status["dashboard"] = maintainer.get_dashboard_stats()
return status
def handle_sub2api_check(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.test_connection()
def handle_sub2api_sync_all(args: argparse.Namespace) -> dict[str, Any]:
return sync_all_tokens_to_sub2api(skip_uploaded=not args.include_uploaded)
def handle_sub2api_sync_one(args: argparse.Namespace) -> dict[str, Any]:
raw_path = Path(args.file)
file_path = raw_path if raw_path.is_absolute() else (Path(TOKENS_DIR) / args.file)
if not file_path.exists():
raise ValueError(f"token 文件不存在: {args.file}")
return sync_token_to_sub2api(file_path)
def handle_sub2api_dedupe(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.dedupe_duplicate_accounts(dry_run=not args.apply)
def handle_sub2api_handle_exception(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
return maintainer.handle_exception_accounts(account_ids=args.ids, delete_unresolved=not args.no_delete)
def handle_sub2api_maintain(args: argparse.Namespace) -> dict[str, Any]:
maintainer = require_sub2api_maintainer()
cfg = load_sync_config()
actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions"))
overrides = {
"refresh_abnormal_accounts": (args.refresh_abnormal, args.no_refresh_abnormal),
"delete_abnormal_accounts": (args.delete_abnormal, args.no_delete_abnormal),
"dedupe_duplicate_accounts": (args.dedupe_duplicate, args.no_dedupe_duplicate),
}
for key, (enable, disable) in overrides.items():
if enable:
actions[key] = True
if disable:
actions[key] = False
result = maintainer.probe_and_clean_sync(actions=actions)
result["actions_text"] = sub2api_actions_description(actions)
return result
def handle_stats(args: argparse.Namespace) -> dict[str, Any]:
return load_state()
def require_pool_maintainer():
maintainer = get_pool_maintainer()
if not maintainer:
raise ValueError("CPA 未配置,请先在 data/sync_config.json 中填写 cpa_base_url 和 cpa_token")
return maintainer
def require_sub2api_maintainer():
maintainer = get_sub2api_maintainer()
if not maintainer:
raise ValueError("Sub2Api 未配置,请先在 data/sync_config.json 中填写 base_url 与认证信息")
return maintainer
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if not getattr(args, "command", None):
parser.print_help()
return 0
try:
result = args.handler(args)
except Exception as exc:
if getattr(args, "json", False):
print_json({"ok": False, "error": str(exc)})
else:
print(f"错误: {exc}", file=sys.stderr)
return 1
if not getattr(args, "json", False) and args.command in {"cpa", "sub2api"} and isinstance(result, dict):
if args.command == "cpa" and args.cpa_command == "status":
print_status_block(f"OpenAI Pool Orchestrator v{__version__} - CPA 状态", result)
return 0
if args.command == "sub2api" and args.sub2api_command == "status":
print_status_block(f"OpenAI Pool Orchestrator v{__version__} - Sub2Api 状态", result)
return 0
return _print_result(args, result)
if __name__ == "__main__":
raise SystemExit(main())