SDT/scripts/verify-workflow-routes.py
2026-03-04 16:40:57 -06:00

301 lines
10 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import pathlib
import shutil
import subprocess
import sys
from typing import Any, Optional, Sequence
from script_common import resolve_command, resolve_repo_root
def load_config(project_root: pathlib.Path) -> dict[str, Any]:
sdt_configs = list(project_root.glob("sdtconfig-*.json"))
config_path = sdt_configs[0] if sdt_configs else (project_root / "devtool.json")
if not config_path.exists():
raise FileNotFoundError(f"Project config not found at: {config_path}")
return json.loads(config_path.read_text(encoding="utf-8"))
def iter_workflows(config: dict[str, Any], selected: Optional[set[str]]) -> list[dict[str, Any]]:
workflows = config.get("workflows", [])
if not isinstance(workflows, list):
return []
normalized: list[dict[str, Any]] = [w for w in workflows if isinstance(w, dict) and isinstance(w.get("id"), str)]
if selected:
normalized = [w for w in normalized if w["id"] in selected]
return normalized
def is_command_available(command: str) -> bool:
resolved = resolve_command(command)
if pathlib.Path(resolved).is_file():
return True
return shutil.which(resolved) is not None
def resolve_script_arg(project_root: pathlib.Path, working_dir: pathlib.Path, arg: str) -> pathlib.Path:
p = pathlib.Path(arg)
if p.is_absolute():
return p
a = working_dir / p
if a.exists():
return a
b = project_root / p
return b
def static_check_workflow(project_root: pathlib.Path, workflow: dict[str, Any]) -> dict[str, Any]:
result: dict[str, Any] = {
"workflowId": workflow.get("id"),
"ok": True,
"issues": [],
"steps": [],
}
steps = workflow.get("steps", [])
if not isinstance(steps, list):
return result
for step in steps:
if not isinstance(step, dict):
continue
step_id = step.get("id", "<unknown>")
step_result: dict[str, Any] = {"stepId": step_id, "ok": True, "issues": []}
working_dir_rel = str(step.get("workingDir") or ".")
working_dir = (project_root / working_dir_rel).resolve()
if not working_dir.exists():
step_result["ok"] = False
step_result["issues"].append(f"workingDir_not_found:{working_dir}")
command = step.get("command")
args = step.get("args") or []
action = step.get("action")
if isinstance(command, str) and command.strip():
if not is_command_available(command):
step_result["ok"] = False
step_result["issues"].append(f"command_not_found:{command}")
if command.lower() in ("python", "py", "python3", "python.exe", "py.exe"):
if isinstance(args, list) and args and isinstance(args[0], str) and args[0].endswith(".py"):
script_path = resolve_script_arg(project_root, working_dir, args[0])
if not script_path.exists():
step_result["ok"] = False
step_result["issues"].append(f"python_script_not_found:{script_path}")
if isinstance(action, str) and action.strip():
if not working_dir.exists():
step_result["ok"] = False
step_result["issues"].append("action_working_dir_not_found")
if not step_result["ok"]:
result["ok"] = False
result["issues"].extend(step_result["issues"])
result["steps"].append(step_result)
return result
def sdt_attempts(repo_root: pathlib.Path) -> list[list[str]]:
attempts: list[list[str]] = []
attempts.append(["sdt"])
if sys.platform.startswith("win"):
attempts.append(["sdt.exe"])
local_exe = repo_root / ("sdt.exe" if sys.platform.startswith("win") else "sdt")
if local_exe.exists():
attempts.append([str(local_exe)])
devtool_csproj = repo_root / "DevTool.csproj"
if devtool_csproj.exists():
attempts.append(["dotnet", "run", "--project", str(devtool_csproj), "--"])
seen: set[tuple[str, ...]] = set()
unique: list[list[str]] = []
for a in attempts:
key = tuple(a)
if key in seen:
continue
seen.add(key)
unique.append(a)
return unique
def try_run_sdt(
repo_root: pathlib.Path,
command_args: Sequence[str],
timeout_seconds: int,
) -> tuple[Optional[subprocess.CompletedProcess[str]], Optional[str]]:
errors: list[str] = []
for base in sdt_attempts(repo_root):
cmd = [*base, *command_args]
try:
proc = subprocess.run(
cmd,
cwd=str(repo_root),
text=True,
capture_output=True,
timeout=timeout_seconds,
check=False,
)
return proc, " ".join(cmd)
except FileNotFoundError:
errors.append(f"not_found:{' '.join(cmd)}")
except subprocess.TimeoutExpired:
errors.append(f"timeout:{' '.join(cmd)}")
return None, "; ".join(errors) if errors else "no_sdt_attempts"
def parse_headless_summary(stdout: str) -> Optional[dict[str, Any]]:
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
for line in reversed(lines):
if not line.startswith("{"):
continue
try:
payload = json.loads(line)
except Exception:
continue
if isinstance(payload, dict) and "runId" in payload and "success" in payload:
return payload
return None
def execute_check_workflow(
repo_root: pathlib.Path,
project_root: pathlib.Path,
workflow_id: str,
env_profile: Optional[str],
timeout_seconds: int,
) -> dict[str, Any]:
run_args = [
"run",
workflow_id,
"--json",
"--project-root",
str(project_root),
"--non-interactive",
]
if env_profile:
run_args.extend(["--env-profile", env_profile])
proc, attempted = try_run_sdt(repo_root, run_args, timeout_seconds)
if proc is None:
return {
"workflowId": workflow_id,
"ok": False,
"attempted": attempted,
"exitCode": None,
"stopReason": "sdt_not_runnable",
"message": attempted,
}
summary = parse_headless_summary(proc.stdout)
if summary is None:
return {
"workflowId": workflow_id,
"ok": False,
"attempted": attempted,
"exitCode": proc.returncode,
"stopReason": "missing_summary",
"message": (proc.stderr or proc.stdout).strip(),
}
return {
"workflowId": workflow_id,
"ok": bool(summary.get("success", False)),
"attempted": attempted,
"exitCode": summary.get("exitCode"),
"stopReason": summary.get("stopReason"),
"message": summary.get("message"),
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Verify SDT workflow routes (static path checks + optional headless execution)."
)
_ = parser.add_argument("--repo-root", default=None, help="Repo root containing DevTool.csproj")
_ = parser.add_argument("--project-root", default=".", help="Project root containing devtool.json")
_ = parser.add_argument("--workflow", action="append", default=[], help="Workflow id to verify (repeatable)")
_ = parser.add_argument("--execute", action="store_true", help="Also run each workflow via `sdt run ... --json`")
_ = parser.add_argument("--env-profile", default=None)
_ = parser.add_argument("--timeout-seconds", type=int, default=600)
_ = parser.add_argument("--output-json", default=None, help="Write full report JSON to file")
args = parser.parse_args()
repo_root = resolve_repo_root(args.repo_root)
project_root = (repo_root / args.project_root).resolve() if not pathlib.Path(args.project_root).is_absolute() else pathlib.Path(args.project_root).resolve()
selected = set(args.workflow) if args.workflow else None
config = load_config(project_root)
workflows = iter_workflows(config, selected)
if not workflows:
print("No workflows selected/found.")
return 2
static_results = [static_check_workflow(project_root, w) for w in workflows]
execute_results: list[dict[str, Any]] = []
if args.execute:
for w in workflows:
wid = str(w["id"])
execute_results.append(
execute_check_workflow(
repo_root=repo_root,
project_root=project_root,
workflow_id=wid,
env_profile=args.env_profile,
timeout_seconds=args.timeout_seconds,
)
)
static_failures = [r for r in static_results if not r["ok"]]
exec_failures = [r for r in execute_results if not r["ok"]]
report: dict[str, Any] = {
"repoRoot": str(repo_root),
"projectRoot": str(project_root),
"totalWorkflows": len(workflows),
"static": {
"checked": len(static_results),
"failed": len(static_failures),
"results": static_results,
},
"execute": {
"enabled": args.execute,
"checked": len(execute_results),
"failed": len(exec_failures),
"results": execute_results,
},
}
if args.output_json:
out_path = pathlib.Path(args.output_json)
if not out_path.is_absolute():
out_path = repo_root / out_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(f"Report written: {out_path}")
print(f"Static checks: {len(static_results)} workflow(s), failures={len(static_failures)}")
if args.execute:
print(f"Execution checks: {len(execute_results)} workflow(s), failures={len(exec_failures)}")
if static_failures:
print("\nStatic failures:")
for sf in static_failures:
print(f"- {sf['workflowId']}: {', '.join(sf['issues'])}")
if exec_failures:
print("\nExecution failures:")
for ef in exec_failures:
print(f"- {ef['workflowId']}: stopReason={ef.get('stopReason')} message={ef.get('message')}")
return 1 if static_failures or exec_failures else 0
if __name__ == "__main__":
raise SystemExit(main())