From 6a250fe6a1818ad6674effb3ebfeaa906785f52a Mon Sep 17 00:00:00 2001 From: mmc Date: Thu, 19 Mar 2026 13:12:34 +0800 Subject: [PATCH] feat: add daemon refill controls --- CONFIG_GUIDE.md | 5 +- README.md | 21 ++- config/sync_config.example.json | 3 +- docker-compose.yml | 5 +- main.py | 264 +++++++++++++++++++++++++------- support.py | 6 +- 6 files changed, 234 insertions(+), 70 deletions(-) diff --git a/CONFIG_GUIDE.md b/CONFIG_GUIDE.md index 2fb6a3b..1a97282 100644 --- a/CONFIG_GUIDE.md +++ b/CONFIG_GUIDE.md @@ -403,8 +403,9 @@ ocxxxxxxx@cursors.online ```json { - "proxy": "http://127.0.0.1:7897", + "proxy": "http://127.0.0.1:17891", "auto_register": false, + "auto_register_max_per_loop": 1, "mail_providers": ["mailtm"], "mail_provider_configs": { "mailtm": { @@ -412,8 +413,6 @@ ocxxxxxxx@cursors.online } }, "mail_strategy": "round_robin", - "multithread": false, - "thread_count": 3, "base_url": "", "bearer_token": "", "email": "", diff --git a/README.md b/README.md index dcb629c..c648fad 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,26 @@ cd /root/standalone_cli docker compose up --build ``` -当前 compose 默认执行一次性命令 `--json config show`,用于快速验证镜像和配置挂载是否正常,不会常驻运行。 +当前 compose 默认执行 `daemon` 常驻模式,会一直运行直到你手动停止。 + +在这个模式下: + +- 按配置周期检查号池状态 +- 号池不足且 `auto_register = true` 时自动补号 +- 按 `threshold - candidates` 估算补号差值,并受 `auto_register_max_per_loop` 限制 +- 号池满足阈值时不执行注册 +- 按 `maintain_interval_minutes` / `sub2api_maintain_interval_minutes` 自动维护 + +由于当前代理是宿主机 `127.0.0.1:17891`,compose 已使用 `host` 网络模式。 + +常用命令: + +```bash +cd /root/standalone_cli +docker compose up --build -d +docker compose logs -f +docker compose down +``` ## 验证命令 diff --git a/config/sync_config.example.json b/config/sync_config.example.json index 323ff14..5916d00 100755 --- a/config/sync_config.example.json +++ b/config/sync_config.example.json @@ -1,6 +1,7 @@ { "proxy": "http://127.0.0.1:17891", "auto_register": false, + "auto_register_max_per_loop": 1, "mail_providers": [ "mailtm" ], @@ -22,8 +23,6 @@ } }, "mail_strategy": "round_robin", - "multithread": false, - "thread_count": 3, "base_url": "https://your-sub2api.example.com", "bearer_token": "", "email": "admin@example.com", diff --git a/docker-compose.yml b/docker-compose.yml index d0000ff..c634e18 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,8 @@ services: build: . image: openai-pool-standalone:latest container_name: openai-pool-standalone - restart: "no" + restart: unless-stopped + network_mode: host volumes: - /root/standalone_cli/data:/app/data - command: ["--json", "config", "show"] + command: ["daemon"] diff --git a/main.py b/main.py index 35100e9..ad80b0b 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ import os import sys import time from pathlib import Path -from typing import Any, List +from typing import Any, List, Optional PROJECT_ROOT = Path(__file__).resolve().parent if str(PROJECT_ROOT) not in sys.path: @@ -77,6 +77,11 @@ def build_parser() -> argparse.ArgumentParser: register_parser.add_argument("--sleep-max", type=int, default=30, help="循环模式最长等待秒数") register_parser.set_defaults(handler=handle_register) + daemon_parser = subparsers.add_parser("daemon", help="常驻运行,按池状态自动补号和定时维护") + daemon_parser.add_argument("--interval-minutes", type=int, default=0, help="检查周期分钟数,0 表示按维护间隔自动推导") + daemon_parser.add_argument("--max-register-per-loop", type=int, default=0, help="每轮最多补号数量,0 表示使用配置中的 auto_register_max_per_loop") + daemon_parser.set_defaults(handler=handle_daemon) + config_parser = subparsers.add_parser("config", help="配置管理") config_subparsers = config_parser.add_subparsers(dest="config_command") @@ -185,75 +190,214 @@ def _print_result(args: argparse.Namespace, result: Any) -> int: return 0 +def _perform_registration_once(cfg: dict[str, Any], proxy: Optional[str]) -> dict[str, Any]: + os.makedirs(TOKENS_DIR, exist_ok=True) + mail_router = MultiMailRouter(cfg) + provider_name, provider = mail_router.next_provider() + print(f"[*] 本次使用邮箱提供商: {provider_name}") + print("[*] 邮箱接口默认直连,不使用代理") + + try: + token_json = register_run( + proxy, + mail_provider=provider, + proxy_pool_config={ + "enabled": bool(cfg.get("proxy_pool_enabled", False)), + "api_url": cfg.get("proxy_pool_api_url", ""), + "auth_mode": cfg.get("proxy_pool_auth_mode", "query"), + "api_key": cfg.get("proxy_pool_api_key", ""), + "count": cfg.get("proxy_pool_count", 1), + "country": cfg.get("proxy_pool_country", "US"), + }, + ) + mail_router.report_success(provider_name) + except Exception as exc: + try: + mail_router.report_failure(provider_name) + except Exception: + pass + print(f"[Error] 发生未捕获异常: {exc}") + return {"ok": False, "error": str(exc)} + + if not token_json: + print("[-] 本次注册失败。") + return {"ok": False, "error": "本次注册失败"} + + token_data = json.loads(token_json) + email = str(token_data.get("email") or "unknown") + file_name = f"token_{email.replace('@', '_')}_{time.time_ns()}.json" + file_path = Path(TOKENS_DIR) / file_name + _write_text_atomic(str(file_path), token_json) + print(f"[*] 成功! Token 已保存至: {file_path}") + + run_result: dict[str, Any] = {"ok": True, "file": file_name, "email": email} + cpa = get_pool_maintainer(cfg) + if cpa: + cpa_ok = cpa.upload_token(file_name, token_data, proxy="") + run_result["cpa_uploaded"] = cpa_ok + print(f"[{'+' if cpa_ok else '-'}] CPA {'上传成功' if cpa_ok else '上传失败'}: {email}") + if cfg.get("auto_sync"): + try: + sync_result = sync_token_to_sub2api(file_path, cfg) + run_result["sub2api_sync"] = sync_result + if sync_result.get("ok"): + print(f"[+] Sub2Api 同步成功: {email}") + else: + print(f"[-] Sub2Api 同步失败: {email}") + except Exception as exc: + run_result["sub2api_sync"] = {"ok": False, "error": str(exc)} + print(f"[-] Sub2Api 同步异常: {exc}") + return run_result + + +def _derive_daemon_interval_minutes(cfg: dict[str, Any], explicit_minutes: int) -> int: + if explicit_minutes and explicit_minutes > 0: + return explicit_minutes + candidates: list[int] = [] + if bool(cfg.get("auto_maintain", False)): + candidates.append(max(1, int(cfg.get("maintain_interval_minutes", 30) or 30))) + if bool(cfg.get("sub2api_auto_maintain", False)): + candidates.append(max(1, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30))) + return min(candidates) if candidates else 30 + + +def _derive_register_cap(cfg: dict[str, Any], explicit_cap: int) -> int: + if explicit_cap and explicit_cap > 0: + return explicit_cap + try: + return max(1, int(cfg.get("auto_register_max_per_loop", 1) or 1)) + except (TypeError, ValueError): + return 1 + + +def _collect_pool_health(cfg: dict[str, Any]) -> list[dict[str, Any]]: + results: list[dict[str, Any]] = [] + + cpa = get_pool_maintainer(cfg) + if cpa: + status = cpa.get_pool_status() + results.append({"name": "CPA", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))}) + + sub2api = get_sub2api_maintainer(cfg) + if sub2api: + status = sub2api.get_pool_status() + results.append({"name": "Sub2Api", "configured": True, "status": status, "healthy": bool(status.get("healthy", False))}) + + return results + + +def _calculate_register_deficit(pool_results: list[dict[str, Any]]) -> int: + deficits: list[int] = [] + for item in pool_results: + status = item.get("status") or {} + try: + threshold = max(0, int(status.get("threshold", 0) or 0)) + candidates = max(0, int(status.get("candidates", 0) or 0)) + except (TypeError, ValueError): + continue + deficits.append(max(0, threshold - candidates)) + return max(deficits) if deficits else 0 + + +def handle_daemon(args: argparse.Namespace) -> dict[str, Any]: + print("[*] 启动常驻编排模式,按 Ctrl+C 停止") + last_cpa_maintain = 0.0 + last_sub2api_maintain = 0.0 + loops = 0 + + try: + while True: + loops += 1 + cfg = load_sync_config() + proxy = str(cfg.get("proxy") or "").strip() or None + interval_minutes = _derive_daemon_interval_minutes(cfg, args.interval_minutes) + interval_seconds = max(60, interval_minutes * 60) + register_cap = _derive_register_cap(cfg, args.max_register_per_loop) + + print(f"\n[{time.strftime('%H:%M:%S')}] >>> Daemon 第 {loops} 轮检查 <<<") + print(f"[*] 当前检查周期: {interval_minutes} 分钟") + print(f"[*] 每轮最大补号数量: {register_cap}") + + pool_results = _collect_pool_health(cfg) + if not pool_results: + print("[!] 未配置可检查的号池,当前仅等待下一轮") + else: + for item in pool_results: + status = item["status"] + print( + f"[*] {item['name']} 池: candidates={status.get('candidates', 0)} " + f"threshold={status.get('threshold', 0)} healthy={item['healthy']}" + ) + + if bool(cfg.get("auto_register", False)) and pool_results: + unhealthy = [item for item in pool_results if not item.get("healthy", False)] + if unhealthy: + names = ", ".join(item["name"] for item in unhealthy) + deficit = _calculate_register_deficit(unhealthy) + planned = max(1, min(register_cap, deficit or 1)) + print(f"[!] 号池不足,开始自动补号: {names},差值约 {deficit},本轮计划补 {planned} 个") + for index in range(planned): + print(f"[*] 自动补号进度: {index + 1}/{planned}") + try: + result = _perform_registration_once(cfg, proxy) + if not result.get("ok"): + print("[!] 本次自动补号失败,停止本轮剩余补号") + break + except Exception as exc: + print(f"[Error] 自动补号失败: {exc}") + break + else: + print("[+] 当前号池满足阈值,本轮不执行注册") + elif bool(cfg.get("auto_register", False)): + print("[!] 已启用 auto_register,但未配置可检查的号池") + + now = time.time() + + cpa = get_pool_maintainer(cfg) + if cpa and bool(cfg.get("auto_maintain", False)): + maintain_seconds = max(60, int(cfg.get("maintain_interval_minutes", 30) or 30) * 60) + if now - last_cpa_maintain >= maintain_seconds: + print("[*] 开始执行 CPA 自动维护") + try: + result = cpa.probe_and_clean_sync() + print_json({"cpa_maintain": result}) + except Exception as exc: + print(f"[Error] CPA 自动维护失败: {exc}") + last_cpa_maintain = now + + sub2api = get_sub2api_maintainer(cfg) + if sub2api and bool(cfg.get("sub2api_auto_maintain", False)): + maintain_seconds = max(60, int(cfg.get("sub2api_maintain_interval_minutes", 30) or 30) * 60) + if now - last_sub2api_maintain >= maintain_seconds: + print("[*] 开始执行 Sub2Api 自动维护") + actions = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions")) + try: + result = sub2api.probe_and_clean_sync(actions=actions) + print_json({"sub2api_maintain": result}) + except Exception as exc: + print(f"[Error] Sub2Api 自动维护失败: {exc}") + last_sub2api_maintain = now + + print(f"[*] 休眠 {interval_minutes} 分钟,等待下一轮检查...") + time.sleep(interval_seconds) + except KeyboardInterrupt: + print("\n[*] 已停止常驻编排模式") + return {"ok": True, "stopped": True, "loops": loops} + + def handle_register(args: argparse.Namespace) -> dict[str, Any]: cfg = load_sync_config() os.makedirs(TOKENS_DIR, exist_ok=True) sleep_min = max(1, args.sleep_min) sleep_max = max(sleep_min, args.sleep_max) proxy = args.proxy if args.proxy is not None else str(cfg.get("proxy") or "").strip() or None - mail_router = MultiMailRouter(cfg) - count = 0 runs: List[dict[str, Any]] = [] while True: count += 1 print(f"\n[{time.strftime('%H:%M:%S')}] >>> 开始第 {count} 次注册流程 <<<") - try: - provider_name, provider = mail_router.next_provider() - print(f"[*] 本次使用邮箱提供商: {provider_name}") - print("[*] 邮箱接口默认直连,不使用代理") - token_json = register_run( - proxy, - mail_provider=provider, - proxy_pool_config={ - "enabled": bool(cfg.get("proxy_pool_enabled", False)), - "api_url": cfg.get("proxy_pool_api_url", ""), - "auth_mode": cfg.get("proxy_pool_auth_mode", "query"), - "api_key": cfg.get("proxy_pool_api_key", ""), - "count": cfg.get("proxy_pool_count", 1), - "country": cfg.get("proxy_pool_country", "US"), - }, - ) - mail_router.report_success(provider_name) - except Exception as exc: - token_json = None - try: - mail_router.report_failure(provider_name) - except Exception: - pass - runs.append({"ok": False, "error": str(exc)}) - print(f"[Error] 发生未捕获异常: {exc}") - - if token_json: - token_data = json.loads(token_json) - email = str(token_data.get("email") or "unknown") - file_name = f"token_{email.replace('@', '_')}_{time.time_ns()}.json" - file_path = Path(TOKENS_DIR) / file_name - _write_text_atomic(str(file_path), token_json) - print(f"[*] 成功! Token 已保存至: {file_path}") - - run_result: dict[str, Any] = {"ok": True, "file": file_name, "email": email} - cpa = get_pool_maintainer(cfg) - if cpa: - cpa_ok = cpa.upload_token(file_name, token_data, proxy="") - run_result["cpa_uploaded"] = cpa_ok - print(f"[{'+' if cpa_ok else '-'}] CPA {'上传成功' if cpa_ok else '上传失败'}: {email}") - if cfg.get("auto_sync"): - try: - sync_result = sync_token_to_sub2api(file_path, cfg) - run_result["sub2api_sync"] = sync_result - if sync_result.get("ok"): - print(f"[+] Sub2Api 同步成功: {email}") - else: - print(f"[-] Sub2Api 同步失败: {email}") - except Exception as exc: - run_result["sub2api_sync"] = {"ok": False, "error": str(exc)} - print(f"[-] Sub2Api 同步异常: {exc}") - runs.append(run_result) - else: - if not runs or runs[-1].get("ok") is not False: - runs.append({"ok": False, "error": "本次注册失败"}) - print("[-] 本次注册失败。") + run_result = _perform_registration_once(cfg, proxy) + runs.append(run_result) if args.once: break @@ -333,6 +477,8 @@ def handle_config_setup(args: argparse.Namespace) -> dict[str, Any]: cfg["proxy"] = _prompt_text("1) 注册代理地址", str(cfg.get("proxy") or "http://127.0.0.1:17891")) cfg["auto_register"] = _prompt_bool("2) 池不足时自动注册", bool(cfg.get("auto_register", False))) + cfg["auto_register_max_per_loop"] = _prompt_int(" 每轮最多自动补号数量", int(cfg.get("auto_register_max_per_loop", 1) or 1)) + print(" 提示: 这个值越大,补号越快,但也会更激进。一般先用 1 或 2。") provider_options = ["mailtm", "duckmail", "moemail", "cloudflare_temp_email"] current_provider = str((cfg.get("mail_providers") or ["mailtm"])[0]).strip().lower() or "mailtm" diff --git a/support.py b/support.py index 4e95187..c0a843a 100644 --- a/support.py +++ b/support.py @@ -42,6 +42,7 @@ DEFAULT_CONFIG: Dict[str, Any] = { "sub2api_maintain_actions": copy.deepcopy(SUB2API_MAINTAIN_ACTION_DEFAULTS), "proxy": "", "auto_register": False, + "auto_register_max_per_loop": 1, "proxy_pool_enabled": True, "proxy_pool_api_url": "https://zenproxy.top/api/fetch", "proxy_pool_auth_mode": "query", @@ -173,12 +174,11 @@ def normalize_config(cfg: Dict[str, Any]) -> Dict[str, Any]: cfg["auto_maintain"] = _as_bool(cfg.get("auto_maintain", False), default=False) cfg["sub2api_auto_maintain"] = _as_bool(cfg.get("sub2api_auto_maintain", False), default=False) cfg["sub2api_maintain_actions"] = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions")) - cfg["multithread"] = _as_bool(cfg.get("multithread", False), default=False) cfg["auto_register"] = _as_bool(cfg.get("auto_register", False), default=False) try: - cfg["thread_count"] = max(1, min(int(cfg.get("thread_count", 3)), 10)) + cfg["auto_register_max_per_loop"] = max(1, min(int(cfg.get("auto_register_max_per_loop", 1)), 20)) except (TypeError, ValueError): - cfg["thread_count"] = 3 + cfg["auto_register_max_per_loop"] = 1 cfg["proxy_pool_enabled"] = _as_bool(cfg.get("proxy_pool_enabled", True), default=True) proxy_pool_api_url = str(cfg.get("proxy_pool_api_url", DEFAULT_CONFIG["proxy_pool_api_url"]) or "").strip() cfg["proxy_pool_api_url"] = proxy_pool_api_url or DEFAULT_CONFIG["proxy_pool_api_url"]