378 lines
14 KiB
Python
Executable File
378 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Railiance Stage 3 promote and rollback tooling."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tomllib
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
SUPPORTED_SCHEMA = "railiance.app.v1"
|
|
|
|
|
|
def utc_now() -> str:
|
|
return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def load_contract(app_dir: Path) -> tuple[Path, dict[str, Any]]:
|
|
contract_path = app_dir / "railiance" / "app.toml"
|
|
if not contract_path.exists():
|
|
raise SystemExit(f"Missing Railiance contract: {contract_path}")
|
|
with contract_path.open("rb") as handle:
|
|
data = tomllib.load(handle)
|
|
if data.get("schema_version") != SUPPORTED_SCHEMA:
|
|
raise SystemExit(
|
|
f"Unsupported schema_version {data.get('schema_version')!r}; expected {SUPPORTED_SCHEMA}"
|
|
)
|
|
return contract_path, data
|
|
|
|
|
|
def app_identity(data: dict[str, Any]) -> dict[str, Any]:
|
|
app = data.get("app", {})
|
|
source = data.get("source", {})
|
|
return {
|
|
"app": {
|
|
"id": app.get("id"),
|
|
"name": app.get("name"),
|
|
"repo": app.get("repo"),
|
|
"owner": app.get("owner"),
|
|
"criticality": app.get("criticality"),
|
|
},
|
|
"source": {
|
|
"revision": source.get("revision"),
|
|
"artifact": source.get("artifact"),
|
|
"digest_policy": source.get("digest_policy"),
|
|
},
|
|
}
|
|
|
|
|
|
def checks_by_id(data: dict[str, Any]) -> dict[str, dict[str, Any]]:
|
|
return {check.get("id"): check for check in data.get("checks", [])}
|
|
|
|
|
|
def stage_checks(data: dict[str, Any], stage_name: str) -> list[dict[str, Any]]:
|
|
stage = data.get("stages", {}).get(stage_name, {})
|
|
lookup = checks_by_id(data)
|
|
return [lookup[item] for item in stage.get("checks", []) if item in lookup]
|
|
|
|
|
|
def stage2_helm_check(data: dict[str, Any]) -> dict[str, Any] | None:
|
|
for check in stage_checks(data, "stage2"):
|
|
if check.get("type") == "helm":
|
|
return check
|
|
return None
|
|
|
|
|
|
def precheck(name: str, status: str, required: bool, detail: str | None = None) -> dict[str, Any]:
|
|
item: dict[str, Any] = {"name": name, "status": status, "required": required}
|
|
if detail:
|
|
item["detail"] = detail
|
|
return item
|
|
|
|
|
|
def required_failures(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
return [item for item in items if item.get("required", True) and item.get("status") != "passed"]
|
|
|
|
|
|
def run_command(args: list[str], cwd: Path, timeout: int, command_ref: str) -> dict[str, Any]:
|
|
started = time.monotonic()
|
|
try:
|
|
completed = subprocess.run(
|
|
args,
|
|
cwd=cwd,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
return {
|
|
"command_ref": command_ref,
|
|
"status": "passed" if completed.returncode == 0 else "failed",
|
|
"exit_code": completed.returncode,
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
"stdout_bytes": len(completed.stdout.encode()),
|
|
"stderr_bytes": len(completed.stderr.encode()),
|
|
}
|
|
except subprocess.TimeoutExpired as exc:
|
|
stdout = exc.stdout if isinstance(exc.stdout, str) else ""
|
|
stderr = exc.stderr if isinstance(exc.stderr, str) else ""
|
|
return {
|
|
"command_ref": command_ref,
|
|
"status": "failed",
|
|
"exit_code": None,
|
|
"duration_seconds": round(time.monotonic() - started, 3),
|
|
"error": f"timeout after {timeout}s",
|
|
"stdout_bytes": len(stdout.encode()),
|
|
"stderr_bytes": len(stderr.encode()),
|
|
}
|
|
|
|
|
|
def stage3_context(app_dir: Path, contract_path: Path, data: dict[str, Any]) -> dict[str, Any]:
|
|
stage = data.get("stages", {}).get("stage3", {})
|
|
if not stage.get("enabled", False):
|
|
raise SystemExit("Stage 3 is disabled in railiance/app.toml")
|
|
app = data.get("app", {})
|
|
helm = stage2_helm_check(data) or {}
|
|
chart = app_dir / str(helm.get("chart", f"charts/{app.get('id', 'app')}"))
|
|
values = app_dir / "values" / "stage3-production.yaml"
|
|
release = str(stage.get("release", app.get("id", "app")))
|
|
namespace = str(stage.get("namespace", app.get("id", "default")))
|
|
context = {
|
|
"contract": str(contract_path),
|
|
"app_dir": str(app_dir),
|
|
"stage": "stage3",
|
|
"namespace": namespace,
|
|
"release": release,
|
|
"chart": str(chart),
|
|
"values": str(values),
|
|
"promotion_mode": stage.get("promotion_mode"),
|
|
"previous_stable": stage.get("previous_stable"),
|
|
"requires_approval": bool(stage.get("requires_approval", False)),
|
|
"evidence_expected": list(stage.get("evidence", [])),
|
|
"checks_expected": list(stage.get("checks", [])),
|
|
}
|
|
context.update(app_identity(data))
|
|
return context
|
|
|
|
|
|
def rollback_context(app_dir: Path, contract_path: Path, data: dict[str, Any]) -> dict[str, Any]:
|
|
context = stage3_context(app_dir, contract_path, data)
|
|
rollback = data.get("rollback", {})
|
|
context["rollback"] = {
|
|
"strategy": rollback.get("strategy"),
|
|
"command_ref": "rollback.command",
|
|
"verification": rollback.get("verification"),
|
|
}
|
|
return context
|
|
|
|
|
|
def promote_prechecks(app_dir: Path, context: dict[str, Any], mode: str, approval_id: str | None) -> list[dict[str, Any]]:
|
|
checks = [precheck("app.toml", "passed", True)]
|
|
chart = Path(context["chart"])
|
|
values = Path(context["values"])
|
|
checks.append(precheck("stage3-chart", "passed" if chart.exists() else "failed", True, str(chart)))
|
|
checks.append(precheck("stage3-values", "passed" if values.exists() else "failed", True, str(values)))
|
|
checks.append(
|
|
precheck(
|
|
"previous-stable",
|
|
"passed" if context.get("previous_stable") else "failed",
|
|
True,
|
|
"Stage 3 must record the rollback target before promotion",
|
|
)
|
|
)
|
|
if mode == "apply":
|
|
checks.append(precheck("helm", "passed" if shutil.which("helm") else "failed", True, "helm executable"))
|
|
else:
|
|
checks.append(precheck("helm", "not_required", False, "plan mode does not execute helm"))
|
|
if mode == "apply" and context.get("requires_approval"):
|
|
checks.append(
|
|
precheck(
|
|
"approval-id",
|
|
"passed" if approval_id else "failed",
|
|
True,
|
|
"Stage 3 requires approval before stable promotion",
|
|
)
|
|
)
|
|
elif context.get("requires_approval"):
|
|
checks.append(precheck("approval-id", "required_before_apply", False))
|
|
return checks
|
|
|
|
|
|
def rollback_prechecks(context: dict[str, Any], mode: str, approval_id: str | None, revision: str | None) -> list[dict[str, Any]]:
|
|
checks = [precheck("app.toml", "passed", True)]
|
|
strategy = context.get("rollback", {}).get("strategy")
|
|
checks.append(precheck("rollback-strategy", "passed" if strategy else "failed", True, str(strategy or "")))
|
|
if mode == "apply":
|
|
checks.append(precheck("helm", "passed" if shutil.which("helm") else "failed", True, "helm executable"))
|
|
checks.append(
|
|
precheck(
|
|
"approval-id",
|
|
"passed" if approval_id else "failed",
|
|
True,
|
|
"Rollback apply requires approval or incident evidence",
|
|
)
|
|
)
|
|
if strategy == "helm-revision":
|
|
checks.append(precheck("helm-revision", "passed" if revision else "failed", True))
|
|
else:
|
|
checks.append(precheck("helm", "not_required", False, "plan mode does not execute helm"))
|
|
checks.append(precheck("approval-id", "required_before_apply", False))
|
|
if strategy == "helm-revision":
|
|
checks.append(precheck("helm-revision", "required_before_apply", False))
|
|
return checks
|
|
|
|
|
|
def promote_args(context: dict[str, Any], timeout: int) -> list[str]:
|
|
return [
|
|
"helm",
|
|
"upgrade",
|
|
"--install",
|
|
context["release"],
|
|
context["chart"],
|
|
"--namespace",
|
|
context["namespace"],
|
|
"--create-namespace",
|
|
"-f",
|
|
context["values"],
|
|
"--atomic",
|
|
"--wait",
|
|
"--timeout",
|
|
f"{timeout}m",
|
|
]
|
|
|
|
|
|
def rollback_args(context: dict[str, Any], revision: str, timeout: int) -> list[str]:
|
|
return [
|
|
"helm",
|
|
"rollback",
|
|
context["release"],
|
|
revision,
|
|
"--namespace",
|
|
context["namespace"],
|
|
"--wait",
|
|
"--timeout",
|
|
f"{timeout}m",
|
|
]
|
|
|
|
|
|
def promote(argv: list[str]) -> int:
|
|
parser = argparse.ArgumentParser(description="Plan or apply a Stage 3 stable promotion.")
|
|
parser.add_argument("app_dir", nargs="?", default=".")
|
|
parser.add_argument("--mode", choices=["plan", "apply"], default="plan")
|
|
parser.add_argument("--plan", action="store_const", const="plan", dest="mode")
|
|
parser.add_argument("--apply", action="store_const", const="apply", dest="mode")
|
|
parser.add_argument("--approval-id")
|
|
parser.add_argument("--timeout-minutes", type=int, default=10)
|
|
parser.add_argument("--json-out")
|
|
parser.add_argument("--pretty", action="store_true")
|
|
args = parser.parse_args(argv)
|
|
|
|
app_dir = Path(args.app_dir).resolve()
|
|
contract_path, data = load_contract(app_dir)
|
|
context = stage3_context(app_dir, contract_path, data)
|
|
checks = promote_prechecks(app_dir, context, args.mode, args.approval_id)
|
|
failures = required_failures(checks)
|
|
actions: list[dict[str, Any]] = []
|
|
status = "planned" if not failures else "blocked"
|
|
if args.mode == "apply" and not failures:
|
|
action = run_command(promote_args(context, args.timeout_minutes), app_dir, args.timeout_minutes * 60, "stage3.helm-promote")
|
|
actions.append(action)
|
|
status = "applied" if action.get("status") == "passed" else "failed"
|
|
result: dict[str, Any] = {
|
|
"schema_version": "railiance.stage3-promote-result.v1",
|
|
"status": status,
|
|
"mode": args.mode,
|
|
"generated_at": utc_now(),
|
|
**context,
|
|
"approval_id": args.approval_id,
|
|
"prechecks": checks,
|
|
"actions": actions,
|
|
"planned_actions": [
|
|
{
|
|
"action_ref": "stage3.helm-promote",
|
|
"tool": "helm",
|
|
"release": context["release"],
|
|
"namespace": context["namespace"],
|
|
"chart": context["chart"],
|
|
"values": context["values"],
|
|
}
|
|
],
|
|
"summary": {
|
|
"required_prechecks_failed": len(failures),
|
|
"actions_total": len(actions),
|
|
"actions_failed": len([item for item in actions if item.get("status") != "passed"]),
|
|
},
|
|
}
|
|
return emit(result, args.json_out, args.pretty, {"planned", "applied"})
|
|
|
|
|
|
def rollback(argv: list[str]) -> int:
|
|
parser = argparse.ArgumentParser(description="Plan or apply a rollback to the previous stable release.")
|
|
parser.add_argument("app_dir", nargs="?", default=".")
|
|
parser.add_argument("--mode", choices=["plan", "apply"], default="plan")
|
|
parser.add_argument("--plan", action="store_const", const="plan", dest="mode")
|
|
parser.add_argument("--apply", action="store_const", const="apply", dest="mode")
|
|
parser.add_argument("--approval-id")
|
|
parser.add_argument("--revision", help="Helm revision to roll back to for helm-revision strategy.")
|
|
parser.add_argument("--timeout-minutes", type=int, default=10)
|
|
parser.add_argument("--json-out")
|
|
parser.add_argument("--pretty", action="store_true")
|
|
args = parser.parse_args(argv)
|
|
|
|
app_dir = Path(args.app_dir).resolve()
|
|
contract_path, data = load_contract(app_dir)
|
|
context = rollback_context(app_dir, contract_path, data)
|
|
checks = rollback_prechecks(context, args.mode, args.approval_id, args.revision)
|
|
failures = required_failures(checks)
|
|
actions: list[dict[str, Any]] = []
|
|
status = "planned" if not failures else "blocked"
|
|
if args.mode == "apply" and not failures:
|
|
action = run_command(
|
|
rollback_args(context, str(args.revision), args.timeout_minutes),
|
|
app_dir,
|
|
args.timeout_minutes * 60,
|
|
"stage3.helm-rollback",
|
|
)
|
|
actions.append(action)
|
|
status = "applied" if action.get("status") == "passed" else "failed"
|
|
result: dict[str, Any] = {
|
|
"schema_version": "railiance.stage3-rollback-result.v1",
|
|
"status": status,
|
|
"mode": args.mode,
|
|
"generated_at": utc_now(),
|
|
**context,
|
|
"approval_id": args.approval_id,
|
|
"revision": args.revision,
|
|
"prechecks": checks,
|
|
"actions": actions,
|
|
"planned_actions": [
|
|
{
|
|
"action_ref": "stage3.helm-rollback",
|
|
"tool": "helm",
|
|
"release": context["release"],
|
|
"namespace": context["namespace"],
|
|
"revision": args.revision,
|
|
}
|
|
],
|
|
"summary": {
|
|
"required_prechecks_failed": len(failures),
|
|
"actions_total": len(actions),
|
|
"actions_failed": len([item for item in actions if item.get("status") != "passed"]),
|
|
},
|
|
}
|
|
return emit(result, args.json_out, args.pretty, {"planned", "applied"})
|
|
|
|
|
|
def emit(result: dict[str, Any], json_out: str | None, pretty: bool, success_statuses: set[str]) -> int:
|
|
rendered = json.dumps(result, indent=2 if pretty else None, sort_keys=True)
|
|
print(rendered)
|
|
if json_out:
|
|
output = Path(json_out)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
output.write_text(rendered + "\n", encoding="utf-8")
|
|
return 0 if result["status"] in success_statuses else 1
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
if not argv:
|
|
print("Usage: railiance-stage3 <promote|rollback> [args]", file=sys.stderr)
|
|
return 2
|
|
command = argv[0]
|
|
if command == "promote":
|
|
return promote(argv[1:])
|
|
if command == "rollback":
|
|
return rollback(argv[1:])
|
|
print(f"Unknown Stage 3 command: {command}", file=sys.stderr)
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|