#!/usr/bin/env python3 import hashlib import json import os import pathlib import shutil import subprocess import sys import time from typing import Any, Iterable, Optional, Sequence # --- Domain: SDT Types --- # SDT Normalized Result Object # result["exit_code"]: int # result["status"]: "ok" | "failed" | "skipped" # result["elapsed_seconds"]: float # result["failure_reason"]: Optional[str] # result["skip_reason"]: Optional[str] SdtResult = dict[str, Any] PROXY_VARS = [ "HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "all_proxy", "GIT_HTTP_PROXY", "GIT_HTTPS_PROXY", "PIP_NO_INDEX" ] # --- Domain: FS Utilities --- def sha256_files(paths: Iterable[pathlib.Path]) -> str: h = hashlib.sha256() for p in sorted(paths): if p.exists(): h.update(p.read_bytes()) return h.hexdigest() def first_existing(paths: Iterable[pathlib.Path]) -> Optional[pathlib.Path]: for p in paths: if p.exists(): return p return None def newest_file(search_root: pathlib.Path, pattern: str) -> Optional[pathlib.Path]: hits = sorted(search_root.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) return hits[0] if hits else None def ensure_dirs(paths: list[pathlib.Path]) -> None: for p in paths: p.mkdir(parents=True, exist_ok=True) def _hint_matches(root: pathlib.Path, hint: str) -> bool: h = hint.strip() if not h: return False try: has_glob = any(ch in h for ch in ("*", "?", "[")) if has_glob: if any(root.glob(h)): return True return any(root.rglob(h)) marker = root / h if marker.exists(): return True # If hint is a plain filename marker, allow bounded search in root tree. if not any(sep in h for sep in ("/", "\\")): return any(p.name == h for p in root.rglob(h)) return False except: return False # --- Domain: Project Discovery --- def resolve_repo_root(start: str | None = None) -> pathlib.Path: base = pathlib.Path(start or os.getcwd()).resolve() for cur in [base, *base.parents]: sdt_configs = list(cur.glob("sdtconfig-*.json")) cfg = sdt_configs[0] if sdt_configs else (cur / "devtool.json") if cfg.exists(): hints = load_project_root_hints(cur, cfg) if not hints or any(_hint_matches(cur, hint) for hint in hints): return cur try: proc = subprocess.run(["git", "-C", str(base), "rev-parse", "--show-toplevel"], capture_output=True, text=True, check=False) if proc.returncode == 0 and proc.stdout.strip(): return pathlib.Path(proc.stdout.strip()).resolve() except: pass return base def load_project_root_hints(repo_root: pathlib.Path, cfg: pathlib.Path | None = None) -> list[str]: if cfg is None: sdt_configs = list(repo_root.glob("sdtconfig-*.json")) cfg = sdt_configs[0] if sdt_configs else (repo_root / "devtool.json") if not cfg.exists(): return [] try: data = json.loads(cfg.read_text(encoding="utf-8")) hints = data.get("project", {}).get("rootHints", []) return [str(x) for x in hints if isinstance(x, str) and x.strip()] except: return [] def _resolve_project_config_path(repo_root: pathlib.Path) -> Optional[pathlib.Path]: sdt_configs = sorted(repo_root.glob("sdtconfig-*.json"), key=lambda p: p.name.lower()) if sdt_configs: return sdt_configs[0] legacy = repo_root / "devtool.json" if legacy.exists(): return legacy return None def load_node_working_dir(repo_root: pathlib.Path) -> Optional[str]: cfg = _resolve_project_config_path(repo_root) if cfg is None: return None try: data = json.loads(cfg.read_text(encoding="utf-8")) node = data.get("toolchains", {}).get("node", {}) value = node.get("workingDir") if isinstance(value, str) and value.strip(): return value.strip() except: pass return None def find_csproj(repo_root: pathlib.Path, hints: Sequence[str] | None = None) -> Optional[pathlib.Path]: if hints: for h in hints: p = (repo_root / h).resolve() if p.exists() and p.suffix == ".csproj": return p hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])] return hits[0] if len(hits) == 1 else None def find_csproj_by_keyword(repo_root: pathlib.Path, keywords: Sequence[str]) -> Optional[pathlib.Path]: hits = [h for h in repo_root.rglob("*.csproj") if not any(x in h.parts for x in [".git", "node_modules", "bin", "obj"])] for kw in keywords: matches = [h for h in hits if kw.lower() in h.name.lower()] if len(matches) == 1: return matches[0] return None def find_node_app_root(repo_root: pathlib.Path, preferred: str | None = None) -> Optional[pathlib.Path]: def _read_pj(p: pathlib.Path): try: return json.loads(p.read_text(encoding="utf-8")) except: return None def _has_scr(p: pathlib.Path, names: Sequence[str]): src = _read_pj(p) return any(src.get("scripts", {}).get(n) for n in names) if src else False def _is_tauri(d: pathlib.Path): return (d / "src-tauri" / "tauri.conf.json").exists() or (d / "tauri.conf.json").exists() def _iter_pj(): excluded = {".git", "node_modules", ".sdt", "dist", "build", ".venv", "venv", "bin", "obj"} for p in repo_root.rglob("package.json"): if any(x in p.parts for x in excluded): continue yield p if not preferred: preferred = load_node_working_dir(repo_root) if preferred: p = (repo_root / preferred).resolve() p = p.parent if p.is_file() else p if (p / "package.json").exists(): return p tauri = [p.parent for p in _iter_pj() if _is_tauri(p.parent)] if len(tauri) == 1: return tauri[0] if len(tauri) > 1: tauri = sorted(set(tauri), key=lambda d: (len(d.parts), str(d).lower())) return tauri[0] scripts = [p.parent for p in _iter_pj() if _has_scr(p, ["tauri", "build"])] if len(scripts) == 1: return scripts[0] if len(scripts) > 1: scripts = sorted(set(scripts), key=lambda d: (len(d.parts), str(d).lower())) return scripts[0] all_pj = [p.parent for p in _iter_pj()] return all_pj[0] if len(all_pj) == 1 else None # --- Domain: Environment Setup --- def clean_proxy_env(env: dict[str, str]) -> None: for k in PROXY_VARS: env.pop(k, None) def dotnet_env(repo_root: pathlib.Path) -> dict[str, str]: env = dict(os.environ) clean_proxy_env(env) h, p, c = repo_root/".dotnet_home", repo_root/".nuget"/"packages", repo_root/".nuget"/"http-cache" ensure_dirs([h, p, c]) env.update({"DOTNET_CLI_HOME":str(h), "NUGET_PACKAGES":str(p), "NUGET_HTTP_CACHE_PATH":str(c), "DOTNET_SKIP_FIRST_TIME_EXPERIENCE":"1", "DOTNET_ADD_GLOBAL_TOOLS_TO_PATH":"0", "DOTNET_GENERATE_ASPNET_CERTIFICATE":"0", "DOTNET_CLI_TELEMETRY_OPTOUT":"1", "NUGET_CERT_REVOCATION_MODE":"offline"}) return env def pip_env(repo_root: pathlib.Path) -> dict[str, str]: env = dict(os.environ) clean_proxy_env(env) c, t = repo_root/".pip"/"cache", repo_root/".tmp"/"pip-temp" ensure_dirs([c, t]) env.update({"PIP_CACHE_DIR":str(c), "PIP_DISABLE_PIP_VERSION_CHECK":"1", "PIP_DEFAULT_TIMEOUT":"30", "PIP_RETRIES":"2", "TEMP":str(t), "TMP":str(t)}) return env # --- Domain: Process Execution --- def resolve_command(command: str) -> str: if not command or os.name != "nt" or any(s in command for s in ("\\", "/")): return command if pathlib.Path(command).suffix: return shutil.which(command) or command low = command.lower() cands = [f"{command}.cmd", f"{command}.exe", f"{command}.bat", command] if low in ("npm", "npx", "pnpm", "yarn", "tauri") else [command] for c in cands: found = _which_windows(c) if found: if pathlib.Path(found).name.lower() in ("npm", "npx", "pnpm", "yarn", "tauri"): shim = pathlib.Path(found).with_name(pathlib.Path(found).name.lower() + ".cmd") if shim.exists(): return str(shim) return found if low in ("npm", "npx", "pnpm", "yarn"): node = _which_windows("node.exe") or _which_windows("node") if node and (pathlib.Path(node).parent / f"{low}.cmd").exists(): return str(pathlib.Path(node).parent / f"{low}.cmd") return cands[-1] def _which_windows(command: str) -> Optional[str]: found = shutil.which(command) if found: return str(pathlib.Path(found).resolve()) for p in ["C:\\Program Files\\dotnet", "C:\\Program Files\\nodejs", "C:\\Program Files\\Git\\bin", "C:\\Program Files\\Git\\usr\\bin"]: if (pathlib.Path(p) / command).exists(): return str((pathlib.Path(p) / command).resolve()) return None def run(command: str, args: list[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> int: try: proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, check=False) return proc.returncode except: return 127 def run_capture(command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> tuple[int, str, str]: try: proc = subprocess.run([resolve_command(command), *args], cwd=str(cwd), env=env, capture_output=True, text=True, check=False, encoding="utf-8", errors="replace") return proc.returncode, proc.stdout, proc.stderr except: return 127, "", "Command not found" def _safe_stream_write(stream: Any, text: str) -> None: if not text: return try: stream.write(text) return except UnicodeEncodeError: pass encoding = getattr(stream, "encoding", None) or "utf-8" data = text.encode(encoding, errors="replace") buffer = getattr(stream, "buffer", None) if buffer is not None: buffer.write(data) buffer.flush() return # Last resort when no binary buffer is exposed. stream.write(data.decode(encoding, errors="replace")) def run_step_with_summary(label: str, command: str, args: Sequence[str], cwd: pathlib.Path, env: dict[str, str] | None = None) -> SdtResult: print(f"\n> {label}\n$ {command} {' '.join(args)}") start = time.time() code, out, err = run_capture(command, args, cwd, env) elapsed = round(time.time() - start, 3) if out: _safe_stream_write(sys.stdout, out) if err: _safe_stream_write(sys.stderr, err) return {"exit_code": code, "status": "ok" if code == 0 else "failed", "elapsed_seconds": elapsed, "stdout": out, "stderr": err, "failure_reason": None, "skip_reason": None} # --- Domain: High-Level Build Logic --- def ensure_dotnet_publish(csproj: pathlib.Path, output_dir: pathlib.Path, configuration: str = "Release", runtime: str = "win-x64", single_file: bool = False, self_contained: bool = True) -> SdtResult: repo_root = resolve_repo_root(str(csproj.parent)) env = dotnet_env(repo_root) args = ["publish", str(csproj), "-c", configuration, "-r", runtime, "--self-contained", "true" if self_contained else "false", f"-p:PublishSingleFile={'true' if single_file else 'false'}", "-p:RestoreIgnoreFailedSources=true", "-p:NuGetAudit=false", "-o", str(output_dir)] if single_file: args.append("-p:IncludeNativeLibrariesForSelfExtract=true") res = run_step_with_summary(f"Publishing {csproj.name}", "dotnet", args, repo_root, env) if res["exit_code"] != 0 and single_file: comb = (res["stdout"] + res["stderr"]).lower() if "generatebundle" in comb and "same bundlerelativepath" in comb: print("Duplicate bundle entries detected; retrying without single-file optimization...") retry = [a if "PublishSingleFile=true" not in a else "-p:PublishSingleFile=false" for a in args] res = run_step_with_summary(f"Publishing {csproj.name} (retry)", "dotnet", retry, repo_root, env) if res["exit_code"] != 0: comb = (res["stdout"] + res["stderr"]).lower() if "netsdk1152" in comb and "multiple publish output files with the same relative path" in comb: print("Duplicate publish output files detected (NETSDK1152); retrying with ErrorOnDuplicatePublishOutputFiles=false...") retry = list(args) if not any("ErrorOnDuplicatePublishOutputFiles" in a for a in retry): retry.append("-p:ErrorOnDuplicatePublishOutputFiles=false") res = run_step_with_summary(f"Publishing {csproj.name} (dedupe retry)", "dotnet", retry, repo_root, env) return res def ensure_npm_build(app_root: pathlib.Path, target: str = "web", configuration: str = "Release", tauri_bundles: str = "none") -> SdtResult: pj, lock = app_root / "package.json", first_existing([app_root / "package-lock.json", app_root / "npm-shrinkwrap.json"]) nm = app_root / "node_modules" h_file, exp_h = nm / ".sdt-deps.sha256", sha256_files([pj, lock] if lock else [pj]) should_inst = not nm.exists() or not h_file.exists() or h_file.read_text(encoding="utf-8").strip() != exp_h if should_inst: args = ["ci", "--no-audit", "--fund=false"] if lock else ["install", "--no-audit", "--fund=false"] res = run_step_with_summary("Installing NPM dependencies", "npm", args, app_root) if res["exit_code"] != 0 and lock and args[0] == "ci": res = run_step_with_summary("Installing NPM dependencies (retry)", "npm", ["install", "--no-audit", "--fund=false"], app_root) if res["exit_code"] != 0: return res ensure_dirs([nm]); h_file.write_text(exp_h, encoding="utf-8") if target == "web": return run_step_with_summary(f"Building Web ({configuration})", "npm", ["run", "build"], app_root) t_cmd = ["run", "tauri", "build"] tail = (["--no-bundle"] if tauri_bundles == "none" else ["--bundles", tauri_bundles]) + (["--debug"] if configuration == "Debug" else []) if tail: t_cmd.extend(["--", *tail]) return run_step_with_summary(f"Building Tauri ({configuration})", "npm", t_cmd, app_root)