Files
standalone-openai-pool-cli/openai_pool_orchestrator/pool_maintainer.py
2026-03-19 07:36:14 +08:00

1062 lines
43 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
账号池维护模块
支持 CPA 平台和 Sub2Api 平台的探测、清理、计数和补号
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import requests as _requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
try:
import aiohttp
except ImportError:
aiohttp = None
logger = logging.getLogger(__name__)
DEFAULT_MGMT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"
def _mgmt_headers(token: str) -> Dict[str, str]:
return {"Authorization": f"Bearer {token}", "Accept": "application/json"}
def _build_session(proxy: str = "") -> _requests.Session:
s = _requests.Session()
retry = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
s.mount("https://", adapter)
s.mount("http://", adapter)
if proxy:
s.proxies = {"http": proxy, "https": proxy}
return s
def _get_item_type(item: Dict[str, Any]) -> str:
return str(item.get("type") or item.get("typo") or "")
def _safe_json(text: str) -> Dict[str, Any]:
try:
return json.loads(text)
except Exception:
return {}
def _extract_account_id(item: Dict[str, Any]) -> Optional[str]:
for key in ("chatgpt_account_id", "chatgptAccountId", "account_id", "accountId"):
val = item.get(key)
if val:
return str(val)
return None
def _parse_time_to_epoch(raw: Any) -> float:
text = str(raw or "").strip()
if not text:
return 0.0
iso_text = text[:-1] + "+00:00" if text.endswith("Z") else text
try:
return datetime.fromisoformat(iso_text).timestamp()
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(text, fmt).timestamp()
except Exception:
continue
return 0.0
class PoolMaintainer:
def __init__(
self,
cpa_base_url: str,
cpa_token: str,
target_type: str = "codex",
min_candidates: int = 800,
used_percent_threshold: int = 95,
user_agent: str = DEFAULT_MGMT_UA,
):
self.base_url = cpa_base_url.rstrip("/")
self.token = cpa_token
self.target_type = target_type
self.min_candidates = min_candidates
self.used_percent_threshold = used_percent_threshold
self.user_agent = user_agent
def fetch_auth_files(self, timeout: int = 15) -> List[Dict[str, Any]]:
resp = _requests.get(
f"{self.base_url}/v0/management/auth-files",
headers=_mgmt_headers(self.token),
timeout=timeout,
)
resp.raise_for_status()
raw = resp.json()
data = raw if isinstance(raw, dict) else {}
files = data.get("files", [])
return files if isinstance(files, list) else []
def get_pool_status(self, timeout: int = 15) -> Dict[str, Any]:
try:
files = self.fetch_auth_files(timeout)
candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
total = len(files)
cand_count = len(candidates)
return {
"total": total,
"candidates": cand_count,
"error_count": max(0, total - cand_count),
"threshold": self.min_candidates,
"healthy": cand_count >= self.min_candidates,
"percent": round(cand_count / self.min_candidates * 100, 1) if self.min_candidates > 0 else 100,
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": None,
}
except Exception as e:
return {
"total": 0,
"candidates": 0,
"error_count": 0,
"threshold": self.min_candidates,
"healthy": False,
"percent": 0,
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": str(e),
}
def test_connection(self, timeout: int = 10) -> Dict[str, Any]:
try:
files = self.fetch_auth_files(timeout)
candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
return {
"ok": True,
"total": len(files),
"candidates": len(candidates),
"message": f"连接成功,共 {len(files)} 个账号,{len(candidates)}{self.target_type} 账号",
}
except Exception as e:
return {"ok": False, "total": 0, "candidates": 0, "message": f"连接失败: {e}"}
async def probe_accounts_async(
self, workers: int = 20, timeout: int = 10, retries: int = 1,
) -> Dict[str, Any]:
if aiohttp is None:
raise RuntimeError("需要安装 aiohttp: pip install aiohttp")
files = self.fetch_auth_files(timeout)
candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
if not candidates:
return {"total": len(files), "candidates": 0, "invalid": [], "files": files}
semaphore = asyncio.Semaphore(max(1, workers))
connector = aiohttp.TCPConnector(limit=max(1, workers))
client_timeout = aiohttp.ClientTimeout(total=max(1, timeout))
async def probe_one(session: aiohttp.ClientSession, item: Dict[str, Any]) -> Dict[str, Any]:
auth_index = item.get("auth_index")
name = item.get("name") or item.get("id")
result = {
"name": name,
"auth_index": auth_index,
"invalid_401": False,
"invalid_used_percent": False,
"used_percent": None,
"error": None,
}
if not auth_index:
result["error"] = "missing auth_index"
return result
account_id = _extract_account_id(item)
call_header = {
"Authorization": "Bearer $TOKEN$",
"Content-Type": "application/json",
"User-Agent": self.user_agent,
}
if account_id:
call_header["Chatgpt-Account-Id"] = account_id
payload = {
"authIndex": auth_index,
"method": "GET",
"url": "https://chatgpt.com/backend-api/wham/usage",
"header": call_header,
}
for attempt in range(retries + 1):
try:
async with semaphore:
async with session.post(
f"{self.base_url}/v0/management/api-call",
headers={**_mgmt_headers(self.token), "Content-Type": "application/json"},
json=payload,
timeout=timeout,
) as resp:
text = await resp.text()
if resp.status >= 400:
raise RuntimeError(f"HTTP {resp.status}: {text[:200]}")
data = _safe_json(text)
sc = data.get("status_code")
result["invalid_401"] = sc == 401
if sc == 200:
try:
body_data = _safe_json(data.get("body", ""))
used_pct = (body_data.get("rate_limit", {}).get("primary_window", {}).get("used_percent"))
if used_pct is not None:
result["used_percent"] = used_pct
result["invalid_used_percent"] = used_pct >= self.used_percent_threshold
except Exception:
pass
return result
except Exception as e:
result["error"] = str(e)
if attempt >= retries:
return result
return result
async def delete_one(session: aiohttp.ClientSession, name: str) -> Dict[str, Any]:
encoded = quote(name, safe="")
try:
async with semaphore:
async with session.delete(
f"{self.base_url}/v0/management/auth-files?name={encoded}",
headers=_mgmt_headers(self.token),
timeout=timeout,
) as resp:
text = await resp.text()
data = _safe_json(text)
ok = resp.status == 200 and data.get("status") == "ok"
return {"name": name, "deleted": ok}
except Exception:
return {"name": name, "deleted": False}
invalid_list = []
async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session:
tasks = [asyncio.create_task(probe_one(session, item)) for item in candidates]
for task in asyncio.as_completed(tasks):
result = await task
if result.get("invalid_401") or result.get("invalid_used_percent"):
invalid_list.append(result)
return {
"total": len(files),
"candidates": len(candidates),
"invalid": invalid_list,
"files": files,
}
async def clean_invalid_async(self, workers: int = 20, timeout: int = 10, retries: int = 1) -> Dict[str, Any]:
if aiohttp is None:
raise RuntimeError("需要安装 aiohttp: pip install aiohttp")
probe_result = await self.probe_accounts_async(workers, timeout, retries)
invalid = probe_result["invalid"]
names = [str(r["name"]) for r in invalid if r.get("name")]
deleted_ok = 0
deleted_fail = 0
if names:
semaphore = asyncio.Semaphore(max(1, workers))
connector = aiohttp.TCPConnector(limit=max(1, workers))
client_timeout = aiohttp.ClientTimeout(total=max(1, timeout))
async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session:
async def do_delete(name: str) -> bool:
encoded = quote(name, safe="")
try:
async with semaphore:
async with session.delete(
f"{self.base_url}/v0/management/auth-files?name={encoded}",
headers=_mgmt_headers(self.token),
timeout=timeout,
) as resp:
text = await resp.text()
data = _safe_json(text)
return resp.status == 200 and data.get("status") == "ok"
except Exception:
return False
tasks = [asyncio.create_task(do_delete(n)) for n in names]
for task in asyncio.as_completed(tasks):
if await task:
deleted_ok += 1
else:
deleted_fail += 1
return {
"total": probe_result["total"],
"candidates": probe_result["candidates"],
"invalid_count": len(invalid),
"deleted_ok": deleted_ok,
"deleted_fail": deleted_fail,
}
def probe_and_clean_sync(self, workers: int = 20, timeout: int = 10, retries: int = 1) -> Dict[str, Any]:
return asyncio.run(self.clean_invalid_async(workers, timeout, retries))
def calculate_gap(self, current_candidates: Optional[int] = None) -> int:
if current_candidates is None:
status = self.get_pool_status()
if status.get("error"):
raise RuntimeError(f"CPA 池状态查询失败: {status['error']}")
current_candidates = status["candidates"]
gap = self.min_candidates - current_candidates
return max(0, gap)
def upload_token(self, filename: str, token_data: Dict[str, Any], proxy: str = "") -> bool:
if not self.base_url or not self.token:
return False
content = json.dumps(token_data, ensure_ascii=False).encode("utf-8")
files = {"file": (filename, content, "application/json")}
headers = {"Authorization": f"Bearer {self.token}"}
with _build_session(proxy) as session:
for attempt in range(3):
try:
resp = session.post(
f"{self.base_url}/v0/management/auth-files",
files=files, headers=headers, verify=False, timeout=30,
)
if resp.status_code in (200, 201, 204):
return True
except Exception:
pass
if attempt < 2:
time.sleep(2 ** attempt)
return False
class Sub2ApiMaintainer:
"""Sub2Api 平台池维护 — 通过 Admin API 管理账号池"""
def __init__(
self,
base_url: str,
bearer_token: str,
min_candidates: int = 200,
email: str = "",
password: str = "",
):
self.base_url = base_url.rstrip("/")
self.bearer_token = bearer_token
self.min_candidates = min_candidates
self.email = email
self.password = password
self._auth_lock = threading.Lock()
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.bearer_token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def _login(self) -> str:
with _build_session() as session:
resp = session.post(
f"{self.base_url}/api/v1/auth/login",
json={"email": self.email, "password": self.password},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
token = (
data.get("token")
or data.get("access_token")
or (data.get("data") or {}).get("token")
or (data.get("data") or {}).get("access_token")
or ""
)
if token:
self.bearer_token = token
return token
def _request(self, method: str, path: str, **kwargs) -> _requests.Response:
kwargs.setdefault("timeout", 15)
url = f"{self.base_url}{path}"
with _build_session() as session:
resp = session.request(method, url, headers=self._headers(), **kwargs)
if resp.status_code == 401 and self.email and self.password:
current_token = self.bearer_token
with self._auth_lock:
if self.bearer_token == current_token:
self._login()
refreshed_token = self.bearer_token
if refreshed_token or self.bearer_token != current_token:
resp = session.request(method, url, headers=self._headers(), **kwargs)
return resp
resp = session.request(method, url, headers=self._headers(), **kwargs)
return resp
def get_dashboard_stats(self, timeout: int = 15) -> Dict[str, Any]:
resp = self._request(
"GET", "/api/v1/admin/dashboard/stats",
params={"timezone": "Asia/Shanghai"}, timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
return data.get("data") if isinstance(data.get("data"), dict) else data
def list_accounts(
self, page: int = 1, page_size: int = 100, timeout: int = 15,
) -> Dict[str, Any]:
params = {
"page": page, "page_size": page_size,
"platform": "openai", "type": "oauth",
}
resp = self._request(
"GET", "/api/v1/admin/accounts",
params=params, timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
return data.get("data") if isinstance(data.get("data"), dict) else data
def _list_all_accounts(self, timeout: int = 15, page_size: int = 100) -> List[Dict[str, Any]]:
all_accounts: List[Dict[str, Any]] = []
page = 1
while True:
data = self.list_accounts(page=page, page_size=page_size, timeout=timeout)
items = data.get("items") or []
if not isinstance(items, list):
items = []
all_accounts.extend([i for i in items if isinstance(i, dict)])
if not items or len(items) < page_size:
break
total = data.get("total")
if isinstance(total, int) and total > 0 and len(all_accounts) >= total:
break
page += 1
return all_accounts
def _account_identity(self, item: Dict[str, Any]) -> Dict[str, str]:
email = ""
rt = ""
extra = item.get("extra")
if isinstance(extra, dict):
email = str(extra.get("email") or "").strip().lower()
if not email:
name = str(item.get("name") or "").strip().lower()
if "@" in name:
email = name
creds = item.get("credentials")
if isinstance(creds, dict):
rt = str(creds.get("refresh_token") or "").strip()
return {"email": email, "refresh_token": rt}
@staticmethod
def _account_sort_key(item: Dict[str, Any]) -> tuple[float, int]:
updated = _parse_time_to_epoch(item.get("updated_at") or item.get("updatedAt"))
try:
item_id = int(item.get("id") or 0)
except (TypeError, ValueError):
item_id = 0
return (updated, item_id)
@staticmethod
def _normalize_account_id(raw: Any) -> Optional[int]:
try:
account_id = int(raw)
except (TypeError, ValueError):
return None
if account_id <= 0:
return None
return account_id
@staticmethod
def _is_abnormal_status(status: Any) -> bool:
return str(status or "").strip().lower() in ("error", "disabled")
def _build_dedupe_plan(self, all_accounts: List[Dict[str, Any]], details_limit: int = 120) -> Dict[str, Any]:
id_to_account: Dict[int, Dict[str, Any]] = {}
parent: Dict[int, int] = {}
key_to_ids: Dict[str, List[int]] = {}
for item in all_accounts:
acc_id = self._normalize_account_id(item.get("id"))
if acc_id is None:
continue
id_to_account[acc_id] = item
parent[acc_id] = acc_id
identity = self._account_identity(item)
email = identity["email"]
refresh_token = identity["refresh_token"]
if email:
key_to_ids.setdefault(f"email:{email}", []).append(acc_id)
if refresh_token:
key_to_ids.setdefault(f"rt:{refresh_token}", []).append(acc_id)
def find(x: int) -> int:
root = x
while parent[root] != root:
root = parent[root]
while parent[x] != x:
nxt = parent[x]
parent[x] = root
x = nxt
return root
def union(a: int, b: int) -> None:
ra = find(a)
rb = find(b)
if ra != rb:
parent[rb] = ra
for ids in key_to_ids.values():
if len(ids) > 1:
head = ids[0]
for acc_id in ids[1:]:
union(head, acc_id)
components: Dict[int, List[int]] = {}
for acc_id in id_to_account.keys():
root = find(acc_id)
components.setdefault(root, []).append(acc_id)
duplicate_groups = [ids for ids in components.values() if len(ids) > 1]
delete_ids: List[int] = []
group_details: List[Dict[str, Any]] = []
for group_ids in duplicate_groups:
group_items = [id_to_account[i] for i in group_ids]
keep_item = max(group_items, key=self._account_sort_key)
keep_id = self._normalize_account_id(keep_item.get("id")) or 0
group_delete_ids = sorted([i for i in group_ids if i != keep_id], reverse=True)
delete_ids.extend(group_delete_ids)
if len(group_details) < details_limit:
emails_set = set()
for it in group_items:
identity = self._account_identity(it)
if identity["email"]:
emails_set.add(identity["email"])
emails = sorted(emails_set)
group_details.append({
"keep_id": keep_id,
"delete_ids": group_delete_ids,
"size": len(group_ids),
"emails": emails,
})
return {
"duplicate_groups": len(duplicate_groups),
"duplicate_accounts": sum(len(g) for g in duplicate_groups),
"delete_ids": delete_ids,
"groups_preview": group_details,
"truncated_groups": max(0, len(duplicate_groups) - len(group_details)),
}
def list_account_inventory(self, timeout: int = 15) -> Dict[str, Any]:
all_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
dedupe_plan = self._build_dedupe_plan(
all_accounts,
details_limit=max(1, len(all_accounts)),
)
duplicate_delete_ids = {
int(account_id)
for account_id in (dedupe_plan.get("delete_ids") or [])
if isinstance(account_id, int)
}
duplicate_map: Dict[int, Dict[str, Any]] = {}
for group in dedupe_plan.get("groups_preview") or []:
keep_id = self._normalize_account_id(group.get("keep_id"))
delete_ids = [
account_id
for account_id in (
self._normalize_account_id(item)
for item in (group.get("delete_ids") or [])
)
if account_id is not None
]
group_ids = ([keep_id] if keep_id is not None else []) + delete_ids
group_size = max(1, int(group.get("size") or len(group_ids) or 1))
emails = [str(email).strip().lower() for email in (group.get("emails") or []) if str(email).strip()]
for account_id in group_ids:
duplicate_map[account_id] = {
"group_size": group_size,
"keep_id": keep_id,
"delete_candidate": account_id in duplicate_delete_ids,
"emails": emails,
}
items: List[Dict[str, Any]] = []
abnormal_count = 0
for raw_item in sorted(all_accounts, key=self._account_sort_key, reverse=True):
account_id = self._normalize_account_id(raw_item.get("id"))
if account_id is None:
continue
identity = self._account_identity(raw_item)
status = str(raw_item.get("status") or "").strip().lower() or "unknown"
if self._is_abnormal_status(status):
abnormal_count += 1
duplicate_info = duplicate_map.get(account_id) or {}
items.append({
"id": account_id,
"name": str(raw_item.get("name") or "").strip(),
"email": identity.get("email") or str(raw_item.get("name") or "").strip(),
"status": status,
"updated_at": raw_item.get("updated_at") or raw_item.get("updatedAt") or "",
"created_at": raw_item.get("created_at") or raw_item.get("createdAt") or "",
"is_duplicate": bool(duplicate_info),
"duplicate_group_size": int(duplicate_info.get("group_size") or 0),
"duplicate_keep": duplicate_info.get("keep_id") == account_id,
"duplicate_delete_candidate": bool(duplicate_info.get("delete_candidate")),
"duplicate_emails": duplicate_info.get("emails") or [],
})
return {
"total": len(items),
"error_count": abnormal_count,
"duplicate_groups": int(dedupe_plan.get("duplicate_groups", 0)),
"duplicate_accounts": int(dedupe_plan.get("duplicate_accounts", 0)),
"items": items,
}
def _refresh_accounts_parallel(self, account_ids: List[int], timeout: int = 30, workers: int = 8) -> Dict[str, List[int]]:
success_ids: List[int] = []
failed_ids: List[int] = []
ids = [i for i in account_ids if isinstance(i, int) and i > 0]
if not ids:
return {"success_ids": success_ids, "failed_ids": failed_ids}
pool_workers = max(1, min(workers, 16, len(ids)))
with ThreadPoolExecutor(max_workers=pool_workers) as executor:
future_to_id = {
executor.submit(self.refresh_account, account_id, timeout=timeout): account_id
for account_id in ids
}
for future in as_completed(future_to_id):
account_id = future_to_id[future]
try:
ok = bool(future.result())
except Exception:
ok = False
if ok:
success_ids.append(account_id)
else:
failed_ids.append(account_id)
return {"success_ids": success_ids, "failed_ids": failed_ids}
def _delete_accounts_parallel(self, account_ids: List[int], timeout: int = 15, workers: int = 12) -> Dict[str, Any]:
deleted_ok_ids: List[int] = []
failed_ids: List[int] = []
unique_ids = sorted({i for i in account_ids if isinstance(i, int) and i > 0}, reverse=True)
if not unique_ids:
return {"deleted_ok": 0, "deleted_fail": 0, "deleted_ok_ids": deleted_ok_ids, "failed_ids": failed_ids}
pool_workers = max(1, min(workers, 24, len(unique_ids)))
with ThreadPoolExecutor(max_workers=pool_workers) as executor:
future_to_id = {
executor.submit(self.delete_account, account_id, timeout=timeout): account_id
for account_id in unique_ids
}
for future in as_completed(future_to_id):
account_id = future_to_id[future]
try:
ok = bool(future.result())
except Exception:
ok = False
if ok:
deleted_ok_ids.append(account_id)
else:
failed_ids.append(account_id)
return {
"deleted_ok": len(deleted_ok_ids),
"deleted_fail": len(failed_ids),
"deleted_ok_ids": deleted_ok_ids,
"failed_ids": failed_ids,
}
def dedupe_duplicate_accounts(self, timeout: int = 15, dry_run: bool = True, details_limit: int = 120) -> Dict[str, Any]:
"""
清理 Sub2Api 中 OpenAI OAuth 重复账号(按 email 或 refresh_token 判重)。
- 同一连通重复组保留“最新”账号updated_at 优先,其次 id 最大)。
- dry_run=True 时仅预览,不执行删除。
"""
all_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
dedupe_plan = self._build_dedupe_plan(all_accounts, details_limit=details_limit)
delete_ids = dedupe_plan["delete_ids"]
deleted_ok = 0
deleted_fail = 0
failed_ids: List[int] = []
if not dry_run and delete_ids:
delete_result = self._delete_accounts_parallel(delete_ids, timeout=timeout, workers=12)
deleted_ok = int(delete_result.get("deleted_ok", 0))
deleted_fail = int(delete_result.get("deleted_fail", 0))
failed_ids = list(delete_result.get("failed_ids") or [])
return {
"dry_run": dry_run,
"total": len(all_accounts),
"duplicate_groups": int(dedupe_plan["duplicate_groups"]),
"duplicate_accounts": int(dedupe_plan["duplicate_accounts"]),
"to_delete": len(delete_ids),
"deleted_ok": deleted_ok,
"deleted_fail": deleted_fail,
"failed_delete_ids": failed_ids[:200],
"groups_preview": dedupe_plan["groups_preview"],
"truncated_groups": int(dedupe_plan["truncated_groups"]),
}
def probe_accounts(self, account_ids: List[int], timeout: int = 30) -> Dict[str, Any]:
ids = sorted({
account_id
for account_id in (
self._normalize_account_id(item)
for item in (account_ids or [])
)
if account_id is not None
})
if not ids:
return {
"requested": 0,
"refreshed_ok": 0,
"refreshed_fail": 0,
"recovered": 0,
"still_abnormal": 0,
"details": [],
}
before_status = self._list_accounts_by_ids(ids, timeout=timeout)
refresh_result = self._refresh_accounts_parallel(ids, timeout=max(30, timeout), workers=8)
success_ids = set(refresh_result.get("success_ids") or [])
failed_ids = set(refresh_result.get("failed_ids") or [])
if success_ids:
time.sleep(2)
after_status = self._list_accounts_by_ids(ids, timeout=timeout)
recovered_ids: List[int] = []
abnormal_after_ids: List[int] = []
details: List[Dict[str, Any]] = []
for account_id in ids:
before = str(before_status.get(account_id) or "unknown").strip().lower()
after = str(after_status.get(account_id) or before or "unknown").strip().lower()
if self._is_abnormal_status(before) and not self._is_abnormal_status(after):
recovered_ids.append(account_id)
if self._is_abnormal_status(after):
abnormal_after_ids.append(account_id)
if len(details) < 200:
details.append({
"id": account_id,
"before_status": before,
"after_status": after,
"refresh_ok": account_id in success_ids,
})
return {
"requested": len(ids),
"refreshed_ok": len(success_ids),
"refreshed_fail": len(failed_ids),
"recovered": len(recovered_ids),
"still_abnormal": len(abnormal_after_ids),
"details": details,
}
def delete_accounts_batch(self, account_ids: List[int], timeout: int = 15) -> Dict[str, Any]:
ids = [
account_id
for account_id in (
self._normalize_account_id(item)
for item in (account_ids or [])
)
if account_id is not None
]
delete_result = self._delete_accounts_parallel(ids, timeout=timeout, workers=12)
return {
"requested": len({*ids}),
"deleted_ok": int(delete_result.get("deleted_ok", 0)),
"deleted_fail": int(delete_result.get("deleted_fail", 0)),
"deleted_ok_ids": list(delete_result.get("deleted_ok_ids") or []),
"failed_ids": list(delete_result.get("failed_ids") or []),
}
def handle_exception_accounts(
self,
account_ids: Optional[List[int]] = None,
timeout: int = 30,
delete_unresolved: bool = True,
) -> Dict[str, Any]:
requested_ids = [
account_id
for account_id in (
self._normalize_account_id(item)
for item in (account_ids or [])
)
if account_id is not None
]
if requested_ids:
current_status = self._list_accounts_by_ids(requested_ids, timeout=timeout)
target_ids = [
account_id
for account_id in requested_ids
if self._is_abnormal_status(current_status.get(account_id))
]
skipped_non_abnormal = max(0, len(set(requested_ids)) - len(target_ids))
else:
all_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
target_ids = [
account_id
for account_id in (
self._normalize_account_id(item.get("id"))
for item in all_accounts
if self._is_abnormal_status(item.get("status"))
)
if account_id is not None
]
skipped_non_abnormal = 0
unique_target_ids = sorted(set(target_ids))
if not unique_target_ids:
return {
"requested": len(set(requested_ids)) if requested_ids else 0,
"targeted": 0,
"refreshed_ok": 0,
"refreshed_fail": 0,
"recovered": 0,
"remaining_abnormal": 0,
"deleted_ok": 0,
"deleted_fail": 0,
"skipped_non_abnormal": skipped_non_abnormal,
}
refresh_result = self._refresh_accounts_parallel(unique_target_ids, timeout=max(30, timeout), workers=8)
if refresh_result.get("success_ids"):
time.sleep(2)
after_status = self._list_accounts_by_ids(unique_target_ids, timeout=timeout)
remaining_abnormal_ids = [
account_id
for account_id in unique_target_ids
if self._is_abnormal_status(after_status.get(account_id))
]
remaining_abnormal_set = set(remaining_abnormal_ids)
recovered_ids = [
account_id
for account_id in unique_target_ids
if account_id not in remaining_abnormal_set
]
delete_result = {
"deleted_ok": 0,
"deleted_fail": 0,
"deleted_ok_ids": [],
"failed_ids": [],
}
if delete_unresolved and remaining_abnormal_ids:
delete_result = self._delete_accounts_parallel(remaining_abnormal_ids, timeout=timeout, workers=12)
return {
"requested": len(set(requested_ids)) if requested_ids else len(unique_target_ids),
"targeted": len(unique_target_ids),
"refreshed_ok": len(refresh_result.get("success_ids") or []),
"refreshed_fail": len(refresh_result.get("failed_ids") or []),
"recovered": len(recovered_ids),
"remaining_abnormal": len(remaining_abnormal_ids),
"deleted_ok": int(delete_result.get("deleted_ok", 0)),
"deleted_fail": int(delete_result.get("deleted_fail", 0)),
"deleted_ok_ids": list(delete_result.get("deleted_ok_ids") or []),
"failed_ids": list(delete_result.get("failed_ids") or []),
"skipped_non_abnormal": skipped_non_abnormal,
}
def refresh_account(self, account_id: int, timeout: int = 30) -> bool:
try:
resp = self._request(
"POST", f"/api/v1/admin/accounts/{account_id}/refresh",
timeout=timeout,
)
return resp.status_code in (200, 201)
except Exception:
return False
def delete_account(self, account_id: int, timeout: int = 15) -> bool:
try:
resp = self._request(
"DELETE", f"/api/v1/admin/accounts/{account_id}",
timeout=timeout,
)
return resp.status_code in (200, 204)
except Exception:
return False
def get_pool_status(self, timeout: int = 15) -> Dict[str, Any]:
try:
all_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
error = sum(
1 for account in all_accounts
if self._is_abnormal_status(account.get("status"))
)
total = len(all_accounts)
normal = max(0, total - error)
return {
"total": total,
"candidates": normal,
"error_count": error,
"threshold": self.min_candidates,
"healthy": normal >= self.min_candidates,
"percent": round(normal / self.min_candidates * 100, 1) if self.min_candidates > 0 else 100,
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": None,
}
except Exception as e:
return {
"total": 0, "candidates": 0, "error_count": 0,
"threshold": self.min_candidates, "healthy": False,
"percent": 0, "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": str(e),
}
def test_connection(self, timeout: int = 10) -> Dict[str, Any]:
try:
status = self.get_pool_status(timeout)
total = int(status.get("total", 0))
normal = int(status.get("candidates", 0))
error = int(status.get("error_count", 0))
return {
"ok": True,
"total": total,
"normal": normal,
"error": error,
"message": f"连接成功,共 {total} 个账号,{normal} 正常,{error} 异常",
}
except Exception as e:
return {"ok": False, "total": 0, "normal": 0, "error": 0,
"message": f"连接失败: {e}"}
def _list_accounts_by_ids(
self, ids: List[int], timeout: int = 15,
) -> Dict[int, str]:
"""查询指定 ID 的账号当前状态,返回 {id: status}"""
result: Dict[int, str] = {}
id_set = set(ids)
page = 1
while id_set:
data = self.list_accounts(page=page, page_size=100, timeout=timeout)
items = data.get("items") or []
if not items:
break
for item in items:
aid = item.get("id")
if aid in id_set:
result[aid] = str(item.get("status", ""))
id_set.discard(aid)
total = data.get("total", 0)
if page * 100 >= total or len(items) < 100:
break
page += 1
return result
def probe_and_clean_sync(self, timeout: int = 15, actions: Optional[Dict[str, bool]] = None) -> Dict[str, Any]:
action_flags = {
"refresh_abnormal_accounts": bool((actions or {}).get("refresh_abnormal_accounts", True)),
"delete_abnormal_accounts": bool((actions or {}).get("delete_abnormal_accounts", True)),
"dedupe_duplicate_accounts": bool((actions or {}).get("dedupe_duplicate_accounts", True)),
}
started = time.time()
all_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
error_accounts = [
account for account in all_accounts
if self._is_abnormal_status(account.get("status"))
]
error_ids = [
self._normalize_account_id(acc.get("id"))
for acc in error_accounts
]
error_ids = [i for i in error_ids if i is not None]
initial_error_ids = set(error_ids)
refresh_result = {"success_ids": [], "failed_ids": []}
if action_flags["refresh_abnormal_accounts"] and error_ids:
refresh_result = self._refresh_accounts_parallel(error_ids, timeout=30, workers=8)
refreshed_ids = list(refresh_result.get("success_ids") or [])
refresh_failed_ids = list(refresh_result.get("failed_ids") or [])
current_accounts = all_accounts
current_error_ids = set(initial_error_ids)
if refreshed_ids:
time.sleep(2)
if action_flags["refresh_abnormal_accounts"] and (error_ids or refreshed_ids):
current_accounts = self._list_all_accounts(timeout=timeout, page_size=100)
current_error_ids = {
int(acc_id) for acc_id in (
self._normalize_account_id(account.get("id"))
for account in current_accounts
if self._is_abnormal_status(account.get("status"))
) if isinstance(acc_id, int)
}
recovered = len(initial_error_ids - current_error_ids)
dedupe_plan = {
"duplicate_groups": 0,
"duplicate_accounts": 0,
"delete_ids": [],
"groups_preview": [],
"truncated_groups": 0,
}
duplicate_delete_ids: List[int] = []
if action_flags["dedupe_duplicate_accounts"]:
dedupe_plan = self._build_dedupe_plan(current_accounts, details_limit=120)
duplicate_delete_ids = [int(i) for i in dedupe_plan["delete_ids"] if isinstance(i, int)]
normal_count = len(current_accounts) - len(current_error_ids)
delete_targets: set[int] = set()
if action_flags["delete_abnormal_accounts"]:
delete_targets.update(current_error_ids)
if action_flags["dedupe_duplicate_accounts"]:
delete_targets.update(duplicate_delete_ids)
delete_result = self._delete_accounts_parallel(sorted(delete_targets, reverse=True), timeout=timeout, workers=12)
deleted_ok = int(delete_result.get("deleted_ok", 0))
deleted_fail = int(delete_result.get("deleted_fail", 0))
deleted_ok_ids = set(int(i) for i in (delete_result.get("deleted_ok_ids") or []) if isinstance(i, int))
deleted_from_error = len(deleted_ok_ids & set(current_error_ids))
deleted_from_duplicate = len(deleted_ok_ids & set(duplicate_delete_ids))
elapsed_ms = int((time.time() - started) * 1000)
return {
"actions": action_flags,
"total": len(current_accounts), "normal": normal_count,
"initial_error_count": len(initial_error_ids),
"error_count": len(current_error_ids), "refreshed": recovered,
"refresh_attempted": len(error_ids) if action_flags["refresh_abnormal_accounts"] else 0,
"refresh_failed": len(refresh_failed_ids),
"deleted_ok": deleted_ok, "deleted_fail": deleted_fail,
"duplicate_groups": int(dedupe_plan["duplicate_groups"]),
"duplicate_accounts": int(dedupe_plan["duplicate_accounts"]),
"duplicate_to_delete": len(duplicate_delete_ids),
"deleted_from_error": deleted_from_error,
"deleted_from_duplicate": deleted_from_duplicate,
"duration_ms": elapsed_ms,
}
def calculate_gap(self, current_candidates: Optional[int] = None) -> int:
if current_candidates is None:
status = self.get_pool_status()
if status.get("error"):
raise RuntimeError(f"Sub2Api 池状态查询失败: {status['error']}")
current_candidates = status["candidates"]
return max(0, self.min_candidates - current_candidates)