from __future__ import annotations import io import json import os import time import urllib.request import zipfile from pathlib import Path def _bool_env(name: str, default: bool) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def _pick_nested_zip(vsix_zip: zipfile.ZipFile) -> str: candidates = [ n for n in vsix_zip.namelist() if n.startswith("extension/dist/bin/") and n.endswith(".zip") and "lingma-" in n ] if not candidates: raise RuntimeError("No lingma-*.zip found in VSIX") candidates.sort() return candidates[-1] def _pick_lingma_binary_path(inner_zip: zipfile.ZipFile) -> str: # Prefer linux amd64 binary path. names = inner_zip.namelist() preferred = [n for n in names if n.endswith("x86_64_linux/Lingma")] if preferred: return preferred[0] fallback = [n for n in names if n.endswith("/Lingma") or n == "Lingma"] if fallback: return fallback[0] raise RuntimeError("Lingma binary not found inside nested zip") def _infer_release_root(member_path: str) -> str: parts = [p for p in member_path.split("/") if p] if "x86_64_linux" in parts: idx = parts.index("x86_64_linux") if idx > 0: return "/".join(parts[:idx]) if len(parts) > 1: return parts[0] return "" def _extract_release_tree( inner_zip: zipfile.ZipFile, release_root: str, out_dir: Path ) -> None: prefix = f"{release_root}/" if release_root else "" for info in inner_zip.infolist(): name = info.filename if not name or name.endswith("/"): continue if prefix and not name.startswith(prefix): continue rel = name[len(prefix) :] if prefix else name if not rel: continue dest = out_dir / rel dest.parent.mkdir(parents=True, exist_ok=True) with inner_zip.open(info, "r") as src, dest.open("wb") as dst: dst.write(src.read()) def _query_marketplace_latest_vsix( publisher: str, extension: str ) -> tuple[str, str, dict]: api = "https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery" payload = { "filters": [ { "criteria": [ {"filterType": 7, "value": f"{publisher}.{extension}"}, {"filterType": 8, "value": "Microsoft.VisualStudio.Code"}, ], "pageNumber": 1, "pageSize": 1, "sortBy": 0, "sortOrder": 0, } ], "assetTypes": [], "flags": 950, } req = urllib.request.Request( api, data=json.dumps(payload).encode("utf-8"), method="POST" ) req.add_header("accept", "application/json;api-version=3.0-preview.1") req.add_header("content-type", "application/json") req.add_header("x-market-client-id", "VSCode 1.115.0") with urllib.request.urlopen(req, timeout=30) as r: body = json.loads(r.read().decode("utf-8", errors="ignore")) exts = body.get("results", [{}])[0].get("extensions", []) if not exts: raise RuntimeError("No extension found from marketplace") ver_obj = exts[0].get("versions", [{}])[0] version = ver_obj.get("version", "") files = ver_obj.get("files", []) vsix_url = "" for f in files: if f.get("assetType") == "Microsoft.VisualStudio.Services.VSIXPackage": vsix_url = f.get("source", "") break if not vsix_url: if not version: raise RuntimeError("No version/vsix url found from marketplace") vsix_url = ( "https://marketplace.visualstudio.com/_apis/public/gallery/" f"publishers/{publisher}/vsextensions/{extension}/{version}/vspackage" ) return ( vsix_url, version, {"publisher": publisher, "extension": extension, "version": version}, ) def bootstrap_from_vsix() -> None: lingma_bin = Path(os.getenv("LINGMA_BIN", "/app/data/bin/Lingma")) source_type = os.getenv("LINGMA_SOURCE_TYPE", "marketplace").strip().lower() vsix_url = os.getenv( "LINGMA_VSIX_URL", "https://tongyi-code.oss-cn-hangzhou.aliyuncs.com/vscode/tongyi-lingma-latest.vsix", ).strip() mp_publisher = os.getenv("LINGMA_MARKETPLACE_PUBLISHER", "Alibaba-Cloud").strip() mp_extension = os.getenv("LINGMA_MARKETPLACE_EXTENSION", "tongyi-lingma").strip() always_refresh = _bool_env("LINGMA_BOOTSTRAP_ALWAYS", True) force_refresh = _bool_env("LINGMA_FORCE_REFRESH", False) if source_type not in {"vsix", "marketplace"}: print(f"[bootstrap] skip: LINGMA_SOURCE_TYPE={source_type}") return marker_path = lingma_bin.parent / ".lingma-bootstrap.json" old_marker = {} if marker_path.exists(): try: old_marker = json.loads( marker_path.read_text(encoding="utf-8", errors="ignore") ) except Exception: old_marker = {} resolved_url = vsix_url resolved_version = "" source_meta = {"source": source_type} if source_type == "marketplace": try: resolved_url, resolved_version, source_meta = ( _query_marketplace_latest_vsix(mp_publisher, mp_extension) ) print( f"[bootstrap] marketplace latest: {mp_publisher}.{mp_extension} " f"version={resolved_version}" ) except Exception as exc: print( f"[bootstrap] marketplace query failed, fallback to LINGMA_VSIX_URL: {exc}" ) resolved_url = vsix_url if ( lingma_bin.exists() and not force_refresh and ( (not always_refresh) or (resolved_version and old_marker.get("version") == resolved_version) ) ): os.chmod(lingma_bin, 0o755) print(f"[bootstrap] reuse existing Lingma: {lingma_bin}") return tmp_dir = Path("/tmp/lingma-bootstrap") tmp_dir.mkdir(parents=True, exist_ok=True) vsix_path = tmp_dir / "tongyi-lingma-latest.vsix" print(f"[bootstrap] downloading VSIX: {resolved_url}") try: with ( urllib.request.urlopen(resolved_url, timeout=30) as r, vsix_path.open("wb") as f, ): total = 0 while True: chunk = r.read(1024 * 1024) if not chunk: break f.write(chunk) total += len(chunk) print(f"[bootstrap] VSIX downloaded bytes={total}") except Exception as exc: if lingma_bin.exists(): print(f"[bootstrap] download failed, fallback to existing Lingma: {exc}") os.chmod(lingma_bin, 0o755) return raise RuntimeError(f"Failed to download VSIX: {exc}") try: with zipfile.ZipFile(vsix_path, "r") as vsix_zip: nested_zip_name = _pick_nested_zip(vsix_zip) nested_zip_bytes = vsix_zip.read(nested_zip_name) with zipfile.ZipFile(io.BytesIO(nested_zip_bytes), "r") as inner_zip: lingma_member = _pick_lingma_binary_path(inner_zip) lingma_bytes = inner_zip.read(lingma_member) release_root = _infer_release_root(lingma_member) lingma_bin.parent.mkdir(parents=True, exist_ok=True) release_dir = lingma_bin.parent / (release_root or "2.5.20") _extract_release_tree(inner_zip, release_root, release_dir) lingma_bin.write_bytes(lingma_bytes) os.chmod(lingma_bin, 0o755) extension_main = release_dir / "extension" / "main.js" if extension_main.exists(): print(f"[bootstrap] extension ready: {extension_main}") else: print(f"[bootstrap] extension missing under: {release_dir}") marker = { "source": source_type, "url": resolved_url, "version": resolved_version, "downloaded_at": int(time.time()), "nested_zip": nested_zip_name, "member": lingma_member, "release_root": release_root, "size": len(lingma_bytes), } marker.update(source_meta) (lingma_bin.parent / ".lingma-bootstrap.json").write_text( json.dumps(marker, ensure_ascii=False, indent=2), encoding="utf-8", ) print(f"[bootstrap] Lingma ready: {lingma_bin} ({len(lingma_bytes)} bytes)") except Exception as exc: if lingma_bin.exists(): print(f"[bootstrap] extraction failed, fallback to existing Lingma: {exc}") os.chmod(lingma_bin, 0o755) return raise RuntimeError(f"Failed to extract Lingma from VSIX: {exc}") def main() -> int: bootstrap_from_vsix() return 0 if __name__ == "__main__": raise SystemExit(main())