#!/usr/bin/env python3 """Compare warden inventory host principals with railiance-infra ssh_principals.yaml. Usage: python scripts/check_principals_drift.py \\ --inventory ~/.config/warden/inventory.yaml \\ --infra ~/railiance-infra/ansible/inventory/ssh_principals.yaml Exit 0 when no drift; exit 1 when principals differ. No secrets printed. """ from __future__ import annotations import argparse import sys from pathlib import Path from typing import Any import yaml def _inventory_host_principals(inventory: dict[str, Any]) -> set[str]: principals: set[str] = set() hosts = inventory.get("hosts") or {} for host_entry in hosts.values(): allowed = host_entry.get("allowed_principals") or {} for principal_list in allowed.values(): principals.update(principal_list) return principals def _infra_principals(infra: dict[str, Any]) -> set[str]: principals: set[str] = set() for host_data in (infra.get("ssh_principals") or {}).values(): for user_principals in (host_data.get("users") or {}).values(): principals.update(user_principals) return principals def _actor_principals(inventory: dict[str, Any]) -> set[str]: principals: set[str] = set() for entry in (inventory.get("actors") or {}).values(): principals.update(entry.get("principals") or []) return principals def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--inventory", type=Path, default=Path.home() / ".config/warden/inventory.yaml", ) parser.add_argument( "--infra", type=Path, default=Path.home() / "railiance-infra/ansible/inventory/ssh_principals.yaml", ) args = parser.parse_args() if not args.inventory.exists(): print(f"inventory not found: {args.inventory}", file=sys.stderr) return 2 if not args.infra.exists(): print(f"infra principals not found: {args.infra}", file=sys.stderr) return 2 inventory = yaml.safe_load(args.inventory.read_text()) or {} infra = yaml.safe_load(args.infra.read_text()) or {} host_principals = _inventory_host_principals(inventory) infra_principals = _infra_principals(infra) actor_principals = _actor_principals(inventory) only_inventory = sorted(host_principals - infra_principals) only_infra = sorted(infra_principals - host_principals) actors_not_on_hosts = sorted(actor_principals - host_principals) drift = bool(only_inventory or only_infra or actors_not_on_hosts) print(f"inventory hosts principals ({len(host_principals)}): {', '.join(sorted(host_principals)) or '(none)'}") print(f"infra deployed principals ({len(infra_principals)}): {', '.join(sorted(infra_principals)) or '(none)'}") print(f"inventory actor principals ({len(actor_principals)}): {', '.join(sorted(actor_principals)) or '(none)'}") if only_inventory: print("\nDRIFT: in inventory hosts but not infra:", ", ".join(only_inventory)) if only_infra: print("DRIFT: in infra but not inventory hosts:", ", ".join(only_infra)) if actors_not_on_hosts: print("WARN: actor principals not listed under any inventory host:", ", ".join(actors_not_on_hosts)) if not drift and not actors_not_on_hosts: print("\nOK — no host/infra principal drift") return 0 if drift: print("\nRegenerate flex-auth registry after inventory changes:") print(" python scripts/build_flex_auth_registry.py -o registry/flex-auth/production_registry_snapshot.json") return 1 print("\nOK — host/infra aligned (actor/host warning only)") return 0 if __name__ == "__main__": raise SystemExit(main())