Files
ops-warden/src/warden/routing/catalog.py
tegwick 1f7970ad9b feat(WARDEN-WP-0014): T1 — structured handoff fields in routing catalog
Adds optional assist-layer fields (auth_method, path_template,
fetch_command, exec_capable, policy_ref) to RouteEntry, parsed and
secret-screened in catalog.py. Handoff fields are templates/pointers
only — _assert_no_secret_material rejects known token prefixes and
high-entropy runs, and exec_capable requires a fetch_command. The
openbao-api-key entry is populated as the reference example (covers the
coulomb_social npm shape).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 16:00:56 +02:00

280 lines
10 KiB
Python

"""Load and validate the routing pointer catalog.
The catalog lives at ``registry/routing/catalog.yaml`` in the repo root. Resolution
order:
1. ``WARDEN_ROUTING_CATALOG`` env var, if set (used by tests / overrides).
2. Walk upward from this module looking for ``registry/routing/catalog.yaml``.
Validation enforces the **no-double-source rule**: only ``warden_executes: true``
entries may carry an authored ``steps`` block or a ``cert_command``. Any non-SSH
entry that does so is a validation error — ops-warden points at the owner's doc, it
never restates another subsystem's procedure.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import List, Optional
import yaml
from warden.routing.models import RouteEntry
# Structured handoff string fields (WP-0014) — templates and pointers only.
# Every one is scanned for accidental secret material; see _assert_no_secret_material.
_HANDOFF_STR_FIELDS = ("auth_method", "path_template", "fetch_command", "policy_ref")
# Known secret-bearing token prefixes — a literal here means a value leaked into
# the catalog (which is git-tracked and agent-visible). Templates use `<...>`.
_SECRET_PREFIXES = (
"ghp_", "gho_", "ghs_", "github_pat_", # GitHub
"sk-", "sk_live_", "sk_test_", # OpenAI / Stripe
"xoxb-", "xoxp-", # Slack
"AKIA", "ASIA", # AWS access key ids
"hvs.", "hvb.", "s.", # Vault/OpenBao service tokens
"AIza", # Google
"eyJ", # JWT
)
# A long unbroken high-entropy run that is not a placeholder — likely a raw value.
_HIGH_ENTROPY_RUN = re.compile(r"[A-Za-z0-9_\-]{32,}")
_REQUIRED_FIELDS = (
"id",
"title",
"need_keywords",
"owner_repo",
"subsystem",
"warden_executes",
"wiki_ref",
"canon_ref",
"reviewed",
"status",
)
_VALID_STATUS = ("active", "draft")
# Default review cadence — see wiki/AccessRouting.md#drift-review-cadence
DEFAULT_STALE_DAYS = 90
def days_since_review(reviewed: str, *, today: Optional[date] = None) -> int:
"""Calendar days between reviewed date (YYYY-MM-DD) and today."""
reviewed_date = date.fromisoformat(reviewed)
ref = today or date.today()
return (ref - reviewed_date).days
def is_review_stale(
reviewed: str,
*,
threshold_days: int = DEFAULT_STALE_DAYS,
today: Optional[date] = None,
) -> bool:
"""True when reviewed date is older than the cadence threshold."""
return days_since_review(reviewed, today=today) > threshold_days
class CatalogError(Exception):
"""Raised when the routing catalog is missing or invalid."""
def find_catalog_path(start: Optional[Path] = None) -> Path:
"""Locate registry/routing/catalog.yaml.
Honors WARDEN_ROUTING_CATALOG first; otherwise walks up from `start`
(default: this module) until a repo root containing the catalog is found.
"""
override = os.environ.get("WARDEN_ROUTING_CATALOG")
if override:
return Path(os.path.expanduser(override))
rel = Path("registry") / "routing" / "catalog.yaml"
here = (start or Path(__file__)).resolve()
for parent in [here, *here.parents]:
candidate = parent / rel
if candidate.exists():
return candidate
raise CatalogError(
f"Routing catalog not found ({rel}). Set WARDEN_ROUTING_CATALOG to override."
)
@dataclass
class Catalog:
path: Path
entries: List[RouteEntry]
# --- lookup helpers ---------------------------------------------------
def get(self, entry_id: str) -> Optional[RouteEntry]:
for e in self.entries:
if e.id == entry_id:
return e
return None
def listed(self, include_draft: bool = False) -> List[RouteEntry]:
if include_draft:
return list(self.entries)
return [e for e in self.entries if e.is_active]
def find(self, query: str, include_draft: bool = False, limit: int = 5) -> List[RouteEntry]:
"""Rank entries by keyword overlap with the query. Highest first."""
tokens = [t for t in query.lower().replace("-", " ").split() if t]
pool = self.listed(include_draft=include_draft)
scored = [(e.match_score(tokens), e) for e in pool]
scored = [(s, e) for s, e in scored if s > 0]
scored.sort(key=lambda pair: (-pair[0], pair[1].id))
return [e for _, e in scored[:limit]]
def stale(
self,
include_draft: bool = False,
threshold_days: int = DEFAULT_STALE_DAYS,
*,
today: Optional[date] = None,
) -> List[RouteEntry]:
"""Entries whose reviewed date is past the cadence threshold."""
return [
e
for e in self.listed(include_draft=include_draft)
if is_review_stale(e.reviewed, threshold_days=threshold_days, today=today)
]
def _assert_no_secret_material(entry_id: str, field_name: str, value: str) -> None:
"""Reject a handoff field that appears to embed a literal secret value.
The structured handoff fields are command/path *templates*: concrete values
must be placeholders (`<...>`) or field names, never a real credential. The
catalog is git-tracked and agent-visible, so a leaked value here is the exact
custody failure WP-0014 forbids. We screen for known token prefixes and for a
long high-entropy run that is not a placeholder.
"""
lowered = value.lower()
for prefix in _SECRET_PREFIXES:
if prefix.lower() in lowered:
raise CatalogError(
f"entry {entry_id!r} field {field_name!r} appears to contain a literal "
f"secret (matched {prefix!r}). Handoff fields are templates — use "
"placeholders like <FIELD>/<PATH>, never a real value."
)
for run in _HIGH_ENTROPY_RUN.findall(value):
# Allow long placeholder/path/identifier tokens; flag anything else.
if "<" in run or ">" in run:
continue
if run.replace("_", "").replace("-", "").isalpha():
continue # all-letters run (e.g. a long word) — not a credential
raise CatalogError(
f"entry {entry_id!r} field {field_name!r} contains a high-entropy token "
f"({run[:8]}…) that is not a placeholder — suspected leaked secret value."
)
def _parse_entry(raw: dict, index: int) -> RouteEntry:
if not isinstance(raw, dict):
raise CatalogError(f"entry #{index} is not a mapping")
missing = [f for f in _REQUIRED_FIELDS if f not in raw]
if missing:
ident = raw.get("id", f"#{index}")
raise CatalogError(f"entry {ident!r} missing required field(s): {', '.join(missing)}")
warden_executes = bool(raw["warden_executes"])
steps = raw.get("steps") or []
cert_command = raw.get("cert_command")
status = str(raw["status"])
if status not in _VALID_STATUS:
raise CatalogError(
f"entry {raw['id']!r} has invalid status {status!r} (expected one of {_VALID_STATUS})"
)
# No-double-source rule: authored procedure only on the SSH lane.
if not warden_executes and steps:
raise CatalogError(
f"entry {raw['id']!r} is not warden_executes but carries a `steps` block "
"— routed needs point at the owner's doc; they must not restate procedure "
"(no-double-source rule)."
)
if not warden_executes and cert_command:
raise CatalogError(
f"entry {raw['id']!r} is not warden_executes but carries a `cert_command`."
)
if not isinstance(raw["need_keywords"], list):
raise CatalogError(f"entry {raw['id']!r} need_keywords must be a list")
# Structured handoff fields (WP-0014) — optional, screened for secret material.
entry_id = str(raw["id"])
handoff: dict[str, Optional[str]] = {}
for fname in _HANDOFF_STR_FIELDS:
val = raw.get(fname)
if val is None or val == "":
handoff[fname] = None
continue
sval = str(val)
_assert_no_secret_material(entry_id, fname, sval)
handoff[fname] = sval
exec_capable = bool(raw.get("exec_capable", False))
# A lane cannot be proxy-executable without a fetch_command to run.
if exec_capable and not handoff["fetch_command"]:
raise CatalogError(
f"entry {entry_id!r} sets exec_capable: true but has no fetch_command — "
"a proxyable lane must declare the command warden runs as the caller."
)
return RouteEntry(
id=entry_id,
title=str(raw["title"]),
need_keywords=[str(k) for k in raw["need_keywords"]],
owner_repo=str(raw["owner_repo"]),
subsystem=str(raw["subsystem"]),
warden_executes=warden_executes,
wiki_ref=str(raw["wiki_ref"]),
canon_ref=str(raw["canon_ref"]),
reviewed=str(raw["reviewed"]),
status=status,
steps=[str(s) for s in steps],
cert_command=str(cert_command) if cert_command else None,
auth_method=handoff["auth_method"],
path_template=handoff["path_template"],
fetch_command=handoff["fetch_command"],
exec_capable=exec_capable,
policy_ref=handoff["policy_ref"],
)
def load_catalog(path: Optional[Path] = None) -> Catalog:
"""Load, parse, and validate the routing catalog."""
catalog_path = path or find_catalog_path()
if not catalog_path.exists():
raise CatalogError(f"Routing catalog not found: {catalog_path}")
try:
with catalog_path.open() as f:
raw = yaml.safe_load(f)
except yaml.YAMLError as e:
raise CatalogError(f"Invalid YAML in {catalog_path}: {e}") from e
if not isinstance(raw, dict):
raise CatalogError("Catalog must be a YAML mapping")
raw_entries = raw.get("entries")
if not isinstance(raw_entries, list) or not raw_entries:
raise CatalogError("Catalog has no `entries` list")
entries: List[RouteEntry] = []
seen: set[str] = set()
for i, raw_entry in enumerate(raw_entries):
entry = _parse_entry(raw_entry, i)
if entry.id in seen:
raise CatalogError(f"duplicate entry id: {entry.id!r}")
seen.add(entry.id)
entries.append(entry)
return Catalog(path=catalog_path, entries=entries)