#!/usr/bin/env python3 """Build a flex-auth registry snapshot from ops-warden inventory.yaml. Usage: python scripts/build_flex_auth_registry.py inventory.yaml -o registry/flex-auth/production_registry_snapshot.json flex-auth load-registry --file registry/flex-auth/production_registry_snapshot.json """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any import yaml GROUP_BY_TYPE = { "adm": "group:ops-warden-admins", "agt": "group:ops-warden-agents", "atm": "group:ops-warden-automations", } SUBJECT_TYPE_BY_ACTOR = { "adm": "Agent", "agt": "Agent", "atm": "Automation", } DESCRIPTOR_BY_TYPE = { "adm": "descriptor:ops-warden-adm-signer", "agt": "descriptor:ops-warden-agt-signer", "atm": "descriptor:ops-warden-atm-signer", } def _caring_descriptor(actor_type: str, resource_id: str) -> dict[str, Any]: return { "id": DESCRIPTOR_BY_TYPE[actor_type], "profile": "caring-0.4.0-rc2", "subject_type": "Group", "organization_relation": "ServiceProvider", "canonical_role": "Operator", "scope": { "level": "Resource", "id": resource_id, "tenant": "tenant:platform", "resource": resource_id, }, "planes": ["Identity", "Secret", "Audit"], "capabilities": ["Use", "Operate", "Audit"], "exposure_modes": ["Metadata"], "conditions": ["TimeLimited", "Logged"], "restrictions": ["PrivilegeEscalationBlocked", "SecretAccessBlocked"], "access_path": "mediated", } def build_registry(inventory: dict[str, Any]) -> dict[str, Any]: actors: dict[str, Any] = inventory.get("actors") or {} resources: list[dict[str, Any]] = [] subjects: list[dict[str, Any]] = [] groups: dict[str, list[str]] = {gid: [] for gid in GROUP_BY_TYPE.values()} relationships: list[dict[str, Any]] = [] for name, entry in sorted(actors.items()): actor_type = str(entry["type"]) principals = list(entry.get("principals") or []) ttl_hours = int(entry.get("ttl_hours") or 24) resource_id = f"ssh-cert:actor/{name}" group_id = GROUP_BY_TYPE[actor_type] resources.append( { "id": resource_id, "type": "ssh-certificate", "labels": ["ssh-signing", actor_type], "trust_zone": "platform", "owner": "team:platform-security", "attributes": { "actor_id": name, "actor_type": actor_type, "allowed_subjects": [name, f"iam:{name}"], "allowed_principals": principals, "max_ttl_hours": ttl_hours, }, } ) subjects.append( { "id": name, "type": SUBJECT_TYPE_BY_ACTOR[actor_type], "display_name": entry.get("description") or name, "organization_relation": "ServiceProvider", "roles": ["Operator"], "groups": [group_id], "tenant": "tenant:platform", "metadata": {"actor_type": actor_type}, } ) groups[group_id].append(name) relationships.append( { "id": f"rel:{name}-sign-{name}", "system": "ops-warden", "subject": group_id, "relation": "signer", "object": resource_id, "tenant": "tenant:platform", "conditions": ["TimeLimited", "Logged"], "caring": _caring_descriptor(actor_type, resource_id), } ) group_records = [ { "id": gid, "display_name": gid.replace("group:", "").replace("-", " ").title(), "members": members, "tenant": "tenant:platform", } for gid, members in groups.items() if members ] return { "systems": [ { "id": "ops-warden", "name": "Ops Warden", "resource_types": [ { "name": "ssh-certificate", "scope_level": "Resource", "planes": ["Identity", "Secret", "Audit"], "metadata": { "description": "Short-lived SSH certificate signing request." }, } ], "actions": [ { "name": "sign", "capabilities": ["Use", "Operate", "Audit"], "planes": ["Identity", "Secret", "Audit"], "exposure_modes": ["Metadata"], "metadata": { "required_context": [ "principals", "actor_type", "pubkey_fingerprint", "ttl_hours", ] }, } ], "caring_profiles": ["caring-0.4.0-rc2"], "metadata": { "flex_auth_contract": "protected-system-v0", "ops_warden_policy_gate": "v2", "policy_enabled_config": "policy.enabled", "tenant": "tenant:platform", }, } ], "resource_manifests": [ { "id": "ops-warden-ssh-certificates", "system": "ops-warden", "resources": resources, "actions": ["sign"], "caring_profile": "caring-0.4.0-rc2", "metadata": { "flex_auth_contract": "resource-registration-v0", "tenant": "tenant:platform", }, } ], "tenants": [{"id": "tenant:platform", "name": "Platform Tenant"}], "subjects": subjects, "groups": group_records, "relationships": relationships, } def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("inventory", type=Path, help="ops-warden inventory.yaml") parser.add_argument("-o", "--output", type=Path, required=True) args = parser.parse_args() inventory = yaml.safe_load(args.inventory.read_text()) or {} registry = build_registry(inventory) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(registry, indent=2) + "\n") print(f"Wrote {args.output} ({len(registry['subjects'])} actors)") if __name__ == "__main__": main()