generated from coulomb/repo-seed
Add policy.py client that calls flex-auth /v1/check before sign/issue when policy.enabled is true. Record policy_decision_id in signatures.log. Default off preserves existing inventory-only behavior. Document production OpenBao health probe and update config/wiki references.
72 lines
2.0 KiB
Python
72 lines
2.0 KiB
Python
"""Domain models for OpsWarden."""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
|
|
class ActorType(str, Enum):
|
|
ADM = "adm" # human operator
|
|
AGT = "agt" # LLM-powered autonomous agent
|
|
ATM = "atm" # deterministic script / pipeline
|
|
|
|
|
|
# Default certificate TTLs per ActorType (AccessManagementDirective §2)
|
|
DEFAULT_TTL_HOURS: dict[ActorType, int] = {
|
|
ActorType.ADM: 48,
|
|
ActorType.AGT: 24,
|
|
ActorType.ATM: 8,
|
|
}
|
|
|
|
# Maximum permitted TTLs — same values, explicit policy name for enforcement
|
|
MAX_TTL_HOURS: dict[ActorType, int] = dict(DEFAULT_TTL_HOURS)
|
|
|
|
# Required name prefixes per ActorType (directive §2 naming convention)
|
|
ACTOR_PREFIX: dict[ActorType, str] = {
|
|
ActorType.ADM: "adm-",
|
|
ActorType.AGT: "agt-",
|
|
ActorType.ATM: "atm-",
|
|
}
|
|
|
|
|
|
def validate_actor_name(name: str, actor_type: ActorType) -> None:
|
|
"""Raise ValueError if name does not carry the required prefix for actor_type."""
|
|
prefix = ACTOR_PREFIX[actor_type]
|
|
if not name.startswith(prefix):
|
|
raise ValueError(
|
|
f"Actor name {name!r} must start with {prefix!r} for type {actor_type.value!r}. "
|
|
f"(AccessManagementDirective §2 naming convention)"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class CertSpec:
|
|
"""Signing request passed to a CABackend."""
|
|
|
|
actor_name: str
|
|
actor_type: ActorType
|
|
pubkey_path: Path
|
|
ttl_hours: int
|
|
principals: List[str]
|
|
identity: str = "" # defaults to actor_name if empty
|
|
policy_decision_id: Optional[str] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if not self.identity:
|
|
self.identity = self.actor_name
|
|
|
|
|
|
@dataclass
|
|
class CertRecord:
|
|
"""Result returned by a CABackend after signing."""
|
|
|
|
identity: str
|
|
valid_before: datetime
|
|
cert_path: Path
|
|
signed_at: datetime
|
|
principals: List[str] = field(default_factory=list)
|
|
actor_name: str = ""
|