363 lines
11 KiB
Python
363 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from typing import Dict, Iterable, List, Sequence
|
|
|
|
|
|
PROXY_VARS = [
|
|
"HTTP_PROXY",
|
|
"HTTPS_PROXY",
|
|
"ALL_PROXY",
|
|
"http_proxy",
|
|
"https_proxy",
|
|
"all_proxy",
|
|
"GIT_HTTP_PROXY",
|
|
"GIT_HTTPS_PROXY",
|
|
"PIP_NO_INDEX",
|
|
]
|
|
|
|
|
|
def resolve_repo_root(start: str | None = None) -> pathlib.Path:
|
|
base = pathlib.Path(start or os.getcwd()).resolve()
|
|
|
|
# Preferred marker for SDT-managed projects.
|
|
for cur in [base, *base.parents]:
|
|
cfg = cur / "devtool.json"
|
|
if cfg.exists():
|
|
hints = load_project_root_hints(cur)
|
|
if not hints:
|
|
return cur
|
|
if any(_hint_matches(cur, hint) for hint in hints):
|
|
return cur
|
|
|
|
# Fall back to git root when available.
|
|
try:
|
|
proc = subprocess.run(
|
|
["git", "-C", str(base), "rev-parse", "--show-toplevel"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if proc.returncode == 0:
|
|
git_root = proc.stdout.strip()
|
|
if git_root:
|
|
return pathlib.Path(git_root).resolve()
|
|
except Exception:
|
|
pass
|
|
|
|
return base
|
|
|
|
|
|
def load_project_root_hints(repo_root: pathlib.Path) -> list[str]:
|
|
cfg = repo_root / "devtool.json"
|
|
if not cfg.exists():
|
|
return []
|
|
try:
|
|
data = json.loads(cfg.read_text(encoding="utf-8"))
|
|
hints = data.get("project", {}).get("rootHints", [])
|
|
return [str(x) for x in hints if isinstance(x, str) and x.strip()]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def ensure_dirs(paths: List[pathlib.Path]) -> None:
|
|
for p in paths:
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def clean_proxy_env(env: Dict[str, str]) -> None:
|
|
for k in PROXY_VARS:
|
|
env.pop(k, None)
|
|
|
|
|
|
def dotnet_env(repo_root: pathlib.Path) -> Dict[str, str]:
|
|
env = dict(os.environ)
|
|
clean_proxy_env(env)
|
|
dotnet_cli_home = repo_root / ".dotnet_home"
|
|
nuget_packages = repo_root / ".nuget" / "packages"
|
|
nuget_http_cache = repo_root / ".nuget" / "http-cache"
|
|
ensure_dirs([dotnet_cli_home, nuget_packages, nuget_http_cache])
|
|
env["DOTNET_CLI_HOME"] = str(dotnet_cli_home)
|
|
env["NUGET_PACKAGES"] = str(nuget_packages)
|
|
env["NUGET_HTTP_CACHE_PATH"] = str(nuget_http_cache)
|
|
env["DOTNET_SKIP_FIRST_TIME_EXPERIENCE"] = "1"
|
|
env["DOTNET_ADD_GLOBAL_TOOLS_TO_PATH"] = "0"
|
|
env["DOTNET_GENERATE_ASPNET_CERTIFICATE"] = "0"
|
|
env["DOTNET_CLI_TELEMETRY_OPTOUT"] = "1"
|
|
env["NUGET_CERT_REVOCATION_MODE"] = "offline"
|
|
return env
|
|
|
|
|
|
def pip_env(repo_root: pathlib.Path) -> Dict[str, str]:
|
|
env = dict(os.environ)
|
|
clean_proxy_env(env)
|
|
pip_cache = repo_root / ".pip" / "cache"
|
|
pip_tmp = repo_root / ".tmp" / "pip-temp"
|
|
ensure_dirs([pip_cache, pip_tmp])
|
|
env["PIP_CACHE_DIR"] = str(pip_cache)
|
|
env["PIP_DISABLE_PIP_VERSION_CHECK"] = "1"
|
|
env["PIP_DEFAULT_TIMEOUT"] = "30"
|
|
env["PIP_RETRIES"] = "2"
|
|
env["TEMP"] = str(pip_tmp)
|
|
env["TMP"] = str(pip_tmp)
|
|
return env
|
|
|
|
|
|
def run(command: str, args: List[str], cwd: pathlib.Path, env: Dict[str, str] | None = None) -> int:
|
|
resolved = resolve_command(command)
|
|
try:
|
|
proc = subprocess.run([resolved, *args], cwd=str(cwd), env=env, check=False)
|
|
return proc.returncode
|
|
except FileNotFoundError:
|
|
print(f"Command not found: {resolved}", file=sys.stderr)
|
|
return 127
|
|
|
|
|
|
def run_capture(command: str, args: Sequence[str], cwd: pathlib.Path, env: Dict[str, str] | None = None) -> tuple[int, str, str]:
|
|
resolved = resolve_command(command)
|
|
try:
|
|
proc = subprocess.run(
|
|
[resolved, *args],
|
|
cwd=str(cwd),
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
return proc.returncode, proc.stdout, proc.stderr
|
|
except FileNotFoundError:
|
|
return 127, "", f"Command not found: {resolved}"
|
|
|
|
|
|
def resolve_command(command: str) -> str:
|
|
if not command:
|
|
return command
|
|
|
|
if os.name != "nt":
|
|
return command
|
|
|
|
if any(sep in command for sep in ("\\", "/")):
|
|
return command
|
|
|
|
if pathlib.Path(command).suffix:
|
|
found = shutil.which(command)
|
|
return found or command
|
|
|
|
candidates = []
|
|
lowered = command.lower()
|
|
if lowered in ("npm", "npx", "pnpm", "yarn", "tauri"):
|
|
candidates.extend([f"{command}.cmd", f"{command}.exe", f"{command}.bat", command])
|
|
else:
|
|
candidates.append(command)
|
|
|
|
for c in candidates:
|
|
found = _which_windows(c)
|
|
if found:
|
|
name = pathlib.Path(found).name.lower()
|
|
if name in ("npm", "npx", "pnpm", "yarn", "tauri"):
|
|
shim = pathlib.Path(found).with_name(name + ".cmd")
|
|
if shim.exists():
|
|
return str(shim)
|
|
return found
|
|
|
|
if lowered in ("npm", "npx", "pnpm", "yarn"):
|
|
node = _which_windows("node.exe") or _which_windows("node")
|
|
if node:
|
|
node_dir = pathlib.Path(node).parent
|
|
shim = node_dir / f"{lowered}.cmd"
|
|
if shim.exists():
|
|
return str(shim)
|
|
|
|
return candidates[-1]
|
|
|
|
|
|
def _hint_matches(root: pathlib.Path, hint: str) -> bool:
|
|
h = hint.strip()
|
|
if not h:
|
|
return False
|
|
|
|
has_glob = any(ch in h for ch in ("*", "?", "["))
|
|
if has_glob:
|
|
# Match both anywhere in root and directly at root-level for common hints like "*.sln".
|
|
if any(root.glob(h)):
|
|
return True
|
|
return any(root.rglob(h))
|
|
|
|
marker = root / h
|
|
if marker.exists():
|
|
return True
|
|
|
|
# If hint is just a filename marker, look bounded in tree.
|
|
if not any(sep in h for sep in ("\\", "/")):
|
|
return any(p.name == h for p in root.rglob(h))
|
|
|
|
return False
|
|
|
|
|
|
def _expand_windows_path_segment(segment: str) -> str:
|
|
expanded = segment
|
|
# Expand %VAR% tokens repeatedly for nested references.
|
|
for _ in range(4):
|
|
next_value = os.path.expandvars(expanded)
|
|
if next_value == expanded:
|
|
break
|
|
expanded = next_value
|
|
return expanded
|
|
|
|
|
|
def _which_windows(command: str) -> str | None:
|
|
found = shutil.which(command)
|
|
if found:
|
|
return found
|
|
|
|
if os.name != "nt":
|
|
return None
|
|
|
|
path_value = os.environ.get("PATH", "")
|
|
pathext = os.environ.get("PATHEXT", ".COM;.EXE;.BAT;.CMD")
|
|
exts = [e.lower() for e in pathext.split(";") if e]
|
|
|
|
has_ext = pathlib.Path(command).suffix != ""
|
|
names = [command] if has_ext else [command, *(command + e.lower() for e in exts)]
|
|
|
|
for raw_segment in path_value.split(os.pathsep):
|
|
segment = _expand_windows_path_segment(raw_segment.strip())
|
|
if not segment:
|
|
continue
|
|
base = pathlib.Path(segment)
|
|
for name in names:
|
|
candidate = base / name
|
|
if candidate.exists():
|
|
return str(candidate)
|
|
|
|
return None
|
|
|
|
|
|
def sha256_files(paths: Iterable[pathlib.Path]) -> str:
|
|
h = hashlib.sha256()
|
|
for p in paths:
|
|
if not p.exists():
|
|
continue
|
|
h.update(p.read_bytes())
|
|
return h.hexdigest()
|
|
|
|
|
|
def first_existing(paths: Iterable[pathlib.Path]) -> pathlib.Path | None:
|
|
for p in paths:
|
|
if p.exists():
|
|
return p
|
|
return None
|
|
|
|
|
|
def find_csproj(repo_root: pathlib.Path, hints: Sequence[str] | None = None) -> pathlib.Path | None:
|
|
if hints:
|
|
for hint in hints:
|
|
candidate = (repo_root / hint).resolve()
|
|
if candidate.exists() and candidate.suffix.lower() == ".csproj":
|
|
return candidate
|
|
|
|
csprojs = sorted(repo_root.rglob("*.csproj"))
|
|
if not csprojs:
|
|
return None
|
|
if len(csprojs) == 1:
|
|
return csprojs[0]
|
|
return None
|
|
|
|
|
|
def find_csproj_by_keyword(repo_root: pathlib.Path, keywords: Sequence[str]) -> pathlib.Path | None:
|
|
kws = [k.lower() for k in keywords]
|
|
matches: list[pathlib.Path] = []
|
|
for p in repo_root.rglob("*.csproj"):
|
|
text = str(p).lower()
|
|
if any(k in text for k in kws):
|
|
matches.append(p)
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
return None
|
|
|
|
|
|
def find_node_app_root(repo_root: pathlib.Path, preferred: str | None = None) -> pathlib.Path | None:
|
|
def _read_package_json(package_json: pathlib.Path) -> dict | None:
|
|
if not package_json.exists():
|
|
return None
|
|
try:
|
|
data = json.loads(package_json.read_text(encoding="utf-8"))
|
|
return data if isinstance(data, dict) else None
|
|
except Exception:
|
|
return None
|
|
|
|
def _has_scripts(package_json: pathlib.Path, names: Sequence[str]) -> bool:
|
|
data = _read_package_json(package_json)
|
|
if not data:
|
|
return False
|
|
scripts = data.get("scripts")
|
|
if not isinstance(scripts, dict):
|
|
return False
|
|
for name in names:
|
|
value = scripts.get(name)
|
|
if isinstance(value, str) and value.strip():
|
|
return True
|
|
return False
|
|
|
|
def _is_tauri_root(candidate_dir: pathlib.Path) -> bool:
|
|
return (candidate_dir / "src-tauri" / "tauri.conf.json").exists() or (candidate_dir / "tauri.conf.json").exists()
|
|
|
|
def _iter_package_jsons() -> list[pathlib.Path]:
|
|
excluded = {"node_modules", "dist", "build", ".git", ".sdt", ".venv", "venv", "bin", "obj"}
|
|
found: list[pathlib.Path] = []
|
|
for current_root, dirs, files in os.walk(repo_root):
|
|
dirs[:] = [d for d in dirs if d not in excluded]
|
|
if "package.json" in files:
|
|
found.append(pathlib.Path(current_root) / "package.json")
|
|
found.sort(key=lambda p: len(p.parts))
|
|
return found
|
|
|
|
if preferred:
|
|
p = (repo_root / preferred).resolve()
|
|
package_json = p / "package.json"
|
|
if package_json.exists():
|
|
# Keep explicit preferred root only when it appears runnable for node workflows.
|
|
if _is_tauri_root(p) or _has_scripts(package_json, ("build", "dev", "start", "test", "tauri")):
|
|
return p
|
|
|
|
package_files = _iter_package_jsons()
|
|
if not package_files:
|
|
return None
|
|
|
|
# Strong preference: a tauri app root with tauri config and package.json.
|
|
tauri_candidates = [p.parent for p in package_files if _is_tauri_root(p.parent)]
|
|
if len(tauri_candidates) == 1:
|
|
return tauri_candidates[0]
|
|
if len(tauri_candidates) > 1:
|
|
tauri_candidates.sort(key=lambda p: len(p.parts))
|
|
return tauri_candidates[0]
|
|
|
|
runnable_candidates = [
|
|
p.parent for p in package_files if _has_scripts(p, ("build", "dev", "start", "test", "tauri"))
|
|
]
|
|
if len(runnable_candidates) == 1:
|
|
return runnable_candidates[0]
|
|
if len(runnable_candidates) > 1:
|
|
runnable_candidates.sort(key=lambda p: len(p.parts))
|
|
return runnable_candidates[0]
|
|
|
|
# As a last fallback, return unique package root only.
|
|
if len(package_files) == 1:
|
|
return package_files[0].parent
|
|
return None
|
|
|
|
|
|
def newest_file(search_root: pathlib.Path, pattern: str) -> pathlib.Path | None:
|
|
if not search_root.exists():
|
|
return None
|
|
files = [p for p in search_root.rglob(pattern) if p.is_file() and "\\obj\\" not in str(p).replace("/", "\\")]
|
|
if not files:
|
|
return None
|
|
files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
return files[0]
|