Files
standalone-openai-pool-cli/support.py
2026-03-19 07:36:14 +08:00

855 lines
32 KiB
Python

from __future__ import annotations
import copy
import json
import os
import shutil
import tempfile
import time
import urllib.error
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
from openai_pool_orchestrator import CONFIG_FILE, STATE_FILE, TOKENS_DIR
from openai_pool_orchestrator.pool_maintainer import PoolMaintainer, Sub2ApiMaintainer
SUB2API_MAINTAIN_ACTION_DEFAULTS: Dict[str, bool] = {
"refresh_abnormal_accounts": True,
"delete_abnormal_accounts": True,
"dedupe_duplicate_accounts": True,
}
UPLOAD_PLATFORMS = ("cpa", "sub2api")
DEFAULT_CONFIG: Dict[str, Any] = {
"base_url": "",
"bearer_token": "",
"account_name": "AutoReg",
"auto_sync": False,
"cpa_base_url": "",
"cpa_token": "",
"min_candidates": 800,
"used_percent_threshold": 95,
"auto_maintain": False,
"maintain_interval_minutes": 30,
"upload_mode": "snapshot",
"mail_provider": "mailtm",
"mail_config": {"api_base": "https://api.mail.tm", "api_key": "", "bearer_token": ""},
"sub2api_min_candidates": 200,
"sub2api_auto_maintain": False,
"sub2api_maintain_interval_minutes": 30,
"sub2api_maintain_actions": copy.deepcopy(SUB2API_MAINTAIN_ACTION_DEFAULTS),
"proxy": "",
"auto_register": False,
"proxy_pool_enabled": True,
"proxy_pool_api_url": "https://zenproxy.top/api/fetch",
"proxy_pool_auth_mode": "query",
"proxy_pool_api_key": "19c0ec43-8f76-4c97-81bc-bcda059eeba4",
"proxy_pool_count": 1,
"proxy_pool_country": "US",
}
def _as_bool(value: Any, default: bool = False) -> bool:
if isinstance(value, bool):
return value
if value is None:
return default
if isinstance(value, (int, float)):
return bool(value)
text = str(value).strip().lower()
if text in ("1", "true", "yes", "on"):
return True
if text in ("0", "false", "no", "off", ""):
return False
return default
def _normalize_service_url(value: Any) -> str:
text = str(value or "").strip()
if not text:
return ""
if not text.startswith(("http://", "https://")):
return ""
return text.rstrip("/")
def normalize_sub2api_maintain_actions(raw: Any) -> Dict[str, bool]:
source = raw if isinstance(raw, dict) else {}
return {
key: _as_bool(source.get(key, default), default=default)
for key, default in SUB2API_MAINTAIN_ACTION_DEFAULTS.items()
}
def _write_json_atomic(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(prefix=f".{path.stem}_", suffix=path.suffix, dir=str(path.parent))
try:
with os.fdopen(fd, "w", encoding="utf-8") as handle:
json.dump(payload, handle, ensure_ascii=False, indent=2)
handle.flush()
os.fsync(handle.fileno())
os.replace(tmp_path, path)
finally:
try:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except OSError:
pass
def load_sync_config() -> Dict[str, Any]:
if CONFIG_FILE.exists():
try:
return normalize_config(json.loads(CONFIG_FILE.read_text(encoding="utf-8")))
except Exception:
pass
return normalize_config(copy.deepcopy(DEFAULT_CONFIG))
def normalize_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
cfg = copy.deepcopy(cfg or {})
legacy = str(cfg.get("mail_provider", "mailtm") or "mailtm").strip().lower()
legacy_cfg = cfg.get("mail_config") or {}
if not isinstance(legacy_cfg, dict):
legacy_cfg = {}
raw_providers = cfg.get("mail_providers")
providers = raw_providers if isinstance(raw_providers, list) else []
providers = [str(name).strip().lower() for name in providers if str(name).strip()]
if not providers:
providers = [legacy]
raw_cfgs = cfg.get("mail_provider_configs")
provider_cfgs = raw_cfgs if isinstance(raw_cfgs, dict) else {}
for name in providers:
if name not in provider_cfgs or not isinstance(provider_cfgs.get(name), dict):
provider_cfgs[name] = {}
if legacy in provider_cfgs:
for key, value in legacy_cfg.items():
provider_cfgs[legacy].setdefault(key, value)
strategy = str(cfg.get("mail_strategy", "round_robin") or "round_robin").strip().lower()
if strategy not in ("round_robin", "random", "failover"):
strategy = "round_robin"
upload_mode = str(cfg.get("upload_mode", "snapshot") or "snapshot").strip().lower()
if upload_mode not in ("snapshot", "decoupled"):
upload_mode = "snapshot"
cfg["mail_providers"] = providers
cfg["mail_provider_configs"] = provider_cfgs
cfg["mail_strategy"] = strategy
cfg["mail_provider"] = providers[0]
cfg["upload_mode"] = upload_mode
cfg["auto_sync"] = _as_bool(cfg.get("auto_sync", False), default=False)
cfg["auto_maintain"] = _as_bool(cfg.get("auto_maintain", False), default=False)
cfg["sub2api_auto_maintain"] = _as_bool(cfg.get("sub2api_auto_maintain", False), default=False)
cfg["sub2api_maintain_actions"] = normalize_sub2api_maintain_actions(cfg.get("sub2api_maintain_actions"))
cfg["multithread"] = _as_bool(cfg.get("multithread", False), default=False)
cfg["auto_register"] = _as_bool(cfg.get("auto_register", False), default=False)
try:
cfg["thread_count"] = max(1, min(int(cfg.get("thread_count", 3)), 10))
except (TypeError, ValueError):
cfg["thread_count"] = 3
cfg["proxy_pool_enabled"] = _as_bool(cfg.get("proxy_pool_enabled", True), default=True)
proxy_pool_api_url = str(cfg.get("proxy_pool_api_url", DEFAULT_CONFIG["proxy_pool_api_url"]) or "").strip()
cfg["proxy_pool_api_url"] = proxy_pool_api_url or DEFAULT_CONFIG["proxy_pool_api_url"]
proxy_pool_auth_mode = str(cfg.get("proxy_pool_auth_mode", "query") or "").strip().lower()
if proxy_pool_auth_mode not in ("header", "query"):
proxy_pool_auth_mode = "query"
cfg["proxy_pool_auth_mode"] = proxy_pool_auth_mode
cfg["proxy_pool_api_key"] = str(cfg.get("proxy_pool_api_key", DEFAULT_CONFIG["proxy_pool_api_key"]) or "").strip()
try:
cfg["proxy_pool_count"] = max(1, min(int(cfg.get("proxy_pool_count", 1)), 20))
except (TypeError, ValueError):
cfg["proxy_pool_count"] = 1
cfg["proxy_pool_country"] = str(cfg.get("proxy_pool_country", "US") or "US").strip().upper() or "US"
cpa_base_url = str(cfg.get("cpa_base_url", "") or "").strip().rstrip("/")
if cpa_base_url.lower().endswith("/v0"):
cpa_base_url = cpa_base_url[:-3].rstrip("/")
cfg["cpa_base_url"] = cpa_base_url
return {**copy.deepcopy(DEFAULT_CONFIG), **cfg}
def save_sync_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
normalized = normalize_config(cfg)
_write_json_atomic(CONFIG_FILE, normalized)
return normalized
def init_config_from_example(project_root: Path) -> Path:
example_path = project_root / "config" / "sync_config.example.json"
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
if CONFIG_FILE.exists():
return CONFIG_FILE
if example_path.exists():
shutil.copyfile(example_path, CONFIG_FILE)
else:
_write_json_atomic(CONFIG_FILE, copy.deepcopy(DEFAULT_CONFIG))
return CONFIG_FILE
def load_state() -> Dict[str, int]:
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text(encoding="utf-8"))
if isinstance(data, dict):
return {
"success": int(data.get("success", 0) or 0),
"fail": int(data.get("fail", 0) or 0),
}
except Exception:
pass
return {"success": 0, "fail": 0}
def iter_token_files() -> Iterable[Path]:
if not TOKENS_DIR.exists():
return []
return sorted(TOKENS_DIR.glob("*.json"), key=lambda path: path.name, reverse=True)
def read_token_file(path: Path) -> Dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"token file is not a JSON object: {path.name}")
return data
def extract_uploaded_platforms(token_data: Dict[str, Any]) -> List[str]:
platforms = set()
raw_platforms = token_data.get("uploaded_platforms")
if isinstance(raw_platforms, list):
for item in raw_platforms:
name = str(item).strip().lower()
if name in UPLOAD_PLATFORMS:
platforms.add(name)
if token_data.get("cpa_uploaded") or token_data.get("cpa_synced"):
platforms.add("cpa")
if token_data.get("sub2api_uploaded") or token_data.get("sub2api_synced") or token_data.get("synced"):
platforms.add("sub2api")
return [name for name in UPLOAD_PLATFORMS if name in platforms]
def is_sub2api_uploaded(token_data: Dict[str, Any]) -> bool:
return "sub2api" in extract_uploaded_platforms(token_data)
def mark_token_uploaded_platform(file_path: Path, platform: str) -> bool:
platform_name = str(platform).strip().lower()
if platform_name not in UPLOAD_PLATFORMS:
return False
try:
token_data = read_token_file(file_path)
platforms = extract_uploaded_platforms(token_data)
if platform_name not in platforms:
platforms.append(platform_name)
token_data["uploaded_platforms"] = [name for name in UPLOAD_PLATFORMS if name in set(platforms)]
token_data[f"{platform_name}_uploaded"] = True
token_data[f"{platform_name}_synced"] = True
if platform_name == "sub2api":
token_data["synced"] = True
uploaded_at = token_data.get("uploaded_at")
if not isinstance(uploaded_at, dict):
uploaded_at = {}
uploaded_at[platform_name] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
token_data["uploaded_at"] = uploaded_at
_write_json_atomic(file_path, token_data)
return True
except Exception:
return False
def get_pool_maintainer(cfg: Optional[Dict[str, Any]] = None) -> Optional[PoolMaintainer]:
config = cfg or load_sync_config()
base_url = _normalize_service_url(config.get("cpa_base_url", ""))
token = str(config.get("cpa_token", "")).strip()
if not base_url or not token:
return None
return PoolMaintainer(
cpa_base_url=base_url,
cpa_token=token,
min_candidates=int(config.get("min_candidates", 800)),
used_percent_threshold=int(config.get("used_percent_threshold", 95)),
)
def get_sub2api_maintainer(cfg: Optional[Dict[str, Any]] = None) -> Optional[Sub2ApiMaintainer]:
config = cfg or load_sync_config()
base_url = _normalize_service_url(config.get("base_url", ""))
bearer = str(config.get("bearer_token", "")).strip()
email = str(config.get("email", "")).strip()
password = str(config.get("password", "")).strip()
if not base_url:
return None
if not bearer and not (email and password):
return None
return Sub2ApiMaintainer(
base_url=base_url,
bearer_token=bearer,
min_candidates=int(config.get("sub2api_min_candidates", 200)),
email=email,
password=password,
)
def verify_sub2api_login(base_url: str, email: str, password: str) -> Dict[str, Any]:
from curl_cffi import requests as cffi_req
url = base_url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
login_url = url.rstrip("/") + "/api/v1/auth/login"
try:
resp = cffi_req.post(
login_url,
json={"email": email, "password": password},
impersonate="chrome",
timeout=15,
)
raw_body = resp.text
if resp.status_code != 200:
try:
err_body = json.loads(raw_body)
err_msg = err_body.get("message") or err_body.get("error") or raw_body[:200]
except json.JSONDecodeError:
err_msg = raw_body[:200]
return {"ok": False, "error": f"登录失败(HTTP {resp.status_code}): {err_msg}"}
body = json.loads(raw_body)
token = (
body.get("token")
or body.get("access_token")
or (body.get("data") or {}).get("token")
or (body.get("data") or {}).get("access_token")
or ""
)
return {"ok": True, "token": token}
except Exception as exc:
return {"ok": False, "error": f"请求异常: {exc}"}
def verify_sub2api_token(base_url: str, bearer_token: str) -> Dict[str, Any]:
from curl_cffi import requests as cffi_req
url = base_url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
verify_url = url.rstrip("/") + "/api/v1/admin/dashboard/stats"
try:
resp = cffi_req.get(
verify_url,
headers={"Authorization": f"Bearer {bearer_token}", "Accept": "application/json"},
params={"timezone": "Asia/Shanghai"},
impersonate="chrome",
timeout=15,
)
if resp.status_code != 200:
return {"ok": False, "error": f"Bearer Token 验证失败: HTTP {resp.status_code}"}
return {"ok": True}
except Exception as exc:
return {"ok": False, "error": f"Bearer Token 验证异常: {exc}"}
def save_sub2api_credentials(
*,
base_url: str,
bearer_token: str = "",
email: str = "",
password: str = "",
account_name: Optional[str] = None,
auto_sync: Optional[bool] = None,
) -> Dict[str, Any]:
cfg = load_sync_config()
normalized_base_url = base_url.strip()
if normalized_base_url and not normalized_base_url.startswith(("http://", "https://")):
normalized_base_url = "https://" + normalized_base_url
if not normalized_base_url:
raise ValueError("请填写平台地址")
saved_email = email.strip() or str(cfg.get("email", "") or "").strip()
saved_password = password.strip() if password else str(cfg.get("password", "") or "").strip()
saved_bearer = bearer_token.strip() or str(cfg.get("bearer_token", "") or "").strip()
verified_token = saved_bearer
if saved_email and saved_password:
verify = verify_sub2api_login(normalized_base_url, saved_email, saved_password)
if not verify.get("ok"):
raise ValueError(str(verify.get("error") or "登录校验失败"))
verified_token = str(verify.get("token") or "").strip() or saved_bearer
elif saved_bearer:
verify = verify_sub2api_token(normalized_base_url, saved_bearer)
if not verify.get("ok"):
raise ValueError(str(verify.get("error") or "Token 校验失败"))
else:
raise ValueError("请填写 Bearer Token 或邮箱和密码")
cfg["base_url"] = normalized_base_url
cfg["bearer_token"] = verified_token
cfg["email"] = saved_email
cfg["password"] = saved_password
if account_name is not None:
cfg["account_name"] = account_name.strip()
if auto_sync is not None:
cfg["auto_sync"] = bool(auto_sync)
return save_sync_config(cfg)
def build_account_payload(email: str, token_data: Dict[str, Any]) -> Dict[str, Any]:
access_token = token_data.get("access_token", "")
refresh_token = token_data.get("refresh_token", "")
id_token = token_data.get("id_token", "")
at_payload = decode_jwt_payload(access_token) if access_token else {}
at_auth = at_payload.get("https://api.openai.com/auth") or {}
chatgpt_account_id = at_auth.get("chatgpt_account_id", "") or token_data.get("account_id", "")
chatgpt_user_id = at_auth.get("chatgpt_user_id", "")
exp_timestamp = at_payload.get("exp", 0)
expires_at = exp_timestamp if isinstance(exp_timestamp, int) and exp_timestamp > 0 else int(time.time()) + 863999
it_payload = decode_jwt_payload(id_token) if id_token else {}
it_auth = it_payload.get("https://api.openai.com/auth") or {}
organization_id = it_auth.get("organization_id", "")
if not organization_id:
orgs = it_auth.get("organizations") or []
if orgs:
organization_id = (orgs[0] or {}).get("id", "")
return {
"name": email,
"notes": "",
"platform": "openai",
"type": "oauth",
"credentials": {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": 863999,
"expires_at": expires_at,
"chatgpt_account_id": chatgpt_account_id,
"chatgpt_user_id": chatgpt_user_id,
"organization_id": organization_id,
},
"extra": {"email": email},
"proxy_id": None,
"concurrency": 10,
"priority": 1,
"rate_multiplier": 1,
"group_ids": [2, 4],
"expires_at": None,
"auto_pause_on_expired": True,
}
def decode_jwt_payload(token: str) -> Dict[str, Any]:
try:
import base64
parts = token.split(".")
if len(parts) != 3:
return {}
payload = parts[1]
pad = 4 - len(payload) % 4
if pad != 4:
payload += "=" * pad
decoded = base64.urlsafe_b64decode(payload.encode("ascii"))
return json.loads(decoded.decode("utf-8"))
except Exception:
return {}
def push_account_api(base_url: str, bearer: str, email: str, token_data: Dict[str, Any]) -> Dict[str, Any]:
from curl_cffi import requests as cffi_req
url = base_url.rstrip("/") + "/api/v1/admin/accounts"
payload = build_account_payload(email, token_data)
try:
resp = cffi_req.post(
url,
json=payload,
headers={
"Authorization": f"Bearer {bearer}",
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Referer": base_url.rstrip("/") + "/admin/accounts",
},
impersonate="chrome",
timeout=20,
)
return {"ok": resp.status_code in (200, 201), "status": resp.status_code, "body": resp.text[:300]}
except Exception as exc:
return {"ok": False, "status": 0, "body": str(exc)}
def update_sub2api_account_api(
base_url: str,
bearer: str,
account_id: int,
email: str,
token_data: Dict[str, Any],
) -> Dict[str, Any]:
from curl_cffi import requests as cffi_req
url = base_url.rstrip("/") + f"/api/v1/admin/accounts/{int(account_id)}"
create_payload = build_account_payload(email, token_data)
payload = {
"name": str(email or "").strip(),
"credentials": create_payload.get("credentials") if isinstance(create_payload.get("credentials"), dict) else {},
"extra": create_payload.get("extra") if isinstance(create_payload.get("extra"), dict) else {},
"concurrency": create_payload.get("concurrency", 10),
"priority": create_payload.get("priority", 1),
"status": "active",
"auto_pause_on_expired": True,
}
try:
resp = cffi_req.put(
url,
json=payload,
headers={
"Authorization": f"Bearer {bearer}",
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"Referer": base_url.rstrip("/") + "/admin/accounts",
},
impersonate="chrome",
timeout=20,
)
return {"ok": resp.status_code in (200, 201), "status": resp.status_code, "body": resp.text[:300]}
except Exception as exc:
return {"ok": False, "status": 0, "body": str(exc)}
def _extract_sub2api_page_payload(body: Any) -> Dict[str, Any]:
if isinstance(body, dict):
data = body.get("data")
if isinstance(data, dict):
return data
return body
return {}
def _sub2api_item_matches_identity(item: Dict[str, Any], email: str, refresh_token: str) -> bool:
email_norm = str(email or "").strip().lower()
refresh_token_norm = str(refresh_token or "").strip()
name = str(item.get("name") or "").strip().lower()
extra = item.get("extra") if isinstance(item.get("extra"), dict) else {}
credentials = item.get("credentials") if isinstance(item.get("credentials"), dict) else {}
item_email = str(extra.get("email") or "").strip().lower()
item_refresh_token = str(credentials.get("refresh_token") or "").strip()
if refresh_token_norm and item_refresh_token and item_refresh_token == refresh_token_norm:
return True
if email_norm and (name == email_norm or item_email == email_norm):
return True
return False
def find_existing_sub2api_account(
base_url: str,
bearer: str,
email: str,
refresh_token: str,
max_pages: int = 8,
) -> Optional[Dict[str, Any]]:
from curl_cffi import requests as cffi_req
url = base_url.rstrip("/") + "/api/v1/admin/accounts"
email_norm = str(email or "").strip().lower()
refresh_token_norm = str(refresh_token or "").strip()
if not email_norm and not refresh_token_norm:
return None
headers = {"Authorization": f"Bearer {bearer}", "Accept": "application/json, text/plain, */*"}
page_size = 100
page = 1
scanned_without_search = 0
while page <= max_pages:
params: Dict[str, Any] = {"page": page, "page_size": page_size, "platform": "openai", "type": "oauth"}
if email_norm:
params["search"] = email_norm
try:
resp = cffi_req.get(url, params=params, headers=headers, impersonate="chrome", timeout=15)
if resp.status_code != 200:
return None
body = resp.json()
except Exception:
return None
data = _extract_sub2api_page_payload(body)
items = data.get("items") if isinstance(data.get("items"), list) else []
for item in items:
if isinstance(item, dict) and _sub2api_item_matches_identity(item, email_norm, refresh_token_norm):
return item
total_raw = data.get("total")
try:
total = int(total_raw) if total_raw is not None else 0
except (TypeError, ValueError):
total = 0
if len(items) < page_size or (total > 0 and page * page_size >= total):
break
page += 1
if refresh_token_norm:
page = 1
while page <= 3:
params = {"page": page, "page_size": page_size, "platform": "openai", "type": "oauth"}
try:
resp = cffi_req.get(url, params=params, headers=headers, impersonate="chrome", timeout=15)
if resp.status_code != 200:
return None
body = resp.json()
except Exception:
return None
data = _extract_sub2api_page_payload(body)
items = data.get("items") if isinstance(data.get("items"), list) else []
for item in items:
if isinstance(item, dict) and _sub2api_item_matches_identity(item, "", refresh_token_norm):
return item
scanned_without_search += len(items)
if len(items) < page_size or scanned_without_search >= 300:
break
page += 1
return None
def push_account_api_with_dedupe(
base_url: str,
bearer: str,
email: str,
token_data: Dict[str, Any],
check_before: bool = True,
check_after: bool = True,
) -> Dict[str, Any]:
refresh_token = str(token_data.get("refresh_token") or "").strip()
existing: Optional[Dict[str, Any]] = None
if check_before:
existing = find_existing_sub2api_account(base_url, bearer, email, refresh_token)
if existing is not None:
existing_id = existing.get("id")
try:
existing_int = int(existing_id)
except (TypeError, ValueError):
existing_int = None
if existing_int is not None and existing_int > 0:
update_result = update_sub2api_account_api(base_url, bearer, existing_int, email, token_data)
if update_result.get("ok"):
return {
"ok": True,
"status": int(update_result.get("status") or 200),
"body": "existing account updated",
"skipped": False,
"reason": "updated_existing_before_create",
"existing_id": existing_int,
}
return {
"ok": False,
"status": int(update_result.get("status") or 0),
"body": "existing account update failed",
"skipped": False,
"reason": "exists_before_create_update_failed",
"existing_id": existing_int,
}
return {
"ok": True,
"status": 200,
"body": "account already exists",
"skipped": True,
"reason": "exists_before_create",
"existing_id": existing_id,
}
result = push_account_api(base_url, bearer, email, token_data)
if result.get("ok"):
result["skipped"] = False
return result
if check_after:
existing = find_existing_sub2api_account(base_url, bearer, email, refresh_token)
if existing is not None:
return {
"ok": True,
"status": int(result.get("status") or 200),
"body": "request failed but account exists",
"skipped": True,
"reason": "exists_after_create",
"existing_id": existing.get("id"),
}
result.setdefault("skipped", False)
return result
def sync_token_to_sub2api(file_path: Path, cfg: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
config = cfg or load_sync_config()
base_url = str(config.get("base_url", "") or "").strip()
bearer = str(config.get("bearer_token", "") or "").strip()
if not base_url or not bearer:
raise ValueError("请先配置 Sub2Api 平台地址和 Bearer Token")
token_data = read_token_file(file_path)
email = str(token_data.get("email") or file_path.name)
result = push_account_api_with_dedupe(base_url, bearer, email, token_data, check_before=True, check_after=True)
if result.get("ok"):
mark_token_uploaded_platform(file_path, "sub2api")
return {"file": file_path.name, "email": email, **result}
def sync_all_tokens_to_sub2api(cfg: Optional[Dict[str, Any]] = None, skip_uploaded: bool = True) -> Dict[str, Any]:
config = cfg or load_sync_config()
results = []
for path in iter_token_files():
try:
token_data = read_token_file(path)
if skip_uploaded and is_sub2api_uploaded(token_data):
results.append({"file": path.name, "email": token_data.get("email", path.name), "ok": True, "skipped": True})
continue
results.append(sync_token_to_sub2api(path, config))
except Exception as exc:
results.append({"file": path.name, "ok": False, "error": str(exc)})
return summarize_results(results)
def upload_all_tokens_to_cpa(cfg: Optional[Dict[str, Any]] = None, skip_uploaded: bool = True) -> Dict[str, Any]:
config = cfg or load_sync_config()
maintainer = get_pool_maintainer(config)
if not maintainer:
raise ValueError("请先配置 CPA 地址和 Token")
proxy = str(config.get("proxy") or "").strip()
results = []
for path in iter_token_files():
try:
token_data = read_token_file(path)
if skip_uploaded and "cpa" in extract_uploaded_platforms(token_data):
results.append({"file": path.name, "email": token_data.get("email", path.name), "ok": True, "skipped": True})
continue
ok = maintainer.upload_token(path.name, token_data, proxy=proxy)
if ok:
mark_token_uploaded_platform(path, "cpa")
results.append({"file": path.name, "email": token_data.get("email", path.name), "ok": ok, "skipped": False})
except Exception as exc:
results.append({"file": path.name, "ok": False, "error": str(exc)})
return summarize_results(results)
def summarize_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
ok_count = sum(1 for item in results if item.get("ok") and not item.get("skipped"))
skip_count = sum(1 for item in results if item.get("skipped"))
fail_count = sum(1 for item in results if not item.get("ok"))
return {"total": len(results), "ok": ok_count, "skipped": skip_count, "fail": fail_count, "results": results}
def print_json(data: Any) -> None:
print(json.dumps(data, ensure_ascii=False, indent=2))
def print_status_block(title: str, data: Dict[str, Any]) -> None:
print(title)
for key, value in data.items():
print(f"- {key}: {value}")
def sub2api_actions_description(actions: Dict[str, bool]) -> str:
labels: List[str] = []
if actions.get("refresh_abnormal_accounts"):
labels.append("异常测活")
if actions.get("delete_abnormal_accounts"):
labels.append("异常清理")
if actions.get("dedupe_duplicate_accounts"):
labels.append("重复清理")
return "".join(labels) if labels else "无动作"
def save_runtime_proxy(proxy: str, auto_register: Optional[bool] = None) -> Dict[str, Any]:
cfg = load_sync_config()
cfg["proxy"] = proxy.strip()
if auto_register is not None:
cfg["auto_register"] = bool(auto_register)
return save_sync_config(cfg)
def check_proxy(proxy: str) -> Dict[str, Any]:
from curl_cffi import requests as cffi_req
import re
proxy_text = proxy.strip()
proxies = {"http": proxy_text, "https": proxy_text} if proxy_text else None
try:
try:
resp = cffi_req.get(
"https://cloudflare.com/cdn-cgi/trace",
proxies=proxies,
http_version="v2",
impersonate="chrome",
timeout=8,
)
except Exception as exc:
if "HTTP/3 is not supported over an HTTP proxy" not in str(exc):
raise
resp = cffi_req.get(
"https://cloudflare.com/cdn-cgi/trace",
proxies=proxies,
http_version="v1",
impersonate="chrome",
timeout=8,
)
text = resp.text
loc_match = re.search(r"^loc=(.+)$", text, re.MULTILINE)
loc = loc_match.group(1) if loc_match else "?"
supported = loc not in ("CN", "HK")
return {"ok": supported, "loc": loc, "error": None if supported else "所在地不支持"}
except Exception as exc:
return {"ok": False, "loc": None, "error": str(exc)}
def login_sub2api_once(base_url: str, email: str, password: str) -> Dict[str, Any]:
url = base_url.strip()
if not url:
raise ValueError("请填写平台地址")
if not url.startswith(("http://", "https://")):
url = "https://" + url
login_url = url.rstrip("/") + "/api/v1/auth/login"
payload = json.dumps({"email": email, "password": password}).encode("utf-8")
request = urllib.request.Request(
login_url,
data=payload,
method="POST",
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
try:
with urllib.request.urlopen(request, timeout=15) as resp:
raw_body = resp.read().decode("utf-8")
body = json.loads(raw_body)
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8", "replace")
try:
err_body = json.loads(raw)
err_msg = err_body.get("message") or err_body.get("error") or raw[:200]
except json.JSONDecodeError:
err_msg = raw[:200]
raise ValueError(f"登录失败: {err_msg}") from exc
except Exception as exc:
raise ValueError(f"请求异常: {exc}") from exc
token = (
body.get("token")
or body.get("access_token")
or (body.get("data") or {}).get("token")
or (body.get("data") or {}).get("access_token")
or ""
)
if not token:
raise ValueError(f"响应中未找到 token 字段: {str(body)[:300]}")
cfg = load_sync_config()
cfg["base_url"] = url
cfg["bearer_token"] = token
cfg["email"] = email
cfg["password"] = password
save_sync_config(cfg)
return {"ok": True, "token_preview": token[:16] + "..."}