generated from coulomb/repo-seed
llm-connect is operational (operator set OPENROUTER_API_KEY). Contract discovered from
the running service: POST /execute {"prompt":...} -> {"content":...}.
LlmConnectBrain embeds the fixed charter + the inbox message as untrusted data, calls
/execute, and parses a JSON action plan (_extract_json tolerates fences/prose), escalating
defensively on malformed/empty/transport errors. The build_plans guardrail still enforces
the allowlist + no-secret invariant on whatever the model returns — the LLM cannot widen
ops-warden's authority. `warden worker run --brain rule|llm` selects the planner.
Live-verified on the real inbox: the LLM brain planned a sensible reply+mark_read for a
secrets-engine coordination message and correctly escalated a secret-custody request as
out-of-lane — better classification than the deterministic RuleBrain.
6 new tests, 236 pass, lint clean. T3 (guarded executor) and T4 (scheduling) remain.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1201 lines
43 KiB
Python
1201 lines
43 KiB
Python
"""OpsWarden CLI."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Annotated, List, Optional
|
||
|
||
import typer
|
||
from rich.console import Console
|
||
from rich.table import Table
|
||
|
||
from warden.ca import CAError, LocalCA, parse_cert_metadata
|
||
from warden.config import ConfigError, WardenConfig, load_config
|
||
from warden.policy import check_sign_policy
|
||
from warden.inventory import ActorEntry, InventoryError, PrincipalsInventory, load_inventory, save_inventory
|
||
from warden.models import ActorType, CertSpec, DEFAULT_TTL_HOURS, validate_actor_name
|
||
from warden.scorecard import run_scorecard
|
||
|
||
app = typer.Typer(
|
||
help="OpsWarden — SSH CA and certificate lifecycle manager",
|
||
no_args_is_help=True,
|
||
)
|
||
inventory_app = typer.Typer(help="Manage principals inventory", no_args_is_help=True)
|
||
app.add_typer(inventory_app, name="inventory")
|
||
route_app = typer.Typer(
|
||
help="Look up which subsystem owns a credential need (read-only pointer layer)",
|
||
no_args_is_help=True,
|
||
)
|
||
app.add_typer(route_app, name="route")
|
||
policy_app = typer.Typer(
|
||
help="Look up Workload Security Posture descriptors (read-only; env posture + maturity)",
|
||
no_args_is_help=True,
|
||
)
|
||
app.add_typer(policy_app, name="policy")
|
||
|
||
worker_app = typer.Typer(
|
||
help="Autonomous coordination worker (WP-0020; dry-run only until executor lands)",
|
||
no_args_is_help=True,
|
||
)
|
||
app.add_typer(worker_app, name="worker")
|
||
|
||
console = Console()
|
||
err = Console(stderr=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _load_cfg() -> WardenConfig:
|
||
try:
|
||
return load_config()
|
||
except ConfigError as e:
|
||
err.print(f"[red]Config error:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
|
||
def _load_inventory(cfg: WardenConfig) -> PrincipalsInventory:
|
||
try:
|
||
return load_inventory(cfg.inventory_path)
|
||
except InventoryError as e:
|
||
err.print(f"[red]Inventory error:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
|
||
def _get_ca(cfg: WardenConfig):
|
||
if cfg.backend == "vault":
|
||
from warden.vault import VaultCA
|
||
return VaultCA(cfg.vault, cfg.state_dir)
|
||
return LocalCA(cfg.ca_key, cfg.state_dir)
|
||
|
||
|
||
def _apply_policy_gate(cfg: WardenConfig, spec: CertSpec) -> None:
|
||
"""Run flex-auth check when policy.enabled; sets spec.policy_decision_id."""
|
||
decision_id = check_sign_policy(cfg.policy, spec)
|
||
if decision_id:
|
||
spec.policy_decision_id = decision_id
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden sign
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def sign(
|
||
actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")],
|
||
pubkey: Annotated[Path, typer.Option("--pubkey", help="Path to actor's public key file")],
|
||
ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None,
|
||
) -> None:
|
||
"""Sign a public key for the given actor. Writes cert text to stdout.
|
||
|
||
This is the cert_command interface: ops-bridge calls this and uses stdout
|
||
as the certificate passed to SSH alongside the private key.
|
||
"""
|
||
cfg = _load_cfg()
|
||
inventory = _load_inventory(cfg)
|
||
|
||
entry = inventory.actors.get(actor_name)
|
||
if entry is None:
|
||
err.print(
|
||
f"[red]Actor {actor_name!r} not found in inventory.[/red] "
|
||
f"Add it with: warden inventory add"
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
spec = CertSpec(
|
||
actor_name=actor_name,
|
||
actor_type=entry.actor_type,
|
||
pubkey_path=pubkey,
|
||
ttl_hours=ttl or entry.ttl_hours,
|
||
principals=entry.principals,
|
||
identity=actor_name,
|
||
)
|
||
|
||
ca = _get_ca(cfg)
|
||
try:
|
||
_apply_policy_gate(cfg, spec)
|
||
record = ca.sign(spec)
|
||
except CAError as e:
|
||
err.print(f"[red]Signing failed:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
# cert_command interface: write cert text to stdout only
|
||
print(record.cert_path.read_text().strip())
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden issue
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def issue(
|
||
actor_name: Annotated[str, typer.Argument(help="Actor name")],
|
||
ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None,
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""Generate a new keypair and sign it for the given actor.
|
||
|
||
Only supported with the local backend. Outputs keypair + cert paths and metadata.
|
||
"""
|
||
cfg = _load_cfg()
|
||
|
||
if cfg.backend != "local":
|
||
err.print("[red]warden issue is only supported with the local backend.[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
inventory = _load_inventory(cfg)
|
||
entry = inventory.actors.get(actor_name)
|
||
if entry is None:
|
||
err.print(f"[red]Actor {actor_name!r} not found in inventory.[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
ca = LocalCA(cfg.ca_key, cfg.state_dir)
|
||
try:
|
||
privkey_path, pubkey_path = ca.generate_keypair(actor_name)
|
||
except CAError as e:
|
||
err.print(f"[red]Key generation failed:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
spec = CertSpec(
|
||
actor_name=actor_name,
|
||
actor_type=entry.actor_type,
|
||
pubkey_path=pubkey_path,
|
||
ttl_hours=ttl or entry.ttl_hours,
|
||
principals=entry.principals,
|
||
identity=actor_name,
|
||
)
|
||
try:
|
||
_apply_policy_gate(cfg, spec)
|
||
record = ca.sign(spec)
|
||
except CAError as e:
|
||
err.print(f"[red]Signing failed:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
result = {
|
||
"actor": actor_name,
|
||
"privkey": str(privkey_path),
|
||
"cert": str(record.cert_path),
|
||
"identity": record.identity,
|
||
"principals": record.principals,
|
||
"valid_before": record.valid_before.isoformat(),
|
||
"signed_at": record.signed_at.isoformat(),
|
||
}
|
||
|
||
if output_json:
|
||
print(json.dumps(result, indent=2))
|
||
else:
|
||
console.print(f"[green]Issued credentials for {actor_name}[/green]")
|
||
for k, v in result.items():
|
||
console.print(f" {k}: {v}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden status
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def status(
|
||
actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None,
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
state_dir_override: Annotated[Optional[Path], typer.Option("--state-dir", help="State dir path (bypasses config)")] = None,
|
||
) -> None:
|
||
"""Show certificate status. Exits 1 if any cert is expired."""
|
||
now = datetime.now(timezone.utc)
|
||
|
||
if state_dir_override is not None:
|
||
state_dir = state_dir_override
|
||
else:
|
||
cfg = _load_cfg()
|
||
state_dir = cfg.state_dir
|
||
|
||
if actor_name:
|
||
cert_path = state_dir / f"{actor_name}-cert.pub"
|
||
paths = [cert_path] if cert_path.exists() else []
|
||
else:
|
||
paths = sorted(state_dir.glob("*-cert.pub")) if state_dir.exists() else []
|
||
|
||
if not paths:
|
||
msg = (
|
||
f"No certificate found for {actor_name!r} (static key / no cert)"
|
||
if actor_name
|
||
else "No certificates in state dir."
|
||
)
|
||
console.print(msg)
|
||
return
|
||
|
||
rows = []
|
||
for cert_path in paths:
|
||
name = cert_path.stem.replace("-cert", "")
|
||
try:
|
||
meta = parse_cert_metadata(cert_path)
|
||
valid_before = meta["valid_before"]
|
||
remaining = valid_before - now
|
||
secs = remaining.total_seconds()
|
||
if secs > 0:
|
||
h, rem = divmod(int(secs), 3600)
|
||
m = rem // 60
|
||
remaining_str = f"{h}h {m}m"
|
||
expired = False
|
||
else:
|
||
remaining_str = "EXPIRED"
|
||
expired = True
|
||
rows.append({
|
||
"actor": name,
|
||
"identity": meta["identity"],
|
||
"principals": ", ".join(meta["principals"]),
|
||
"valid_before": valid_before.isoformat(),
|
||
"remaining": remaining_str,
|
||
"expired": expired,
|
||
})
|
||
except Exception as e:
|
||
rows.append({"actor": name, "error": str(e), "expired": False})
|
||
|
||
if output_json:
|
||
print(json.dumps(rows, indent=2))
|
||
else:
|
||
table = Table(title="Certificate Status")
|
||
table.add_column("Actor")
|
||
table.add_column("Identity")
|
||
table.add_column("Principals")
|
||
table.add_column("Valid Before (UTC)")
|
||
table.add_column("Remaining")
|
||
for row in rows:
|
||
if "error" in row:
|
||
table.add_row(row["actor"], "[red]parse error[/red]", "", "", row["error"])
|
||
else:
|
||
rem_styled = (
|
||
f"[red]{row['remaining']}[/red]" if row["expired"] else row["remaining"]
|
||
)
|
||
table.add_row(
|
||
row["actor"],
|
||
row["identity"],
|
||
row["principals"],
|
||
row["valid_before"],
|
||
rem_styled,
|
||
)
|
||
console.print(table)
|
||
|
||
if any(r.get("expired") for r in rows):
|
||
raise typer.Exit(1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden scorecard
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def scorecard(
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""Run compliance scorecard checks (AccessManagementDirective §5, cert-side)."""
|
||
cfg = _load_cfg()
|
||
inventory = _load_inventory(cfg)
|
||
|
||
results = run_scorecard(cfg.state_dir, inventory)
|
||
passed = sum(1 for r in results if r.passed)
|
||
total = len(results)
|
||
|
||
if output_json:
|
||
print(json.dumps(
|
||
[{"check": r.name, "passed": r.passed, "detail": r.detail} for r in results],
|
||
indent=2,
|
||
))
|
||
else:
|
||
table = Table(title=f"OpsWarden Scorecard ({passed}/{total})")
|
||
table.add_column("Check")
|
||
table.add_column("Status")
|
||
table.add_column("Detail")
|
||
for r in results:
|
||
status_str = "[green]PASS[/green]" if r.passed else "[red]FAIL[/red]"
|
||
table.add_row(r.name, status_str, r.detail)
|
||
console.print(table)
|
||
console.print(
|
||
f"\nScore: {passed}/{total} "
|
||
+ ("[green]Operational[/green]" if passed == total else "[yellow]Needs attention[/yellow]")
|
||
)
|
||
|
||
if passed < total:
|
||
raise typer.Exit(1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden inventory
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@inventory_app.command("list")
|
||
def inventory_list(
|
||
output_json: Annotated[bool, typer.Option("--json")] = False,
|
||
) -> None:
|
||
"""List all actors in the principals inventory."""
|
||
cfg = _load_cfg()
|
||
inventory = _load_inventory(cfg)
|
||
|
||
if not inventory.actors:
|
||
console.print("No actors in inventory.")
|
||
return
|
||
|
||
if output_json:
|
||
print(json.dumps({
|
||
name: {
|
||
"type": e.actor_type.value,
|
||
"principals": e.principals,
|
||
"ttl_hours": e.ttl_hours,
|
||
"description": e.description,
|
||
}
|
||
for name, e in inventory.actors.items()
|
||
}, indent=2))
|
||
return
|
||
|
||
table = Table(title=f"Principals Inventory ({cfg.inventory_path})")
|
||
table.add_column("Actor")
|
||
table.add_column("Type")
|
||
table.add_column("Principals")
|
||
table.add_column("TTL (h)")
|
||
table.add_column("Description")
|
||
for name, e in inventory.actors.items():
|
||
table.add_row(
|
||
name,
|
||
e.actor_type.value,
|
||
", ".join(e.principals),
|
||
str(e.ttl_hours),
|
||
e.description,
|
||
)
|
||
console.print(table)
|
||
|
||
|
||
@inventory_app.command("add")
|
||
def inventory_add(
|
||
actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")],
|
||
actor_type: Annotated[ActorType, typer.Option("--type", "-t", help="adm | agt | atm")],
|
||
principals: Annotated[
|
||
Optional[List[str]],
|
||
typer.Option("--principal", "-p", help="Principal (repeat for multiple)"),
|
||
] = None,
|
||
ttl: Annotated[Optional[int], typer.Option("--ttl", help="TTL in hours")] = None,
|
||
description: Annotated[str, typer.Option("--description", "-d")] = "",
|
||
) -> None:
|
||
"""Add an actor to the principals inventory."""
|
||
cfg = _load_cfg()
|
||
|
||
try:
|
||
validate_actor_name(actor_name, actor_type)
|
||
except ValueError as e:
|
||
err.print(f"[red]{e}[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
resolved_principals: List[str] = principals or [actor_name]
|
||
inventory = _load_inventory(cfg)
|
||
inventory.actors[actor_name] = ActorEntry(
|
||
name=actor_name,
|
||
actor_type=actor_type,
|
||
principals=resolved_principals,
|
||
ttl_hours=ttl or DEFAULT_TTL_HOURS[actor_type],
|
||
description=description,
|
||
)
|
||
try:
|
||
save_inventory(inventory, cfg.inventory_path)
|
||
except Exception as e:
|
||
err.print(f"[red]Failed to save inventory:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
console.print(
|
||
f"[green]Added[/green] {actor_name} "
|
||
f"(type={actor_type.value}, principals={resolved_principals}, ttl={ttl or DEFAULT_TTL_HOURS[actor_type]}h)"
|
||
)
|
||
|
||
|
||
@inventory_app.command("remove")
|
||
def inventory_remove(
|
||
actor_name: Annotated[str, typer.Argument(help="Actor name to remove")],
|
||
) -> None:
|
||
"""Remove an actor from the principals inventory."""
|
||
cfg = _load_cfg()
|
||
inventory = _load_inventory(cfg)
|
||
|
||
if actor_name not in inventory.actors:
|
||
err.print(f"[red]Actor {actor_name!r} not in inventory.[/red]")
|
||
raise typer.Exit(1)
|
||
|
||
del inventory.actors[actor_name]
|
||
try:
|
||
save_inventory(inventory, cfg.inventory_path)
|
||
except Exception as e:
|
||
err.print(f"[red]Failed to save inventory:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
console.print(f"[green]Removed[/green] {actor_name}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden cleanup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def cleanup(
|
||
actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None,
|
||
dry_run: Annotated[bool, typer.Option("--dry-run", help="Preview without deleting")] = False,
|
||
) -> None:
|
||
"""Remove stale (expired > 5 min) certificates from state dir."""
|
||
cfg = _load_cfg()
|
||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||
|
||
if actor_name:
|
||
cert_path = cfg.state_dir / f"{actor_name}-cert.pub"
|
||
paths = [cert_path] if cert_path.exists() else []
|
||
else:
|
||
paths = sorted(cfg.state_dir.glob("*-cert.pub")) if cfg.state_dir.exists() else []
|
||
|
||
removed = []
|
||
for cert_path in paths:
|
||
try:
|
||
meta = parse_cert_metadata(cert_path)
|
||
except Exception:
|
||
continue
|
||
if meta["valid_before"] < cutoff:
|
||
if dry_run:
|
||
console.print(f"would remove: {cert_path.name}")
|
||
else:
|
||
cert_path.unlink()
|
||
console.print(f"removed: {cert_path.name}")
|
||
removed.append(cert_path.name)
|
||
|
||
if not removed:
|
||
console.print("No stale certificates found.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden log
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.command()
|
||
def log(
|
||
actor_name: Annotated[Optional[str], typer.Argument(help="Filter by actor name")] = None,
|
||
last: Annotated[int, typer.Option("--last", help="Show last N entries")] = 20,
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""Show outgoing certificate signing history."""
|
||
cfg = _load_cfg()
|
||
log_path = cfg.state_dir / "signatures.log"
|
||
|
||
if not log_path.exists():
|
||
if output_json:
|
||
print("[]")
|
||
else:
|
||
console.print("No signatures log found.")
|
||
return
|
||
|
||
entries = []
|
||
for line in log_path.read_text().splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
entry = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
if actor_name and entry.get("actor") != actor_name:
|
||
continue
|
||
entries.append(entry)
|
||
|
||
entries = entries[-last:]
|
||
|
||
if output_json:
|
||
print(json.dumps(entries, indent=2))
|
||
return
|
||
|
||
if not entries:
|
||
console.print("No matching log entries.")
|
||
return
|
||
|
||
table = Table(title="Signatures Log")
|
||
table.add_column("Timestamp")
|
||
table.add_column("Actor")
|
||
table.add_column("Type")
|
||
table.add_column("Identity")
|
||
table.add_column("TTL (h)")
|
||
table.add_column("Valid Before (UTC)")
|
||
table.add_column("Backend")
|
||
for e in entries:
|
||
table.add_row(
|
||
e.get("timestamp", "")[:19],
|
||
e.get("actor", ""),
|
||
e.get("actor_type", ""),
|
||
e.get("identity", ""),
|
||
str(e.get("ttl_hours", "")),
|
||
e.get("valid_before", "")[:19],
|
||
e.get("backend", ""),
|
||
)
|
||
console.print(table)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden route — read-only routing lookup over the pointer catalog
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _load_catalog():
|
||
from warden.routing import CatalogError, load_catalog
|
||
try:
|
||
return load_catalog()
|
||
except CatalogError as e:
|
||
err.print(f"[red]Routing catalog error:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
|
||
def _entry_summary(entry) -> dict:
|
||
"""Pointer-only summary. Never includes secret material."""
|
||
return {
|
||
"id": entry.id,
|
||
"title": entry.title,
|
||
"owner_repo": entry.owner_repo,
|
||
"subsystem": entry.subsystem,
|
||
"warden_executes": entry.warden_executes,
|
||
# warden_role tells an agent at a glance whether ops-warden runs this lane
|
||
# itself (issue), proxies the fetch as the caller (assist), or only points (route).
|
||
"warden_role": (
|
||
"issue" if entry.warden_executes
|
||
else "assist" if entry.exec_capable
|
||
else "route"
|
||
),
|
||
"exec_capable": entry.exec_capable,
|
||
# resolvable: can `warden access --fetch` run this now with no <…> to fill?
|
||
# Lets an automated caller gate on readiness before attempting a fetch.
|
||
"resolvable": entry.resolvable,
|
||
# Owner-native exec front door (WP-0019): when present, this subsystem's exec is
|
||
# the PRIMARY path; ops-warden's proxy is the transparent fallback.
|
||
**(
|
||
{
|
||
"exec_owner": entry.exec_owner,
|
||
"exec_command": entry.exec_command,
|
||
"pointer_command": entry.pointer_command,
|
||
}
|
||
if entry.has_native_exec
|
||
else {}
|
||
),
|
||
"wiki_ref": entry.wiki_ref,
|
||
"canon_ref": entry.canon_ref,
|
||
"reviewed": entry.reviewed,
|
||
"status": entry.status,
|
||
}
|
||
|
||
|
||
def _print_entry_table(
|
||
entries, title: str, *, show_reviewed: bool = False, stale_threshold_days: int = 90
|
||
) -> None:
|
||
table = Table(title=title)
|
||
table.add_column("ID")
|
||
table.add_column("Need")
|
||
table.add_column("Owner")
|
||
table.add_column("warden")
|
||
if show_reviewed:
|
||
table.add_column("Reviewed")
|
||
table.add_column("Days")
|
||
table.add_column("Status")
|
||
from warden.routing.catalog import days_since_review
|
||
|
||
for e in entries:
|
||
if e.warden_executes:
|
||
executes = "[green]issue[/green]"
|
||
elif e.exec_capable:
|
||
executes = "[cyan]assist[/cyan]" # warden access --fetch/--exec proxies it
|
||
else:
|
||
executes = "route"
|
||
status_styled = e.status if e.status == "active" else f"[yellow]{e.status}[/yellow]"
|
||
if show_reviewed:
|
||
days = days_since_review(e.reviewed)
|
||
reviewed_styled = (
|
||
f"[yellow]{e.reviewed}[/yellow]"
|
||
if days > stale_threshold_days
|
||
else e.reviewed
|
||
)
|
||
table.add_row(
|
||
e.id, e.title, e.owner_repo, executes, reviewed_styled, str(days), status_styled
|
||
)
|
||
else:
|
||
table.add_row(e.id, e.title, e.owner_repo, executes, status_styled)
|
||
console.print(table)
|
||
|
||
|
||
@route_app.command("list")
|
||
def route_list(
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
|
||
tag: Annotated[Optional[str], typer.Option("--tag", help="Filter by need keyword")] = None,
|
||
stale_only: Annotated[
|
||
bool, typer.Option("--stale", help="Show entries past review cadence (see --stale-days)")
|
||
] = False,
|
||
stale_days: Annotated[
|
||
int,
|
||
typer.Option(
|
||
"--stale-days",
|
||
help="Days since reviewed before an entry is stale (default 90)",
|
||
min=1,
|
||
),
|
||
] = 90,
|
||
) -> None:
|
||
"""List routing scenarios. Active-only unless --all."""
|
||
from warden.routing.catalog import days_since_review
|
||
|
||
catalog = _load_catalog()
|
||
if stale_only:
|
||
entries = catalog.stale(include_draft=all_entries, threshold_days=stale_days)
|
||
else:
|
||
entries = catalog.listed(include_draft=all_entries)
|
||
if tag:
|
||
t = tag.lower()
|
||
entries = [e for e in entries if t in [k.lower() for k in e.need_keywords]]
|
||
|
||
if output_json:
|
||
payload = []
|
||
for e in entries:
|
||
row = _entry_summary(e)
|
||
if stale_only:
|
||
row["days_since_review"] = days_since_review(e.reviewed)
|
||
row["stale_threshold_days"] = stale_days
|
||
payload.append(row)
|
||
print(json.dumps(payload, indent=2))
|
||
return
|
||
|
||
if not entries:
|
||
if stale_only:
|
||
console.print(f"No stale routing entries (threshold: {stale_days} days since reviewed).")
|
||
else:
|
||
console.print("No matching routing entries.")
|
||
return
|
||
title = (
|
||
f"Stale routing scenarios (>{stale_days}d since reviewed)"
|
||
if stale_only
|
||
else "Routing scenarios"
|
||
)
|
||
_print_entry_table(
|
||
entries, title, show_reviewed=stale_only, stale_threshold_days=stale_days
|
||
)
|
||
|
||
|
||
@route_app.command("show")
|
||
def route_show(
|
||
entry_id: Annotated[str, typer.Argument(help="Catalog entry id (see `warden route list`)")],
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""Show owner, pointers, and (SSH only) the authored steps for one scenario."""
|
||
catalog = _load_catalog()
|
||
entry = catalog.get(entry_id)
|
||
if entry is None:
|
||
err.print(
|
||
f"[red]Unknown routing id {entry_id!r}.[/red] "
|
||
f"Try: warden route find {entry_id!r}"
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
if output_json:
|
||
summary = _entry_summary(entry)
|
||
summary["need_keywords"] = entry.need_keywords
|
||
if entry.warden_executes:
|
||
summary["steps"] = entry.steps
|
||
summary["cert_command"] = entry.cert_command
|
||
elif entry.has_native_exec:
|
||
summary["next_action"] = (
|
||
f"primary: run via {entry.exec_owner} — `{entry.exec_command}`; ops-warden "
|
||
f"routes to the owner (fallback: `warden access <need> --exec`). See `{entry.wiki_ref}`."
|
||
)
|
||
elif entry.exec_capable:
|
||
summary["next_action"] = (
|
||
f"ops-warden can proxy this as the caller: `warden access <need> --fetch`"
|
||
f" (or `--exec -- <cmd>`); runs {entry.owner_repo}'s tool with your "
|
||
f"identity. See `{entry.wiki_ref}`."
|
||
)
|
||
else:
|
||
summary["next_action"] = (
|
||
f"next action on `{entry.owner_repo}` — see `{entry.wiki_ref}`"
|
||
)
|
||
print(json.dumps(summary, indent=2))
|
||
return
|
||
|
||
console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])")
|
||
console.print(f" owner : {entry.owner_repo} ({entry.subsystem})")
|
||
console.print(f" wiki : {entry.wiki_ref}")
|
||
console.print(f" canon : {entry.canon_ref}")
|
||
console.print(f" reviewed : {entry.reviewed} status: {entry.status}")
|
||
|
||
if entry.warden_executes:
|
||
console.print("\n[green]ops-warden issues this directly.[/green]")
|
||
console.print(f" cert_command: [bold]{entry.cert_command}[/bold]")
|
||
if entry.steps:
|
||
console.print(" steps:")
|
||
for i, step in enumerate(entry.steps, 1):
|
||
console.print(f" {i}. {step}")
|
||
console.print(
|
||
" precondition: actor in inventory? backend configured? run `warden status`."
|
||
)
|
||
else:
|
||
console.print(
|
||
f"\n[yellow]ops-warden does not issue this.[/yellow] "
|
||
f"Next action on [bold]{entry.owner_repo}[/bold] — see {entry.wiki_ref}."
|
||
)
|
||
|
||
|
||
@route_app.command("find")
|
||
def route_find(
|
||
query: Annotated[str, typer.Argument(help="Free-text need, e.g. 'issue core api key'")],
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
|
||
limit: Annotated[int, typer.Option("--limit", help="Max matches")] = 5,
|
||
) -> None:
|
||
"""Rank routing scenarios by keyword overlap with the query."""
|
||
catalog = _load_catalog()
|
||
matches = catalog.find(query, include_draft=all_entries, limit=limit)
|
||
|
||
if output_json:
|
||
print(json.dumps([_entry_summary(e) for e in matches], indent=2))
|
||
return
|
||
|
||
if not matches:
|
||
console.print(
|
||
f"No routing match for {query!r}. "
|
||
"Try `warden route list --all` to browse all scenarios."
|
||
)
|
||
return
|
||
_print_entry_table(matches, f"Matches for {query!r}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden access — operator front door (advisory; proxy lands in T3)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _access_json(entry, expanded, gate: str, domain: Optional[str]) -> dict:
|
||
"""Stable, secret-free JSON shape for agentic operators. WP-0014 T2."""
|
||
payload = _entry_summary(entry)
|
||
payload["domain"] = domain
|
||
payload["policy_gate"] = gate
|
||
payload["handoff"] = {
|
||
"auth_method": expanded.auth_method,
|
||
"path_template": expanded.path_template,
|
||
"fetch_command": expanded.fetch_command,
|
||
"policy_ref": expanded.policy_ref,
|
||
"exec_capable": expanded.exec_capable,
|
||
}
|
||
if entry.warden_executes:
|
||
payload["next_action"] = "ops-warden issues this directly — see cert_command"
|
||
payload["cert_command"] = entry.cert_command
|
||
elif entry.has_native_exec:
|
||
payload["next_action"] = (
|
||
f"primary: run via {entry.exec_owner} — `{entry.exec_command}`; "
|
||
"ops-warden routes to the owner (fallback: `warden access <need> --exec`). "
|
||
"ops-warden holds no token."
|
||
)
|
||
elif expanded.exec_capable:
|
||
verb = "fetch" if entry.lane != "login" else "login"
|
||
payload["next_action"] = (
|
||
f"ops-warden can proxy this {verb} as the caller: "
|
||
f"`warden access <need> --fetch`"
|
||
+ ("" if entry.lane == "login" else " (or `--exec -- <cmd>`)")
|
||
+ f". Runs {entry.owner_repo}'s tool with your identity; ops-warden holds no value."
|
||
)
|
||
else:
|
||
payload["next_action"] = (
|
||
f"obtain from {entry.owner_repo} ({entry.subsystem}); "
|
||
"ops-warden holds no value"
|
||
)
|
||
return payload
|
||
|
||
|
||
def _access_proxy(
|
||
entry,
|
||
*,
|
||
domain: Optional[str],
|
||
field: Optional[str],
|
||
path: Optional[str],
|
||
do_exec: bool,
|
||
child_argv: list,
|
||
no_policy: bool,
|
||
) -> None:
|
||
"""Proxy a non-SSH credential fetch as the caller (WP-0014 T3).
|
||
|
||
Enforces the three guardrails: caller identity (no warden token), policy gate
|
||
before fetch, and transit-only (no value persisted or logged). All warden chatter
|
||
goes to stderr so --fetch stdout carries only the secret.
|
||
"""
|
||
from warden.proxy import (
|
||
ProxyError,
|
||
caller_auth_present,
|
||
proxy_exec,
|
||
proxy_fetch,
|
||
resolve_fetch_command,
|
||
write_audit,
|
||
)
|
||
from warden.policy import check_fetch_policy
|
||
|
||
if not entry.exec_capable:
|
||
err.print(
|
||
f"[red]{entry.id!r} is not exec_capable.[/red] "
|
||
"Use `warden access` (advisory) and obtain it from the owner directly."
|
||
)
|
||
raise typer.Exit(2)
|
||
|
||
# Proxy is privileged — require a real config for policy posture + audit sink.
|
||
try:
|
||
cfg = load_config()
|
||
except ConfigError as e:
|
||
err.print(
|
||
f"[red]Proxy requires warden.yaml[/red] (policy gate + audit sink): {e}\n"
|
||
"Advisory mode works without it: drop --fetch/--exec."
|
||
)
|
||
raise typer.Exit(2)
|
||
|
||
is_login = entry.lane == "login"
|
||
decision_id = None
|
||
|
||
if is_login:
|
||
# Login lane: interactive auth bootstrap. No caller-auth precheck (you have no
|
||
# token yet — that's the point) and no secret-read gate (it needs an identity
|
||
# this flow establishes). --exec is meaningless here.
|
||
if do_exec:
|
||
err.print(
|
||
"[red]--exec is not valid for a login lane[/red] "
|
||
f"({entry.id!r} is interactive auth). Use --fetch."
|
||
)
|
||
raise typer.Exit(2)
|
||
err.print(
|
||
"[dim]login lane — interactive auth bootstrap; no secret-read gate, "
|
||
"token stays in the caller's own store.[/dim]"
|
||
)
|
||
else:
|
||
# G1 — caller identity. ops-warden adds no token of its own.
|
||
if not caller_auth_present():
|
||
err.print(
|
||
"[red]No caller credential found[/red] (VAULT_TOKEN/BAO_TOKEN or ~/.vault-token). "
|
||
f"Authenticate first: {entry.auth_method or 'see the owner auth path'}."
|
||
)
|
||
raise typer.Exit(3)
|
||
|
||
# G3 — policy gate before fetch.
|
||
if cfg.policy.enabled:
|
||
try:
|
||
decision_id = check_fetch_policy(
|
||
cfg.policy, need_id=entry.id, owner_repo=entry.owner_repo, domain=domain
|
||
)
|
||
except CAError as e:
|
||
err.print(f"[red]Policy gate denied the fetch:[/red] {e}")
|
||
raise typer.Exit(4)
|
||
err.print(f"[green]flex-auth allow[/green] (decision {decision_id}).")
|
||
elif not no_policy:
|
||
err.print(
|
||
"[yellow]flex-auth gate is not enforced[/yellow] (policy.enabled=false). "
|
||
"Re-run with [bold]--no-policy[/bold] to proxy ungated, or enable the gate."
|
||
)
|
||
raise typer.Exit(4)
|
||
else:
|
||
err.print("[yellow]Proxying ungated[/yellow] (--no-policy; gate not enforced).")
|
||
|
||
try:
|
||
argv = resolve_fetch_command(entry, domain=domain, field=field, path=path)
|
||
except ProxyError as e:
|
||
err.print(f"[red]{e}[/red]")
|
||
raise typer.Exit(2)
|
||
|
||
action = "login" if is_login else ("exec" if do_exec else "fetch")
|
||
err.print(
|
||
f"[dim]proxy {action}: {entry.id} → {entry.owner_repo} "
|
||
f"(caller identity; value not persisted)[/dim]"
|
||
)
|
||
try:
|
||
if do_exec:
|
||
if not child_argv:
|
||
err.print("[red]--exec needs a command after `--`[/red], e.g. `-- npm publish`.")
|
||
raise typer.Exit(2)
|
||
rc = proxy_exec(argv, env_var=field or "", child_argv=child_argv)
|
||
else:
|
||
rc = proxy_fetch(argv)
|
||
except ProxyError as e:
|
||
err.print(f"[red]{e}[/red]")
|
||
raise typer.Exit(5)
|
||
finally:
|
||
try:
|
||
write_audit(
|
||
cfg.state_dir,
|
||
need_id=entry.id,
|
||
owner_repo=entry.owner_repo,
|
||
domain=domain,
|
||
action=action,
|
||
decision_id=decision_id,
|
||
)
|
||
except OSError as e:
|
||
err.print(f"[yellow]audit write failed:[/yellow] {e}")
|
||
|
||
raise typer.Exit(rc)
|
||
|
||
|
||
@app.command(
|
||
"access",
|
||
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
|
||
)
|
||
def access(
|
||
ctx: typer.Context,
|
||
need: Annotated[str, typer.Argument(help="Free-text need, e.g. 'npm token', 'db password'")],
|
||
domain: Annotated[
|
||
Optional[str],
|
||
typer.Option("--domain", help="Substitute <domain> in path/auth templates, e.g. coulomb_social"),
|
||
] = None,
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON (stable, secret-free)")] = False,
|
||
all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False,
|
||
do_fetch: Annotated[
|
||
bool, typer.Option("--fetch", help="Proxy the fetch as the caller; value streams to stdout")
|
||
] = False,
|
||
do_exec: Annotated[
|
||
bool,
|
||
typer.Option("--exec", help="Run the trailing command (after --) with the secret in its env"),
|
||
] = False,
|
||
field: Annotated[
|
||
Optional[str], typer.Option("--field", help="Secret field / env-var name, e.g. NPM_AUTH_TOKEN")
|
||
] = None,
|
||
path: Annotated[
|
||
Optional[str], typer.Option("--path", help="Override the owner-side path template")
|
||
] = None,
|
||
no_policy: Annotated[
|
||
bool,
|
||
typer.Option("--no-policy", help="Acknowledge proxying when the flex-auth gate is not enforced"),
|
||
] = False,
|
||
) -> None:
|
||
"""Operator front door: how to obtain any credential, gated and audited.
|
||
|
||
Advisory by default — renders the owner, auth method, path template, command
|
||
skeleton, and policy gate status for the best-matching need. ops-warden issues
|
||
the SSH lane directly and **routes every other need to its owner** — it never
|
||
holds or vends the secret value.
|
||
|
||
With --fetch / --exec it proxies the fetch *as the caller* for exec_capable lanes:
|
||
the flex-auth gate runs first, ops-warden adds no credential of its own, the value
|
||
is never persisted or logged, and only metadata is audited.
|
||
"""
|
||
from warden.access import expand_handoff, policy_gate_status
|
||
|
||
catalog = _load_catalog()
|
||
matches = catalog.find(need, include_draft=all_entries, limit=1)
|
||
if not matches:
|
||
err.print(
|
||
f"[red]No access match for {need!r}.[/red] "
|
||
"Try `warden route list --all` to browse, or rephrase the need."
|
||
)
|
||
raise typer.Exit(1)
|
||
|
||
entry = matches[0]
|
||
|
||
if do_fetch or do_exec:
|
||
_access_proxy(
|
||
entry,
|
||
domain=domain,
|
||
field=field,
|
||
path=path,
|
||
do_exec=do_exec,
|
||
child_argv=list(ctx.args),
|
||
no_policy=no_policy,
|
||
)
|
||
return
|
||
|
||
expanded = expand_handoff(entry, domain)
|
||
gate = policy_gate_status()
|
||
|
||
if output_json:
|
||
print(json.dumps(_access_json(entry, expanded, gate, domain), indent=2))
|
||
return
|
||
|
||
console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])")
|
||
console.print(f" owner : {entry.owner_repo} ({entry.subsystem})")
|
||
|
||
if entry.warden_executes:
|
||
console.print("\n[green]ops-warden issues this directly.[/green]")
|
||
console.print(f" run : [bold]{entry.cert_command}[/bold]")
|
||
if entry.steps:
|
||
for i, step in enumerate(entry.steps, 1):
|
||
console.print(f" {i}. {step}")
|
||
return
|
||
|
||
if expanded.auth_method:
|
||
console.print(f" auth : {expanded.auth_method}")
|
||
if expanded.path_template:
|
||
console.print(f" path : {expanded.path_template}")
|
||
if expanded.fetch_command:
|
||
console.print(f" fetch : {expanded.fetch_command}")
|
||
if expanded.policy_ref:
|
||
console.print(f" policy : {expanded.policy_ref} [dim]({gate})[/dim]")
|
||
console.print(f" wiki : {entry.wiki_ref}")
|
||
console.print(f" canon : {entry.canon_ref}")
|
||
|
||
proxy = f"warden access {need!r}"
|
||
if domain:
|
||
proxy += f" --domain {domain}"
|
||
|
||
if entry.has_native_exec:
|
||
console.print(
|
||
f" exec : [bold]{entry.exec_command}[/bold] "
|
||
f"[cyan](via {entry.exec_owner} — primary)[/cyan]"
|
||
)
|
||
if entry.pointer_command:
|
||
console.print(f" pointer : [dim]{entry.pointer_command}[/dim]")
|
||
if expanded.exec_capable:
|
||
label = "fallback" if entry.has_native_exec else "proxy"
|
||
hint = (
|
||
"transparent conduit — fetches as you"
|
||
if entry.lane != "login"
|
||
else "runs the interactive login as you"
|
||
)
|
||
console.print(f" {label:<8} : [dim]{proxy} --fetch[/dim] [yellow]({hint})[/yellow]")
|
||
if expanded.path_template and "<" in expanded.path_template:
|
||
console.print(
|
||
" note : remaining <…> placeholders are owner-confirmed names "
|
||
f"(coordinate with {entry.owner_repo})."
|
||
)
|
||
|
||
if entry.has_native_exec:
|
||
console.print(
|
||
f"\n[green]Primary:[/green] run it via [bold]{entry.exec_owner}[/bold] — "
|
||
f"[bold]{entry.exec_command}[/bold]. ops-warden routes to the owner and holds no token.\n"
|
||
f"[dim]Fallback:[/dim] [bold]{proxy} --exec -- <cmd>[/bold] — ops-warden's transparent "
|
||
"conduit (runs the fetch as you, holds nothing)."
|
||
)
|
||
elif expanded.exec_capable:
|
||
verb = "fetch this for you" if entry.lane != "login" else "run this login for you"
|
||
console.print(
|
||
f"\n[green]ops-warden can {verb}[/green] as the caller — "
|
||
f"[bold]{proxy} --fetch[/bold]"
|
||
+ ("" if entry.lane == "login" else f" (or [bold]{proxy} --exec -- <cmd>[/bold])")
|
||
+ f". It runs {entry.owner_repo}'s tool with [bold]your[/bold] identity; the "
|
||
"value streams to you and ops-warden never holds, caches, or logs it."
|
||
)
|
||
else:
|
||
console.print(
|
||
f"\n[yellow]ops-warden does not hold this secret.[/yellow] "
|
||
f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — "
|
||
"warden advises, the owner vends."
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden policy — read-only Workload Security Posture lookup (WP-0015 T2)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _load_posture():
|
||
from warden.posture import PostureError, load_posture
|
||
try:
|
||
return load_posture()
|
||
except PostureError as e:
|
||
err.print(f"[red]Posture descriptor error:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
|
||
@policy_app.command("list")
|
||
def policy_list(
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""List both posture axes: environment postures and workload maturity levels."""
|
||
cat = _load_posture()
|
||
if output_json:
|
||
print(json.dumps({
|
||
"env_postures": [vars(e) for e in cat.env_postures],
|
||
"maturity_levels": [vars(m) for m in cat.maturity_levels],
|
||
"dataclass_floor": cat.dataclass_floor,
|
||
"requires_env_posture": cat.requires_env_posture,
|
||
}, indent=2))
|
||
return
|
||
|
||
env_table = Table(title="Axis A — environment posture")
|
||
for col in ("ID", "rank", "backend", "real values", "user data", "audit"):
|
||
env_table.add_column(col)
|
||
for e in sorted(cat.env_postures, key=lambda x: x.rank):
|
||
env_table.add_row(e.id, str(e.rank), e.backend, e.real_values, e.real_user_data, e.audit)
|
||
console.print(env_table)
|
||
|
||
mat_table = Table(title="Axis B — workload maturity")
|
||
for col in ("ID", "rank", "phase", "max dataclass", "promotion gate"):
|
||
mat_table.add_column(col)
|
||
for m in sorted(cat.maturity_levels, key=lambda x: x.rank):
|
||
mat_table.add_row(m.id, str(m.rank), m.phase, m.max_dataclass, ", ".join(m.promotion_gate) or "—")
|
||
console.print(mat_table)
|
||
console.print(
|
||
f"\n[dim]lattice: deliver iff env=={cat.requires_env_posture} and "
|
||
"workload.maturity >= secret.required_maturity (and the dataclass floor).[/dim]"
|
||
)
|
||
|
||
|
||
@policy_app.command("show")
|
||
def policy_show(
|
||
descriptor_id: Annotated[str, typer.Argument(help="An env posture (dev/test/prod) or maturity level (M0–M3)")],
|
||
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
|
||
) -> None:
|
||
"""Show one environment posture or maturity level."""
|
||
cat = _load_posture()
|
||
env = cat.env(descriptor_id)
|
||
mat = cat.maturity(descriptor_id)
|
||
if env is None and mat is None:
|
||
err.print(
|
||
f"[red]Unknown descriptor {descriptor_id!r}.[/red] "
|
||
"Try `warden policy list`."
|
||
)
|
||
raise typer.Exit(1)
|
||
obj = env or mat
|
||
if output_json:
|
||
print(json.dumps({"axis": "env_posture" if env else "maturity_level", **vars(obj)}, indent=2))
|
||
return
|
||
axis = "environment posture" if env else "workload maturity level"
|
||
console.print(f"[bold]{obj.id}[/bold] ([cyan]{axis}[/cyan])")
|
||
for k, v in vars(obj).items():
|
||
if k == "id":
|
||
continue
|
||
console.print(f" {k:14}: {', '.join(v) if isinstance(v, list) else v}")
|
||
if mat:
|
||
floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id]
|
||
if floor:
|
||
console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# warden worker — autonomous coordination worker (WP-0020 T1: dry-run scaffold)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@worker_app.command("run")
|
||
def worker_run(
|
||
once: Annotated[bool, typer.Option("--once", help="Process the inbox once and exit")] = True,
|
||
dry_run: Annotated[
|
||
bool,
|
||
typer.Option("--dry-run/--execute", help="Plan only (default); --execute lands in WP-0020 T3"),
|
||
] = True,
|
||
brain: Annotated[
|
||
str,
|
||
typer.Option("--brain", help="Planner: 'rule' (deterministic, default) or 'llm' (llm-connect)"),
|
||
] = "rule",
|
||
) -> None:
|
||
"""Read ops-warden's unread coordination requests and render a guardrailed plan.
|
||
|
||
Plans with the deterministic RuleBrain (default) or the llm-connect brain (--brain llm).
|
||
Either way the allowlist + no-secret guardrails are enforced on every action. --execute
|
||
is rejected until the guarded executor (T3) ships; dry-run is the default.
|
||
"""
|
||
from warden.worker import HubClient, LlmConnectBrain, RuleBrain, build_plans, render_plans
|
||
|
||
if not dry_run:
|
||
err.print(
|
||
"[red]--execute is not available yet[/red] (WP-0020 T3). "
|
||
"The worker runs dry-run only until the guarded executor lands."
|
||
)
|
||
raise typer.Exit(2)
|
||
|
||
if brain not in ("rule", "llm"):
|
||
err.print(f"[red]Unknown --brain {brain!r}[/red] (expected 'rule' or 'llm').")
|
||
raise typer.Exit(2)
|
||
|
||
try:
|
||
messages = HubClient().unread()
|
||
except Exception as e: # noqa: BLE001 — surface any transport error as a clean message
|
||
err.print(f"[red]Could not read the State Hub inbox:[/red] {e}")
|
||
raise typer.Exit(1)
|
||
|
||
chosen = LlmConnectBrain() if brain == "llm" else RuleBrain()
|
||
plans = build_plans(messages, chosen)
|
||
console.print(render_plans(plans))
|
||
auto = sum(1 for p in plans if not p.escalated)
|
||
console.print(
|
||
f"\n[dim]{len(plans)} request(s): {auto} auto-actionable, "
|
||
f"{len(plans) - auto} need a human. (dry-run — nothing executed)[/dim]"
|
||
)
|