302 lines
10 KiB
Python
Executable File
302 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Railiance Stage 1 local validation command."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tomllib
|
|
import urllib.error
|
|
import urllib.request
|
|
import urllib.parse
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
SUPPORTED_SCHEMA = "railiance.app.v1"
|
|
|
|
|
|
def utc_now() -> str:
|
|
return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def load_contract(app_dir: Path) -> tuple[Path, dict[str, Any]]:
|
|
path = app_dir / "railiance" / "app.toml"
|
|
if not path.exists():
|
|
raise SystemExit(f"Missing Railiance contract: {path}")
|
|
with path.open("rb") as handle:
|
|
data = tomllib.load(handle)
|
|
if data.get("schema_version") != SUPPORTED_SCHEMA:
|
|
raise SystemExit(
|
|
f"Unsupported schema_version {data.get('schema_version')!r}; expected {SUPPORTED_SCHEMA}"
|
|
)
|
|
return path, data
|
|
|
|
|
|
def command_result(
|
|
command: str, cwd: Path, timeout_seconds: int | None, command_ref: str
|
|
) -> dict[str, Any]:
|
|
started = time.monotonic()
|
|
timeout = timeout_seconds or 900
|
|
try:
|
|
completed = subprocess.run(
|
|
command,
|
|
cwd=cwd,
|
|
shell=True,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
status = "passed" if completed.returncode == 0 else "failed"
|
|
return {
|
|
"command_ref": command_ref,
|
|
"status": status,
|
|
"exit_code": completed.returncode,
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
"stdout_bytes": len(completed.stdout.encode()),
|
|
"stderr_bytes": len(completed.stderr.encode()),
|
|
}
|
|
except subprocess.TimeoutExpired as exc:
|
|
return {
|
|
"command_ref": command_ref,
|
|
"status": "failed",
|
|
"exit_code": None,
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
"error": f"timeout after {timeout}s",
|
|
"stdout_bytes": len((exc.stdout or "").encode()) if isinstance(exc.stdout, str) else 0,
|
|
"stderr_bytes": len((exc.stderr or "").encode()) if isinstance(exc.stderr, str) else 0,
|
|
}
|
|
|
|
|
|
def check_required(check: dict[str, Any]) -> bool:
|
|
return bool(check.get("required", True))
|
|
|
|
|
|
def skipped(check: dict[str, Any], reason: str) -> dict[str, Any]:
|
|
required = check_required(check)
|
|
return {
|
|
"id": check.get("id"),
|
|
"type": check.get("type"),
|
|
"required": required,
|
|
"status": "failed" if required else "skipped",
|
|
"reason": reason,
|
|
}
|
|
|
|
|
|
def scrub_url(url: str) -> str:
|
|
try:
|
|
parts = urllib.parse.urlsplit(url)
|
|
except ValueError:
|
|
return "<invalid-url>"
|
|
netloc = parts.netloc.rsplit("@", 1)[-1]
|
|
return urllib.parse.urlunsplit((parts.scheme, netloc, parts.path, "", ""))
|
|
|
|
|
|
def run_http_check(check: dict[str, Any]) -> dict[str, Any]:
|
|
started = time.monotonic()
|
|
url = str(check.get("url", ""))
|
|
timeout = int(check.get("timeout_seconds", 10))
|
|
expected_status = int(check.get("expected_status", 200))
|
|
required = check_required(check)
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=timeout) as response:
|
|
status_code = response.getcode()
|
|
except (urllib.error.URLError, TimeoutError, ValueError) as exc:
|
|
return {
|
|
"id": check.get("id"),
|
|
"type": "http",
|
|
"required": required,
|
|
"status": "failed" if required else "skipped",
|
|
"url": scrub_url(url),
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
"reason": str(exc),
|
|
}
|
|
status = "passed" if status_code == expected_status else "failed"
|
|
return {
|
|
"id": check.get("id"),
|
|
"type": "http",
|
|
"required": required,
|
|
"status": status if required or status == "passed" else "skipped",
|
|
"url": scrub_url(url),
|
|
"expected_status": expected_status,
|
|
"actual_status": status_code,
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
}
|
|
|
|
|
|
def run_helm_check(check: dict[str, Any], app_dir: Path, release: str) -> dict[str, Any]:
|
|
if shutil.which("helm") is None:
|
|
return skipped(check, "helm is not installed")
|
|
chart = str(check.get("chart", ""))
|
|
values = str(check.get("values", ""))
|
|
mode = str(check.get("mode", "template"))
|
|
if mode not in {"template", "server-dry-run"}:
|
|
return skipped(check, f"unsupported helm mode for Stage 1: {mode}")
|
|
command = f"helm template {release} {chart}"
|
|
if values:
|
|
command += f" -f {values}"
|
|
result = command_result(
|
|
command, app_dir, int(check.get("timeout_seconds", 120)), f"checks.{check.get('id')}.helm"
|
|
)
|
|
return {
|
|
"id": check.get("id"),
|
|
"type": "helm",
|
|
"required": check_required(check),
|
|
"status": result["status"],
|
|
"mode": mode,
|
|
"command_ref": result.get("command_ref"),
|
|
"exit_code": result.get("exit_code"),
|
|
"duration_seconds": result.get("duration_seconds"),
|
|
"stdout_bytes": result.get("stdout_bytes"),
|
|
"stderr_bytes": result.get("stderr_bytes"),
|
|
}
|
|
|
|
|
|
def run_check(check: dict[str, Any], app_dir: Path, release: str) -> dict[str, Any]:
|
|
check_type = check.get("type")
|
|
if check.get("stage") != "stage1":
|
|
return skipped(check, "not a Stage 1 check")
|
|
if check_type == "command":
|
|
command = str(check.get("run", ""))
|
|
if not command:
|
|
return skipped(check, "command check has no run field")
|
|
result = command_result(
|
|
command, app_dir, int(check.get("timeout_seconds", 900)), f"checks.{check.get('id')}.command"
|
|
)
|
|
return {
|
|
"id": check.get("id"),
|
|
"type": "command",
|
|
"required": check_required(check),
|
|
**result,
|
|
}
|
|
if check_type == "http":
|
|
return run_http_check(check)
|
|
if check_type == "helm":
|
|
return run_helm_check(check, app_dir, release)
|
|
if check_type == "manual":
|
|
return skipped(check, "manual check cannot be satisfied by railiance run")
|
|
return skipped(check, f"unsupported local check type: {check_type}")
|
|
|
|
|
|
def required_failures(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
return [item for item in items if item.get("required", True) and item.get("status") != "passed"]
|
|
|
|
|
|
def build_result(app_dir: Path, contract_path: Path, data: dict[str, Any]) -> dict[str, Any]:
|
|
stage = data.get("stages", {}).get("stage1", {})
|
|
if not stage.get("enabled", False):
|
|
raise SystemExit("Stage 1 is disabled in railiance/app.toml")
|
|
|
|
app = data.get("app", {})
|
|
source = data.get("source", {})
|
|
started_at = utc_now()
|
|
started_monotonic = time.monotonic()
|
|
|
|
stage_commands = list(stage.get("commands", []))
|
|
command_results = [
|
|
command_result(command, app_dir, None, f"stages.stage1.commands[{index}]")
|
|
for index, command in enumerate(stage_commands)
|
|
]
|
|
|
|
check_ids = list(stage.get("checks", []))
|
|
all_checks = {check.get("id"): check for check in data.get("checks", [])}
|
|
check_results = []
|
|
for check_id in check_ids:
|
|
check = all_checks.get(check_id)
|
|
if check is None:
|
|
check_results.append(
|
|
{
|
|
"id": check_id,
|
|
"type": None,
|
|
"required": True,
|
|
"status": "failed",
|
|
"reason": "check id is referenced by Stage 1 but not defined",
|
|
}
|
|
)
|
|
continue
|
|
check_results.append(run_check(check, app_dir, str(stage.get("release", app.get("id", "app")))))
|
|
|
|
command_failures = [item for item in command_results if item.get("status") != "passed"]
|
|
check_failures = required_failures(check_results)
|
|
status = "passed" if not command_failures and not check_failures else "failed"
|
|
|
|
return {
|
|
"schema_version": "railiance.run-result.v1",
|
|
"status": status,
|
|
"stage": "stage1",
|
|
"started_at": started_at,
|
|
"finished_at": utc_now(),
|
|
"duration_seconds": round(time.monotonic() - started_monotonic, 3),
|
|
"app": {
|
|
"id": app.get("id"),
|
|
"name": app.get("name"),
|
|
"repo": app.get("repo"),
|
|
"owner": app.get("owner"),
|
|
"criticality": app.get("criticality"),
|
|
},
|
|
"source": {
|
|
"revision": source.get("revision"),
|
|
"artifact": source.get("artifact"),
|
|
"digest_policy": source.get("digest_policy"),
|
|
},
|
|
"contract": str(contract_path),
|
|
"app_dir": str(app_dir),
|
|
"release": stage.get("release"),
|
|
"namespace": stage.get("namespace"),
|
|
"requires_approval": bool(stage.get("requires_approval", False)),
|
|
"evidence_expected": list(stage.get("evidence", [])),
|
|
"commands": command_results,
|
|
"checks": check_results,
|
|
"summary": {
|
|
"commands_total": len(command_results),
|
|
"commands_failed": len(command_failures),
|
|
"checks_total": len(check_results),
|
|
"required_checks_failed": len(check_failures),
|
|
},
|
|
}
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Run Railiance Stage 1 local validation from railiance/app.toml."
|
|
)
|
|
parser.add_argument(
|
|
"app_dir",
|
|
nargs="?",
|
|
default=".",
|
|
help="Application or overlay repository directory (default: current directory).",
|
|
)
|
|
parser.add_argument(
|
|
"--json-out",
|
|
help="Optional path to write the machine-readable run result.",
|
|
)
|
|
parser.add_argument(
|
|
"--pretty",
|
|
action="store_true",
|
|
help="Pretty-print JSON output to stdout.",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = parse_args(argv)
|
|
app_dir = Path(args.app_dir).resolve()
|
|
contract_path, data = load_contract(app_dir)
|
|
result = build_result(app_dir, contract_path, data)
|
|
rendered = json.dumps(result, indent=2 if args.pretty else None, sort_keys=True)
|
|
print(rendered)
|
|
if args.json_out:
|
|
output_path = Path(args.json_out)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(rendered + "\n", encoding="utf-8")
|
|
return 0 if result["status"] == "passed" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|