200 lines
7.0 KiB
Python
200 lines
7.0 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.request
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
|
|
def _bool_env(name: str, default: bool) -> bool:
|
|
raw = os.getenv(name)
|
|
if raw is None:
|
|
return default
|
|
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def _pick_nested_zip(vsix_zip: zipfile.ZipFile) -> str:
|
|
candidates = [
|
|
n
|
|
for n in vsix_zip.namelist()
|
|
if n.startswith("extension/dist/bin/") and n.endswith(".zip") and "lingma-" in n
|
|
]
|
|
if not candidates:
|
|
raise RuntimeError("No lingma-*.zip found in VSIX")
|
|
candidates.sort()
|
|
return candidates[-1]
|
|
|
|
|
|
def _pick_lingma_binary_path(inner_zip: zipfile.ZipFile) -> str:
|
|
# Prefer linux amd64 binary path.
|
|
names = inner_zip.namelist()
|
|
preferred = [n for n in names if n.endswith("x86_64_linux/Lingma")]
|
|
if preferred:
|
|
return preferred[0]
|
|
fallback = [n for n in names if n.endswith("/Lingma") or n == "Lingma"]
|
|
if fallback:
|
|
return fallback[0]
|
|
raise RuntimeError("Lingma binary not found inside nested zip")
|
|
|
|
|
|
def _query_marketplace_latest_vsix(publisher: str, extension: str) -> tuple[str, str, dict]:
|
|
api = "https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery"
|
|
payload = {
|
|
"filters": [
|
|
{
|
|
"criteria": [
|
|
{"filterType": 7, "value": f"{publisher}.{extension}"},
|
|
{"filterType": 8, "value": "Microsoft.VisualStudio.Code"},
|
|
],
|
|
"pageNumber": 1,
|
|
"pageSize": 1,
|
|
"sortBy": 0,
|
|
"sortOrder": 0,
|
|
}
|
|
],
|
|
"assetTypes": [],
|
|
"flags": 950,
|
|
}
|
|
req = urllib.request.Request(api, data=json.dumps(payload).encode("utf-8"), method="POST")
|
|
req.add_header("accept", "application/json;api-version=3.0-preview.1")
|
|
req.add_header("content-type", "application/json")
|
|
req.add_header("x-market-client-id", "VSCode 1.115.0")
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
body = json.loads(r.read().decode("utf-8", errors="ignore"))
|
|
|
|
exts = body.get("results", [{}])[0].get("extensions", [])
|
|
if not exts:
|
|
raise RuntimeError("No extension found from marketplace")
|
|
ver_obj = exts[0].get("versions", [{}])[0]
|
|
version = ver_obj.get("version", "")
|
|
files = ver_obj.get("files", [])
|
|
vsix_url = ""
|
|
for f in files:
|
|
if f.get("assetType") == "Microsoft.VisualStudio.Services.VSIXPackage":
|
|
vsix_url = f.get("source", "")
|
|
break
|
|
if not vsix_url:
|
|
if not version:
|
|
raise RuntimeError("No version/vsix url found from marketplace")
|
|
vsix_url = (
|
|
"https://marketplace.visualstudio.com/_apis/public/gallery/"
|
|
f"publishers/{publisher}/vsextensions/{extension}/{version}/vspackage"
|
|
)
|
|
return vsix_url, version, {"publisher": publisher, "extension": extension, "version": version}
|
|
|
|
|
|
def bootstrap_from_vsix() -> None:
|
|
lingma_bin = Path(os.getenv("LINGMA_BIN", "/app/bin/Lingma"))
|
|
source_type = os.getenv("LINGMA_SOURCE_TYPE", "marketplace").strip().lower()
|
|
vsix_url = os.getenv(
|
|
"LINGMA_VSIX_URL",
|
|
"https://tongyi-code.oss-cn-hangzhou.aliyuncs.com/vscode/tongyi-lingma-latest.vsix",
|
|
).strip()
|
|
mp_publisher = os.getenv("LINGMA_MARKETPLACE_PUBLISHER", "Alibaba-Cloud").strip()
|
|
mp_extension = os.getenv("LINGMA_MARKETPLACE_EXTENSION", "tongyi-lingma").strip()
|
|
always_refresh = _bool_env("LINGMA_BOOTSTRAP_ALWAYS", True)
|
|
force_refresh = _bool_env("LINGMA_FORCE_REFRESH", False)
|
|
|
|
if source_type not in {"vsix", "marketplace"}:
|
|
print(f"[bootstrap] skip: LINGMA_SOURCE_TYPE={source_type}")
|
|
return
|
|
|
|
marker_path = lingma_bin.parent / ".lingma-bootstrap.json"
|
|
old_marker = {}
|
|
if marker_path.exists():
|
|
try:
|
|
old_marker = json.loads(marker_path.read_text(encoding="utf-8", errors="ignore"))
|
|
except Exception:
|
|
old_marker = {}
|
|
|
|
resolved_url = vsix_url
|
|
resolved_version = ""
|
|
source_meta = {"source": source_type}
|
|
if source_type == "marketplace":
|
|
try:
|
|
resolved_url, resolved_version, source_meta = _query_marketplace_latest_vsix(
|
|
mp_publisher, mp_extension
|
|
)
|
|
print(
|
|
f"[bootstrap] marketplace latest: {mp_publisher}.{mp_extension} "
|
|
f"version={resolved_version}"
|
|
)
|
|
except Exception as exc:
|
|
print(f"[bootstrap] marketplace query failed, fallback to LINGMA_VSIX_URL: {exc}")
|
|
resolved_url = vsix_url
|
|
|
|
if (
|
|
lingma_bin.exists()
|
|
and not force_refresh
|
|
and (
|
|
(not always_refresh)
|
|
or (resolved_version and old_marker.get("version") == resolved_version)
|
|
)
|
|
):
|
|
os.chmod(lingma_bin, 0o755)
|
|
print(f"[bootstrap] reuse existing Lingma: {lingma_bin}")
|
|
return
|
|
|
|
tmp_dir = Path("/tmp/lingma-bootstrap")
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
vsix_path = tmp_dir / "tongyi-lingma-latest.vsix"
|
|
|
|
print(f"[bootstrap] downloading VSIX: {resolved_url}")
|
|
try:
|
|
with urllib.request.urlopen(resolved_url, timeout=120) as r:
|
|
data = r.read()
|
|
vsix_path.write_bytes(data)
|
|
except Exception as exc:
|
|
if lingma_bin.exists():
|
|
print(f"[bootstrap] download failed, fallback to existing Lingma: {exc}")
|
|
os.chmod(lingma_bin, 0o755)
|
|
return
|
|
raise RuntimeError(f"Failed to download VSIX: {exc}")
|
|
|
|
try:
|
|
with zipfile.ZipFile(vsix_path, "r") as vsix_zip:
|
|
nested_zip_name = _pick_nested_zip(vsix_zip)
|
|
nested_zip_bytes = vsix_zip.read(nested_zip_name)
|
|
|
|
with zipfile.ZipFile(io.BytesIO(nested_zip_bytes), "r") as inner_zip:
|
|
lingma_member = _pick_lingma_binary_path(inner_zip)
|
|
lingma_bytes = inner_zip.read(lingma_member)
|
|
|
|
lingma_bin.parent.mkdir(parents=True, exist_ok=True)
|
|
lingma_bin.write_bytes(lingma_bytes)
|
|
os.chmod(lingma_bin, 0o755)
|
|
|
|
marker = {
|
|
"source": source_type,
|
|
"url": resolved_url,
|
|
"version": resolved_version,
|
|
"downloaded_at": int(time.time()),
|
|
"nested_zip": nested_zip_name,
|
|
"member": lingma_member,
|
|
"size": len(lingma_bytes),
|
|
}
|
|
marker.update(source_meta)
|
|
(lingma_bin.parent / ".lingma-bootstrap.json").write_text(
|
|
json.dumps(marker, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
print(f"[bootstrap] Lingma ready: {lingma_bin} ({len(lingma_bytes)} bytes)")
|
|
except Exception as exc:
|
|
if lingma_bin.exists():
|
|
print(f"[bootstrap] extraction failed, fallback to existing Lingma: {exc}")
|
|
os.chmod(lingma_bin, 0o755)
|
|
return
|
|
raise RuntimeError(f"Failed to extract Lingma from VSIX: {exc}")
|
|
|
|
|
|
def main() -> int:
|
|
bootstrap_from_vsix()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|