Files
ops-warden/src/warden/inventory.py
2026-03-28 00:45:43 +00:00

109 lines
3.4 KiB
Python

"""Principals inventory — actor registry with type, principals, and TTL policy."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List
import yaml
from warden.models import ActorType, DEFAULT_TTL_HOURS, validate_actor_name
class InventoryError(Exception):
"""Raised when inventory is invalid."""
@dataclass
class ActorEntry:
name: str
actor_type: ActorType
principals: List[str]
ttl_hours: int
description: str = ""
@dataclass
class HostEntry:
name: str
allowed_principals: Dict[str, List[str]] # actor_type.value -> [principal, ...]
@dataclass
class PrincipalsInventory:
actors: Dict[str, ActorEntry] = field(default_factory=dict)
hosts: Dict[str, HostEntry] = field(default_factory=dict)
def load_inventory(path: Path) -> PrincipalsInventory:
"""Load inventory.yaml. Returns empty inventory if path does not exist."""
if not path.exists():
return PrincipalsInventory()
try:
with path.open() as f:
raw = yaml.safe_load(f) or {}
except yaml.YAMLError as e:
raise InventoryError(f"Invalid YAML in {path}: {e}") from e
actors: Dict[str, ActorEntry] = {}
for name, data in (raw.get("actors") or {}).items():
if not isinstance(data, dict):
raise InventoryError(f"Actor {name!r} must be a mapping")
type_raw = str(data.get("type", ""))
try:
actor_type = ActorType(type_raw)
except ValueError:
raise InventoryError(
f"Actor {name!r} has invalid type: {type_raw!r}. "
f"Must be one of: adm, agt, atm"
)
try:
validate_actor_name(name, actor_type)
except ValueError as e:
raise InventoryError(str(e)) from e
ttl = int(data.get("ttl_hours", DEFAULT_TTL_HOURS[actor_type]))
principals = list(data.get("principals") or [name])
actors[name] = ActorEntry(
name=name,
actor_type=actor_type,
principals=principals,
ttl_hours=ttl,
description=str(data.get("description", "")),
)
hosts: Dict[str, HostEntry] = {}
for hostname, data in (raw.get("hosts") or {}).items():
if not isinstance(data, dict):
raise InventoryError(f"Host {hostname!r} must be a mapping")
hosts[hostname] = HostEntry(
name=hostname,
allowed_principals=dict(data.get("allowed_principals") or {}),
)
return PrincipalsInventory(actors=actors, hosts=hosts)
def save_inventory(inventory: PrincipalsInventory, path: Path) -> None:
"""Write inventory to path, creating parent directories as needed."""
path.parent.mkdir(parents=True, exist_ok=True)
raw: dict = {
"actors": {
name: {
"type": e.actor_type.value,
"principals": e.principals,
"ttl_hours": e.ttl_hours,
**({"description": e.description} if e.description else {}),
}
for name, e in inventory.actors.items()
},
}
if inventory.hosts:
raw["hosts"] = {
name: {"allowed_principals": h.allowed_principals}
for name, h in inventory.hosts.items()
}
with path.open("w") as f:
yaml.dump(raw, f, default_flow_style=False, sort_keys=False)