journal/Journal.DevTool/scripts/script_common.py
2026-03-04 16:42:04 -06:00

304 lines
14 KiB
Python

#!/usr/bin/env python3
import hashlib
import json
import os
import pathlib
import shutil
import subprocess
import sys
import time
from typing import Any, Iterable, Optional, Sequence
# --- Domain: SDT Types ---
# SDT Normalized Result Object
# result["exit_code"]: int
# result["status"]: "ok" | "failed" | "skipped"
# result["elapsed_seconds"]: float
# result["failure_reason"]: Optional[str]
# result["skip_reason"]: Optional[str]
SdtResult = dict[str, Any]
PROXY_VARS = [
"HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy",
"GIT_HTTP_PROXY", "GIT_HTTPS_PROXY", "PIP_NO_INDEX"
]
# --- Domain: FS Utilities ---
def sha256_files(paths: Iterable[pathlib.Path]) -> str:
h = hashlib.sha256()
for p in sorted(paths):
if p.exists(): h.update(p.read_bytes())
return h.hexdigest()
def first_existing(paths: Iterable[pathlib.Path]) -> Optional[pathlib.Path]:
for p in paths:
if p.exists(): return p
return None
def newest_file(search_root: pathlib.Path, pattern: str) -> Optional[pathlib.Path]:
hits = sorted(search_root.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
return hits[0] if hits else None
def ensure_dirs(paths: list[pathlib.Path]) -> None:
for p in paths: p.mkdir(parents=True, exist_ok=True)
def _hint_matches(root: pathlib.Path, hint: str) -> bool:
h = hint.strip()
if not h:
return False
try:
has_glob = any(ch in h for ch in ("*", "?", "["))
if has_glob:
if any(root.glob(h)):
return True
return any(root.rglob(h))
marker = root / h
if marker.exists():
return True
# If hint is a plain filename marker, allow bounded search in root tree.
if not any(sep in h for sep in ("/", "\\")):
return any(p.name == h for p in root.rglob(h))
return False
except:
return False
# --- Domain: Project Discovery ---
def resolve_repo_root(start: str | None = None) -> pathlib.Path:
base = pathlib.Path(start or os.getcwd()).resolve()
for cur in [base, *base.parents]:
sdt_configs = list(cur.glob("sdtconfig-*.json"))
cfg = sdt_configs[0] if sdt_configs else (cur / "devtool.json")
if cfg.exists():
hints = load_project_root_hints(cur, cfg)
if not hints or any(_hint_matches(cur, hint) for hint in hints): return cur
try:
proc = subprocess.run(["git", "-C", str(base), "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=False)
if proc.returncode == 0 and proc.stdout.strip(): return pathlib.Path(proc.stdout.strip()).resolve()
except: pass
return base
def load_project_root_hints(repo_root: pathlib.Path, cfg: pathlib.Path | None = None) -> list[str]:
if cfg is None:
sdt_configs = list(repo_root.glob("sdtconfig-*.json"))
cfg = sdt_configs[0] if sdt_configs else (repo_root / "devtool.json")
if not cfg.exists(): return []
try:
data = json.loads(cfg.read_text(encoding="utf-8"))
hints = data.get("project", {}).get("rootHints", [])
return [str(x) for x in hints if isinstance(x, str) and x.strip()]
except: return []
def _resolve_project_config_path(repo_root: pathlib.Path) -> Optional[pathlib.Path]:
sdt_configs = sorted(repo_root.glob("sdtconfig-*.json"), key=lambda p: p.name.lower())
if sdt_configs:
return sdt_configs[0]
legacy = repo_root / "devtool.json"
if legacy.exists():
return legacy
return None
def load_node_working_dir(repo_root: pathlib.Path) -> Optional[str]:
cfg = _resolve_project_config_path(repo_root)
if cfg is None:
return None
try:
data = json.loads(cfg.read_text(encoding="utf-8"))
node = data.get("toolchains", {}).get("node", {})
value = node.get("workingDir")
if isinstance(value, str) and value.strip():
return value.strip()
except:
pass
return None
def find_csproj(repo_root: pathlib.Path, hints: Sequence[str] | None = None) -> Optional[pathlib.Path]:
if hints:
for h in hints:
p = (repo_root / h).resolve()
if p.exists() and p.suffix == ".csproj": return p
hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])]
return hits[0] if len(hits) == 1 else None
def find_csproj_by_keyword(repo_root: pathlib.Path, keywords: Sequence[str]) -> Optional[pathlib.Path]:
hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])]
for kw in keywords:
matches = [h for h in hits if kw.lower() in h.name.lower()]
if len(matches) == 1: return matches[0]
return None
def find_node_app_root(repo_root: pathlib.Path, preferred: str | None = None) -> Optional[pathlib.Path]:
def _read_pj(p: pathlib.Path):
try: return json.loads(p.read_text(encoding="utf-8"))
except: return None
def _has_scr(p: pathlib.Path, names: Sequence[str]):
src = _read_pj(p)
return any(src.get("scripts", {}).get(n) for n in names) if src else False
def _is_tauri(d: pathlib.Path): return (d / "src-tauri" / "tauri.conf.json").exists() or (d / "tauri.conf.json").exists()
def _iter_pj():
excluded = {".git", "node_modules", ".sdt", "dist", "build", ".venv", "venv", "bin", "obj"}
for p in repo_root.rglob("package.json"):
if any(x in p.parts for x in excluded):
continue
yield p
if not preferred:
preferred = load_node_working_dir(repo_root)
if preferred:
p = (repo_root / preferred).resolve()
p = p.parent if p.is_file() else p
if (p / "package.json").exists():
return p
tauri = [p.parent for p in _iter_pj() if _is_tauri(p.parent)]
if len(tauri) == 1: return tauri[0]
if len(tauri) > 1:
tauri = sorted(set(tauri), key=lambda d: (len(d.parts), str(d).lower()))
return tauri[0]
scripts = [p.parent for p in _iter_pj() if _has_scr(p, ["tauri", "build"])]
if len(scripts) == 1: return scripts[0]
if len(scripts) > 1:
scripts = sorted(set(scripts), key=lambda d: (len(d.parts), str(d).lower()))
return scripts[0]
all_pj = [p.parent for p in _iter_pj()]
return all_pj[0] if len(all_pj) == 1 else None
# --- Domain: Environment Setup ---
def clean_proxy_env(env: dict[str, str]) -> None:
for k in PROXY_VARS: env.pop(k, None)
def dotnet_env(repo_root: pathlib.Path) -> dict[str, str]:
env = dict(os.environ)
clean_proxy_env(env)
h, p, c = repo_root/".dotnet_home", repo_root/".nuget"/"packages", repo_root/".nuget"/"http-cache"
ensure_dirs([h, p, c])
env.update({"DOTNET_CLI_HOME":str(h), "NUGET_PACKAGES":str(p), "NUGET_HTTP_CACHE_PATH":str(c),
"DOTNET_SKIP_FIRST_TIME_EXPERIENCE":"1", "DOTNET_ADD_GLOBAL_TOOLS_TO_PATH":"0",
"DOTNET_GENERATE_ASPNET_CERTIFICATE":"0",
"DOTNET_CLI_TELEMETRY_OPTOUT":"1", "NUGET_CERT_REVOCATION_MODE":"offline"})
return env
def pip_env(repo_root: pathlib.Path) -> dict[str, str]:
env = dict(os.environ)
clean_proxy_env(env)
c, t = repo_root/".pip"/"cache", repo_root/".tmp"/"pip-temp"
ensure_dirs([c, t])
env.update({"PIP_CACHE_DIR":str(c), "PIP_DISABLE_PIP_VERSION_CHECK":"1", "PIP_DEFAULT_TIMEOUT":"30", "PIP_RETRIES":"2", "TEMP":str(t), "TMP":str(t)})
return env
# --- Domain: Process Execution ---
def resolve_command(command: str) -> str:
if not command or os.name != "nt" or any(s in command for s in ("\\", "/")): return command
if pathlib.Path(command).suffix: return shutil.which(command) or command
low = command.lower()
cands = [f"{command}.cmd", f"{command}.exe", f"{command}.bat", command] if low in ("npm", "npx", "pnpm", "yarn", "tauri") else [command]
for c in cands:
found = _which_windows(c)
if found:
if pathlib.Path(found).name.lower() in ("npm", "npx", "pnpm", "yarn", "tauri"):
shim = pathlib.Path(found).with_name(pathlib.Path(found).name.lower() + ".cmd")
if shim.exists(): return str(shim)
return found
if low in ("npm", "npx", "pnpm", "yarn"):
node = _which_windows("node.exe") or _which_windows("node")
if node and (pathlib.Path(node).parent / f"{low}.cmd").exists(): return str(pathlib.Path(node).parent / f"{low}.cmd")
return cands[-1]
def _which_windows(command: str) -> Optional[str]:
found = shutil.which(command)
if found: return str(pathlib.Path(found).resolve())
for p in ["C:\\Program Files\\dotnet", "C:\\Program Files\\nodejs", "C:\\Program Files\\Git\\bin", "C:\\Program Files\\Git\\usr\\bin"]:
if (pathlib.Path(p) / command).exists(): return str((pathlib.Path(p) / command).resolve())
return None
def run(command: str, args: list[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> int:
try:
proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, check=False)
return proc.returncode
except: return 127
def run_capture(command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> tuple[int, str, str]:
try:
proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, capture_output=True, text=True, check=False, encoding="utf-8", errors="replace")
return proc.returncode, proc.stdout, proc.stderr
except: return 127, "", "Command not found"
def _safe_stream_write(stream: Any, text: str) -> None:
if not text:
return
try:
stream.write(text)
return
except UnicodeEncodeError:
pass
encoding = getattr(stream, "encoding", None) or "utf-8"
data = text.encode(encoding, errors="replace")
buffer = getattr(stream, "buffer", None)
if buffer is not None:
buffer.write(data)
buffer.flush()
return
# Last resort when no binary buffer is exposed.
stream.write(data.decode(encoding, errors="replace"))
def run_step_with_summary(label: str, command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> SdtResult:
print(f"\n> {label}\n$ {command} {' '.join(args)}")
start = time.time()
code, out, err = run_capture(command, args, cwd, env)
elapsed = round(time.time() - start, 3)
if out: _safe_stream_write(sys.stdout, out)
if err: _safe_stream_write(sys.stderr, err)
return {"exit_code": code, "status": "ok" if code == 0 else "failed", "elapsed_seconds": elapsed, "stdout": out, "stderr": err, "failure_reason": None, "skip_reason": None}
# --- Domain: High-Level Build Logic ---
def ensure_dotnet_publish(csproj: pathlib.Path, output_dir: pathlib.Path, configuration: str = "Release", runtime: str = "win-x64", single_file: bool = False, self_contained: bool = True) -> SdtResult:
repo_root = resolve_repo_root(str(csproj.parent))
env = dotnet_env(repo_root)
args = ["publish", str(csproj), "-c", configuration, "-r", runtime, "--self-contained", "true" if self_contained else "false",
f"-p:PublishSingleFile={'true' if single_file else 'false'}", "-p:RestoreIgnoreFailedSources=true", "-p:NuGetAudit=false", "-o", str(output_dir)]
if single_file: args.append("-p:IncludeNativeLibrariesForSelfExtract=true")
res = run_step_with_summary(f"Publishing {csproj.name}", "dotnet", args, repo_root, env)
if res["exit_code"] != 0 and single_file:
comb = (res["stdout"] + res["stderr"]).lower()
if "generatebundle" in comb and "same bundlerelativepath" in comb:
print("Duplicate bundle entries detected; retrying without single-file optimization...")
retry = [a if "PublishSingleFile=true" not in a else "-p:PublishSingleFile=false" for a in args]
res = run_step_with_summary(f"Publishing {csproj.name} (retry)", "dotnet", retry, repo_root, env)
if res["exit_code"] != 0:
comb = (res["stdout"] + res["stderr"]).lower()
if "netsdk1152" in comb and "multiple publish output files with the same relative path" in comb:
print("Duplicate publish output files detected (NETSDK1152); retrying with ErrorOnDuplicatePublishOutputFiles=false...")
retry = list(args)
if not any("ErrorOnDuplicatePublishOutputFiles" in a for a in retry):
retry.append("-p:ErrorOnDuplicatePublishOutputFiles=false")
res = run_step_with_summary(f"Publishing {csproj.name} (dedupe retry)", "dotnet", retry, repo_root, env)
return res
def ensure_npm_build(app_root: pathlib.Path, target: str = "web", configuration: str = "Release", tauri_bundles: str = "none") -> SdtResult:
pj, lock = app_root / "package.json", first_existing([app_root / "package-lock.json", app_root / "npm-shrinkwrap.json"])
nm = app_root / "node_modules"
h_file, exp_h = nm / ".sdt-deps.sha256", sha256_files([pj, lock] if lock else [pj])
should_inst = not nm.exists() or not h_file.exists() or h_file.read_text(encoding="utf-8").strip() != exp_h
if should_inst:
args = ["ci", "--no-audit", "--fund=false"] if lock else ["install", "--no-audit", "--fund=false"]
res = run_step_with_summary("Installing NPM dependencies", "npm", args, app_root)
if res["exit_code"] != 0 and lock and args[0] == "ci":
res = run_step_with_summary("Installing NPM dependencies (retry)", "npm", ["install", "--no-audit", "--fund=false"], app_root)
if res["exit_code"] != 0: return res
ensure_dirs([nm]); h_file.write_text(exp_h, encoding="utf-8")
if target == "web": return run_step_with_summary(f"Building Web ({configuration})", "npm", ["run", "build"], app_root)
t_cmd = ["run", "tauri", "build"]
tail = (["--no-bundle"] if tauri_bundles == "none" else ["--bundles", tauri_bundles]) + (["--debug"] if configuration == "Debug" else [])
if tail: t_cmd.extend(["--", *tail])
return run_step_with_summary(f"Building Tauri ({configuration})", "npm", t_cmd, app_root)