Files
ops-warden/src/warden/cli.py
tegwick f8ac55367c feat(WARDEN-WP-0020): T3 — guarded executor (worker now acts, not just plans)
HubClient gains writes (mark_read, send_reply, add_progress). execute_plan/execute_plans
run the safe, allowlisted actions autonomously: route_answer (reply with the computed
answer + auto mark-read), reply (LLM-drafted body), progress_note, mark_read. Escalated
plans and non-auto-executable kinds are left for a human; every action is metadata-only
(no secret value read/sent/logged).

Deliberate guardrail: propose_catalog_diff and any code/routing change is NOT auto-executed
even under full-auto — a bad catalog commit could misroute credentials, so it goes to human
review (recoverability over convenience). AUTO_EXECUTABLE is the messaging/hub tier only.

`warden worker run --execute` runs the executor (dry-run still default). 7 executor tests
(reply+mark, with/without body, escalated skip, catalog-diff-left-for-human, progress,
failure-without-crash); 243 pass, lint clean. First live --execute shakedown is the
operator's (staged rollout); T4 schedules it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 23:19:13 +02:00

1205 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""OpsWarden CLI."""
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Annotated, List, Optional
import typer
from rich.console import Console
from rich.table import Table
from warden.ca import CAError, LocalCA, parse_cert_metadata
from warden.config import ConfigError, WardenConfig, load_config
from warden.policy import check_sign_policy
from warden.inventory import ActorEntry, InventoryError, PrincipalsInventory, load_inventory, save_inventory
from warden.models import ActorType, CertSpec, DEFAULT_TTL_HOURS, validate_actor_name
from warden.scorecard import run_scorecard
app = typer.Typer(
help="OpsWarden — SSH CA and certificate lifecycle manager",
no_args_is_help=True,
)
inventory_app = typer.Typer(help="Manage principals inventory", no_args_is_help=True)
app.add_typer(inventory_app, name="inventory")
route_app = typer.Typer(
help="Look up which subsystem owns a credential need (read-only pointer layer)",
no_args_is_help=True,
)
app.add_typer(route_app, name="route")
policy_app = typer.Typer(
help="Look up Workload Security Posture descriptors (read-only; env posture + maturity)",
no_args_is_help=True,
)
app.add_typer(policy_app, name="policy")
worker_app = typer.Typer(
help="Autonomous coordination worker (WP-0020; dry-run only until executor lands)",
no_args_is_help=True,
)
app.add_typer(worker_app, name="worker")
console = Console()
err = Console(stderr=True)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_cfg() -> WardenConfig:
try:
return load_config()
except ConfigError as e:
err.print(f"[red]Config error:[/red] {e}")
raise typer.Exit(1)
def _load_inventory(cfg: WardenConfig) -> PrincipalsInventory:
try:
return load_inventory(cfg.inventory_path)
except InventoryError as e:
err.print(f"[red]Inventory error:[/red] {e}")
raise typer.Exit(1)
def _get_ca(cfg: WardenConfig):
if cfg.backend == "vault":
from warden.vault import VaultCA
return VaultCA(cfg.vault, cfg.state_dir)
return LocalCA(cfg.ca_key, cfg.state_dir)
def _apply_policy_gate(cfg: WardenConfig, spec: CertSpec) -> None:
"""Run flex-auth check when policy.enabled; sets spec.policy_decision_id."""
decision_id = check_sign_policy(cfg.policy, spec)
if decision_id:
spec.policy_decision_id = decision_id
# ---------------------------------------------------------------------------
# warden sign
# ---------------------------------------------------------------------------
@app.command()
def sign(
actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")],
pubkey: Annotated[Path, typer.Option("--pubkey", help="Path to actor's public key file")],
ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None,
) -> None:
"""Sign a public key for the given actor. Writes cert text to stdout.
This is the cert_command interface: ops-bridge calls this and uses stdout
as the certificate passed to SSH alongside the private key.
"""
cfg = _load_cfg()
inventory = _load_inventory(cfg)
entry = inventory.actors.get(actor_name)
if entry is None:
err.print(
f"[red]Actor {actor_name!r} not found in inventory.[/red] "
f"Add it with: warden inventory add"
)
raise typer.Exit(1)
spec = CertSpec(
actor_name=actor_name,
actor_type=entry.actor_type,
pubkey_path=pubkey,
ttl_hours=ttl or entry.ttl_hours,
principals=entry.principals,
identity=actor_name,
)
ca = _get_ca(cfg)
try:
_apply_policy_gate(cfg, spec)
record = ca.sign(spec)
except CAError as e:
err.print(f"[red]Signing failed:[/red] {e}")
raise typer.Exit(1)
# cert_command interface: write cert text to stdout only
print(record.cert_path.read_text().strip())
# ---------------------------------------------------------------------------
# warden issue
# ---------------------------------------------------------------------------
@app.command()
def issue(
actor_name: Annotated[str, typer.Argument(help="Actor name")],
ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None,
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Generate a new keypair and sign it for the given actor.
Only supported with the local backend. Outputs keypair + cert paths and metadata.
"""
cfg = _load_cfg()
if cfg.backend != "local":
err.print("[red]warden issue is only supported with the local backend.[/red]")
raise typer.Exit(1)
inventory = _load_inventory(cfg)
entry = inventory.actors.get(actor_name)
if entry is None:
err.print(f"[red]Actor {actor_name!r} not found in inventory.[/red]")
raise typer.Exit(1)
ca = LocalCA(cfg.ca_key, cfg.state_dir)
try:
privkey_path, pubkey_path = ca.generate_keypair(actor_name)
except CAError as e:
err.print(f"[red]Key generation failed:[/red] {e}")
raise typer.Exit(1)
spec = CertSpec(
actor_name=actor_name,
actor_type=entry.actor_type,
pubkey_path=pubkey_path,
ttl_hours=ttl or entry.ttl_hours,
principals=entry.principals,
identity=actor_name,
)
try:
_apply_policy_gate(cfg, spec)
record = ca.sign(spec)
except CAError as e:
err.print(f"[red]Signing failed:[/red] {e}")
raise typer.Exit(1)
result = {
"actor": actor_name,
"privkey": str(privkey_path),
"cert": str(record.cert_path),
"identity": record.identity,
"principals": record.principals,
"valid_before": record.valid_before.isoformat(),
"signed_at": record.signed_at.isoformat(),
}
if output_json:
print(json.dumps(result, indent=2))
else:
console.print(f"[green]Issued credentials for {actor_name}[/green]")
for k, v in result.items():
console.print(f" {k}: {v}")
# ---------------------------------------------------------------------------
# warden status
# ---------------------------------------------------------------------------
@app.command()
def status(
actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None,
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
state_dir_override: Annotated[Optional[Path], typer.Option("--state-dir", help="State dir path (bypasses config)")] = None,
) -> None:
"""Show certificate status. Exits 1 if any cert is expired."""
now = datetime.now(timezone.utc)
if state_dir_override is not None:
state_dir = state_dir_override
else:
cfg = _load_cfg()
state_dir = cfg.state_dir
if actor_name:
cert_path = state_dir / f"{actor_name}-cert.pub"
paths = [cert_path] if cert_path.exists() else []
else:
paths = sorted(state_dir.glob("*-cert.pub")) if state_dir.exists() else []
if not paths:
msg = (
f"No certificate found for {actor_name!r} (static key / no cert)"
if actor_name
else "No certificates in state dir."
)
console.print(msg)
return
rows = []
for cert_path in paths:
name = cert_path.stem.replace("-cert", "")
try:
meta = parse_cert_metadata(cert_path)
valid_before = meta["valid_before"]
remaining = valid_before - now
secs = remaining.total_seconds()
if secs > 0:
h, rem = divmod(int(secs), 3600)
m = rem // 60
remaining_str = f"{h}h {m}m"
expired = False
else:
remaining_str = "EXPIRED"
expired = True
rows.append({
"actor": name,
"identity": meta["identity"],
"principals": ", ".join(meta["principals"]),
"valid_before": valid_before.isoformat(),
"remaining": remaining_str,
"expired": expired,
})
except Exception as e:
rows.append({"actor": name, "error": str(e), "expired": False})
if output_json:
print(json.dumps(rows, indent=2))
else:
table = Table(title="Certificate Status")
table.add_column("Actor")
table.add_column("Identity")
table.add_column("Principals")
table.add_column("Valid Before (UTC)")
table.add_column("Remaining")
for row in rows:
if "error" in row:
table.add_row(row["actor"], "[red]parse error[/red]", "", "", row["error"])
else:
rem_styled = (
f"[red]{row['remaining']}[/red]" if row["expired"] else row["remaining"]
)
table.add_row(
row["actor"],
row["identity"],
row["principals"],
row["valid_before"],
rem_styled,
)
console.print(table)
if any(r.get("expired") for r in rows):
raise typer.Exit(1)
# ---------------------------------------------------------------------------
# warden scorecard
# ---------------------------------------------------------------------------
@app.command()
def scorecard(
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Run compliance scorecard checks (AccessManagementDirective §5, cert-side)."""
cfg = _load_cfg()
inventory = _load_inventory(cfg)
results = run_scorecard(cfg.state_dir, inventory)
passed = sum(1 for r in results if r.passed)
total = len(results)
if output_json:
print(json.dumps(
[{"check": r.name, "passed": r.passed, "detail": r.detail} for r in results],
indent=2,
))
else:
table = Table(title=f"OpsWarden Scorecard ({passed}/{total})")
table.add_column("Check")
table.add_column("Status")
table.add_column("Detail")
for r in results:
status_str = "[green]PASS[/green]" if r.passed else "[red]FAIL[/red]"
table.add_row(r.name, status_str, r.detail)
console.print(table)
console.print(
f"\nScore: {passed}/{total} "
+ ("[green]Operational[/green]" if passed == total else "[yellow]Needs attention[/yellow]")
)
if passed < total:
raise typer.Exit(1)
# ---------------------------------------------------------------------------
# warden inventory
# ---------------------------------------------------------------------------
@inventory_app.command("list")
def inventory_list(
output_json: Annotated[bool, typer.Option("--json")] = False,
) -> None:
"""List all actors in the principals inventory."""
cfg = _load_cfg()
inventory = _load_inventory(cfg)
if not inventory.actors:
console.print("No actors in inventory.")
return
if output_json:
print(json.dumps({
name: {
"type": e.actor_type.value,
"principals": e.principals,
"ttl_hours": e.ttl_hours,
"description": e.description,
}
for name, e in inventory.actors.items()
}, indent=2))
return
table = Table(title=f"Principals Inventory ({cfg.inventory_path})")
table.add_column("Actor")
table.add_column("Type")
table.add_column("Principals")
table.add_column("TTL (h)")
table.add_column("Description")
for name, e in inventory.actors.items():
table.add_row(
name,
e.actor_type.value,
", ".join(e.principals),
str(e.ttl_hours),
e.description,
)
console.print(table)
@inventory_app.command("add")
def inventory_add(
actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")],
actor_type: Annotated[ActorType, typer.Option("--type", "-t", help="adm | agt | atm")],
principals: Annotated[
Optional[List[str]],
typer.Option("--principal", "-p", help="Principal (repeat for multiple)"),
] = None,
ttl: Annotated[Optional[int], typer.Option("--ttl", help="TTL in hours")] = None,
description: Annotated[str, typer.Option("--description", "-d")] = "",
) -> None:
"""Add an actor to the principals inventory."""
cfg = _load_cfg()
try:
validate_actor_name(actor_name, actor_type)
except ValueError as e:
err.print(f"[red]{e}[/red]")
raise typer.Exit(1)
resolved_principals: List[str] = principals or [actor_name]
inventory = _load_inventory(cfg)
inventory.actors[actor_name] = ActorEntry(
name=actor_name,
actor_type=actor_type,
principals=resolved_principals,
ttl_hours=ttl or DEFAULT_TTL_HOURS[actor_type],
description=description,
)
try:
save_inventory(inventory, cfg.inventory_path)
except Exception as e:
err.print(f"[red]Failed to save inventory:[/red] {e}")
raise typer.Exit(1)
console.print(
f"[green]Added[/green] {actor_name} "
f"(type={actor_type.value}, principals={resolved_principals}, ttl={ttl or DEFAULT_TTL_HOURS[actor_type]}h)"
)
@inventory_app.command("remove")
def inventory_remove(
actor_name: Annotated[str, typer.Argument(help="Actor name to remove")],
) -> None:
"""Remove an actor from the principals inventory."""
cfg = _load_cfg()
inventory = _load_inventory(cfg)
if actor_name not in inventory.actors:
err.print(f"[red]Actor {actor_name!r} not in inventory.[/red]")
raise typer.Exit(1)
del inventory.actors[actor_name]
try:
save_inventory(inventory, cfg.inventory_path)
except Exception as e:
err.print(f"[red]Failed to save inventory:[/red] {e}")
raise typer.Exit(1)
console.print(f"[green]Removed[/green] {actor_name}")
# ---------------------------------------------------------------------------
# warden cleanup
# ---------------------------------------------------------------------------
@app.command()
def cleanup(
actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None,
dry_run: Annotated[bool, typer.Option("--dry-run", help="Preview without deleting")] = False,
) -> None:
"""Remove stale (expired > 5 min) certificates from state dir."""
cfg = _load_cfg()
cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
if actor_name:
cert_path = cfg.state_dir / f"{actor_name}-cert.pub"
paths = [cert_path] if cert_path.exists() else []
else:
paths = sorted(cfg.state_dir.glob("*-cert.pub")) if cfg.state_dir.exists() else []
removed = []
for cert_path in paths:
try:
meta = parse_cert_metadata(cert_path)
except Exception:
continue
if meta["valid_before"] < cutoff:
if dry_run:
console.print(f"would remove: {cert_path.name}")
else:
cert_path.unlink()
console.print(f"removed: {cert_path.name}")
removed.append(cert_path.name)
if not removed:
console.print("No stale certificates found.")
# ---------------------------------------------------------------------------
# warden log
# ---------------------------------------------------------------------------
@app.command()
def log(
actor_name: Annotated[Optional[str], typer.Argument(help="Filter by actor name")] = None,
last: Annotated[int, typer.Option("--last", help="Show last N entries")] = 20,
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Show outgoing certificate signing history."""
cfg = _load_cfg()
log_path = cfg.state_dir / "signatures.log"
if not log_path.exists():
if output_json:
print("[]")
else:
console.print("No signatures log found.")
return
entries = []
for line in log_path.read_text().splitlines():
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if actor_name and entry.get("actor") != actor_name:
continue
entries.append(entry)
entries = entries[-last:]
if output_json:
print(json.dumps(entries, indent=2))
return
if not entries:
console.print("No matching log entries.")
return
table = Table(title="Signatures Log")
table.add_column("Timestamp")
table.add_column("Actor")
table.add_column("Type")
table.add_column("Identity")
table.add_column("TTL (h)")
table.add_column("Valid Before (UTC)")
table.add_column("Backend")
for e in entries:
table.add_row(
e.get("timestamp", "")[:19],
e.get("actor", ""),
e.get("actor_type", ""),
e.get("identity", ""),
str(e.get("ttl_hours", "")),
e.get("valid_before", "")[:19],
e.get("backend", ""),
)
console.print(table)
# ---------------------------------------------------------------------------
# warden route — read-only routing lookup over the pointer catalog
# ---------------------------------------------------------------------------
def _load_catalog():
from warden.routing import CatalogError, load_catalog
try:
return load_catalog()
except CatalogError as e:
err.print(f"[red]Routing catalog error:[/red] {e}")
raise typer.Exit(1)
def _entry_summary(entry) -> dict:
"""Pointer-only summary. Never includes secret material."""
return {
"id": entry.id,
"title": entry.title,
"owner_repo": entry.owner_repo,
"subsystem": entry.subsystem,
"warden_executes": entry.warden_executes,
# warden_role tells an agent at a glance whether ops-warden runs this lane
# itself (issue), proxies the fetch as the caller (assist), or only points (route).
"warden_role": (
"issue" if entry.warden_executes
else "assist" if entry.exec_capable
else "route"
),
"exec_capable": entry.exec_capable,
# resolvable: can `warden access --fetch` run this now with no <…> to fill?
# Lets an automated caller gate on readiness before attempting a fetch.
"resolvable": entry.resolvable,
# Owner-native exec front door (WP-0019): when present, this subsystem's exec is
# the PRIMARY path; ops-warden's proxy is the transparent fallback.
**(
{
"exec_owner": entry.exec_owner,
"exec_command": entry.exec_command,
"pointer_command": entry.pointer_command,
}
if entry.has_native_exec
else {}
),
"wiki_ref": entry.wiki_ref,
"canon_ref": entry.canon_ref,
"reviewed": entry.reviewed,
"status": entry.status,
}
def _print_entry_table(
entries, title: str, *, show_reviewed: bool = False, stale_threshold_days: int = 90
) -> None:
table = Table(title=title)
table.add_column("ID")
table.add_column("Need")
table.add_column("Owner")
table.add_column("warden")
if show_reviewed:
table.add_column("Reviewed")
table.add_column("Days")
table.add_column("Status")
from warden.routing.catalog import days_since_review
for e in entries:
if e.warden_executes:
executes = "[green]issue[/green]"
elif e.exec_capable:
executes = "[cyan]assist[/cyan]" # warden access --fetch/--exec proxies it
else:
executes = "route"
status_styled = e.status if e.status == "active" else f"[yellow]{e.status}[/yellow]"
if show_reviewed:
days = days_since_review(e.reviewed)
reviewed_styled = (
f"[yellow]{e.reviewed}[/yellow]"
if days > stale_threshold_days
else e.reviewed
)
table.add_row(
e.id, e.title, e.owner_repo, executes, reviewed_styled, str(days), status_styled
)
else:
table.add_row(e.id, e.title, e.owner_repo, executes, status_styled)
console.print(table)
@route_app.command("list")
def route_list(
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
tag: Annotated[Optional[str], typer.Option("--tag", help="Filter by need keyword")] = None,
stale_only: Annotated[
bool, typer.Option("--stale", help="Show entries past review cadence (see --stale-days)")
] = False,
stale_days: Annotated[
int,
typer.Option(
"--stale-days",
help="Days since reviewed before an entry is stale (default 90)",
min=1,
),
] = 90,
) -> None:
"""List routing scenarios. Active-only unless --all."""
from warden.routing.catalog import days_since_review
catalog = _load_catalog()
if stale_only:
entries = catalog.stale(include_draft=all_entries, threshold_days=stale_days)
else:
entries = catalog.listed(include_draft=all_entries)
if tag:
t = tag.lower()
entries = [e for e in entries if t in [k.lower() for k in e.need_keywords]]
if output_json:
payload = []
for e in entries:
row = _entry_summary(e)
if stale_only:
row["days_since_review"] = days_since_review(e.reviewed)
row["stale_threshold_days"] = stale_days
payload.append(row)
print(json.dumps(payload, indent=2))
return
if not entries:
if stale_only:
console.print(f"No stale routing entries (threshold: {stale_days} days since reviewed).")
else:
console.print("No matching routing entries.")
return
title = (
f"Stale routing scenarios (>{stale_days}d since reviewed)"
if stale_only
else "Routing scenarios"
)
_print_entry_table(
entries, title, show_reviewed=stale_only, stale_threshold_days=stale_days
)
@route_app.command("show")
def route_show(
entry_id: Annotated[str, typer.Argument(help="Catalog entry id (see `warden route list`)")],
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Show owner, pointers, and (SSH only) the authored steps for one scenario."""
catalog = _load_catalog()
entry = catalog.get(entry_id)
if entry is None:
err.print(
f"[red]Unknown routing id {entry_id!r}.[/red] "
f"Try: warden route find {entry_id!r}"
)
raise typer.Exit(1)
if output_json:
summary = _entry_summary(entry)
summary["need_keywords"] = entry.need_keywords
if entry.warden_executes:
summary["steps"] = entry.steps
summary["cert_command"] = entry.cert_command
elif entry.has_native_exec:
summary["next_action"] = (
f"primary: run via {entry.exec_owner} — `{entry.exec_command}`; ops-warden "
f"routes to the owner (fallback: `warden access <need> --exec`). See `{entry.wiki_ref}`."
)
elif entry.exec_capable:
summary["next_action"] = (
f"ops-warden can proxy this as the caller: `warden access <need> --fetch`"
f" (or `--exec -- <cmd>`); runs {entry.owner_repo}'s tool with your "
f"identity. See `{entry.wiki_ref}`."
)
else:
summary["next_action"] = (
f"next action on `{entry.owner_repo}` — see `{entry.wiki_ref}`"
)
print(json.dumps(summary, indent=2))
return
console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])")
console.print(f" owner : {entry.owner_repo} ({entry.subsystem})")
console.print(f" wiki : {entry.wiki_ref}")
console.print(f" canon : {entry.canon_ref}")
console.print(f" reviewed : {entry.reviewed} status: {entry.status}")
if entry.warden_executes:
console.print("\n[green]ops-warden issues this directly.[/green]")
console.print(f" cert_command: [bold]{entry.cert_command}[/bold]")
if entry.steps:
console.print(" steps:")
for i, step in enumerate(entry.steps, 1):
console.print(f" {i}. {step}")
console.print(
" precondition: actor in inventory? backend configured? run `warden status`."
)
else:
console.print(
f"\n[yellow]ops-warden does not issue this.[/yellow] "
f"Next action on [bold]{entry.owner_repo}[/bold] — see {entry.wiki_ref}."
)
@route_app.command("find")
def route_find(
query: Annotated[str, typer.Argument(help="Free-text need, e.g. 'issue core api key'")],
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
limit: Annotated[int, typer.Option("--limit", help="Max matches")] = 5,
) -> None:
"""Rank routing scenarios by keyword overlap with the query."""
catalog = _load_catalog()
matches = catalog.find(query, include_draft=all_entries, limit=limit)
if output_json:
print(json.dumps([_entry_summary(e) for e in matches], indent=2))
return
if not matches:
console.print(
f"No routing match for {query!r}. "
"Try `warden route list --all` to browse all scenarios."
)
return
_print_entry_table(matches, f"Matches for {query!r}")
# ---------------------------------------------------------------------------
# warden access — operator front door (advisory; proxy lands in T3)
# ---------------------------------------------------------------------------
def _access_json(entry, expanded, gate: str, domain: Optional[str]) -> dict:
"""Stable, secret-free JSON shape for agentic operators. WP-0014 T2."""
payload = _entry_summary(entry)
payload["domain"] = domain
payload["policy_gate"] = gate
payload["handoff"] = {
"auth_method": expanded.auth_method,
"path_template": expanded.path_template,
"fetch_command": expanded.fetch_command,
"policy_ref": expanded.policy_ref,
"exec_capable": expanded.exec_capable,
}
if entry.warden_executes:
payload["next_action"] = "ops-warden issues this directly — see cert_command"
payload["cert_command"] = entry.cert_command
elif entry.has_native_exec:
payload["next_action"] = (
f"primary: run via {entry.exec_owner} — `{entry.exec_command}`; "
"ops-warden routes to the owner (fallback: `warden access <need> --exec`). "
"ops-warden holds no token."
)
elif expanded.exec_capable:
verb = "fetch" if entry.lane != "login" else "login"
payload["next_action"] = (
f"ops-warden can proxy this {verb} as the caller: "
f"`warden access <need> --fetch`"
+ ("" if entry.lane == "login" else " (or `--exec -- <cmd>`)")
+ f". Runs {entry.owner_repo}'s tool with your identity; ops-warden holds no value."
)
else:
payload["next_action"] = (
f"obtain from {entry.owner_repo} ({entry.subsystem}); "
"ops-warden holds no value"
)
return payload
def _access_proxy(
entry,
*,
domain: Optional[str],
field: Optional[str],
path: Optional[str],
do_exec: bool,
child_argv: list,
no_policy: bool,
) -> None:
"""Proxy a non-SSH credential fetch as the caller (WP-0014 T3).
Enforces the three guardrails: caller identity (no warden token), policy gate
before fetch, and transit-only (no value persisted or logged). All warden chatter
goes to stderr so --fetch stdout carries only the secret.
"""
from warden.proxy import (
ProxyError,
caller_auth_present,
proxy_exec,
proxy_fetch,
resolve_fetch_command,
write_audit,
)
from warden.policy import check_fetch_policy
if not entry.exec_capable:
err.print(
f"[red]{entry.id!r} is not exec_capable.[/red] "
"Use `warden access` (advisory) and obtain it from the owner directly."
)
raise typer.Exit(2)
# Proxy is privileged — require a real config for policy posture + audit sink.
try:
cfg = load_config()
except ConfigError as e:
err.print(
f"[red]Proxy requires warden.yaml[/red] (policy gate + audit sink): {e}\n"
"Advisory mode works without it: drop --fetch/--exec."
)
raise typer.Exit(2)
is_login = entry.lane == "login"
decision_id = None
if is_login:
# Login lane: interactive auth bootstrap. No caller-auth precheck (you have no
# token yet — that's the point) and no secret-read gate (it needs an identity
# this flow establishes). --exec is meaningless here.
if do_exec:
err.print(
"[red]--exec is not valid for a login lane[/red] "
f"({entry.id!r} is interactive auth). Use --fetch."
)
raise typer.Exit(2)
err.print(
"[dim]login lane — interactive auth bootstrap; no secret-read gate, "
"token stays in the caller's own store.[/dim]"
)
else:
# G1 — caller identity. ops-warden adds no token of its own.
if not caller_auth_present():
err.print(
"[red]No caller credential found[/red] (VAULT_TOKEN/BAO_TOKEN or ~/.vault-token). "
f"Authenticate first: {entry.auth_method or 'see the owner auth path'}."
)
raise typer.Exit(3)
# G3 — policy gate before fetch.
if cfg.policy.enabled:
try:
decision_id = check_fetch_policy(
cfg.policy, need_id=entry.id, owner_repo=entry.owner_repo, domain=domain
)
except CAError as e:
err.print(f"[red]Policy gate denied the fetch:[/red] {e}")
raise typer.Exit(4)
err.print(f"[green]flex-auth allow[/green] (decision {decision_id}).")
elif not no_policy:
err.print(
"[yellow]flex-auth gate is not enforced[/yellow] (policy.enabled=false). "
"Re-run with [bold]--no-policy[/bold] to proxy ungated, or enable the gate."
)
raise typer.Exit(4)
else:
err.print("[yellow]Proxying ungated[/yellow] (--no-policy; gate not enforced).")
try:
argv = resolve_fetch_command(entry, domain=domain, field=field, path=path)
except ProxyError as e:
err.print(f"[red]{e}[/red]")
raise typer.Exit(2)
action = "login" if is_login else ("exec" if do_exec else "fetch")
err.print(
f"[dim]proxy {action}: {entry.id}{entry.owner_repo} "
f"(caller identity; value not persisted)[/dim]"
)
try:
if do_exec:
if not child_argv:
err.print("[red]--exec needs a command after `--`[/red], e.g. `-- npm publish`.")
raise typer.Exit(2)
rc = proxy_exec(argv, env_var=field or "", child_argv=child_argv)
else:
rc = proxy_fetch(argv)
except ProxyError as e:
err.print(f"[red]{e}[/red]")
raise typer.Exit(5)
finally:
try:
write_audit(
cfg.state_dir,
need_id=entry.id,
owner_repo=entry.owner_repo,
domain=domain,
action=action,
decision_id=decision_id,
)
except OSError as e:
err.print(f"[yellow]audit write failed:[/yellow] {e}")
raise typer.Exit(rc)
@app.command(
"access",
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
)
def access(
ctx: typer.Context,
need: Annotated[str, typer.Argument(help="Free-text need, e.g. 'npm token', 'db password'")],
domain: Annotated[
Optional[str],
typer.Option("--domain", help="Substitute <domain> in path/auth templates, e.g. coulomb_social"),
] = None,
output_json: Annotated[bool, typer.Option("--json", help="Output JSON (stable, secret-free)")] = False,
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
do_fetch: Annotated[
bool, typer.Option("--fetch", help="Proxy the fetch as the caller; value streams to stdout")
] = False,
do_exec: Annotated[
bool,
typer.Option("--exec", help="Run the trailing command (after --) with the secret in its env"),
] = False,
field: Annotated[
Optional[str], typer.Option("--field", help="Secret field / env-var name, e.g. NPM_AUTH_TOKEN")
] = None,
path: Annotated[
Optional[str], typer.Option("--path", help="Override the owner-side path template")
] = None,
no_policy: Annotated[
bool,
typer.Option("--no-policy", help="Acknowledge proxying when the flex-auth gate is not enforced"),
] = False,
) -> None:
"""Operator front door: how to obtain any credential, gated and audited.
Advisory by default — renders the owner, auth method, path template, command
skeleton, and policy gate status for the best-matching need. ops-warden issues
the SSH lane directly and **routes every other need to its owner** — it never
holds or vends the secret value.
With --fetch / --exec it proxies the fetch *as the caller* for exec_capable lanes:
the flex-auth gate runs first, ops-warden adds no credential of its own, the value
is never persisted or logged, and only metadata is audited.
"""
from warden.access import expand_handoff, policy_gate_status
catalog = _load_catalog()
matches = catalog.find(need, include_draft=all_entries, limit=1)
if not matches:
err.print(
f"[red]No access match for {need!r}.[/red] "
"Try `warden route list --all` to browse, or rephrase the need."
)
raise typer.Exit(1)
entry = matches[0]
if do_fetch or do_exec:
_access_proxy(
entry,
domain=domain,
field=field,
path=path,
do_exec=do_exec,
child_argv=list(ctx.args),
no_policy=no_policy,
)
return
expanded = expand_handoff(entry, domain)
gate = policy_gate_status()
if output_json:
print(json.dumps(_access_json(entry, expanded, gate, domain), indent=2))
return
console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])")
console.print(f" owner : {entry.owner_repo} ({entry.subsystem})")
if entry.warden_executes:
console.print("\n[green]ops-warden issues this directly.[/green]")
console.print(f" run : [bold]{entry.cert_command}[/bold]")
if entry.steps:
for i, step in enumerate(entry.steps, 1):
console.print(f" {i}. {step}")
return
if expanded.auth_method:
console.print(f" auth : {expanded.auth_method}")
if expanded.path_template:
console.print(f" path : {expanded.path_template}")
if expanded.fetch_command:
console.print(f" fetch : {expanded.fetch_command}")
if expanded.policy_ref:
console.print(f" policy : {expanded.policy_ref} [dim]({gate})[/dim]")
console.print(f" wiki : {entry.wiki_ref}")
console.print(f" canon : {entry.canon_ref}")
proxy = f"warden access {need!r}"
if domain:
proxy += f" --domain {domain}"
if entry.has_native_exec:
console.print(
f" exec : [bold]{entry.exec_command}[/bold] "
f"[cyan](via {entry.exec_owner} — primary)[/cyan]"
)
if entry.pointer_command:
console.print(f" pointer : [dim]{entry.pointer_command}[/dim]")
if expanded.exec_capable:
label = "fallback" if entry.has_native_exec else "proxy"
hint = (
"transparent conduit — fetches as you"
if entry.lane != "login"
else "runs the interactive login as you"
)
console.print(f" {label:<8} : [dim]{proxy} --fetch[/dim] [yellow]({hint})[/yellow]")
if expanded.path_template and "<" in expanded.path_template:
console.print(
" note : remaining <…> placeholders are owner-confirmed names "
f"(coordinate with {entry.owner_repo})."
)
if entry.has_native_exec:
console.print(
f"\n[green]Primary:[/green] run it via [bold]{entry.exec_owner}[/bold] — "
f"[bold]{entry.exec_command}[/bold]. ops-warden routes to the owner and holds no token.\n"
f"[dim]Fallback:[/dim] [bold]{proxy} --exec -- <cmd>[/bold] — ops-warden's transparent "
"conduit (runs the fetch as you, holds nothing)."
)
elif expanded.exec_capable:
verb = "fetch this for you" if entry.lane != "login" else "run this login for you"
console.print(
f"\n[green]ops-warden can {verb}[/green] as the caller — "
f"[bold]{proxy} --fetch[/bold]"
+ ("" if entry.lane == "login" else f" (or [bold]{proxy} --exec -- <cmd>[/bold])")
+ f". It runs {entry.owner_repo}'s tool with [bold]your[/bold] identity; the "
"value streams to you and ops-warden never holds, caches, or logs it."
)
else:
console.print(
f"\n[yellow]ops-warden does not hold this secret.[/yellow] "
f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — "
"warden advises, the owner vends."
)
# ---------------------------------------------------------------------------
# warden policy — read-only Workload Security Posture lookup (WP-0015 T2)
# ---------------------------------------------------------------------------
def _load_posture():
from warden.posture import PostureError, load_posture
try:
return load_posture()
except PostureError as e:
err.print(f"[red]Posture descriptor error:[/red] {e}")
raise typer.Exit(1)
@policy_app.command("list")
def policy_list(
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""List both posture axes: environment postures and workload maturity levels."""
cat = _load_posture()
if output_json:
print(json.dumps({
"env_postures": [vars(e) for e in cat.env_postures],
"maturity_levels": [vars(m) for m in cat.maturity_levels],
"dataclass_floor": cat.dataclass_floor,
"requires_env_posture": cat.requires_env_posture,
}, indent=2))
return
env_table = Table(title="Axis A — environment posture")
for col in ("ID", "rank", "backend", "real values", "user data", "audit"):
env_table.add_column(col)
for e in sorted(cat.env_postures, key=lambda x: x.rank):
env_table.add_row(e.id, str(e.rank), e.backend, e.real_values, e.real_user_data, e.audit)
console.print(env_table)
mat_table = Table(title="Axis B — workload maturity")
for col in ("ID", "rank", "phase", "max dataclass", "promotion gate"):
mat_table.add_column(col)
for m in sorted(cat.maturity_levels, key=lambda x: x.rank):
mat_table.add_row(m.id, str(m.rank), m.phase, m.max_dataclass, ", ".join(m.promotion_gate) or "")
console.print(mat_table)
console.print(
f"\n[dim]lattice: deliver iff env=={cat.requires_env_posture} and "
"workload.maturity >= secret.required_maturity (and the dataclass floor).[/dim]"
)
@policy_app.command("show")
def policy_show(
descriptor_id: Annotated[str, typer.Argument(help="An env posture (dev/test/prod) or maturity level (M0M3)")],
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Show one environment posture or maturity level."""
cat = _load_posture()
env = cat.env(descriptor_id)
mat = cat.maturity(descriptor_id)
if env is None and mat is None:
err.print(
f"[red]Unknown descriptor {descriptor_id!r}.[/red] "
"Try `warden policy list`."
)
raise typer.Exit(1)
obj = env or mat
if output_json:
print(json.dumps({"axis": "env_posture" if env else "maturity_level", **vars(obj)}, indent=2))
return
axis = "environment posture" if env else "workload maturity level"
console.print(f"[bold]{obj.id}[/bold] ([cyan]{axis}[/cyan])")
for k, v in vars(obj).items():
if k == "id":
continue
console.print(f" {k:14}: {', '.join(v) if isinstance(v, list) else v}")
if mat:
floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id]
if floor:
console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level")
# ---------------------------------------------------------------------------
# warden worker — autonomous coordination worker (WP-0020 T1: dry-run scaffold)
# ---------------------------------------------------------------------------
@worker_app.command("run")
def worker_run(
once: Annotated[bool, typer.Option("--once", help="Process the inbox once and exit")] = True,
dry_run: Annotated[
bool,
typer.Option("--dry-run/--execute", help="Plan only (default); --execute lands in WP-0020 T3"),
] = True,
brain: Annotated[
str,
typer.Option("--brain", help="Planner: 'rule' (deterministic, default) or 'llm' (llm-connect)"),
] = "rule",
) -> None:
"""Read ops-warden's unread coordination requests and render a guardrailed plan.
Plans with the deterministic RuleBrain (default) or the llm-connect brain (--brain llm).
Either way the allowlist + no-secret guardrails are enforced on every action. --execute
is rejected until the guarded executor (T3) ships; dry-run is the default.
"""
from warden.worker import (
HubClient, LlmConnectBrain, RuleBrain, build_plans, execute_plans, render_plans,
)
if brain not in ("rule", "llm"):
err.print(f"[red]Unknown --brain {brain!r}[/red] (expected 'rule' or 'llm').")
raise typer.Exit(2)
hub = HubClient()
try:
messages = hub.unread()
except Exception as e: # noqa: BLE001 — surface any transport error as a clean message
err.print(f"[red]Could not read the State Hub inbox:[/red] {e}")
raise typer.Exit(1)
chosen = LlmConnectBrain() if brain == "llm" else RuleBrain()
plans = build_plans(messages, chosen)
auto = sum(1 for p in plans if not p.escalated)
if dry_run:
console.print(render_plans(plans))
console.print(
f"\n[dim]{len(plans)} request(s): {auto} auto-actionable, "
f"{len(plans) - auto} need a human. (dry-run — nothing executed)[/dim]"
)
return
# --execute: run the guarded executor. Topic for audit progress events.
topic_id = "cee7bedf-2b48-46ef-8601-006474f2ad7a"
console.print("[yellow]Executing (full-auto, in-scope only; escalations left for a human)…[/yellow]")
console.print(execute_plans(plans, hub, topic_id=topic_id))