#!/usr/bin/env python3 import argparse import json import pathlib import shutil import subprocess import sys from typing import Any, Dict, List, Optional, Sequence, Tuple from script_common import resolve_command, resolve_repo_root def load_config(project_root: pathlib.Path) -> dict: config_path = project_root / "devtool.json" if not config_path.exists(): raise FileNotFoundError(f"devtool.json not found at: {config_path}") return json.loads(config_path.read_text(encoding="utf-8")) def iter_workflows(config: dict, selected: Optional[set[str]]) -> List[dict]: workflows = config.get("workflows", []) if not isinstance(workflows, list): return [] normalized: List[dict] = [w for w in workflows if isinstance(w, dict) and isinstance(w.get("id"), str)] if selected: normalized = [w for w in normalized if w["id"] in selected] return normalized def is_command_available(command: str) -> bool: resolved = resolve_command(command) if pathlib.Path(resolved).is_file(): return True return shutil.which(resolved) is not None def resolve_script_arg(project_root: pathlib.Path, working_dir: pathlib.Path, arg: str) -> pathlib.Path: p = pathlib.Path(arg) if p.is_absolute(): return p a = working_dir / p if a.exists(): return a b = project_root / p return b def static_check_workflow(project_root: pathlib.Path, workflow: dict) -> dict: result = { "workflowId": workflow.get("id"), "ok": True, "issues": [], "steps": [], } for step in workflow.get("steps", []): if not isinstance(step, dict): continue step_id = step.get("id", "") step_result = {"stepId": step_id, "ok": True, "issues": []} working_dir_rel = step.get("workingDir") or "." working_dir = (project_root / working_dir_rel).resolve() if not working_dir.exists(): step_result["ok"] = False step_result["issues"].append(f"workingDir_not_found:{working_dir}") command = step.get("command") args = step.get("args") or [] action = step.get("action") if isinstance(command, str) and command.strip(): if not is_command_available(command): step_result["ok"] = False step_result["issues"].append(f"command_not_found:{command}") if command.lower() in ("python", "py", "python3", "python.exe", "py.exe"): if args and isinstance(args[0], str) and args[0].endswith(".py"): script_path = resolve_script_arg(project_root, working_dir, args[0]) if not script_path.exists(): step_result["ok"] = False step_result["issues"].append(f"python_script_not_found:{script_path}") if isinstance(action, str) and action.strip(): # Action-based steps still require workingDir existence for reliable execution. if not working_dir.exists(): step_result["ok"] = False step_result["issues"].append("action_working_dir_not_found") if not step_result["ok"]: result["ok"] = False result["issues"].extend(step_result["issues"]) result["steps"].append(step_result) return result def sdt_attempts(repo_root: pathlib.Path) -> List[List[str]]: attempts: List[List[str]] = [] attempts.append(["sdt"]) if sys.platform.startswith("win"): attempts.append(["sdt.exe"]) local_exe = repo_root / ("sdt.exe" if sys.platform.startswith("win") else "sdt") if local_exe.exists(): attempts.append([str(local_exe)]) devtool_csproj = repo_root / "DevTool.csproj" if devtool_csproj.exists(): attempts.append(["dotnet", "run", "--project", str(devtool_csproj), "--"]) # Preserve order but dedupe exact attempts. seen = set() unique: List[List[str]] = [] for a in attempts: key = tuple(a) if key in seen: continue seen.add(key) unique.append(a) return unique def try_run_sdt( repo_root: pathlib.Path, command_args: Sequence[str], timeout_seconds: int, ) -> Tuple[Optional[subprocess.CompletedProcess], Optional[str]]: errors: List[str] = [] for base in sdt_attempts(repo_root): cmd = [*base, *command_args] try: proc = subprocess.run( cmd, cwd=str(repo_root), text=True, capture_output=True, timeout=timeout_seconds, check=False, ) return proc, " ".join(cmd) except FileNotFoundError: errors.append(f"not_found:{' '.join(cmd)}") except subprocess.TimeoutExpired: errors.append(f"timeout:{' '.join(cmd)}") return None, "; ".join(errors) if errors else "no_sdt_attempts" def parse_headless_summary(stdout: str) -> Optional[Dict[str, Any]]: lines = [line.strip() for line in stdout.splitlines() if line.strip()] for line in reversed(lines): if not line.startswith("{"): continue try: payload = json.loads(line) except Exception: continue if isinstance(payload, dict) and "runId" in payload and "success" in payload: return payload return None def execute_check_workflow( repo_root: pathlib.Path, project_root: pathlib.Path, workflow_id: str, env_profile: Optional[str], timeout_seconds: int, ) -> dict: args = [ "run", workflow_id, "--json", "--project-root", str(project_root), "--non-interactive", ] if env_profile: args.extend(["--env-profile", env_profile]) proc, attempted = try_run_sdt(repo_root, args, timeout_seconds) if proc is None: return { "workflowId": workflow_id, "ok": False, "attempted": attempted, "exitCode": None, "stopReason": "sdt_not_runnable", "message": attempted, } summary = parse_headless_summary(proc.stdout) if summary is None: return { "workflowId": workflow_id, "ok": False, "attempted": attempted, "exitCode": proc.returncode, "stopReason": "missing_summary", "message": (proc.stderr or proc.stdout).strip(), } return { "workflowId": workflow_id, "ok": bool(summary.get("success", False)), "attempted": attempted, "exitCode": summary.get("exitCode"), "stopReason": summary.get("stopReason"), "message": summary.get("message"), } def main() -> int: parser = argparse.ArgumentParser( description="Verify SDT workflow routes (static path checks + optional headless execution)." ) parser.add_argument("--repo-root", default=None, help="Repo root containing DevTool.csproj") parser.add_argument("--project-root", default=".", help="Project root containing devtool.json") parser.add_argument("--workflow", action="append", default=[], help="Workflow id to verify (repeatable)") parser.add_argument("--execute", action="store_true", help="Also run each workflow via `sdt run ... --json`") parser.add_argument("--env-profile", default=None) parser.add_argument("--timeout-seconds", type=int, default=600) parser.add_argument("--output-json", default=None, help="Write full report JSON to file") args = parser.parse_args() repo_root = resolve_repo_root(args.repo_root) project_root = (repo_root / args.project_root).resolve() if not pathlib.Path(args.project_root).is_absolute() else pathlib.Path(args.project_root).resolve() selected = set(args.workflow) if args.workflow else None config = load_config(project_root) workflows = iter_workflows(config, selected) if not workflows: print("No workflows selected/found.") return 2 static_results = [static_check_workflow(project_root, w) for w in workflows] execute_results: List[dict] = [] if args.execute: for w in workflows: wid = w["id"] execute_results.append( execute_check_workflow( repo_root=repo_root, project_root=project_root, workflow_id=wid, env_profile=args.env_profile, timeout_seconds=args.timeout_seconds, ) ) static_failures = [r for r in static_results if not r["ok"]] exec_failures = [r for r in execute_results if not r["ok"]] report = { "repoRoot": str(repo_root), "projectRoot": str(project_root), "totalWorkflows": len(workflows), "static": { "checked": len(static_results), "failed": len(static_failures), "results": static_results, }, "execute": { "enabled": args.execute, "checked": len(execute_results), "failed": len(exec_failures), "results": execute_results, }, } if args.output_json: out_path = pathlib.Path(args.output_json) if not out_path.is_absolute(): out_path = repo_root / out_path out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(report, indent=2), encoding="utf-8") print(f"Report written: {out_path}") print(f"Static checks: {len(static_results)} workflow(s), failures={len(static_failures)}") if args.execute: print(f"Execution checks: {len(execute_results)} workflow(s), failures={len(exec_failures)}") if static_failures: print("\nStatic failures:") for f in static_failures: print(f"- {f['workflowId']}: {', '.join(f['issues'])}") if exec_failures: print("\nExecution failures:") for f in exec_failures: print(f"- {f['workflowId']}: stopReason={f.get('stopReason')} message={f.get('message')}") return 1 if static_failures or exec_failures else 0 if __name__ == "__main__": raise SystemExit(main())