From 2c513864bcb9df049af175711d15491c842c89b6 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 27 Jun 2026 16:13:51 +0200 Subject: [PATCH] =?UTF-8?q?feat(WARDEN-WP-0014):=20T2=20=E2=80=94=20warden?= =?UTF-8?q?=20access=20advisory=20front=20door?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `warden access [--domain X] [--json]`: resolves a credential need against the routing catalog and renders the structured handoff (owner, auth method, path template, command skeleton, policy gate status, proxy hint). SSH lane points at `warden sign`; routed lanes end "warden advises, the owner vends". New pure warden/access.py module (expand_handoff, policy_gate_status) reused by the T3 proxy lane. JSON output is stable and secret-free. tests/test_access.py added. 157 passed, lint clean. Co-Authored-By: Claude Opus 4.8 --- src/warden/access.py | 76 +++++++++++++ src/warden/cli.py | 105 ++++++++++++++++++ tests/test_access.py | 101 +++++++++++++++++ .../WARDEN-WP-0014-operator-access-assist.md | 18 +-- 4 files changed, 293 insertions(+), 7 deletions(-) create mode 100644 src/warden/access.py create mode 100644 tests/test_access.py diff --git a/src/warden/access.py b/src/warden/access.py new file mode 100644 index 0000000..c7d2777 --- /dev/null +++ b/src/warden/access.py @@ -0,0 +1,76 @@ +"""Operator access assist — render structured handoff for a credential need. + +The `warden access` front door (WP-0014) resolves a need to a `RouteEntry` and +renders its **structured handoff**: how the caller authenticates to the owning +subsystem, the owner-side path template, the command skeleton to run *as the +caller*, and the policy check the fetch path gates on. + +This module is **pure**: it expands templates and reports gate status. It never +fetches, holds, or logs a secret value — that boundary is the whole point of the +assist layer. Proxy execution (`--fetch`/`--exec`) lives in the CLI/T3 lane and +reuses `expand_handoff` to build the command it runs as the caller. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from warden.config import ConfigError, load_config +from warden.routing.models import RouteEntry + + +@dataclass +class ExpandedHandoff: + """Handoff templates with `` substituted when a domain is supplied. + + Remaining placeholders (``, ``, ``) are intentionally + left for the caller/owner to fill — ops-warden does not invent owner-side names. + """ + + auth_method: Optional[str] + path_template: Optional[str] + fetch_command: Optional[str] + policy_ref: Optional[str] + exec_capable: bool + + +def _sub_domain(value: Optional[str], domain: Optional[str]) -> Optional[str]: + if value and domain: + return value.replace("", domain) + return value + + +def expand_handoff(entry: RouteEntry, domain: Optional[str] = None) -> ExpandedHandoff: + """Expand an entry's handoff templates for display or proxy. + + The catalog `fetch_command` may reference the literal token ````; + we inline the entry's ``path_template`` so the rendered command is self-contained, + then substitute ```` across every field when a domain is given. + """ + path = entry.path_template + fetch = entry.fetch_command + if fetch and path and "" in fetch: + fetch = fetch.replace("", path) + + return ExpandedHandoff( + auth_method=_sub_domain(entry.auth_method, domain), + path_template=_sub_domain(path, domain), + fetch_command=_sub_domain(fetch, domain), + policy_ref=_sub_domain(entry.policy_ref, domain), + exec_capable=entry.exec_capable, + ) + + +def policy_gate_status() -> str: + """One-line description of whether the flex-auth gate is enforced for fetches. + + Advisory output only — never raises. The proxy lane (T3) is what actually runs + the gate before fetching; here we just report the configured posture. + """ + try: + cfg = load_config() + except ConfigError: + return "advisory — no warden.yaml (caller identity; gate not enforced)" + if cfg.policy.enabled: + return f"enforced — flex-auth at {cfg.policy.flex_auth_url}" + return "advisory — policy.enabled=false (gate ships with flex-auth deploy)" diff --git a/src/warden/cli.py b/src/warden/cli.py index f635d93..4b708c4 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -708,3 +708,108 @@ def route_find( ) return _print_entry_table(matches, f"Matches for {query!r}") + + +# --------------------------------------------------------------------------- +# warden access — operator front door (advisory; proxy lands in T3) +# --------------------------------------------------------------------------- + +def _access_json(entry, expanded, gate: str, domain: Optional[str]) -> dict: + """Stable, secret-free JSON shape for agentic operators. WP-0014 T2.""" + payload = _entry_summary(entry) + payload["domain"] = domain + payload["policy_gate"] = gate + payload["handoff"] = { + "auth_method": expanded.auth_method, + "path_template": expanded.path_template, + "fetch_command": expanded.fetch_command, + "policy_ref": expanded.policy_ref, + "exec_capable": expanded.exec_capable, + } + if entry.warden_executes: + payload["next_action"] = "ops-warden issues this directly — see cert_command" + payload["cert_command"] = entry.cert_command + else: + payload["next_action"] = ( + f"obtain from {entry.owner_repo} ({entry.subsystem}); " + "ops-warden holds no value" + ) + return payload + + +@app.command("access") +def access( + need: Annotated[str, typer.Argument(help="Free-text need, e.g. 'npm token', 'db password'")], + domain: Annotated[ + Optional[str], + typer.Option("--domain", help="Substitute in path/auth templates, e.g. coulomb_social"), + ] = None, + output_json: Annotated[bool, typer.Option("--json", help="Output JSON (stable, secret-free)")] = False, + all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False, +) -> None: + """Operator front door: how to obtain any credential, gated and audited. + + Advisory by default — renders the owner, auth method, path template, command + skeleton, and policy gate status for the best-matching need. ops-warden issues + the SSH lane directly and **routes every other need to its owner** — it never + holds or vends the secret value. (Proxy fetch arrives in WP-0014 T3.) + """ + from warden.access import expand_handoff, policy_gate_status + + catalog = _load_catalog() + matches = catalog.find(need, include_draft=all_entries, limit=1) + if not matches: + err.print( + f"[red]No access match for {need!r}.[/red] " + "Try `warden route list --all` to browse, or rephrase the need." + ) + raise typer.Exit(1) + + entry = matches[0] + expanded = expand_handoff(entry, domain) + gate = policy_gate_status() + + if output_json: + print(json.dumps(_access_json(entry, expanded, gate, domain), indent=2)) + return + + console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])") + console.print(f" owner : {entry.owner_repo} ({entry.subsystem})") + + if entry.warden_executes: + console.print("\n[green]ops-warden issues this directly.[/green]") + console.print(f" run : [bold]{entry.cert_command}[/bold]") + if entry.steps: + for i, step in enumerate(entry.steps, 1): + console.print(f" {i}. {step}") + return + + if expanded.auth_method: + console.print(f" auth : {expanded.auth_method}") + if expanded.path_template: + console.print(f" path : {expanded.path_template}") + if expanded.fetch_command: + console.print(f" fetch : {expanded.fetch_command}") + if expanded.policy_ref: + console.print(f" policy : {expanded.policy_ref} [dim]({gate})[/dim]") + console.print(f" wiki : {entry.wiki_ref}") + console.print(f" canon : {entry.canon_ref}") + + if expanded.exec_capable: + proxy = f"warden access {need!r}" + if domain: + proxy += f" --domain {domain}" + console.print( + f" proxy : [dim]{proxy} --fetch[/dim] " + "[yellow](exec_capable; proxy ships in WP-0014 T3)[/yellow]" + ) + if expanded.path_template and "<" in expanded.path_template: + console.print( + " note : remaining <…> placeholders are owner-confirmed names " + f"(coordinate with {entry.owner_repo})." + ) + console.print( + f"\n[yellow]ops-warden does not hold this secret.[/yellow] " + f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — " + "warden advises, the owner vends." + ) diff --git a/tests/test_access.py b/tests/test_access.py new file mode 100644 index 0000000..394fddb --- /dev/null +++ b/tests/test_access.py @@ -0,0 +1,101 @@ +"""Tests for the `warden access` operator front door (WP-0014 T2).""" +from __future__ import annotations + +import json +from pathlib import Path + +from typer.testing import CliRunner + +from warden.access import expand_handoff, policy_gate_status +from warden.cli import app +from warden.routing.models import RouteEntry + +runner = CliRunner() + + +def _repo_catalog() -> Path: + return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml" + + +def _openbao_entry() -> RouteEntry: + return RouteEntry( + id="openbao-api-key", + title="API key, DB credential, or dynamic lease", + need_keywords=["api", "key", "npm", "token"], + owner_repo="railiance-platform", + subsystem="OpenBao", + warden_executes=False, + wiki_ref="wiki/CredentialRouting.md#routing-table", + canon_ref="net-kingdom/docs/x.md", + reviewed="2026-06-27", + status="active", + auth_method="key-cape OIDC → bao login -method=oidc role=", + path_template="platform/workloads///", + fetch_command="bao kv get -field= ", + policy_ref="flex-auth check secret.read:", + exec_capable=True, + ) + + +# --- pure expansion -------------------------------------------------------- + +def test_expand_inlines_path_template_token(): + e = expand_handoff(_openbao_entry()) + assert "" not in e.fetch_command + assert e.fetch_command.startswith("bao kv get -field= platform/workloads/") + + +def test_expand_substitutes_domain(): + e = expand_handoff(_openbao_entry(), domain="coulomb_social") + assert "coulomb_social" in e.path_template + assert "" not in e.path_template + assert "" not in e.auth_method + # owner-side names stay as placeholders — warden does not invent them + assert "" in e.path_template and "" in e.path_template + + +def test_expand_without_domain_keeps_placeholder(): + e = expand_handoff(_openbao_entry()) + assert "" in e.path_template + + +def test_policy_gate_status_no_config(monkeypatch, tmp_path): + monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nope.yaml")) + assert "advisory" in policy_gate_status() + + +# --- CLI ------------------------------------------------------------------- + +def test_access_advisory_output(monkeypatch): + monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog())) + r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social"]) + assert r.exit_code == 0 + assert "railiance-platform" in r.stdout + assert "platform/workloads/coulomb_social/" in r.stdout + assert "does not hold this secret" in r.stdout + + +def test_access_json_shape_is_secret_free(monkeypatch): + monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog())) + r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social", "--json"]) + assert r.exit_code == 0 + payload = json.loads(r.stdout) + assert payload["id"] == "openbao-api-key" + assert payload["domain"] == "coulomb_social" + assert payload["handoff"]["exec_capable"] is True + # only placeholders/templates — never a concrete credential + assert "" in payload["handoff"]["fetch_command"] + + +def test_access_ssh_lane_points_to_sign(monkeypatch): + monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog())) + r = runner.invoke(app, ["access", "ssh cert for host access"]) + assert r.exit_code == 0 + assert "issues this directly" in r.stdout + assert "warden sign" in r.stdout + + +def test_access_no_match_exits_nonzero(monkeypatch): + monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog())) + r = runner.invoke(app, ["access", "zzzz-no-such-need-xyzzy"]) + assert r.exit_code == 1 diff --git a/workplans/WARDEN-WP-0014-operator-access-assist.md b/workplans/WARDEN-WP-0014-operator-access-assist.md index b84cb4c..4cbe432 100644 --- a/workplans/WARDEN-WP-0014-operator-access-assist.md +++ b/workplans/WARDEN-WP-0014-operator-access-assist.md @@ -109,17 +109,21 @@ state_hub_task_id: "abb0e722-6524-4224-8638-6ee1573ed3e0" ```task id: WARDEN-WP-0014-T02 -status: todo +status: done priority: high state_hub_task_id: "c1497263-7124-459f-b63a-d0c0c7005c86" ``` -- [ ] `warden access [--domain X] [--json]` — resolves via the same matcher as - `warden route find`, but renders the **structured handoff**: owner, auth method, - path template, command skeleton, policy ref + gate status, and any "unconfirmed - path → provision request" note. -- [ ] Advisory is the **default** behavior (no value fetched). -- [ ] `--json` output for agentic operators (stable shape, documented). +- [x] `warden access [--domain X] [--json]` — resolves via the same matcher as + `warden route find` and renders the **structured handoff**: owner, auth method, + path template, command skeleton, policy ref + gate status, proxy hint, and the + `<…>` owner-confirmed-name note. (`warden/access.py` pure module + `access` + command in `cli.py`.) +- [x] Advisory is the **default** behavior (no value fetched); SSH lane points at + `warden sign`; routed lanes end with "warden advises, the owner vends". +- [x] `--json` output for agentic operators — stable, secret-free shape + (`handoff` block + `next_action`); `--domain` substitutes `` only. +- [x] Tests: `tests/test_access.py` (expansion, gate status, advisory/SSH/JSON/no-match). ### T3 — OpenBao proxy lane (`--fetch` / `--exec`)