diff --git a/registry/routing/catalog.yaml b/registry/routing/catalog.yaml index 65ea9fb..70a4085 100644 --- a/registry/routing/catalog.yaml +++ b/registry/routing/catalog.yaml @@ -50,14 +50,22 @@ entries: - id: openbao-api-key title: API key, DB credential, or dynamic lease - need_keywords: [api, key, secret, database, db, password, token, lease, openbao, vault, kv, dynamic, credential] + need_keywords: [api, key, secret, database, db, password, token, lease, openbao, vault, kv, dynamic, credential, npm, npm_auth_token, registry] owner_repo: railiance-platform subsystem: OpenBao warden_executes: false wiki_ref: wiki/CredentialRouting.md#routing-table canon_ref: net-kingdom/docs/platform-identity-security-architecture.md - reviewed: "2026-06-18" + reviewed: "2026-06-27" status: active + # Structured handoff (WP-0014) — reference example. Templates only, no values. + # ops-warden does not own this secret; it advises and (exec_capable) proxies the + # fetch *as the caller* via `warden access`, never holding or persisting the value. + auth_method: "key-cape OIDC → bao login -method=oidc role=" + path_template: "platform/workloads///" + fetch_command: "bao kv get -field= " + policy_ref: "flex-auth check secret.read:" + exec_capable: true - id: flex-auth-policy-check title: Authorization decision — may this actor perform this action diff --git a/src/warden/routing/catalog.py b/src/warden/routing/catalog.py index 21a4b96..e90ef83 100644 --- a/src/warden/routing/catalog.py +++ b/src/warden/routing/catalog.py @@ -14,6 +14,7 @@ never restates another subsystem's procedure. from __future__ import annotations import os +import re from dataclasses import dataclass from datetime import date from pathlib import Path @@ -23,6 +24,24 @@ import yaml from warden.routing.models import RouteEntry +# Structured handoff string fields (WP-0014) — templates and pointers only. +# Every one is scanned for accidental secret material; see _assert_no_secret_material. +_HANDOFF_STR_FIELDS = ("auth_method", "path_template", "fetch_command", "policy_ref") + +# Known secret-bearing token prefixes — a literal here means a value leaked into +# the catalog (which is git-tracked and agent-visible). Templates use `<...>`. +_SECRET_PREFIXES = ( + "ghp_", "gho_", "ghs_", "github_pat_", # GitHub + "sk-", "sk_live_", "sk_test_", # OpenAI / Stripe + "xoxb-", "xoxp-", # Slack + "AKIA", "ASIA", # AWS access key ids + "hvs.", "hvb.", "s.", # Vault/OpenBao service tokens + "AIza", # Google + "eyJ", # JWT +) +# A long unbroken high-entropy run that is not a placeholder — likely a raw value. +_HIGH_ENTROPY_RUN = re.compile(r"[A-Za-z0-9_\-]{32,}") + _REQUIRED_FIELDS = ( "id", "title", @@ -125,6 +144,35 @@ class Catalog: ] +def _assert_no_secret_material(entry_id: str, field_name: str, value: str) -> None: + """Reject a handoff field that appears to embed a literal secret value. + + The structured handoff fields are command/path *templates*: concrete values + must be placeholders (`<...>`) or field names, never a real credential. The + catalog is git-tracked and agent-visible, so a leaked value here is the exact + custody failure WP-0014 forbids. We screen for known token prefixes and for a + long high-entropy run that is not a placeholder. + """ + lowered = value.lower() + for prefix in _SECRET_PREFIXES: + if prefix.lower() in lowered: + raise CatalogError( + f"entry {entry_id!r} field {field_name!r} appears to contain a literal " + f"secret (matched {prefix!r}). Handoff fields are templates — use " + "placeholders like /, never a real value." + ) + for run in _HIGH_ENTROPY_RUN.findall(value): + # Allow long placeholder/path/identifier tokens; flag anything else. + if "<" in run or ">" in run: + continue + if run.replace("_", "").replace("-", "").isalpha(): + continue # all-letters run (e.g. a long word) — not a credential + raise CatalogError( + f"entry {entry_id!r} field {field_name!r} contains a high-entropy token " + f"({run[:8]}…) that is not a placeholder — suspected leaked secret value." + ) + + def _parse_entry(raw: dict, index: int) -> RouteEntry: if not isinstance(raw, dict): raise CatalogError(f"entry #{index} is not a mapping") @@ -159,8 +207,28 @@ def _parse_entry(raw: dict, index: int) -> RouteEntry: if not isinstance(raw["need_keywords"], list): raise CatalogError(f"entry {raw['id']!r} need_keywords must be a list") + # Structured handoff fields (WP-0014) — optional, screened for secret material. + entry_id = str(raw["id"]) + handoff: dict[str, Optional[str]] = {} + for fname in _HANDOFF_STR_FIELDS: + val = raw.get(fname) + if val is None or val == "": + handoff[fname] = None + continue + sval = str(val) + _assert_no_secret_material(entry_id, fname, sval) + handoff[fname] = sval + + exec_capable = bool(raw.get("exec_capable", False)) + # A lane cannot be proxy-executable without a fetch_command to run. + if exec_capable and not handoff["fetch_command"]: + raise CatalogError( + f"entry {entry_id!r} sets exec_capable: true but has no fetch_command — " + "a proxyable lane must declare the command warden runs as the caller." + ) + return RouteEntry( - id=str(raw["id"]), + id=entry_id, title=str(raw["title"]), need_keywords=[str(k) for k in raw["need_keywords"]], owner_repo=str(raw["owner_repo"]), @@ -172,6 +240,11 @@ def _parse_entry(raw: dict, index: int) -> RouteEntry: status=status, steps=[str(s) for s in steps], cert_command=str(cert_command) if cert_command else None, + auth_method=handoff["auth_method"], + path_template=handoff["path_template"], + fetch_command=handoff["fetch_command"], + exec_capable=exec_capable, + policy_ref=handoff["policy_ref"], ) diff --git a/src/warden/routing/models.py b/src/warden/routing/models.py index 868b2a1..1ce02cd 100644 --- a/src/warden/routing/models.py +++ b/src/warden/routing/models.py @@ -26,11 +26,26 @@ class RouteEntry: # SSH lane only — None/empty for routed (non-executed) needs. steps: List[str] = field(default_factory=list) cert_command: Optional[str] = None + # Structured handoff (WP-0014) — optional, allowed on any lane. These are + # *templates and pointers* the `warden access` assist layer renders (and, for + # exec_capable lanes, proxies). They are NOT authored procedure prose and they + # never carry a secret value — only placeholders (`<...>`) and field names. + # Validation in catalog.py enforces the no-secret-material rule on every one. + auth_method: Optional[str] = None # how the caller authenticates to the owner + path_template: Optional[str] = None # owner-side path with `<...>` placeholders + fetch_command: Optional[str] = None # command skeleton run *as the caller* + exec_capable: bool = False # may `warden access --fetch/--exec` proxy it + policy_ref: Optional[str] = None # flex-auth check the fetch path runs first @property def is_active(self) -> bool: return self.status == "active" + @property + def has_handoff(self) -> bool: + """True when structured assist fields are present (advisory richness).""" + return any((self.auth_method, self.path_template, self.fetch_command)) + def match_score(self, tokens: List[str]) -> int: """Keyword-overlap score against need_keywords, title, and id. diff --git a/tests/test_routing.py b/tests/test_routing.py index 9a58a24..05f099d 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -110,6 +110,60 @@ def test_missing_catalog_file(): load_catalog(Path("/nonexistent/catalog.yaml")) +# --------------------------------------------------------------------------- +# Structured handoff fields (WP-0014, T1) +# --------------------------------------------------------------------------- + +def test_handoff_fields_parse_on_routed_entry(tmp_path): + entry = dict(ROUTED_ENTRY) + entry["auth_method"] = "key-cape OIDC → bao login -method=oidc role=" + entry["path_template"] = "platform/workloads///" + entry["fetch_command"] = "bao kv get -field= " + entry["policy_ref"] = "flex-auth check secret.read:" + entry["exec_capable"] = True + catalog = load_catalog(_write_catalog(tmp_path, [entry])) + e = catalog.get("openbao-api-key") + assert e.has_handoff is True + assert e.exec_capable is True + assert e.path_template.startswith("platform/workloads/") + + +def test_real_catalog_openbao_entry_has_handoff(): + e = load_catalog(_repo_catalog()).get("openbao-api-key") + assert e is not None and e.has_handoff and e.exec_capable + assert "<" in e.path_template and "<" in e.fetch_command # templates, not values + + +def test_exec_capable_without_fetch_command_rejected(tmp_path): + bad = dict(ROUTED_ENTRY) + bad["exec_capable"] = True # no fetch_command + with pytest.raises(CatalogError, match="fetch_command"): + load_catalog(_write_catalog(tmp_path, [bad])) + + +@pytest.mark.parametrize( + "leaked", + [ + "bao write x token=ghp_abcdef0123456789abcdef0123", # github token prefix + "x=AKIAIOSFODNN7EXAMPLE", # aws key id + "header=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", # jwt prefix + "val=ZmFrZXNlY3JldDEyMzQ1Njc4OWFiY2RlZmdoaWprbA", # high-entropy run + ], +) +def test_handoff_secret_material_rejected(tmp_path, leaked): + bad = dict(ROUTED_ENTRY) + bad["fetch_command"] = leaked + with pytest.raises(CatalogError, match="secret|high-entropy"): + load_catalog(_write_catalog(tmp_path, [bad])) + + +def test_handoff_template_with_placeholders_accepted(tmp_path): + ok = dict(ROUTED_ENTRY) + ok["fetch_command"] = "bao kv get -field= platform/workloads//" + catalog = load_catalog(_write_catalog(tmp_path, [ok])) + assert catalog.get("openbao-api-key").fetch_command.startswith("bao kv get") + + # --------------------------------------------------------------------------- # find ranking # --------------------------------------------------------------------------- diff --git a/workplans/WARDEN-WP-0014-operator-access-assist.md b/workplans/WARDEN-WP-0014-operator-access-assist.md index 57232a8..b84cb4c 100644 --- a/workplans/WARDEN-WP-0014-operator-access-assist.md +++ b/workplans/WARDEN-WP-0014-operator-access-assist.md @@ -87,19 +87,23 @@ an interactive tool and lower risk to defer. ```task id: WARDEN-WP-0014-T01 -status: progress +status: done priority: high state_hub_task_id: "abb0e722-6524-4224-8638-6ee1573ed3e0" ``` -- [ ] Extend `registry/routing/catalog.yaml` entry schema with optional structured +- [x] Extend `registry/routing/catalog.yaml` entry schema with optional structured handoff fields for non-SSH lanes: `auth_method`, `path_template`, - `fetch_command`, `exec_capable` (bool), `policy_ref`. -- [ ] Fields are **generated/structured pointers**, not prose restatements — each links - to the owner's canon (`canon_ref`) for the authoritative procedure (no drift). -- [ ] Populate for `openbao-api-key` (and the coulomb_social npm shape from this thread) - as the reference example; leave `draft` entries `draft`. -- [ ] Validation: schema check rejects a `fetch_command` that embeds a literal value. + `fetch_command`, `exec_capable` (bool), `policy_ref`. (`RouteEntry` + + `_parse_entry`; `has_handoff` helper.) +- [x] Fields are **structured pointers/templates**, not prose restatements — each + sits alongside the owner's `canon_ref` for the authoritative procedure (no drift). +- [x] Populate for `openbao-api-key` (covers the coulomb_social npm shape: keyword + `npm_auth_token` added) as the reference example; `draft` entries untouched. +- [x] Validation: `_assert_no_secret_material` rejects known token prefixes and + high-entropy runs in any handoff field; `exec_capable` requires `fetch_command`. + Tests in `tests/test_routing.py` (handoff parse, real-catalog, secret-leak + matrix, placeholder-accepted). ### T2 — `warden access` advisory surface