260 lines
8.8 KiB
Python
260 lines
8.8 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.request
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
|
|
def _bool_env(name: str, default: bool) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def _pick_nested_zip(vsix_zip: zipfile.ZipFile) -> str:
|
|
candidates = [
|
|
n
|
|
for n in vsix_zip.namelist()
|
|
if n.startswith("extension/dist/bin/") and n.endswith(".zip") and "lingma-" in n
|
|
]
|
|
if not candidates:
|
|
raise RuntimeError("No lingma-*.zip found in VSIX")
|
|
candidates.sort()
|
|
return candidates[-1]
|
|
|
|
|
|
def _pick_lingma_binary_path(inner_zip: zipfile.ZipFile) -> str:
|
|
# Prefer linux amd64 binary path.
|
|
names = inner_zip.namelist()
|
|
preferred = [n for n in names if n.endswith("x86_64_linux/Lingma")]
|
|
if preferred:
|
|
return preferred[0]
|
|
fallback = [n for n in names if n.endswith("/Lingma") or n == "Lingma"]
|
|
if fallback:
|
|
return fallback[0]
|
|
raise RuntimeError("Lingma binary not found inside nested zip")
|
|
|
|
|
|
def _infer_release_root(member_path: str) -> str:
|
|
parts = [p for p in member_path.split("/") if p]
|
|
if "x86_64_linux" in parts:
|
|
idx = parts.index("x86_64_linux")
|
|
if idx > 0:
|
|
return "/".join(parts[:idx])
|
|
if len(parts) > 1:
|
|
return parts[0]
|
|
return ""
|
|
|
|
|
|
def _extract_release_tree(
|
|
inner_zip: zipfile.ZipFile, release_root: str, out_dir: Path
|
|
) -> None:
|
|
prefix = f"{release_root}/" if release_root else ""
|
|
for info in inner_zip.infolist():
|
|
name = info.filename
|
|
if not name or name.endswith("/"):
|
|
continue
|
|
if prefix and not name.startswith(prefix):
|
|
continue
|
|
rel = name[len(prefix) :] if prefix else name
|
|
if not rel:
|
|
continue
|
|
dest = out_dir / rel
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
with inner_zip.open(info, "r") as src, dest.open("wb") as dst:
|
|
dst.write(src.read())
|
|
|
|
|
|
def _query_marketplace_latest_vsix(
|
|
publisher: str, extension: str
|
|
) -> tuple[str, str, dict]:
|
|
api = "https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery"
|
|
payload = {
|
|
"filters": [
|
|
{
|
|
"criteria": [
|
|
{"filterType": 7, "value": f"{publisher}.{extension}"},
|
|
{"filterType": 8, "value": "Microsoft.VisualStudio.Code"},
|
|
],
|
|
"pageNumber": 1,
|
|
"pageSize": 1,
|
|
"sortBy": 0,
|
|
"sortOrder": 0,
|
|
}
|
|
],
|
|
"assetTypes": [],
|
|
"flags": 950,
|
|
}
|
|
req = urllib.request.Request(
|
|
api, data=json.dumps(payload).encode("utf-8"), method="POST"
|
|
)
|
|
req.add_header("accept", "application/json;api-version=3.0-preview.1")
|
|
req.add_header("content-type", "application/json")
|
|
req.add_header("x-market-client-id", "VSCode 1.115.0")
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
body = json.loads(r.read().decode("utf-8", errors="ignore"))
|
|
|
|
exts = body.get("results", [{}])[0].get("extensions", [])
|
|
if not exts:
|
|
raise RuntimeError("No extension found from marketplace")
|
|
ver_obj = exts[0].get("versions", [{}])[0]
|
|
version = ver_obj.get("version", "")
|
|
files = ver_obj.get("files", [])
|
|
vsix_url = ""
|
|
for f in files:
|
|
if f.get("assetType") == "Microsoft.VisualStudio.Services.VSIXPackage":
|
|
vsix_url = f.get("source", "")
|
|
break
|
|
if not vsix_url:
|
|
if not version:
|
|
raise RuntimeError("No version/vsix url found from marketplace")
|
|
vsix_url = (
|
|
"https://marketplace.visualstudio.com/_apis/public/gallery/"
|
|
f"publishers/{publisher}/vsextensions/{extension}/{version}/vspackage"
|
|
)
|
|
return (
|
|
vsix_url,
|
|
version,
|
|
{"publisher": publisher, "extension": extension, "version": version},
|
|
)
|
|
|
|
|
|
def bootstrap_from_vsix() -> None:
|
|
lingma_bin = Path(os.getenv("LINGMA_BIN", "/app/data/bin/Lingma"))
|
|
source_type = os.getenv("LINGMA_SOURCE_TYPE", "marketplace").strip().lower()
|
|
vsix_url = os.getenv(
|
|
"LINGMA_VSIX_URL",
|
|
"https://tongyi-code.oss-cn-hangzhou.aliyuncs.com/vscode/tongyi-lingma-latest.vsix",
|
|
).strip()
|
|
mp_publisher = os.getenv("LINGMA_MARKETPLACE_PUBLISHER", "Alibaba-Cloud").strip()
|
|
mp_extension = os.getenv("LINGMA_MARKETPLACE_EXTENSION", "tongyi-lingma").strip()
|
|
always_refresh = _bool_env("LINGMA_BOOTSTRAP_ALWAYS", True)
|
|
force_refresh = _bool_env("LINGMA_FORCE_REFRESH", False)
|
|
|
|
if source_type not in {"vsix", "marketplace"}:
|
|
print(f"[bootstrap] skip: LINGMA_SOURCE_TYPE={source_type}")
|
|
return
|
|
|
|
marker_path = lingma_bin.parent / ".lingma-bootstrap.json"
|
|
old_marker = {}
|
|
if marker_path.exists():
|
|
try:
|
|
old_marker = json.loads(
|
|
marker_path.read_text(encoding="utf-8", errors="ignore")
|
|
)
|
|
except Exception:
|
|
old_marker = {}
|
|
|
|
resolved_url = vsix_url
|
|
resolved_version = ""
|
|
source_meta = {"source": source_type}
|
|
if source_type == "marketplace":
|
|
try:
|
|
resolved_url, resolved_version, source_meta = (
|
|
_query_marketplace_latest_vsix(mp_publisher, mp_extension)
|
|
)
|
|
print(
|
|
f"[bootstrap] marketplace latest: {mp_publisher}.{mp_extension} "
|
|
f"version={resolved_version}"
|
|
)
|
|
except Exception as exc:
|
|
print(
|
|
f"[bootstrap] marketplace query failed, fallback to LINGMA_VSIX_URL: {exc}"
|
|
)
|
|
resolved_url = vsix_url
|
|
|
|
if (
|
|
lingma_bin.exists()
|
|
and not force_refresh
|
|
and (
|
|
(not always_refresh)
|
|
or (resolved_version and old_marker.get("version") == resolved_version)
|
|
)
|
|
):
|
|
os.chmod(lingma_bin, 0o755)
|
|
print(f"[bootstrap] reuse existing Lingma: {lingma_bin}")
|
|
return
|
|
|
|
tmp_dir = Path("/tmp/lingma-bootstrap")
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
vsix_path = tmp_dir / "tongyi-lingma-latest.vsix"
|
|
|
|
print(f"[bootstrap] downloading VSIX: {resolved_url}")
|
|
try:
|
|
with (
|
|
urllib.request.urlopen(resolved_url, timeout=30) as r,
|
|
vsix_path.open("wb") as f,
|
|
):
|
|
total = 0
|
|
while True:
|
|
chunk = r.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
total += len(chunk)
|
|
print(f"[bootstrap] VSIX downloaded bytes={total}")
|
|
except Exception as exc:
|
|
if lingma_bin.exists():
|
|
print(f"[bootstrap] download failed, fallback to existing Lingma: {exc}")
|
|
os.chmod(lingma_bin, 0o755)
|
|
return
|
|
raise RuntimeError(f"Failed to download VSIX: {exc}")
|
|
|
|
try:
|
|
with zipfile.ZipFile(vsix_path, "r") as vsix_zip:
|
|
nested_zip_name = _pick_nested_zip(vsix_zip)
|
|
nested_zip_bytes = vsix_zip.read(nested_zip_name)
|
|
|
|
with zipfile.ZipFile(io.BytesIO(nested_zip_bytes), "r") as inner_zip:
|
|
lingma_member = _pick_lingma_binary_path(inner_zip)
|
|
lingma_bytes = inner_zip.read(lingma_member)
|
|
release_root = _infer_release_root(lingma_member)
|
|
lingma_bin.parent.mkdir(parents=True, exist_ok=True)
|
|
release_dir = lingma_bin.parent / (release_root or "2.5.20")
|
|
_extract_release_tree(inner_zip, release_root, release_dir)
|
|
|
|
lingma_bin.write_bytes(lingma_bytes)
|
|
os.chmod(lingma_bin, 0o755)
|
|
extension_main = release_dir / "extension" / "main.js"
|
|
if extension_main.exists():
|
|
print(f"[bootstrap] extension ready: {extension_main}")
|
|
else:
|
|
print(f"[bootstrap] extension missing under: {release_dir}")
|
|
|
|
marker = {
|
|
"source": source_type,
|
|
"url": resolved_url,
|
|
"version": resolved_version,
|
|
"downloaded_at": int(time.time()),
|
|
"nested_zip": nested_zip_name,
|
|
"member": lingma_member,
|
|
"release_root": release_root,
|
|
"size": len(lingma_bytes),
|
|
}
|
|
marker.update(source_meta)
|
|
(lingma_bin.parent / ".lingma-bootstrap.json").write_text(
|
|
json.dumps(marker, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
print(f"[bootstrap] Lingma ready: {lingma_bin} ({len(lingma_bytes)} bytes)")
|
|
except Exception as exc:
|
|
if lingma_bin.exists():
|
|
print(f"[bootstrap] extraction failed, fallback to existing Lingma: {exc}")
|
|
os.chmod(lingma_bin, 0o755)
|
|
return
|
|
raise RuntimeError(f"Failed to extract Lingma from VSIX: {exc}")
|
|
|
|
|
|
def main() -> int:
|
|
bootstrap_from_vsix()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|