#!/usr/bin/env python3 """Railiance Stage 1 local validation command.""" from __future__ import annotations import argparse import json import shutil import subprocess import sys import time import tomllib import urllib.error import urllib.request import urllib.parse from datetime import UTC, datetime from pathlib import Path from typing import Any SUPPORTED_SCHEMA = "railiance.app.v1" def utc_now() -> str: return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def load_contract(app_dir: Path) -> tuple[Path, dict[str, Any]]: path = app_dir / "railiance" / "app.toml" if not path.exists(): raise SystemExit(f"Missing Railiance contract: {path}") with path.open("rb") as handle: data = tomllib.load(handle) if data.get("schema_version") != SUPPORTED_SCHEMA: raise SystemExit( f"Unsupported schema_version {data.get('schema_version')!r}; expected {SUPPORTED_SCHEMA}" ) return path, data def command_result( command: str, cwd: Path, timeout_seconds: int | None, command_ref: str ) -> dict[str, Any]: started = time.monotonic() timeout = timeout_seconds or 900 try: completed = subprocess.run( command, cwd=cwd, shell=True, text=True, capture_output=True, timeout=timeout, check=False, ) status = "passed" if completed.returncode == 0 else "failed" return { "command_ref": command_ref, "status": status, "exit_code": completed.returncode, "duration_seconds": round(time.monotonic() - started, 3), "stdout_bytes": len(completed.stdout.encode()), "stderr_bytes": len(completed.stderr.encode()), } except subprocess.TimeoutExpired as exc: return { "command_ref": command_ref, "status": "failed", "exit_code": None, "duration_seconds": round(time.monotonic() - started, 3), "error": f"timeout after {timeout}s", "stdout_bytes": len((exc.stdout or "").encode()) if isinstance(exc.stdout, str) else 0, "stderr_bytes": len((exc.stderr or "").encode()) if isinstance(exc.stderr, str) else 0, } def check_required(check: dict[str, Any]) -> bool: return bool(check.get("required", True)) def skipped(check: dict[str, Any], reason: str) -> dict[str, Any]: required = check_required(check) return { "id": check.get("id"), "type": check.get("type"), "required": required, "status": "failed" if required else "skipped", "reason": reason, } def scrub_url(url: str) -> str: try: parts = urllib.parse.urlsplit(url) except ValueError: return "" netloc = parts.netloc.rsplit("@", 1)[-1] return urllib.parse.urlunsplit((parts.scheme, netloc, parts.path, "", "")) def run_http_check(check: dict[str, Any]) -> dict[str, Any]: started = time.monotonic() url = str(check.get("url", "")) timeout = int(check.get("timeout_seconds", 10)) expected_status = int(check.get("expected_status", 200)) required = check_required(check) try: with urllib.request.urlopen(url, timeout=timeout) as response: status_code = response.getcode() except (urllib.error.URLError, TimeoutError, ValueError) as exc: return { "id": check.get("id"), "type": "http", "required": required, "status": "failed" if required else "skipped", "url": scrub_url(url), "duration_seconds": round(time.monotonic() - started, 3), "reason": str(exc), } status = "passed" if status_code == expected_status else "failed" return { "id": check.get("id"), "type": "http", "required": required, "status": status if required or status == "passed" else "skipped", "url": scrub_url(url), "expected_status": expected_status, "actual_status": status_code, "duration_seconds": round(time.monotonic() - started, 3), } def run_helm_check(check: dict[str, Any], app_dir: Path, release: str) -> dict[str, Any]: if shutil.which("helm") is None: return skipped(check, "helm is not installed") chart = str(check.get("chart", "")) values = str(check.get("values", "")) mode = str(check.get("mode", "template")) if mode not in {"template", "server-dry-run"}: return skipped(check, f"unsupported helm mode for Stage 1: {mode}") command = f"helm template {release} {chart}" if values: command += f" -f {values}" result = command_result( command, app_dir, int(check.get("timeout_seconds", 120)), f"checks.{check.get('id')}.helm" ) return { "id": check.get("id"), "type": "helm", "required": check_required(check), "status": result["status"], "mode": mode, "command_ref": result.get("command_ref"), "exit_code": result.get("exit_code"), "duration_seconds": result.get("duration_seconds"), "stdout_bytes": result.get("stdout_bytes"), "stderr_bytes": result.get("stderr_bytes"), } def run_check(check: dict[str, Any], app_dir: Path, release: str) -> dict[str, Any]: check_type = check.get("type") if check.get("stage") != "stage1": return skipped(check, "not a Stage 1 check") if check_type == "command": command = str(check.get("run", "")) if not command: return skipped(check, "command check has no run field") result = command_result( command, app_dir, int(check.get("timeout_seconds", 900)), f"checks.{check.get('id')}.command" ) return { "id": check.get("id"), "type": "command", "required": check_required(check), **result, } if check_type == "http": return run_http_check(check) if check_type == "helm": return run_helm_check(check, app_dir, release) if check_type == "manual": return skipped(check, "manual check cannot be satisfied by railiance run") return skipped(check, f"unsupported local check type: {check_type}") def required_failures(items: list[dict[str, Any]]) -> list[dict[str, Any]]: return [item for item in items if item.get("required", True) and item.get("status") != "passed"] def build_result(app_dir: Path, contract_path: Path, data: dict[str, Any]) -> dict[str, Any]: stage = data.get("stages", {}).get("stage1", {}) if not stage.get("enabled", False): raise SystemExit("Stage 1 is disabled in railiance/app.toml") app = data.get("app", {}) source = data.get("source", {}) started_at = utc_now() started_monotonic = time.monotonic() stage_commands = list(stage.get("commands", [])) command_results = [ command_result(command, app_dir, None, f"stages.stage1.commands[{index}]") for index, command in enumerate(stage_commands) ] check_ids = list(stage.get("checks", [])) all_checks = {check.get("id"): check for check in data.get("checks", [])} check_results = [] for check_id in check_ids: check = all_checks.get(check_id) if check is None: check_results.append( { "id": check_id, "type": None, "required": True, "status": "failed", "reason": "check id is referenced by Stage 1 but not defined", } ) continue check_results.append(run_check(check, app_dir, str(stage.get("release", app.get("id", "app"))))) command_failures = [item for item in command_results if item.get("status") != "passed"] check_failures = required_failures(check_results) status = "passed" if not command_failures and not check_failures else "failed" return { "schema_version": "railiance.run-result.v1", "status": status, "stage": "stage1", "started_at": started_at, "finished_at": utc_now(), "duration_seconds": round(time.monotonic() - started_monotonic, 3), "app": { "id": app.get("id"), "name": app.get("name"), "repo": app.get("repo"), "owner": app.get("owner"), "criticality": app.get("criticality"), }, "source": { "revision": source.get("revision"), "artifact": source.get("artifact"), "digest_policy": source.get("digest_policy"), }, "contract": str(contract_path), "app_dir": str(app_dir), "release": stage.get("release"), "namespace": stage.get("namespace"), "requires_approval": bool(stage.get("requires_approval", False)), "evidence_expected": list(stage.get("evidence", [])), "commands": command_results, "checks": check_results, "summary": { "commands_total": len(command_results), "commands_failed": len(command_failures), "checks_total": len(check_results), "required_checks_failed": len(check_failures), }, } def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run Railiance Stage 1 local validation from railiance/app.toml." ) parser.add_argument( "app_dir", nargs="?", default=".", help="Application or overlay repository directory (default: current directory).", ) parser.add_argument( "--json-out", help="Optional path to write the machine-readable run result.", ) parser.add_argument( "--pretty", action="store_true", help="Pretty-print JSON output to stdout.", ) return parser.parse_args(argv) def main(argv: list[str]) -> int: args = parse_args(argv) app_dir = Path(args.app_dir).resolve() contract_path, data = load_contract(app_dir) result = build_result(app_dir, contract_path, data) rendered = json.dumps(result, indent=2 if args.pretty else None, sort_keys=True) print(rendered) if args.json_out: output_path = Path(args.json_out) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(rendered + "\n", encoding="utf-8") return 0 if result["status"] == "passed" else 1 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))