#!/usr/bin/env python3 """Railiance Stage 2 deploy and observe tooling.""" from __future__ import annotations import argparse import json import shutil import subprocess import sys import time import tomllib import urllib.parse import urllib.request import urllib.error from datetime import UTC, datetime from pathlib import Path from typing import Any SUPPORTED_SCHEMA = "railiance.app.v1" def utc_now() -> str: return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def scrub_url(url: str) -> str: try: parts = urllib.parse.urlsplit(url) except ValueError: return "" netloc = parts.netloc.rsplit("@", 1)[-1] return urllib.parse.urlunsplit((parts.scheme, netloc, parts.path, "", "")) def load_contract(app_dir: Path) -> tuple[Path, dict[str, Any]]: contract_path = app_dir / "railiance" / "app.toml" if not contract_path.exists(): raise SystemExit(f"Missing Railiance contract: {contract_path}") with contract_path.open("rb") as handle: data = tomllib.load(handle) if data.get("schema_version") != SUPPORTED_SCHEMA: raise SystemExit( f"Unsupported schema_version {data.get('schema_version')!r}; expected {SUPPORTED_SCHEMA}" ) return contract_path, data def check_required(check: dict[str, Any]) -> bool: return bool(check.get("required", True)) def checks_by_id(data: dict[str, Any]) -> dict[str, dict[str, Any]]: return {check.get("id"): check for check in data.get("checks", [])} def stage2_checks(data: dict[str, Any]) -> list[dict[str, Any]]: stage = data.get("stages", {}).get("stage2", {}) lookup = checks_by_id(data) return [lookup[item] for item in stage.get("checks", []) if item in lookup] def helm_check(data: dict[str, Any]) -> dict[str, Any] | None: for check in stage2_checks(data): if check.get("type") == "helm": return check return None def kubernetes_check(data: dict[str, Any]) -> dict[str, Any] | None: for check in stage2_checks(data): if check.get("type") == "kubernetes": return check return None def http_checks(data: dict[str, Any]) -> list[dict[str, Any]]: return [check for check in stage2_checks(data) if check.get("type") == "http"] def precheck(name: str, status: str, required: bool, detail: str | None = None) -> dict[str, Any]: item: dict[str, Any] = {"name": name, "status": status, "required": required} if detail: item["detail"] = detail return item def required_failures(items: list[dict[str, Any]]) -> list[dict[str, Any]]: return [item for item in items if item.get("required", True) and item.get("status") != "passed"] def run_command(args: list[str], cwd: Path, timeout: int, command_ref: str) -> dict[str, Any]: started = time.monotonic() try: completed = subprocess.run( args, cwd=cwd, text=True, capture_output=True, timeout=timeout, check=False, ) return { "command_ref": command_ref, "status": "passed" if completed.returncode == 0 else "failed", "exit_code": completed.returncode, "duration_seconds": round(time.monotonic() - started, 3), "stdout_bytes": len(completed.stdout.encode()), "stderr_bytes": len(completed.stderr.encode()), } except subprocess.TimeoutExpired as exc: stdout = exc.stdout if isinstance(exc.stdout, str) else "" stderr = exc.stderr if isinstance(exc.stderr, str) else "" return { "command_ref": command_ref, "status": "failed", "exit_code": None, "duration_seconds": round(time.monotonic() - started, 3), "error": f"timeout after {timeout}s", "stdout_bytes": len(stdout.encode()), "stderr_bytes": len(stderr.encode()), } def app_identity(data: dict[str, Any]) -> dict[str, Any]: app = data.get("app", {}) source = data.get("source", {}) return { "app": { "id": app.get("id"), "name": app.get("name"), "repo": app.get("repo"), "owner": app.get("owner"), "criticality": app.get("criticality"), }, "source": { "revision": source.get("revision"), "artifact": source.get("artifact"), "digest_policy": source.get("digest_policy"), }, } def stage2_context(app_dir: Path, contract_path: Path, data: dict[str, Any]) -> dict[str, Any]: stage = data.get("stages", {}).get("stage2", {}) if not stage.get("enabled", False): raise SystemExit("Stage 2 is disabled in railiance/app.toml") helm = helm_check(data) or {} chart = app_dir / str(helm.get("chart", f"charts/{data.get('app', {}).get('id', 'app')}")) values = app_dir / str(helm.get("values", "values/stage2-canary.yaml")) release = str(stage.get("release", f"{data.get('app', {}).get('id', 'app')}-canary")) namespace = str(stage.get("namespace", data.get("app", {}).get("id", "default"))) context = { "contract": str(contract_path), "app_dir": str(app_dir), "stage": "stage2", "namespace": namespace, "release": release, "canary_mode": stage.get("canary_mode"), "observation_minutes": stage.get("observation_minutes"), "requires_approval": bool(stage.get("requires_approval", False)), "chart": str(chart), "values": str(values), "evidence_expected": list(stage.get("evidence", [])), "checks_expected": list(stage.get("checks", [])), } context.update(app_identity(data)) return context def local_prechecks(app_dir: Path, data: dict[str, Any], mode: str, approval_id: str | None) -> list[dict[str, Any]]: stage = data.get("stages", {}).get("stage2", {}) helm = helm_check(data) checks: list[dict[str, Any]] = [] checks.append(precheck("app.toml", "passed", True)) if helm is None: checks.append(precheck("stage2-helm-check", "failed", True, "no Stage 2 helm check declared")) else: chart = app_dir / str(helm.get("chart", "")) values = app_dir / str(helm.get("values", "")) checks.append(precheck("stage2-chart", "passed" if chart.exists() else "failed", True, str(chart))) checks.append(precheck("stage2-values", "passed" if values.exists() else "failed", True, str(values))) if mode in {"server-dry-run", "apply"}: checks.append( precheck("helm", "passed" if shutil.which("helm") else "failed", True, "helm executable") ) else: checks.append(precheck("helm", "not_required", False, "plan mode does not execute helm")) if mode == "apply" and stage.get("requires_approval", False): checks.append( precheck( "approval-id", "passed" if approval_id else "failed", True, "Stage 2 requires approval before canary exposure", ) ) elif stage.get("requires_approval", False): checks.append(precheck("approval-id", "required_before_apply", False)) else: checks.append(precheck("approval-id", "not_required", False)) return checks def helm_args(context: dict[str, Any], mode: str, timeout: int) -> list[str]: args = [ "helm", "upgrade", "--install", context["release"], context["chart"], "--namespace", context["namespace"], "--create-namespace", "-f", context["values"], ] if mode == "server-dry-run": args.extend(["--dry-run=server", "--debug"]) if mode == "apply": args.extend(["--atomic", "--wait", "--timeout", f"{timeout}m"]) return args def deploy(argv: list[str]) -> int: parser = argparse.ArgumentParser(description="Plan or apply a Stage 2 Railiance canary.") parser.add_argument("app_dir", nargs="?", default=".") parser.add_argument("--stage", default="2", choices=["2", "stage2"]) parser.add_argument("--mode", choices=["plan", "server-dry-run", "apply"], default="plan") parser.add_argument("--plan", action="store_const", const="plan", dest="mode") parser.add_argument("--apply", action="store_const", const="apply", dest="mode") parser.add_argument("--server-dry-run", action="store_const", const="server-dry-run", dest="mode") parser.add_argument("--approval-id", help="Operator approval/progress id required before apply when declared.") parser.add_argument("--stage1-result", help="Optional Stage 1 result JSON for same-candidate evidence.") parser.add_argument("--timeout-minutes", type=int, default=10) parser.add_argument("--json-out") parser.add_argument("--pretty", action="store_true") args = parser.parse_args(argv) app_dir = Path(args.app_dir).resolve() contract_path, data = load_contract(app_dir) context = stage2_context(app_dir, contract_path, data) checks = local_prechecks(app_dir, data, args.mode, args.approval_id) if args.stage1_result: try: stage1 = json.loads(Path(args.stage1_result).read_text(encoding="utf-8")) checks.append( precheck( "stage1-result", "passed" if stage1.get("status") == "passed" else "failed", args.mode == "apply", Path(args.stage1_result).name, ) ) except (OSError, json.JSONDecodeError) as exc: checks.append(precheck("stage1-result", "failed", args.mode == "apply", str(exc))) else: checks.append(precheck("stage1-result", "recommended_before_apply", False)) actions: list[dict[str, Any]] = [] failures = required_failures(checks) status = "planned" if args.mode == "plan" else "blocked" if not failures and args.mode in {"server-dry-run", "apply"}: action = run_command(helm_args(context, args.mode, args.timeout_minutes), app_dir, args.timeout_minutes * 60, "stage2.helm") actions.append(action) status = "passed" if action.get("status") == "passed" and args.mode == "server-dry-run" else "applied" if action.get("status") != "passed": status = "failed" elif failures: status = "blocked" result: dict[str, Any] = { "schema_version": "railiance.stage2-deploy-result.v1", "status": status, "mode": args.mode, "generated_at": utc_now(), **context, "approval_id": args.approval_id, "prechecks": checks, "actions": actions, "planned_actions": [ { "action_ref": "stage2.helm", "tool": "helm", "mode": args.mode, "release": context["release"], "namespace": context["namespace"], "chart": context["chart"], "values": context["values"], } ], "summary": { "required_prechecks_failed": len(failures), "actions_total": len(actions), "actions_failed": len([item for item in actions if item.get("status") != "passed"]), }, } rendered = json.dumps(result, indent=2 if args.pretty else None, sort_keys=True) print(rendered) if args.json_out: output = Path(args.json_out) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(rendered + "\n", encoding="utf-8") return 0 if result["status"] in {"planned", "passed", "applied"} else 1 def observation_targets(data: dict[str, Any], context: dict[str, Any]) -> dict[str, Any]: kube = kubernetes_check(data) or {} return { "rollout": kube.get("resource", f"deploy/{context['release']}"), "pod_selector": f"app.kubernetes.io/instance={context['release']}", "ingress_selector": f"app.kubernetes.io/instance={context['release']}", "health_urls": [scrub_url(str(check.get("url", ""))) for check in http_checks(data)], "metrics": { "tool": "kubectl top pods", "selector": f"app.kubernetes.io/instance={context['release']}", }, } def observe(argv: list[str]) -> int: parser = argparse.ArgumentParser(description="Plan or run Stage 2 Railiance observation checks.") parser.add_argument("app_dir", nargs="?", default=".") parser.add_argument("--stage", default="2", choices=["2", "stage2"]) parser.add_argument("--mode", choices=["plan", "live"], default="plan") parser.add_argument("--plan", action="store_const", const="plan", dest="mode") parser.add_argument("--live", action="store_const", const="live", dest="mode") parser.add_argument("--timeout-seconds", type=int, default=120) parser.add_argument("--json-out") parser.add_argument("--pretty", action="store_true") args = parser.parse_args(argv) app_dir = Path(args.app_dir).resolve() contract_path, data = load_contract(app_dir) context = stage2_context(app_dir, contract_path, data) targets = observation_targets(data, context) checks = [precheck("app.toml", "passed", True)] if args.mode == "live": checks.append( precheck("kubectl", "passed" if shutil.which("kubectl") else "failed", True, "kubectl executable") ) else: checks.append(precheck("kubectl", "not_required", False, "plan mode does not query cluster")) actions: list[dict[str, Any]] = [] failures = required_failures(checks) status = "planned" if args.mode == "live" and not failures: ns = context["namespace"] rollout = str(targets["rollout"]) actions.append( run_command( ["kubectl", "-n", ns, "rollout", "status", rollout, f"--timeout={args.timeout_seconds}s"], app_dir, args.timeout_seconds, "stage2.rollout-status", ) ) actions.append( run_command( ["kubectl", "-n", ns, "get", rollout, "-o", "json"], app_dir, args.timeout_seconds, "stage2.rollout-json", ) ) actions.append( run_command( ["kubectl", "-n", ns, "get", "pods", "-l", str(targets["pod_selector"]), "-o", "json"], app_dir, args.timeout_seconds, "stage2.pods-json", ) ) actions.append( run_command( ["kubectl", "-n", ns, "get", "ingress", "-l", str(targets["ingress_selector"]), "-o", "json"], app_dir, args.timeout_seconds, "stage2.ingress-json", ) ) metrics = run_command( ["kubectl", "-n", ns, "top", "pods", "-l", str(targets["pod_selector"]), "--no-headers"], app_dir, args.timeout_seconds, "stage2.metrics", ) if metrics.get("status") != "passed": metrics["optional"] = True metrics["status"] = "unavailable" actions.append(metrics) status = "passed" if not [item for item in actions if item.get("status") == "failed"] else "failed" elif failures: status = "blocked" result: dict[str, Any] = { "schema_version": "railiance.stage2-observe-result.v1", "status": status, "mode": args.mode, "generated_at": utc_now(), **context, "targets": targets, "prechecks": checks, "actions": actions, "summary": { "required_prechecks_failed": len(failures), "actions_total": len(actions), "actions_failed": len([item for item in actions if item.get("status") == "failed"]), "metrics_unavailable": len([item for item in actions if item.get("status") == "unavailable"]), }, } rendered = json.dumps(result, indent=2 if args.pretty else None, sort_keys=True) print(rendered) if args.json_out: output = Path(args.json_out) output.parent.mkdir(parents=True, exist_ok=True) output.write_text(rendered + "\n", encoding="utf-8") return 0 if result["status"] in {"planned", "passed"} else 1 def main(argv: list[str]) -> int: parser = argparse.ArgumentParser(description="Railiance Stage 2 tooling.") subparsers = parser.add_subparsers(dest="command", required=True) deploy_parser = subparsers.add_parser("deploy", help="Plan or apply a Stage 2 canary.") deploy_parser.add_argument("args", nargs=argparse.REMAINDER) observe_parser = subparsers.add_parser("observe", help="Plan or run Stage 2 observation.") observe_parser.add_argument("args", nargs=argparse.REMAINDER) parsed = parser.parse_args(argv[:1]) if parsed.command == "deploy": return deploy(argv[1:]) if parsed.command == "observe": return observe(argv[1:]) return 2 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))