feat: close WP-0009/WP-0013 production integration stewardship strand

Ship flex-auth policy gate registry and smoke evidence, archive WP-0009
through WP-0013, and add integration docs: ops-bridge cert_command
migration playbook, operator OpenBao token hygiene, principals drift
check script, and 2026-06-24 INTENT/SCOPE gap analysis.
This commit is contained in:
2026-06-24 12:44:32 +02:00
parent 1778b169da
commit 90007c2cda
24 changed files with 2192 additions and 121 deletions

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""Build a flex-auth registry snapshot from ops-warden inventory.yaml.
Usage:
python scripts/build_flex_auth_registry.py inventory.yaml -o registry/flex-auth/production_registry_snapshot.json
flex-auth load-registry --file registry/flex-auth/production_registry_snapshot.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
GROUP_BY_TYPE = {
"adm": "group:ops-warden-admins",
"agt": "group:ops-warden-agents",
"atm": "group:ops-warden-automations",
}
SUBJECT_TYPE_BY_ACTOR = {
"adm": "Agent",
"agt": "Agent",
"atm": "Automation",
}
DESCRIPTOR_BY_TYPE = {
"adm": "descriptor:ops-warden-adm-signer",
"agt": "descriptor:ops-warden-agt-signer",
"atm": "descriptor:ops-warden-atm-signer",
}
def _caring_descriptor(actor_type: str, resource_id: str) -> dict[str, Any]:
return {
"id": DESCRIPTOR_BY_TYPE[actor_type],
"profile": "caring-0.4.0-rc2",
"subject_type": "Group",
"organization_relation": "ServiceProvider",
"canonical_role": "Operator",
"scope": {
"level": "Resource",
"id": resource_id,
"tenant": "tenant:platform",
"resource": resource_id,
},
"planes": ["Identity", "Secret", "Audit"],
"capabilities": ["Use", "Operate", "Audit"],
"exposure_modes": ["Metadata"],
"conditions": ["TimeLimited", "Logged"],
"restrictions": ["PrivilegeEscalationBlocked", "SecretAccessBlocked"],
"access_path": "mediated",
}
def build_registry(inventory: dict[str, Any]) -> dict[str, Any]:
actors: dict[str, Any] = inventory.get("actors") or {}
resources: list[dict[str, Any]] = []
subjects: list[dict[str, Any]] = []
groups: dict[str, list[str]] = {gid: [] for gid in GROUP_BY_TYPE.values()}
relationships: list[dict[str, Any]] = []
for name, entry in sorted(actors.items()):
actor_type = str(entry["type"])
principals = list(entry.get("principals") or [])
ttl_hours = int(entry.get("ttl_hours") or 24)
resource_id = f"ssh-cert:actor/{name}"
group_id = GROUP_BY_TYPE[actor_type]
resources.append(
{
"id": resource_id,
"type": "ssh-certificate",
"labels": ["ssh-signing", actor_type],
"trust_zone": "platform",
"owner": "team:platform-security",
"attributes": {
"actor_id": name,
"actor_type": actor_type,
"allowed_subjects": [name, f"iam:{name}"],
"allowed_principals": principals,
"max_ttl_hours": ttl_hours,
},
}
)
subjects.append(
{
"id": name,
"type": SUBJECT_TYPE_BY_ACTOR[actor_type],
"display_name": entry.get("description") or name,
"organization_relation": "ServiceProvider",
"roles": ["Operator"],
"groups": [group_id],
"tenant": "tenant:platform",
"metadata": {"actor_type": actor_type},
}
)
groups[group_id].append(name)
relationships.append(
{
"id": f"rel:{name}-sign-{name}",
"system": "ops-warden",
"subject": group_id,
"relation": "signer",
"object": resource_id,
"tenant": "tenant:platform",
"conditions": ["TimeLimited", "Logged"],
"caring": _caring_descriptor(actor_type, resource_id),
}
)
group_records = [
{
"id": gid,
"display_name": gid.replace("group:", "").replace("-", " ").title(),
"members": members,
"tenant": "tenant:platform",
}
for gid, members in groups.items()
if members
]
return {
"systems": [
{
"id": "ops-warden",
"name": "Ops Warden",
"resource_types": [
{
"name": "ssh-certificate",
"scope_level": "Resource",
"planes": ["Identity", "Secret", "Audit"],
"metadata": {
"description": "Short-lived SSH certificate signing request."
},
}
],
"actions": [
{
"name": "sign",
"capabilities": ["Use", "Operate", "Audit"],
"planes": ["Identity", "Secret", "Audit"],
"exposure_modes": ["Metadata"],
"metadata": {
"required_context": [
"principals",
"actor_type",
"pubkey_fingerprint",
"ttl_hours",
]
},
}
],
"caring_profiles": ["caring-0.4.0-rc2"],
"metadata": {
"flex_auth_contract": "protected-system-v0",
"ops_warden_policy_gate": "v2",
"policy_enabled_config": "policy.enabled",
"tenant": "tenant:platform",
},
}
],
"resource_manifests": [
{
"id": "ops-warden-ssh-certificates",
"system": "ops-warden",
"resources": resources,
"actions": ["sign"],
"caring_profile": "caring-0.4.0-rc2",
"metadata": {
"flex_auth_contract": "resource-registration-v0",
"tenant": "tenant:platform",
},
}
],
"tenants": [{"id": "tenant:platform", "name": "Platform Tenant"}],
"subjects": subjects,
"groups": group_records,
"relationships": relationships,
}
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("inventory", type=Path, help="ops-warden inventory.yaml")
parser.add_argument("-o", "--output", type=Path, required=True)
args = parser.parse_args()
inventory = yaml.safe_load(args.inventory.read_text()) or {}
registry = build_registry(inventory)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(registry, indent=2) + "\n")
print(f"Wrote {args.output} ({len(registry['subjects'])} actors)")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Compare warden inventory host principals with railiance-infra ssh_principals.yaml.
Usage:
python scripts/check_principals_drift.py \\
--inventory ~/.config/warden/inventory.yaml \\
--infra ~/railiance-infra/ansible/inventory/ssh_principals.yaml
Exit 0 when no drift; exit 1 when principals differ. No secrets printed.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from typing import Any
import yaml
def _inventory_host_principals(inventory: dict[str, Any]) -> set[str]:
principals: set[str] = set()
hosts = inventory.get("hosts") or {}
for host_entry in hosts.values():
allowed = host_entry.get("allowed_principals") or {}
for principal_list in allowed.values():
principals.update(principal_list)
return principals
def _infra_principals(infra: dict[str, Any]) -> set[str]:
principals: set[str] = set()
for host_data in (infra.get("ssh_principals") or {}).values():
for user_principals in (host_data.get("users") or {}).values():
principals.update(user_principals)
return principals
def _actor_principals(inventory: dict[str, Any]) -> set[str]:
principals: set[str] = set()
for entry in (inventory.get("actors") or {}).values():
principals.update(entry.get("principals") or [])
return principals
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--inventory",
type=Path,
default=Path.home() / ".config/warden/inventory.yaml",
)
parser.add_argument(
"--infra",
type=Path,
default=Path.home() / "railiance-infra/ansible/inventory/ssh_principals.yaml",
)
args = parser.parse_args()
if not args.inventory.exists():
print(f"inventory not found: {args.inventory}", file=sys.stderr)
return 2
if not args.infra.exists():
print(f"infra principals not found: {args.infra}", file=sys.stderr)
return 2
inventory = yaml.safe_load(args.inventory.read_text()) or {}
infra = yaml.safe_load(args.infra.read_text()) or {}
host_principals = _inventory_host_principals(inventory)
infra_principals = _infra_principals(infra)
actor_principals = _actor_principals(inventory)
only_inventory = sorted(host_principals - infra_principals)
only_infra = sorted(infra_principals - host_principals)
actors_not_on_hosts = sorted(actor_principals - host_principals)
drift = bool(only_inventory or only_infra or actors_not_on_hosts)
print(f"inventory hosts principals ({len(host_principals)}): {', '.join(sorted(host_principals)) or '(none)'}")
print(f"infra deployed principals ({len(infra_principals)}): {', '.join(sorted(infra_principals)) or '(none)'}")
print(f"inventory actor principals ({len(actor_principals)}): {', '.join(sorted(actor_principals)) or '(none)'}")
if only_inventory:
print("\nDRIFT: in inventory hosts but not infra:", ", ".join(only_inventory))
if only_infra:
print("DRIFT: in infra but not inventory hosts:", ", ".join(only_infra))
if actors_not_on_hosts:
print("WARN: actor principals not listed under any inventory host:", ", ".join(actors_not_on_hosts))
if not drift and not actors_not_on_hosts:
print("\nOK — no host/infra principal drift")
return 0
if drift:
print("\nRegenerate flex-auth registry after inventory changes:")
print(" python scripts/build_flex_auth_registry.py <inventory> -o registry/flex-auth/production_registry_snapshot.json")
return 1
print("\nOK — host/infra aligned (actor/host warning only)")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,105 @@
#!/usr/bin/env bash
# Production policy-gate smoke for WARDEN-WP-0009 T02.
#
# Validates flex-auth registry (from inventory), allow/deny paths through
# warden sign, and optionally OpenBao-backed signing when VAULT_TOKEN works.
#
# Usage:
# ./scripts/policy_gate_production_smoke.sh
# INVENTORY=~/.config/warden/inventory.yaml ./scripts/policy_gate_production_smoke.sh
# SMOKE_VAULT=1 ./scripts/policy_gate_production_smoke.sh # also test backend: vault
set -euo pipefail
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
INVENTORY="${INVENTORY:-$HOME/.config/warden/inventory.yaml}"
REGISTRY="$ROOT/registry/flex-auth/production_registry_snapshot.json"
POLICY="${FLEX_AUTH_POLICY:-$HOME/flex-auth/examples/ops-warden/policy_package.md}"
FLEX_AUTH_BIN="${FLEX_AUTH_BIN:-/tmp/flex-auth}"
ADDR="${FLEX_AUTH_ADDR:-127.0.0.1:18090}"
PUBKEY="${PUBKEY:-$HOME/.ssh/agt-state-hub-bridge_ed25519.pub}"
ACTOR="${ACTOR:-agt-state-hub-bridge}"
SMOKE_DIR="$(mktemp -d /tmp/warden-prod-policy-smoke-XXXXXX)"
cleanup() {
if [[ -n "${FA_PID:-}" ]] && kill -0 "$FA_PID" 2>/dev/null; then
kill "$FA_PID" 2>/dev/null || true
wait "$FA_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT
echo "==> Building registry from $INVENTORY"
uv run --directory "$ROOT" python scripts/build_flex_auth_registry.py \
"$INVENTORY" -o "$REGISTRY"
"$FLEX_AUTH_BIN" load-registry --file "$REGISTRY" >/dev/null
echo "==> Starting flex-auth on $ADDR"
"$FLEX_AUTH_BIN" serve \
--addr "$ADDR" \
--registry "$REGISTRY" \
--policy "$POLICY" \
--log "$SMOKE_DIR/flex-auth-decisions.jsonl" &
FA_PID=$!
sleep 0.6
ssh-keygen -t ed25519 -f "$SMOKE_DIR/ca_key" -N "" -q
cat >"$SMOKE_DIR/warden.yaml" <<EOF
backend: local
ca_key: $SMOKE_DIR/ca_key
state_dir: $SMOKE_DIR/state
inventory_path: $INVENTORY
policy:
enabled: true
flex_auth_url: http://$ADDR
fail_closed: true
tenant: tenant:platform
system: ops-warden
EOF
export WARDEN_CONFIG="$SMOKE_DIR/warden.yaml"
echo "==> Allow path: warden sign $ACTOR"
uv run --directory "$ROOT" warden sign "$ACTOR" --pubkey "$PUBKEY" >/dev/null
ALLOW_LINE="$(tail -1 "$SMOKE_DIR/state/signatures.log")"
python3 -c "import json,sys; e=json.loads(sys.argv[1]); assert e.get('policy_decision_id'), e; print('policy_decision_id:', e['policy_decision_id'])" "$ALLOW_LINE"
echo "==> Deny path: ttl above max"
set +e
DENY_OUT="$(uv run --directory "$ROOT" warden sign "$ACTOR" --pubkey "$PUBKEY" --ttl 999 2>&1)"
DENY_RC=$?
set -e
if [[ "$DENY_RC" -ne 1 ]]; then
echo "expected deny exit 1, got $DENY_RC" >&2
exit 1
fi
echo "$DENY_OUT" | grep -q "ttl_out_of_bounds"
if [[ "${SMOKE_VAULT:-0}" == "1" ]]; then
echo "==> Vault-backed allow (requires scoped VAULT_TOKEN)"
cat >"$SMOKE_DIR/warden-vault.yaml" <<EOF
backend: vault
vault:
addr: https://bao.coulomb.social
mount: ssh
role_map:
adm: adm-role
agt: agt-role
atm: atm-role
token_env: VAULT_TOKEN
inventory_path: $INVENTORY
state_dir: $SMOKE_DIR/state-vault
policy:
enabled: true
flex_auth_url: http://$ADDR
fail_closed: true
tenant: tenant:platform
system: ops-warden
EOF
export WARDEN_CONFIG="$SMOKE_DIR/warden-vault.yaml"
uv run --directory "$ROOT" warden sign "$ACTOR" --pubkey "$PUBKEY" >/dev/null
VAULT_LINE="$(tail -1 "$SMOKE_DIR/state-vault/signatures.log")"
python3 -c "import json,sys; e=json.loads(sys.argv[1]); assert e.get('backend')=='vault' and e.get('policy_decision_id'); print('vault policy_decision_id:', e['policy_decision_id'])" "$VAULT_LINE"
fi
echo "OK — production registry policy gate smoke passed"