journal/Journal.DevTool/scripts/verify-workflow-routes.py
stan44 ad199c338c SDT is standalone now, journal can use the bridge.
sdt can compile journal and other projects.
it using a json config system.
this program's Repo exists on the Gitea under stan.
Readme included as well.
2026-03-02 19:54:35 -06:00

299 lines
10 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import pathlib
import shutil
import subprocess
import sys
from typing import Any, Dict, List, Optional, Sequence, Tuple
from script_common import resolve_command, resolve_repo_root
def load_config(project_root: pathlib.Path) -> dict:
config_path = project_root / "devtool.json"
if not config_path.exists():
raise FileNotFoundError(f"devtool.json not found at: {config_path}")
return json.loads(config_path.read_text(encoding="utf-8"))
def iter_workflows(config: dict, selected: Optional[set[str]]) -> List[dict]:
workflows = config.get("workflows", [])
if not isinstance(workflows, list):
return []
normalized: List[dict] = [w for w in workflows if isinstance(w, dict) and isinstance(w.get("id"), str)]
if selected:
normalized = [w for w in normalized if w["id"] in selected]
return normalized
def is_command_available(command: str) -> bool:
resolved = resolve_command(command)
if pathlib.Path(resolved).is_file():
return True
return shutil.which(resolved) is not None
def resolve_script_arg(project_root: pathlib.Path, working_dir: pathlib.Path, arg: str) -> pathlib.Path:
p = pathlib.Path(arg)
if p.is_absolute():
return p
a = working_dir / p
if a.exists():
return a
b = project_root / p
return b
def static_check_workflow(project_root: pathlib.Path, workflow: dict) -> dict:
result = {
"workflowId": workflow.get("id"),
"ok": True,
"issues": [],
"steps": [],
}
for step in workflow.get("steps", []):
if not isinstance(step, dict):
continue
step_id = step.get("id", "<unknown>")
step_result = {"stepId": step_id, "ok": True, "issues": []}
working_dir_rel = step.get("workingDir") or "."
working_dir = (project_root / working_dir_rel).resolve()
if not working_dir.exists():
step_result["ok"] = False
step_result["issues"].append(f"workingDir_not_found:{working_dir}")
command = step.get("command")
args = step.get("args") or []
action = step.get("action")
if isinstance(command, str) and command.strip():
if not is_command_available(command):
step_result["ok"] = False
step_result["issues"].append(f"command_not_found:{command}")
if command.lower() in ("python", "py", "python3", "python.exe", "py.exe"):
if args and isinstance(args[0], str) and args[0].endswith(".py"):
script_path = resolve_script_arg(project_root, working_dir, args[0])
if not script_path.exists():
step_result["ok"] = False
step_result["issues"].append(f"python_script_not_found:{script_path}")
if isinstance(action, str) and action.strip():
# Action-based steps still require workingDir existence for reliable execution.
if not working_dir.exists():
step_result["ok"] = False
step_result["issues"].append("action_working_dir_not_found")
if not step_result["ok"]:
result["ok"] = False
result["issues"].extend(step_result["issues"])
result["steps"].append(step_result)
return result
def sdt_attempts(repo_root: pathlib.Path) -> List[List[str]]:
attempts: List[List[str]] = []
attempts.append(["sdt"])
if sys.platform.startswith("win"):
attempts.append(["sdt.exe"])
local_exe = repo_root / ("sdt.exe" if sys.platform.startswith("win") else "sdt")
if local_exe.exists():
attempts.append([str(local_exe)])
devtool_csproj = repo_root / "DevTool.csproj"
if devtool_csproj.exists():
attempts.append(["dotnet", "run", "--project", str(devtool_csproj), "--"])
# Preserve order but dedupe exact attempts.
seen = set()
unique: List[List[str]] = []
for a in attempts:
key = tuple(a)
if key in seen:
continue
seen.add(key)
unique.append(a)
return unique
def try_run_sdt(
repo_root: pathlib.Path,
command_args: Sequence[str],
timeout_seconds: int,
) -> Tuple[Optional[subprocess.CompletedProcess], Optional[str]]:
errors: List[str] = []
for base in sdt_attempts(repo_root):
cmd = [*base, *command_args]
try:
proc = subprocess.run(
cmd,
cwd=str(repo_root),
text=True,
capture_output=True,
timeout=timeout_seconds,
check=False,
)
return proc, " ".join(cmd)
except FileNotFoundError:
errors.append(f"not_found:{' '.join(cmd)}")
except subprocess.TimeoutExpired:
errors.append(f"timeout:{' '.join(cmd)}")
return None, "; ".join(errors) if errors else "no_sdt_attempts"
def parse_headless_summary(stdout: str) -> Optional[Dict[str, Any]]:
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
for line in reversed(lines):
if not line.startswith("{"):
continue
try:
payload = json.loads(line)
except Exception:
continue
if isinstance(payload, dict) and "runId" in payload and "success" in payload:
return payload
return None
def execute_check_workflow(
repo_root: pathlib.Path,
project_root: pathlib.Path,
workflow_id: str,
env_profile: Optional[str],
timeout_seconds: int,
) -> dict:
args = [
"run",
workflow_id,
"--json",
"--project-root",
str(project_root),
"--non-interactive",
]
if env_profile:
args.extend(["--env-profile", env_profile])
proc, attempted = try_run_sdt(repo_root, args, timeout_seconds)
if proc is None:
return {
"workflowId": workflow_id,
"ok": False,
"attempted": attempted,
"exitCode": None,
"stopReason": "sdt_not_runnable",
"message": attempted,
}
summary = parse_headless_summary(proc.stdout)
if summary is None:
return {
"workflowId": workflow_id,
"ok": False,
"attempted": attempted,
"exitCode": proc.returncode,
"stopReason": "missing_summary",
"message": (proc.stderr or proc.stdout).strip(),
}
return {
"workflowId": workflow_id,
"ok": bool(summary.get("success", False)),
"attempted": attempted,
"exitCode": summary.get("exitCode"),
"stopReason": summary.get("stopReason"),
"message": summary.get("message"),
}
def main() -> int:
parser = argparse.ArgumentParser(
description="Verify SDT workflow routes (static path checks + optional headless execution)."
)
parser.add_argument("--repo-root", default=None, help="Repo root containing DevTool.csproj")
parser.add_argument("--project-root", default=".", help="Project root containing devtool.json")
parser.add_argument("--workflow", action="append", default=[], help="Workflow id to verify (repeatable)")
parser.add_argument("--execute", action="store_true", help="Also run each workflow via `sdt run ... --json`")
parser.add_argument("--env-profile", default=None)
parser.add_argument("--timeout-seconds", type=int, default=600)
parser.add_argument("--output-json", default=None, help="Write full report JSON to file")
args = parser.parse_args()
repo_root = resolve_repo_root(args.repo_root)
project_root = (repo_root / args.project_root).resolve() if not pathlib.Path(args.project_root).is_absolute() else pathlib.Path(args.project_root).resolve()
selected = set(args.workflow) if args.workflow else None
config = load_config(project_root)
workflows = iter_workflows(config, selected)
if not workflows:
print("No workflows selected/found.")
return 2
static_results = [static_check_workflow(project_root, w) for w in workflows]
execute_results: List[dict] = []
if args.execute:
for w in workflows:
wid = w["id"]
execute_results.append(
execute_check_workflow(
repo_root=repo_root,
project_root=project_root,
workflow_id=wid,
env_profile=args.env_profile,
timeout_seconds=args.timeout_seconds,
)
)
static_failures = [r for r in static_results if not r["ok"]]
exec_failures = [r for r in execute_results if not r["ok"]]
report = {
"repoRoot": str(repo_root),
"projectRoot": str(project_root),
"totalWorkflows": len(workflows),
"static": {
"checked": len(static_results),
"failed": len(static_failures),
"results": static_results,
},
"execute": {
"enabled": args.execute,
"checked": len(execute_results),
"failed": len(exec_failures),
"results": execute_results,
},
}
if args.output_json:
out_path = pathlib.Path(args.output_json)
if not out_path.is_absolute():
out_path = repo_root / out_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(f"Report written: {out_path}")
print(f"Static checks: {len(static_results)} workflow(s), failures={len(static_failures)}")
if args.execute:
print(f"Execution checks: {len(execute_results)} workflow(s), failures={len(exec_failures)}")
if static_failures:
print("\nStatic failures:")
for f in static_failures:
print(f"- {f['workflowId']}: {', '.join(f['issues'])}")
if exec_failures:
print("\nExecution failures:")
for f in exec_failures:
print(f"- {f['workflowId']}: stopReason={f.get('stopReason')} message={f.get('message')}")
return 1 if static_failures or exec_failures else 0
if __name__ == "__main__":
raise SystemExit(main())