"""OpsWarden CLI.""" from __future__ import annotations import json from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Annotated, List, Optional import typer from rich.console import Console from rich.table import Table from warden.ca import CAError, LocalCA, parse_cert_metadata from warden.config import ConfigError, WardenConfig, load_config from warden.policy import check_sign_policy from warden.inventory import ActorEntry, InventoryError, PrincipalsInventory, load_inventory, save_inventory from warden.models import ActorType, CertSpec, DEFAULT_TTL_HOURS, validate_actor_name from warden.scorecard import run_scorecard app = typer.Typer( help="OpsWarden — SSH CA and certificate lifecycle manager", no_args_is_help=True, ) inventory_app = typer.Typer(help="Manage principals inventory", no_args_is_help=True) app.add_typer(inventory_app, name="inventory") route_app = typer.Typer( help="Look up which subsystem owns a credential need (read-only pointer layer)", no_args_is_help=True, ) app.add_typer(route_app, name="route") console = Console() err = Console(stderr=True) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _load_cfg() -> WardenConfig: try: return load_config() except ConfigError as e: err.print(f"[red]Config error:[/red] {e}") raise typer.Exit(1) def _load_inventory(cfg: WardenConfig) -> PrincipalsInventory: try: return load_inventory(cfg.inventory_path) except InventoryError as e: err.print(f"[red]Inventory error:[/red] {e}") raise typer.Exit(1) def _get_ca(cfg: WardenConfig): if cfg.backend == "vault": from warden.vault import VaultCA return VaultCA(cfg.vault, cfg.state_dir) return LocalCA(cfg.ca_key, cfg.state_dir) def _apply_policy_gate(cfg: WardenConfig, spec: CertSpec) -> None: """Run flex-auth check when policy.enabled; sets spec.policy_decision_id.""" decision_id = check_sign_policy(cfg.policy, spec) if decision_id: spec.policy_decision_id = decision_id # --------------------------------------------------------------------------- # warden sign # --------------------------------------------------------------------------- @app.command() def sign( actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")], pubkey: Annotated[Path, typer.Option("--pubkey", help="Path to actor's public key file")], ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None, ) -> None: """Sign a public key for the given actor. Writes cert text to stdout. This is the cert_command interface: ops-bridge calls this and uses stdout as the certificate passed to SSH alongside the private key. """ cfg = _load_cfg() inventory = _load_inventory(cfg) entry = inventory.actors.get(actor_name) if entry is None: err.print( f"[red]Actor {actor_name!r} not found in inventory.[/red] " f"Add it with: warden inventory add" ) raise typer.Exit(1) spec = CertSpec( actor_name=actor_name, actor_type=entry.actor_type, pubkey_path=pubkey, ttl_hours=ttl or entry.ttl_hours, principals=entry.principals, identity=actor_name, ) ca = _get_ca(cfg) try: _apply_policy_gate(cfg, spec) record = ca.sign(spec) except CAError as e: err.print(f"[red]Signing failed:[/red] {e}") raise typer.Exit(1) # cert_command interface: write cert text to stdout only print(record.cert_path.read_text().strip()) # --------------------------------------------------------------------------- # warden issue # --------------------------------------------------------------------------- @app.command() def issue( actor_name: Annotated[str, typer.Argument(help="Actor name")], ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None, output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, ) -> None: """Generate a new keypair and sign it for the given actor. Only supported with the local backend. Outputs keypair + cert paths and metadata. """ cfg = _load_cfg() if cfg.backend != "local": err.print("[red]warden issue is only supported with the local backend.[/red]") raise typer.Exit(1) inventory = _load_inventory(cfg) entry = inventory.actors.get(actor_name) if entry is None: err.print(f"[red]Actor {actor_name!r} not found in inventory.[/red]") raise typer.Exit(1) ca = LocalCA(cfg.ca_key, cfg.state_dir) try: privkey_path, pubkey_path = ca.generate_keypair(actor_name) except CAError as e: err.print(f"[red]Key generation failed:[/red] {e}") raise typer.Exit(1) spec = CertSpec( actor_name=actor_name, actor_type=entry.actor_type, pubkey_path=pubkey_path, ttl_hours=ttl or entry.ttl_hours, principals=entry.principals, identity=actor_name, ) try: _apply_policy_gate(cfg, spec) record = ca.sign(spec) except CAError as e: err.print(f"[red]Signing failed:[/red] {e}") raise typer.Exit(1) result = { "actor": actor_name, "privkey": str(privkey_path), "cert": str(record.cert_path), "identity": record.identity, "principals": record.principals, "valid_before": record.valid_before.isoformat(), "signed_at": record.signed_at.isoformat(), } if output_json: print(json.dumps(result, indent=2)) else: console.print(f"[green]Issued credentials for {actor_name}[/green]") for k, v in result.items(): console.print(f" {k}: {v}") # --------------------------------------------------------------------------- # warden status # --------------------------------------------------------------------------- @app.command() def status( actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None, output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, state_dir_override: Annotated[Optional[Path], typer.Option("--state-dir", help="State dir path (bypasses config)")] = None, ) -> None: """Show certificate status. Exits 1 if any cert is expired.""" now = datetime.now(timezone.utc) if state_dir_override is not None: state_dir = state_dir_override else: cfg = _load_cfg() state_dir = cfg.state_dir if actor_name: cert_path = state_dir / f"{actor_name}-cert.pub" paths = [cert_path] if cert_path.exists() else [] else: paths = sorted(state_dir.glob("*-cert.pub")) if state_dir.exists() else [] if not paths: msg = ( f"No certificate found for {actor_name!r} (static key / no cert)" if actor_name else "No certificates in state dir." ) console.print(msg) return rows = [] for cert_path in paths: name = cert_path.stem.replace("-cert", "") try: meta = parse_cert_metadata(cert_path) valid_before = meta["valid_before"] remaining = valid_before - now secs = remaining.total_seconds() if secs > 0: h, rem = divmod(int(secs), 3600) m = rem // 60 remaining_str = f"{h}h {m}m" expired = False else: remaining_str = "EXPIRED" expired = True rows.append({ "actor": name, "identity": meta["identity"], "principals": ", ".join(meta["principals"]), "valid_before": valid_before.isoformat(), "remaining": remaining_str, "expired": expired, }) except Exception as e: rows.append({"actor": name, "error": str(e), "expired": False}) if output_json: print(json.dumps(rows, indent=2)) else: table = Table(title="Certificate Status") table.add_column("Actor") table.add_column("Identity") table.add_column("Principals") table.add_column("Valid Before (UTC)") table.add_column("Remaining") for row in rows: if "error" in row: table.add_row(row["actor"], "[red]parse error[/red]", "", "", row["error"]) else: rem_styled = ( f"[red]{row['remaining']}[/red]" if row["expired"] else row["remaining"] ) table.add_row( row["actor"], row["identity"], row["principals"], row["valid_before"], rem_styled, ) console.print(table) if any(r.get("expired") for r in rows): raise typer.Exit(1) # --------------------------------------------------------------------------- # warden scorecard # --------------------------------------------------------------------------- @app.command() def scorecard( output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, ) -> None: """Run compliance scorecard checks (AccessManagementDirective §5, cert-side).""" cfg = _load_cfg() inventory = _load_inventory(cfg) results = run_scorecard(cfg.state_dir, inventory) passed = sum(1 for r in results if r.passed) total = len(results) if output_json: print(json.dumps( [{"check": r.name, "passed": r.passed, "detail": r.detail} for r in results], indent=2, )) else: table = Table(title=f"OpsWarden Scorecard ({passed}/{total})") table.add_column("Check") table.add_column("Status") table.add_column("Detail") for r in results: status_str = "[green]PASS[/green]" if r.passed else "[red]FAIL[/red]" table.add_row(r.name, status_str, r.detail) console.print(table) console.print( f"\nScore: {passed}/{total} " + ("[green]Operational[/green]" if passed == total else "[yellow]Needs attention[/yellow]") ) if passed < total: raise typer.Exit(1) # --------------------------------------------------------------------------- # warden inventory # --------------------------------------------------------------------------- @inventory_app.command("list") def inventory_list( output_json: Annotated[bool, typer.Option("--json")] = False, ) -> None: """List all actors in the principals inventory.""" cfg = _load_cfg() inventory = _load_inventory(cfg) if not inventory.actors: console.print("No actors in inventory.") return if output_json: print(json.dumps({ name: { "type": e.actor_type.value, "principals": e.principals, "ttl_hours": e.ttl_hours, "description": e.description, } for name, e in inventory.actors.items() }, indent=2)) return table = Table(title=f"Principals Inventory ({cfg.inventory_path})") table.add_column("Actor") table.add_column("Type") table.add_column("Principals") table.add_column("TTL (h)") table.add_column("Description") for name, e in inventory.actors.items(): table.add_row( name, e.actor_type.value, ", ".join(e.principals), str(e.ttl_hours), e.description, ) console.print(table) @inventory_app.command("add") def inventory_add( actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")], actor_type: Annotated[ActorType, typer.Option("--type", "-t", help="adm | agt | atm")], principals: Annotated[ Optional[List[str]], typer.Option("--principal", "-p", help="Principal (repeat for multiple)"), ] = None, ttl: Annotated[Optional[int], typer.Option("--ttl", help="TTL in hours")] = None, description: Annotated[str, typer.Option("--description", "-d")] = "", ) -> None: """Add an actor to the principals inventory.""" cfg = _load_cfg() try: validate_actor_name(actor_name, actor_type) except ValueError as e: err.print(f"[red]{e}[/red]") raise typer.Exit(1) resolved_principals: List[str] = principals or [actor_name] inventory = _load_inventory(cfg) inventory.actors[actor_name] = ActorEntry( name=actor_name, actor_type=actor_type, principals=resolved_principals, ttl_hours=ttl or DEFAULT_TTL_HOURS[actor_type], description=description, ) try: save_inventory(inventory, cfg.inventory_path) except Exception as e: err.print(f"[red]Failed to save inventory:[/red] {e}") raise typer.Exit(1) console.print( f"[green]Added[/green] {actor_name} " f"(type={actor_type.value}, principals={resolved_principals}, ttl={ttl or DEFAULT_TTL_HOURS[actor_type]}h)" ) @inventory_app.command("remove") def inventory_remove( actor_name: Annotated[str, typer.Argument(help="Actor name to remove")], ) -> None: """Remove an actor from the principals inventory.""" cfg = _load_cfg() inventory = _load_inventory(cfg) if actor_name not in inventory.actors: err.print(f"[red]Actor {actor_name!r} not in inventory.[/red]") raise typer.Exit(1) del inventory.actors[actor_name] try: save_inventory(inventory, cfg.inventory_path) except Exception as e: err.print(f"[red]Failed to save inventory:[/red] {e}") raise typer.Exit(1) console.print(f"[green]Removed[/green] {actor_name}") # --------------------------------------------------------------------------- # warden cleanup # --------------------------------------------------------------------------- @app.command() def cleanup( actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None, dry_run: Annotated[bool, typer.Option("--dry-run", help="Preview without deleting")] = False, ) -> None: """Remove stale (expired > 5 min) certificates from state dir.""" cfg = _load_cfg() cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) if actor_name: cert_path = cfg.state_dir / f"{actor_name}-cert.pub" paths = [cert_path] if cert_path.exists() else [] else: paths = sorted(cfg.state_dir.glob("*-cert.pub")) if cfg.state_dir.exists() else [] removed = [] for cert_path in paths: try: meta = parse_cert_metadata(cert_path) except Exception: continue if meta["valid_before"] < cutoff: if dry_run: console.print(f"would remove: {cert_path.name}") else: cert_path.unlink() console.print(f"removed: {cert_path.name}") removed.append(cert_path.name) if not removed: console.print("No stale certificates found.") # --------------------------------------------------------------------------- # warden log # --------------------------------------------------------------------------- @app.command() def log( actor_name: Annotated[Optional[str], typer.Argument(help="Filter by actor name")] = None, last: Annotated[int, typer.Option("--last", help="Show last N entries")] = 20, output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, ) -> None: """Show outgoing certificate signing history.""" cfg = _load_cfg() log_path = cfg.state_dir / "signatures.log" if not log_path.exists(): if output_json: print("[]") else: console.print("No signatures log found.") return entries = [] for line in log_path.read_text().splitlines(): line = line.strip() if not line: continue try: entry = json.loads(line) except json.JSONDecodeError: continue if actor_name and entry.get("actor") != actor_name: continue entries.append(entry) entries = entries[-last:] if output_json: print(json.dumps(entries, indent=2)) return if not entries: console.print("No matching log entries.") return table = Table(title="Signatures Log") table.add_column("Timestamp") table.add_column("Actor") table.add_column("Type") table.add_column("Identity") table.add_column("TTL (h)") table.add_column("Valid Before (UTC)") table.add_column("Backend") for e in entries: table.add_row( e.get("timestamp", "")[:19], e.get("actor", ""), e.get("actor_type", ""), e.get("identity", ""), str(e.get("ttl_hours", "")), e.get("valid_before", "")[:19], e.get("backend", ""), ) console.print(table) # --------------------------------------------------------------------------- # warden route — read-only routing lookup over the pointer catalog # --------------------------------------------------------------------------- def _load_catalog(): from warden.routing import CatalogError, load_catalog try: return load_catalog() except CatalogError as e: err.print(f"[red]Routing catalog error:[/red] {e}") raise typer.Exit(1) def _entry_summary(entry) -> dict: """Pointer-only summary. Never includes secret material.""" return { "id": entry.id, "title": entry.title, "owner_repo": entry.owner_repo, "subsystem": entry.subsystem, "warden_executes": entry.warden_executes, "wiki_ref": entry.wiki_ref, "canon_ref": entry.canon_ref, "reviewed": entry.reviewed, "status": entry.status, } def _print_entry_table( entries, title: str, *, show_reviewed: bool = False, stale_threshold_days: int = 90 ) -> None: table = Table(title=title) table.add_column("ID") table.add_column("Need") table.add_column("Owner") table.add_column("warden") if show_reviewed: table.add_column("Reviewed") table.add_column("Days") table.add_column("Status") from warden.routing.catalog import days_since_review for e in entries: executes = "[green]issue[/green]" if e.warden_executes else "route" status_styled = e.status if e.status == "active" else f"[yellow]{e.status}[/yellow]" if show_reviewed: days = days_since_review(e.reviewed) reviewed_styled = ( f"[yellow]{e.reviewed}[/yellow]" if days > stale_threshold_days else e.reviewed ) table.add_row( e.id, e.title, e.owner_repo, executes, reviewed_styled, str(days), status_styled ) else: table.add_row(e.id, e.title, e.owner_repo, executes, status_styled) console.print(table) @route_app.command("list") def route_list( output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False, tag: Annotated[Optional[str], typer.Option("--tag", help="Filter by need keyword")] = None, stale_only: Annotated[ bool, typer.Option("--stale", help="Show entries past review cadence (see --stale-days)") ] = False, stale_days: Annotated[ int, typer.Option( "--stale-days", help="Days since reviewed before an entry is stale (default 90)", min=1, ), ] = 90, ) -> None: """List routing scenarios. Active-only unless --all.""" from warden.routing.catalog import days_since_review catalog = _load_catalog() if stale_only: entries = catalog.stale(include_draft=all_entries, threshold_days=stale_days) else: entries = catalog.listed(include_draft=all_entries) if tag: t = tag.lower() entries = [e for e in entries if t in [k.lower() for k in e.need_keywords]] if output_json: payload = [] for e in entries: row = _entry_summary(e) if stale_only: row["days_since_review"] = days_since_review(e.reviewed) row["stale_threshold_days"] = stale_days payload.append(row) print(json.dumps(payload, indent=2)) return if not entries: if stale_only: console.print(f"No stale routing entries (threshold: {stale_days} days since reviewed).") else: console.print("No matching routing entries.") return title = ( f"Stale routing scenarios (>{stale_days}d since reviewed)" if stale_only else "Routing scenarios" ) _print_entry_table( entries, title, show_reviewed=stale_only, stale_threshold_days=stale_days ) @route_app.command("show") def route_show( entry_id: Annotated[str, typer.Argument(help="Catalog entry id (see `warden route list`)")], output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, ) -> None: """Show owner, pointers, and (SSH only) the authored steps for one scenario.""" catalog = _load_catalog() entry = catalog.get(entry_id) if entry is None: err.print( f"[red]Unknown routing id {entry_id!r}.[/red] " f"Try: warden route find {entry_id!r}" ) raise typer.Exit(1) if output_json: summary = _entry_summary(entry) summary["need_keywords"] = entry.need_keywords if entry.warden_executes: summary["steps"] = entry.steps summary["cert_command"] = entry.cert_command else: summary["next_action"] = ( f"next action on `{entry.owner_repo}` — see `{entry.wiki_ref}`" ) print(json.dumps(summary, indent=2)) return console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])") console.print(f" owner : {entry.owner_repo} ({entry.subsystem})") console.print(f" wiki : {entry.wiki_ref}") console.print(f" canon : {entry.canon_ref}") console.print(f" reviewed : {entry.reviewed} status: {entry.status}") if entry.warden_executes: console.print("\n[green]ops-warden issues this directly.[/green]") console.print(f" cert_command: [bold]{entry.cert_command}[/bold]") if entry.steps: console.print(" steps:") for i, step in enumerate(entry.steps, 1): console.print(f" {i}. {step}") console.print( " precondition: actor in inventory? backend configured? run `warden status`." ) else: console.print( f"\n[yellow]ops-warden does not issue this.[/yellow] " f"Next action on [bold]{entry.owner_repo}[/bold] — see {entry.wiki_ref}." ) @route_app.command("find") def route_find( query: Annotated[str, typer.Argument(help="Free-text need, e.g. 'issue core api key'")], output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False, limit: Annotated[int, typer.Option("--limit", help="Max matches")] = 5, ) -> None: """Rank routing scenarios by keyword overlap with the query.""" catalog = _load_catalog() matches = catalog.find(query, include_draft=all_entries, limit=limit) if output_json: print(json.dumps([_entry_summary(e) for e in matches], indent=2)) return if not matches: console.print( f"No routing match for {query!r}. " "Try `warden route list --all` to browse all scenarios." ) return _print_entry_table(matches, f"Matches for {query!r}") # --------------------------------------------------------------------------- # warden access — operator front door (advisory; proxy lands in T3) # --------------------------------------------------------------------------- def _access_json(entry, expanded, gate: str, domain: Optional[str]) -> dict: """Stable, secret-free JSON shape for agentic operators. WP-0014 T2.""" payload = _entry_summary(entry) payload["domain"] = domain payload["policy_gate"] = gate payload["handoff"] = { "auth_method": expanded.auth_method, "path_template": expanded.path_template, "fetch_command": expanded.fetch_command, "policy_ref": expanded.policy_ref, "exec_capable": expanded.exec_capable, } if entry.warden_executes: payload["next_action"] = "ops-warden issues this directly — see cert_command" payload["cert_command"] = entry.cert_command else: payload["next_action"] = ( f"obtain from {entry.owner_repo} ({entry.subsystem}); " "ops-warden holds no value" ) return payload @app.command("access") def access( need: Annotated[str, typer.Argument(help="Free-text need, e.g. 'npm token', 'db password'")], domain: Annotated[ Optional[str], typer.Option("--domain", help="Substitute in path/auth templates, e.g. coulomb_social"), ] = None, output_json: Annotated[bool, typer.Option("--json", help="Output JSON (stable, secret-free)")] = False, all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False, ) -> None: """Operator front door: how to obtain any credential, gated and audited. Advisory by default — renders the owner, auth method, path template, command skeleton, and policy gate status for the best-matching need. ops-warden issues the SSH lane directly and **routes every other need to its owner** — it never holds or vends the secret value. (Proxy fetch arrives in WP-0014 T3.) """ from warden.access import expand_handoff, policy_gate_status catalog = _load_catalog() matches = catalog.find(need, include_draft=all_entries, limit=1) if not matches: err.print( f"[red]No access match for {need!r}.[/red] " "Try `warden route list --all` to browse, or rephrase the need." ) raise typer.Exit(1) entry = matches[0] expanded = expand_handoff(entry, domain) gate = policy_gate_status() if output_json: print(json.dumps(_access_json(entry, expanded, gate, domain), indent=2)) return console.print(f"[bold]{entry.title}[/bold] ([cyan]{entry.id}[/cyan])") console.print(f" owner : {entry.owner_repo} ({entry.subsystem})") if entry.warden_executes: console.print("\n[green]ops-warden issues this directly.[/green]") console.print(f" run : [bold]{entry.cert_command}[/bold]") if entry.steps: for i, step in enumerate(entry.steps, 1): console.print(f" {i}. {step}") return if expanded.auth_method: console.print(f" auth : {expanded.auth_method}") if expanded.path_template: console.print(f" path : {expanded.path_template}") if expanded.fetch_command: console.print(f" fetch : {expanded.fetch_command}") if expanded.policy_ref: console.print(f" policy : {expanded.policy_ref} [dim]({gate})[/dim]") console.print(f" wiki : {entry.wiki_ref}") console.print(f" canon : {entry.canon_ref}") if expanded.exec_capable: proxy = f"warden access {need!r}" if domain: proxy += f" --domain {domain}" console.print( f" proxy : [dim]{proxy} --fetch[/dim] " "[yellow](exec_capable; proxy ships in WP-0014 T3)[/yellow]" ) if expanded.path_template and "<" in expanded.path_template: console.print( " note : remaining <…> placeholders are owner-confirmed names " f"(coordinate with {entry.owner_repo})." ) console.print( f"\n[yellow]ops-warden does not hold this secret.[/yellow] " f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — " "warden advises, the owner vends." )