304 lines
14 KiB
Python
304 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from typing import Any, Iterable, Optional, Sequence
|
|
|
|
# --- Domain: SDT Types ---
|
|
# SDT Normalized Result Object
|
|
# result["exit_code"]: int
|
|
# result["status"]: "ok" | "failed" | "skipped"
|
|
# result["elapsed_seconds"]: float
|
|
# result["failure_reason"]: Optional[str]
|
|
# result["skip_reason"]: Optional[str]
|
|
SdtResult = dict[str, Any]
|
|
|
|
PROXY_VARS = [
|
|
"HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy",
|
|
"GIT_HTTP_PROXY", "GIT_HTTPS_PROXY", "PIP_NO_INDEX"
|
|
]
|
|
|
|
# --- Domain: FS Utilities ---
|
|
|
|
def sha256_files(paths: Iterable[pathlib.Path]) -> str:
|
|
h = hashlib.sha256()
|
|
for p in sorted(paths):
|
|
if p.exists(): h.update(p.read_bytes())
|
|
return h.hexdigest()
|
|
|
|
def first_existing(paths: Iterable[pathlib.Path]) -> Optional[pathlib.Path]:
|
|
for p in paths:
|
|
if p.exists(): return p
|
|
return None
|
|
|
|
def newest_file(search_root: pathlib.Path, pattern: str) -> Optional[pathlib.Path]:
|
|
hits = sorted(search_root.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
return hits[0] if hits else None
|
|
|
|
def ensure_dirs(paths: list[pathlib.Path]) -> None:
|
|
for p in paths: p.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _hint_matches(root: pathlib.Path, hint: str) -> bool:
|
|
h = hint.strip()
|
|
if not h:
|
|
return False
|
|
try:
|
|
has_glob = any(ch in h for ch in ("*", "?", "["))
|
|
if has_glob:
|
|
if any(root.glob(h)):
|
|
return True
|
|
return any(root.rglob(h))
|
|
|
|
marker = root / h
|
|
if marker.exists():
|
|
return True
|
|
|
|
# If hint is a plain filename marker, allow bounded search in root tree.
|
|
if not any(sep in h for sep in ("/", "\\")):
|
|
return any(p.name == h for p in root.rglob(h))
|
|
|
|
return False
|
|
except:
|
|
return False
|
|
|
|
# --- Domain: Project Discovery ---
|
|
|
|
def resolve_repo_root(start: str | None = None) -> pathlib.Path:
|
|
base = pathlib.Path(start or os.getcwd()).resolve()
|
|
for cur in [base, *base.parents]:
|
|
sdt_configs = list(cur.glob("sdtconfig-*.json"))
|
|
cfg = sdt_configs[0] if sdt_configs else (cur / "devtool.json")
|
|
if cfg.exists():
|
|
hints = load_project_root_hints(cur, cfg)
|
|
if not hints or any(_hint_matches(cur, hint) for hint in hints): return cur
|
|
try:
|
|
proc = subprocess.run(["git", "-C", str(base), "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=False)
|
|
if proc.returncode == 0 and proc.stdout.strip(): return pathlib.Path(proc.stdout.strip()).resolve()
|
|
except: pass
|
|
return base
|
|
|
|
def load_project_root_hints(repo_root: pathlib.Path, cfg: pathlib.Path | None = None) -> list[str]:
|
|
if cfg is None:
|
|
sdt_configs = list(repo_root.glob("sdtconfig-*.json"))
|
|
cfg = sdt_configs[0] if sdt_configs else (repo_root / "devtool.json")
|
|
if not cfg.exists(): return []
|
|
try:
|
|
data = json.loads(cfg.read_text(encoding="utf-8"))
|
|
hints = data.get("project", {}).get("rootHints", [])
|
|
return [str(x) for x in hints if isinstance(x, str) and x.strip()]
|
|
except: return []
|
|
|
|
def _resolve_project_config_path(repo_root: pathlib.Path) -> Optional[pathlib.Path]:
|
|
sdt_configs = sorted(repo_root.glob("sdtconfig-*.json"), key=lambda p: p.name.lower())
|
|
if sdt_configs:
|
|
return sdt_configs[0]
|
|
legacy = repo_root / "devtool.json"
|
|
if legacy.exists():
|
|
return legacy
|
|
return None
|
|
|
|
def load_node_working_dir(repo_root: pathlib.Path) -> Optional[str]:
|
|
cfg = _resolve_project_config_path(repo_root)
|
|
if cfg is None:
|
|
return None
|
|
try:
|
|
data = json.loads(cfg.read_text(encoding="utf-8"))
|
|
node = data.get("toolchains", {}).get("node", {})
|
|
value = node.get("workingDir")
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
def find_csproj(repo_root: pathlib.Path, hints: Sequence[str] | None = None) -> Optional[pathlib.Path]:
|
|
if hints:
|
|
for h in hints:
|
|
p = (repo_root / h).resolve()
|
|
if p.exists() and p.suffix == ".csproj": return p
|
|
hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])]
|
|
return hits[0] if len(hits) == 1 else None
|
|
|
|
def find_csproj_by_keyword(repo_root: pathlib.Path, keywords: Sequence[str]) -> Optional[pathlib.Path]:
|
|
hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])]
|
|
for kw in keywords:
|
|
matches = [h for h in hits if kw.lower() in h.name.lower()]
|
|
if len(matches) == 1: return matches[0]
|
|
return None
|
|
|
|
def find_node_app_root(repo_root: pathlib.Path, preferred: str | None = None) -> Optional[pathlib.Path]:
|
|
def _read_pj(p: pathlib.Path):
|
|
try: return json.loads(p.read_text(encoding="utf-8"))
|
|
except: return None
|
|
def _has_scr(p: pathlib.Path, names: Sequence[str]):
|
|
src = _read_pj(p)
|
|
return any(src.get("scripts", {}).get(n) for n in names) if src else False
|
|
def _is_tauri(d: pathlib.Path): return (d / "src-tauri" / "tauri.conf.json").exists() or (d / "tauri.conf.json").exists()
|
|
def _iter_pj():
|
|
excluded = {".git", "node_modules", ".sdt", "dist", "build", ".venv", "venv", "bin", "obj"}
|
|
for p in repo_root.rglob("package.json"):
|
|
if any(x in p.parts for x in excluded):
|
|
continue
|
|
yield p
|
|
|
|
if not preferred:
|
|
preferred = load_node_working_dir(repo_root)
|
|
if preferred:
|
|
p = (repo_root / preferred).resolve()
|
|
p = p.parent if p.is_file() else p
|
|
if (p / "package.json").exists():
|
|
return p
|
|
|
|
tauri = [p.parent for p in _iter_pj() if _is_tauri(p.parent)]
|
|
if len(tauri) == 1: return tauri[0]
|
|
if len(tauri) > 1:
|
|
tauri = sorted(set(tauri), key=lambda d: (len(d.parts), str(d).lower()))
|
|
return tauri[0]
|
|
scripts = [p.parent for p in _iter_pj() if _has_scr(p, ["tauri", "build"])]
|
|
if len(scripts) == 1: return scripts[0]
|
|
if len(scripts) > 1:
|
|
scripts = sorted(set(scripts), key=lambda d: (len(d.parts), str(d).lower()))
|
|
return scripts[0]
|
|
|
|
all_pj = [p.parent for p in _iter_pj()]
|
|
return all_pj[0] if len(all_pj) == 1 else None
|
|
|
|
# --- Domain: Environment Setup ---
|
|
|
|
def clean_proxy_env(env: dict[str, str]) -> None:
|
|
for k in PROXY_VARS: env.pop(k, None)
|
|
|
|
def dotnet_env(repo_root: pathlib.Path) -> dict[str, str]:
|
|
env = dict(os.environ)
|
|
clean_proxy_env(env)
|
|
h, p, c = repo_root/".dotnet_home", repo_root/".nuget"/"packages", repo_root/".nuget"/"http-cache"
|
|
ensure_dirs([h, p, c])
|
|
env.update({"DOTNET_CLI_HOME":str(h), "NUGET_PACKAGES":str(p), "NUGET_HTTP_CACHE_PATH":str(c),
|
|
"DOTNET_SKIP_FIRST_TIME_EXPERIENCE":"1", "DOTNET_ADD_GLOBAL_TOOLS_TO_PATH":"0",
|
|
"DOTNET_GENERATE_ASPNET_CERTIFICATE":"0",
|
|
"DOTNET_CLI_TELEMETRY_OPTOUT":"1", "NUGET_CERT_REVOCATION_MODE":"offline"})
|
|
return env
|
|
|
|
def pip_env(repo_root: pathlib.Path) -> dict[str, str]:
|
|
env = dict(os.environ)
|
|
clean_proxy_env(env)
|
|
c, t = repo_root/".pip"/"cache", repo_root/".tmp"/"pip-temp"
|
|
ensure_dirs([c, t])
|
|
env.update({"PIP_CACHE_DIR":str(c), "PIP_DISABLE_PIP_VERSION_CHECK":"1", "PIP_DEFAULT_TIMEOUT":"30", "PIP_RETRIES":"2", "TEMP":str(t), "TMP":str(t)})
|
|
return env
|
|
|
|
# --- Domain: Process Execution ---
|
|
|
|
def resolve_command(command: str) -> str:
|
|
if not command or os.name != "nt" or any(s in command for s in ("\\", "/")): return command
|
|
if pathlib.Path(command).suffix: return shutil.which(command) or command
|
|
low = command.lower()
|
|
cands = [f"{command}.cmd", f"{command}.exe", f"{command}.bat", command] if low in ("npm", "npx", "pnpm", "yarn", "tauri") else [command]
|
|
for c in cands:
|
|
found = _which_windows(c)
|
|
if found:
|
|
if pathlib.Path(found).name.lower() in ("npm", "npx", "pnpm", "yarn", "tauri"):
|
|
shim = pathlib.Path(found).with_name(pathlib.Path(found).name.lower() + ".cmd")
|
|
if shim.exists(): return str(shim)
|
|
return found
|
|
if low in ("npm", "npx", "pnpm", "yarn"):
|
|
node = _which_windows("node.exe") or _which_windows("node")
|
|
if node and (pathlib.Path(node).parent / f"{low}.cmd").exists(): return str(pathlib.Path(node).parent / f"{low}.cmd")
|
|
return cands[-1]
|
|
|
|
def _which_windows(command: str) -> Optional[str]:
|
|
found = shutil.which(command)
|
|
if found: return str(pathlib.Path(found).resolve())
|
|
for p in ["C:\\Program Files\\dotnet", "C:\\Program Files\\nodejs", "C:\\Program Files\\Git\\bin", "C:\\Program Files\\Git\\usr\\bin"]:
|
|
if (pathlib.Path(p) / command).exists(): return str((pathlib.Path(p) / command).resolve())
|
|
return None
|
|
|
|
def run(command: str, args: list[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> int:
|
|
try:
|
|
proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, check=False)
|
|
return proc.returncode
|
|
except: return 127
|
|
|
|
def run_capture(command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> tuple[int, str, str]:
|
|
try:
|
|
proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, capture_output=True, text=True, check=False, encoding="utf-8", errors="replace")
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
except: return 127, "", "Command not found"
|
|
|
|
def _safe_stream_write(stream: Any, text: str) -> None:
|
|
if not text:
|
|
return
|
|
try:
|
|
stream.write(text)
|
|
return
|
|
except UnicodeEncodeError:
|
|
pass
|
|
|
|
encoding = getattr(stream, "encoding", None) or "utf-8"
|
|
data = text.encode(encoding, errors="replace")
|
|
buffer = getattr(stream, "buffer", None)
|
|
if buffer is not None:
|
|
buffer.write(data)
|
|
buffer.flush()
|
|
return
|
|
|
|
# Last resort when no binary buffer is exposed.
|
|
stream.write(data.decode(encoding, errors="replace"))
|
|
|
|
def run_step_with_summary(label: str, command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> SdtResult:
|
|
print(f"\n> {label}\n$ {command} {' '.join(args)}")
|
|
start = time.time()
|
|
code, out, err = run_capture(command, args, cwd, env)
|
|
elapsed = round(time.time() - start, 3)
|
|
if out: _safe_stream_write(sys.stdout, out)
|
|
if err: _safe_stream_write(sys.stderr, err)
|
|
return {"exit_code": code, "status": "ok" if code == 0 else "failed", "elapsed_seconds": elapsed, "stdout": out, "stderr": err, "failure_reason": None, "skip_reason": None}
|
|
|
|
# --- Domain: High-Level Build Logic ---
|
|
|
|
def ensure_dotnet_publish(csproj: pathlib.Path, output_dir: pathlib.Path, configuration: str = "Release", runtime: str = "win-x64", single_file: bool = False, self_contained: bool = True) -> SdtResult:
|
|
repo_root = resolve_repo_root(str(csproj.parent))
|
|
env = dotnet_env(repo_root)
|
|
args = ["publish", str(csproj), "-c", configuration, "-r", runtime, "--self-contained", "true" if self_contained else "false",
|
|
f"-p:PublishSingleFile={'true' if single_file else 'false'}", "-p:RestoreIgnoreFailedSources=true", "-p:NuGetAudit=false", "-o", str(output_dir)]
|
|
if single_file: args.append("-p:IncludeNativeLibrariesForSelfExtract=true")
|
|
res = run_step_with_summary(f"Publishing {csproj.name}", "dotnet", args, repo_root, env)
|
|
if res["exit_code"] != 0 and single_file:
|
|
comb = (res["stdout"] + res["stderr"]).lower()
|
|
if "generatebundle" in comb and "same bundlerelativepath" in comb:
|
|
print("Duplicate bundle entries detected; retrying without single-file optimization...")
|
|
retry = [a if "PublishSingleFile=true" not in a else "-p:PublishSingleFile=false" for a in args]
|
|
res = run_step_with_summary(f"Publishing {csproj.name} (retry)", "dotnet", retry, repo_root, env)
|
|
if res["exit_code"] != 0:
|
|
comb = (res["stdout"] + res["stderr"]).lower()
|
|
if "netsdk1152" in comb and "multiple publish output files with the same relative path" in comb:
|
|
print("Duplicate publish output files detected (NETSDK1152); retrying with ErrorOnDuplicatePublishOutputFiles=false...")
|
|
retry = list(args)
|
|
if not any("ErrorOnDuplicatePublishOutputFiles" in a for a in retry):
|
|
retry.append("-p:ErrorOnDuplicatePublishOutputFiles=false")
|
|
res = run_step_with_summary(f"Publishing {csproj.name} (dedupe retry)", "dotnet", retry, repo_root, env)
|
|
return res
|
|
|
|
def ensure_npm_build(app_root: pathlib.Path, target: str = "web", configuration: str = "Release", tauri_bundles: str = "none") -> SdtResult:
|
|
pj, lock = app_root / "package.json", first_existing([app_root / "package-lock.json", app_root / "npm-shrinkwrap.json"])
|
|
nm = app_root / "node_modules"
|
|
h_file, exp_h = nm / ".sdt-deps.sha256", sha256_files([pj, lock] if lock else [pj])
|
|
should_inst = not nm.exists() or not h_file.exists() or h_file.read_text(encoding="utf-8").strip() != exp_h
|
|
if should_inst:
|
|
args = ["ci", "--no-audit", "--fund=false"] if lock else ["install", "--no-audit", "--fund=false"]
|
|
res = run_step_with_summary("Installing NPM dependencies", "npm", args, app_root)
|
|
if res["exit_code"] != 0 and lock and args[0] == "ci":
|
|
res = run_step_with_summary("Installing NPM dependencies (retry)", "npm", ["install", "--no-audit", "--fund=false"], app_root)
|
|
if res["exit_code"] != 0: return res
|
|
ensure_dirs([nm]); h_file.write_text(exp_h, encoding="utf-8")
|
|
if target == "web": return run_step_with_summary(f"Building Web ({configuration})", "npm", ["run", "build"], app_root)
|
|
t_cmd = ["run", "tauri", "build"]
|
|
tail = (["--no-bundle"] if tauri_bundles == "none" else ["--bundles", tauri_bundles]) + (["--debug"] if configuration == "Debug" else [])
|
|
if tail: t_cmd.extend(["--", *tail])
|
|
return run_step_with_summary(f"Building Tauri ({configuration})", "npm", t_cmd, app_root)
|