From 5ae6b988aa54cdc1d3161a44a8211e1b4003ab05 Mon Sep 17 00:00:00 2001 From: Bernd Worsch Date: Sat, 28 Mar 2026 00:45:43 +0000 Subject: [PATCH] Initial Commit --- CLAUDE.md | 8 + SCOPE.md | 129 ++++++ pyproject.toml | 34 ++ src/warden/__init__.py | 3 + src/warden/ca.py | 164 ++++++++ src/warden/cli.py | 397 ++++++++++++++++++ src/warden/config.py | 114 +++++ src/warden/inventory.py | 108 +++++ src/warden/models.py | 67 +++ src/warden/scorecard.py | 98 +++++ src/warden/scripts/__init__.py | 0 src/warden/scripts/ops_ssh_wrapper.py | 82 ++++ src/warden/vault.py | 97 +++++ tests/__init__.py | 0 tests/test_ca.py | 180 ++++++++ tests/test_config.py | 84 ++++ tests/test_inventory.py | 87 ++++ tests/test_models.py | 67 +++ tests/test_scorecard.py | 100 +++++ wiki/AccessManagementDirective.md | 203 +++++++++ wiki/CertCommandInterface.md | 105 +++++ wiki/OpsWardenConfig.md | 147 +++++++ .../WARDEN-WP-0001-initial-implementation.md | 126 ++++++ 23 files changed, 2400 insertions(+) create mode 100644 CLAUDE.md create mode 100644 SCOPE.md create mode 100644 pyproject.toml create mode 100644 src/warden/__init__.py create mode 100644 src/warden/ca.py create mode 100644 src/warden/cli.py create mode 100644 src/warden/config.py create mode 100644 src/warden/inventory.py create mode 100644 src/warden/models.py create mode 100644 src/warden/scorecard.py create mode 100644 src/warden/scripts/__init__.py create mode 100644 src/warden/scripts/ops_ssh_wrapper.py create mode 100644 src/warden/vault.py create mode 100644 tests/__init__.py create mode 100644 tests/test_ca.py create mode 100644 tests/test_config.py create mode 100644 tests/test_inventory.py create mode 100644 tests/test_models.py create mode 100644 tests/test_scorecard.py create mode 100644 wiki/AccessManagementDirective.md create mode 100644 wiki/CertCommandInterface.md create mode 100644 wiki/OpsWardenConfig.md create mode 100644 workplans/WARDEN-WP-0001-initial-implementation.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..8e22989 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,8 @@ +# ops-warden — Claude Code Instructions + +@.claude/rules/repo-identity.md +@.claude/rules/session-protocol.md +@.claude/rules/workplan-convention.md +@.claude/rules/stack-and-commands.md +@.claude/rules/architecture.md +@.claude/rules/repo-boundary.md diff --git a/SCOPE.md b/SCOPE.md new file mode 100644 index 0000000..60928ad --- /dev/null +++ b/SCOPE.md @@ -0,0 +1,129 @@ +# SCOPE + +> This file helps you quickly understand what this repository is about, +> when it is relevant, and when it is not. +> It is intentionally lightweight and may be incomplete. + +--- + +## One-liner + +SSH Certificate Authority and credential issuance for the ops fleet — signs short-lived +certificates for `adm`/`agt`/`atm` actors; provides the `cert_command` interface consumed +by ops-bridge and other tooling. + +--- + +## Core Idea + +Implements `wiki/AccessManagementDirective.md` §§1–5. Owns the CA key, actor identity +inventory, signing logic, and scorecard. Two backends: `local` (ssh-keygen, for labs / +non-Vault use) and `vault` (HashiCorp Vault SSH engine, for production). Both expose the +same CLI surface and the same `cert_command` interface — callers never need to know which +backend is in use. + +--- + +## In Scope + +- Local CA backend (`ssh-keygen -s`) — fully functional without Vault +- Vault SSH engine backend — production-grade signing via Vault API +- Actor identity registry (`inventory.yaml`) — maps actors to principals and TTL policy +- `cert_command` interface: `warden sign --pubkey ` → cert text on stdout +- TTL policy enforcement per `ActorType` (`adm` 48 h, `agt` 24 h, `atm` 8 h) +- Certificate status inspection (`warden status`) +- Stale-cert cleanup and scorecard checks (cert-side; see §5 of directive) +- `warden issue` — generate keypair + sign in one step (for `agt`/`atm` actors) +- `ops-ssh-wrapper` script — wraps SSH commands with automatic cert acquisition + +--- + +## Out of Scope + +- Tunnel lifecycle management → `ops-bridge` +- Host-side principal deployment (`/etc/ssh/auth_principals/`) → `railiance-infra` Ansible +- SSH key generation for human admins (self-service: `ssh-keygen`) +- Vault cluster setup, HA, or PKI secrets engine +- Session recording, SIEM forwarding, audit log aggregation +- SSO / Teleport integration (trigger when §6.2 scale thresholds are hit) +- Host-side scorecard checks (password auth disabled, root login disabled) → `railiance-infra` + +--- + +## Relevant When + +- Issuing or refreshing a cert for any `adm`/`agt`/`atm` actor +- Checking cert validity or running the compliance scorecard +- `ops-bridge` needs a `cert_command` to be defined for a tunnel +- Adding a new actor to the principals inventory +- Bootstrapping the CA for a new environment + +--- + +## Not Relevant When + +- Managing tunnel lifecycle (→ `ops-bridge`) +- Deploying SSH principal config to hosts (→ `railiance-infra`) +- All access is via static keys with no TTL (ops-bridge static key mode handles this) +- Human admins manually managing their own certificates + +--- + +## Current State + +- Status: planned — WARDEN-WP-0001 not yet started +- Implementation: scaffolding only (models, config, CA, inventory, scorecard, CLI stubs) + +--- + +## How It Fits + +- Upstream: CA key (file or Vault); actor inventory in Git +- Downstream consumers: `ops-bridge` calls `warden sign` via `cert_command`; any other + tool needing short-lived SSH certs can use the same interface +- Often used with: `ops-bridge` (primary consumer), `railiance-infra` (host-side principal sync) + +--- + +## Terminology + +- `ActorType`: `adm` (human operator), `agt` (LLM agent), `atm` (deterministic automation) +- `cert_command`: shell command that a caller (e.g. ops-bridge) runs to obtain a cert +- `CertSpec`: signing request (actor name, pubkey path, TTL, principals) +- `CertRecord`: result of signing (identity, valid_before, cert_path, signed_at) +- `principals`: SSH roles embedded in the cert, matched against `/etc/ssh/auth_principals/%u` +- `inventory.yaml`: authoritative registry of actor → principals + TTL policy +- `LocalCA`: file-based CA backend using `ssh-keygen -s` +- `VaultCA`: Vault SSH engine backend + +--- + +## Related / Overlapping Repositories + +- `ops-bridge` — primary consumer; calls `warden sign` via `cert_command` in tunnel config +- `railiance-infra` — owns host-side principal deployment and host-side scorecard checks +- `the-custodian/state-hub` — domain/workstream registry + +--- + +## Provided Capabilities + +```capability +type: security +title: SSH certificate issuance +description: Issues short-lived CA-signed SSH certificates for adm/agt/atm actors via a + pluggable cert_command interface; supports local CA (ssh-keygen) and Vault SSH engine backends. +keywords: [ssh, certificate, ca, credential, warden, ops-warden, pki, vault] +``` + +--- + +## Getting Oriented + +- Start with: `SCOPE.md` (this file), then `wiki/AccessManagementDirective.md` +- Config reference: `wiki/OpsWardenConfig.md` +- cert_command contract: `wiki/CertCommandInterface.md` +- Config files: `~/.config/warden/warden.yaml`, `~/.config/warden/inventory.yaml` +- State: `~/.local/state/warden/` (certs, generated keypairs) +- Entry point: `warden --help` +- Workplan: `workplans/WARDEN-WP-0001-initial-implementation.md` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9f84eff --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,34 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ops-warden" +version = "0.1.0" +description = "SSH CA and certificate lifecycle manager for ops actors" +requires-python = ">=3.11" +dependencies = [ + "typer[all]>=0.12", + "pyyaml>=6.0", + "httpx>=0.27", +] + +[project.scripts] +warden = "warden.cli:app" +ops-ssh-wrapper = "warden.scripts.ops_ssh_wrapper:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/warden"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] + +[tool.ruff] +line-length = 88 + +[dependency-groups] +dev = [ + "pytest>=8.0", + "ruff>=0.4", +] diff --git a/src/warden/__init__.py b/src/warden/__init__.py new file mode 100644 index 0000000..4525f1e --- /dev/null +++ b/src/warden/__init__.py @@ -0,0 +1,3 @@ +"""OpsWarden — SSH CA and certificate lifecycle manager.""" + +__version__ = "0.1.0" diff --git a/src/warden/ca.py b/src/warden/ca.py new file mode 100644 index 0000000..9da2f63 --- /dev/null +++ b/src/warden/ca.py @@ -0,0 +1,164 @@ +"""CA backends for OpsWarden: LocalCA (ssh-keygen) and abstract base.""" +from __future__ import annotations + +import os +import shutil +import subprocess +import tempfile +from abc import ABC, abstractmethod +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Optional + +from warden.models import CertRecord, CertSpec + + +class CAError(Exception): + """Raised when a CA operation fails.""" + + +class CABackend(ABC): + @abstractmethod + def sign(self, spec: CertSpec) -> CertRecord: + """Sign the public key in spec and return a CertRecord.""" + ... + + +def parse_cert_metadata(cert_path: Path) -> dict: + """Parse ssh-keygen -L output into identity, valid_before, and principals. + + Note: ssh-keygen displays timestamps without explicit timezone; we treat them + as UTC, consistent with how ssh-keygen internally stores certificate validity. + """ + result = subprocess.run( + ["ssh-keygen", "-L", "-f", str(cert_path)], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise CAError(f"ssh-keygen -L failed: {result.stderr.strip()}") + + identity: Optional[str] = None + valid_before: Optional[datetime] = None + principals: List[str] = [] + in_principals = False + + for line in result.stdout.splitlines(): + stripped = line.strip() + if stripped.startswith("Key ID:"): + # Key ID: "agt-state-hub-bridge" + raw = stripped.split(":", 1)[1].strip() + identity = raw.strip('"') + elif stripped.startswith("Valid:"): + # Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00 + parts = stripped.split(" to ", 1) + if len(parts) == 2: + ts_str = parts[1].strip() + try: + dt = datetime.fromisoformat(ts_str) + valid_before = dt.replace(tzinfo=timezone.utc) + except ValueError: + pass + elif stripped == "Principals:": + in_principals = True + elif in_principals: + if stripped and not stripped.endswith(":") and stripped != "(none)": + principals.append(stripped) + else: + in_principals = False + + if valid_before is None: + raise CAError( + f"Could not parse valid_before from cert at {cert_path}. " + f"Ensure the cert has a valid TTL." + ) + + return { + "identity": identity or "", + "valid_before": valid_before, + "principals": principals, + } + + +class LocalCA(CABackend): + """File-based CA using ssh-keygen. Requires the CA private key on disk.""" + + def __init__(self, ca_key: Path, state_dir: Path) -> None: + self._ca_key = Path(os.path.expanduser(str(ca_key))) + self._state_dir = Path(os.path.expanduser(str(state_dir))) + + def sign(self, spec: CertSpec) -> CertRecord: + """Sign the public key in spec. Returns a CertRecord; cert saved to state_dir.""" + pubkey = Path(os.path.expanduser(str(spec.pubkey_path))) + if not pubkey.exists(): + raise CAError(f"Public key not found: {pubkey}") + if not self._ca_key.exists(): + raise CAError(f"CA key not found: {self._ca_key}") + + principals_str = ",".join(spec.principals) + + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir) + pubkey_copy = tmpdir_path / "key.pub" + shutil.copy2(pubkey, pubkey_copy) + # ssh-keygen -s writes cert to -cert.pub + cert_path_tmp = tmpdir_path / "key-cert.pub" + + cmd = [ + "ssh-keygen", + "-s", str(self._ca_key), + "-I", spec.identity, + "-n", principals_str, + "-V", f"+{spec.ttl_hours}h", + str(pubkey_copy), + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise CAError(f"Signing failed: {result.stderr.strip()}") + + if not cert_path_tmp.exists(): + raise CAError( + f"Expected cert not written after signing: {cert_path_tmp}. " + f"ssh-keygen stderr: {result.stderr.strip()}" + ) + + meta = parse_cert_metadata(cert_path_tmp) + + self._state_dir.mkdir(parents=True, exist_ok=True) + dest = self._state_dir / f"{spec.actor_name}-cert.pub" + shutil.copy2(cert_path_tmp, dest) + + return CertRecord( + identity=meta["identity"] or spec.identity, + valid_before=meta["valid_before"], + cert_path=dest, + signed_at=datetime.now(timezone.utc), + principals=meta["principals"], + actor_name=spec.actor_name, + ) + + def generate_keypair(self, actor_name: str) -> tuple[Path, Path]: + """Generate an ed25519 keypair for an actor. + + Returns (privkey_path, pubkey_path). Overwrites existing files. + """ + key_dir = self._state_dir / "keys" + key_dir.mkdir(parents=True, exist_ok=True) + privkey = key_dir / f"{actor_name}_ed25519" + pubkey = key_dir / f"{actor_name}_ed25519.pub" + + for p in (privkey, pubkey): + if p.exists(): + p.unlink() + + cmd = [ + "ssh-keygen", "-t", "ed25519", + "-f", str(privkey), + "-N", "", # no passphrase + "-C", actor_name, + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise CAError(f"Key generation failed: {result.stderr.strip()}") + + return privkey, pubkey diff --git a/src/warden/cli.py b/src/warden/cli.py new file mode 100644 index 0000000..dcc98b4 --- /dev/null +++ b/src/warden/cli.py @@ -0,0 +1,397 @@ +"""OpsWarden CLI.""" +from __future__ import annotations + +import json +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Annotated, List, Optional + +import typer +from rich.console import Console +from rich.table import Table + +from warden.ca import CAError, LocalCA, parse_cert_metadata +from warden.config import ConfigError, WardenConfig, load_config +from warden.inventory import ActorEntry, InventoryError, PrincipalsInventory, load_inventory, save_inventory +from warden.models import ActorType, CertSpec, DEFAULT_TTL_HOURS, validate_actor_name +from warden.scorecard import run_scorecard + +app = typer.Typer( + help="OpsWarden — SSH CA and certificate lifecycle manager", + no_args_is_help=True, +) +inventory_app = typer.Typer(help="Manage principals inventory", no_args_is_help=True) +app.add_typer(inventory_app, name="inventory") + +console = Console() +err = Console(stderr=True) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _load_cfg() -> WardenConfig: + try: + return load_config() + except ConfigError as e: + err.print(f"[red]Config error:[/red] {e}") + raise typer.Exit(1) + + +def _load_inventory(cfg: WardenConfig) -> PrincipalsInventory: + try: + return load_inventory(cfg.inventory_path) + except InventoryError as e: + err.print(f"[red]Inventory error:[/red] {e}") + raise typer.Exit(1) + + +def _get_ca(cfg: WardenConfig): + if cfg.backend == "vault": + from warden.vault import VaultCA + return VaultCA(cfg.vault, cfg.state_dir) + return LocalCA(cfg.ca_key, cfg.state_dir) + + +# --------------------------------------------------------------------------- +# warden sign +# --------------------------------------------------------------------------- + +@app.command() +def sign( + actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")], + pubkey: Annotated[Path, typer.Option("--pubkey", help="Path to actor's public key file")], + ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None, +) -> None: + """Sign a public key for the given actor. Writes cert text to stdout. + + This is the cert_command interface: ops-bridge calls this and uses stdout + as the certificate passed to SSH alongside the private key. + """ + cfg = _load_cfg() + inventory = _load_inventory(cfg) + + entry = inventory.actors.get(actor_name) + if entry is None: + err.print( + f"[red]Actor {actor_name!r} not found in inventory.[/red] " + f"Add it with: warden inventory add" + ) + raise typer.Exit(1) + + spec = CertSpec( + actor_name=actor_name, + actor_type=entry.actor_type, + pubkey_path=pubkey, + ttl_hours=ttl or entry.ttl_hours, + principals=entry.principals, + identity=actor_name, + ) + + ca = _get_ca(cfg) + try: + record = ca.sign(spec) + except CAError as e: + err.print(f"[red]Signing failed:[/red] {e}") + raise typer.Exit(1) + + # cert_command interface: write cert text to stdout only + print(record.cert_path.read_text().strip()) + + +# --------------------------------------------------------------------------- +# warden issue +# --------------------------------------------------------------------------- + +@app.command() +def issue( + actor_name: Annotated[str, typer.Argument(help="Actor name")], + ttl: Annotated[Optional[int], typer.Option("--ttl", help="Override TTL in hours")] = None, + output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, +) -> None: + """Generate a new keypair and sign it for the given actor. + + Only supported with the local backend. Outputs keypair + cert paths and metadata. + """ + cfg = _load_cfg() + + if cfg.backend != "local": + err.print("[red]warden issue is only supported with the local backend.[/red]") + raise typer.Exit(1) + + inventory = _load_inventory(cfg) + entry = inventory.actors.get(actor_name) + if entry is None: + err.print(f"[red]Actor {actor_name!r} not found in inventory.[/red]") + raise typer.Exit(1) + + ca = LocalCA(cfg.ca_key, cfg.state_dir) + try: + privkey_path, pubkey_path = ca.generate_keypair(actor_name) + except CAError as e: + err.print(f"[red]Key generation failed:[/red] {e}") + raise typer.Exit(1) + + spec = CertSpec( + actor_name=actor_name, + actor_type=entry.actor_type, + pubkey_path=pubkey_path, + ttl_hours=ttl or entry.ttl_hours, + principals=entry.principals, + identity=actor_name, + ) + try: + record = ca.sign(spec) + except CAError as e: + err.print(f"[red]Signing failed:[/red] {e}") + raise typer.Exit(1) + + result = { + "actor": actor_name, + "privkey": str(privkey_path), + "cert": str(record.cert_path), + "identity": record.identity, + "principals": record.principals, + "valid_before": record.valid_before.isoformat(), + "signed_at": record.signed_at.isoformat(), + } + + if output_json: + print(json.dumps(result, indent=2)) + else: + console.print(f"[green]Issued credentials for {actor_name}[/green]") + for k, v in result.items(): + console.print(f" {k}: {v}") + + +# --------------------------------------------------------------------------- +# warden status +# --------------------------------------------------------------------------- + +@app.command() +def status( + actor_name: Annotated[Optional[str], typer.Argument(help="Actor name (omit for all)")] = None, + output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, +) -> None: + """Show certificate status. Exits 1 if any cert is expired.""" + cfg = _load_cfg() + now = datetime.now(timezone.utc) + + if actor_name: + cert_path = cfg.state_dir / f"{actor_name}-cert.pub" + paths = [cert_path] if cert_path.exists() else [] + else: + paths = sorted(cfg.state_dir.glob("*-cert.pub")) if cfg.state_dir.exists() else [] + + if not paths: + msg = ( + f"No certificate found for {actor_name!r} (static key / no cert)" + if actor_name + else "No certificates in state dir." + ) + console.print(msg) + return + + rows = [] + for cert_path in paths: + name = cert_path.stem.replace("-cert", "") + try: + meta = parse_cert_metadata(cert_path) + valid_before = meta["valid_before"] + remaining = valid_before - now + secs = remaining.total_seconds() + if secs > 0: + h, rem = divmod(int(secs), 3600) + m = rem // 60 + remaining_str = f"{h}h {m}m" + expired = False + else: + remaining_str = "EXPIRED" + expired = True + rows.append({ + "actor": name, + "identity": meta["identity"], + "principals": ", ".join(meta["principals"]), + "valid_before": valid_before.isoformat(), + "remaining": remaining_str, + "expired": expired, + }) + except Exception as e: + rows.append({"actor": name, "error": str(e), "expired": False}) + + if output_json: + print(json.dumps(rows, indent=2)) + else: + table = Table(title="Certificate Status") + table.add_column("Actor") + table.add_column("Identity") + table.add_column("Principals") + table.add_column("Valid Before (UTC)") + table.add_column("Remaining") + for row in rows: + if "error" in row: + table.add_row(row["actor"], "[red]parse error[/red]", "", "", row["error"]) + else: + rem_styled = ( + f"[red]{row['remaining']}[/red]" if row["expired"] else row["remaining"] + ) + table.add_row( + row["actor"], + row["identity"], + row["principals"], + row["valid_before"], + rem_styled, + ) + console.print(table) + + if any(r.get("expired") for r in rows): + raise typer.Exit(1) + + +# --------------------------------------------------------------------------- +# warden scorecard +# --------------------------------------------------------------------------- + +@app.command() +def scorecard( + output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, +) -> None: + """Run compliance scorecard checks (AccessManagementDirective §5, cert-side).""" + cfg = _load_cfg() + inventory = _load_inventory(cfg) + + results = run_scorecard(cfg.state_dir, inventory) + passed = sum(1 for r in results if r.passed) + total = len(results) + + if output_json: + print(json.dumps( + [{"check": r.name, "passed": r.passed, "detail": r.detail} for r in results], + indent=2, + )) + else: + table = Table(title=f"OpsWarden Scorecard ({passed}/{total})") + table.add_column("Check") + table.add_column("Status") + table.add_column("Detail") + for r in results: + status_str = "[green]PASS[/green]" if r.passed else "[red]FAIL[/red]" + table.add_row(r.name, status_str, r.detail) + console.print(table) + console.print( + f"\nScore: {passed}/{total} " + + ("[green]Operational[/green]" if passed == total else "[yellow]Needs attention[/yellow]") + ) + + if passed < total: + raise typer.Exit(1) + + +# --------------------------------------------------------------------------- +# warden inventory +# --------------------------------------------------------------------------- + +@inventory_app.command("list") +def inventory_list( + output_json: Annotated[bool, typer.Option("--json")] = False, +) -> None: + """List all actors in the principals inventory.""" + cfg = _load_cfg() + inventory = _load_inventory(cfg) + + if not inventory.actors: + console.print("No actors in inventory.") + return + + if output_json: + print(json.dumps({ + name: { + "type": e.actor_type.value, + "principals": e.principals, + "ttl_hours": e.ttl_hours, + "description": e.description, + } + for name, e in inventory.actors.items() + }, indent=2)) + return + + table = Table(title=f"Principals Inventory ({cfg.inventory_path})") + table.add_column("Actor") + table.add_column("Type") + table.add_column("Principals") + table.add_column("TTL (h)") + table.add_column("Description") + for name, e in inventory.actors.items(): + table.add_row( + name, + e.actor_type.value, + ", ".join(e.principals), + str(e.ttl_hours), + e.description, + ) + console.print(table) + + +@inventory_app.command("add") +def inventory_add( + actor_name: Annotated[str, typer.Argument(help="Actor name (e.g. agt-state-hub-bridge)")], + actor_type: Annotated[ActorType, typer.Option("--type", "-t", help="adm | agt | atm")], + principals: Annotated[ + Optional[List[str]], + typer.Option("--principal", "-p", help="Principal (repeat for multiple)"), + ] = None, + ttl: Annotated[Optional[int], typer.Option("--ttl", help="TTL in hours")] = None, + description: Annotated[str, typer.Option("--description", "-d")] = "", +) -> None: + """Add an actor to the principals inventory.""" + cfg = _load_cfg() + + try: + validate_actor_name(actor_name, actor_type) + except ValueError as e: + err.print(f"[red]{e}[/red]") + raise typer.Exit(1) + + resolved_principals: List[str] = principals or [actor_name] + inventory = _load_inventory(cfg) + inventory.actors[actor_name] = ActorEntry( + name=actor_name, + actor_type=actor_type, + principals=resolved_principals, + ttl_hours=ttl or DEFAULT_TTL_HOURS[actor_type], + description=description, + ) + try: + save_inventory(inventory, cfg.inventory_path) + except Exception as e: + err.print(f"[red]Failed to save inventory:[/red] {e}") + raise typer.Exit(1) + + console.print( + f"[green]Added[/green] {actor_name} " + f"(type={actor_type.value}, principals={resolved_principals}, ttl={ttl or DEFAULT_TTL_HOURS[actor_type]}h)" + ) + + +@inventory_app.command("remove") +def inventory_remove( + actor_name: Annotated[str, typer.Argument(help="Actor name to remove")], +) -> None: + """Remove an actor from the principals inventory.""" + cfg = _load_cfg() + inventory = _load_inventory(cfg) + + if actor_name not in inventory.actors: + err.print(f"[red]Actor {actor_name!r} not in inventory.[/red]") + raise typer.Exit(1) + + del inventory.actors[actor_name] + try: + save_inventory(inventory, cfg.inventory_path) + except Exception as e: + err.print(f"[red]Failed to save inventory:[/red] {e}") + raise typer.Exit(1) + + console.print(f"[green]Removed[/green] {actor_name}") diff --git a/src/warden/config.py b/src/warden/config.py new file mode 100644 index 0000000..8704fb7 --- /dev/null +++ b/src/warden/config.py @@ -0,0 +1,114 @@ +"""Config loading for OpsWarden.""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Optional + +import yaml + + +class ConfigError(Exception): + """Raised when config is invalid or missing.""" + + +@dataclass +class VaultConfig: + addr: str + role_map: Dict[str, str] # ActorType.value -> vault role name + token_env: str = "VAULT_TOKEN" # env var holding the Vault token + mount: str = "ssh" # Vault secrets engine mount path + + +@dataclass +class WardenConfig: + backend: str # "local" or "vault" + ca_key: Optional[Path] = None # required for local backend + vault: Optional[VaultConfig] = None # required for vault backend + inventory_path: Path = field( + default_factory=lambda: Path.home() / ".config" / "warden" / "inventory.yaml" + ) + state_dir: Path = field( + default_factory=lambda: Path.home() / ".local" / "state" / "warden" + ) + + +def _default_config_path() -> Path: + return Path.home() / ".config" / "warden" / "warden.yaml" + + +def load_config(path: Optional[Path] = None) -> WardenConfig: + """Load and validate warden.yaml. Respects WARDEN_CONFIG env var.""" + config_path = path or Path( + os.environ.get("WARDEN_CONFIG", str(_default_config_path())) + ) + if not config_path.exists(): + raise ConfigError(f"Config not found: {config_path}") + + try: + with config_path.open() as f: + raw = yaml.safe_load(f) + except yaml.YAMLError as e: + raise ConfigError(f"Invalid YAML in {config_path}: {e}") from e + + if not isinstance(raw, dict): + raise ConfigError("Config must be a YAML mapping") + + backend = str(raw.get("backend", "local")) + if backend not in ("local", "vault"): + raise ConfigError( + f"backend must be 'local' or 'vault', got: {backend!r}" + ) + + ca_key = None + if "ca_key" in raw and raw["ca_key"]: + ca_key = Path(os.path.expanduser(str(raw["ca_key"]))) + + vault_cfg = None + if backend == "vault": + v = raw.get("vault") or {} + if "addr" not in v: + raise ConfigError("vault backend requires vault.addr") + role_map = v.get("role_map") or { + "adm": "adm-role", + "agt": "agt-role", + "atm": "atm-role", + } + vault_cfg = VaultConfig( + addr=str(v["addr"]), + role_map=dict(role_map), + token_env=str(v.get("token_env", "VAULT_TOKEN")), + mount=str(v.get("mount", "ssh")), + ) + elif backend == "local" and ca_key is None: + raise ConfigError("local backend requires ca_key") + + inventory_path = Path( + os.path.expanduser( + str( + raw.get( + "inventory_path", + str(Path.home() / ".config" / "warden" / "inventory.yaml"), + ) + ) + ) + ) + state_dir = Path( + os.path.expanduser( + str( + raw.get( + "state_dir", + str(Path.home() / ".local" / "state" / "warden"), + ) + ) + ) + ) + + return WardenConfig( + backend=backend, + ca_key=ca_key, + vault=vault_cfg, + inventory_path=inventory_path, + state_dir=state_dir, + ) diff --git a/src/warden/inventory.py b/src/warden/inventory.py new file mode 100644 index 0000000..ef60ba3 --- /dev/null +++ b/src/warden/inventory.py @@ -0,0 +1,108 @@ +"""Principals inventory — actor registry with type, principals, and TTL policy.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List + +import yaml + +from warden.models import ActorType, DEFAULT_TTL_HOURS, validate_actor_name + + +class InventoryError(Exception): + """Raised when inventory is invalid.""" + + +@dataclass +class ActorEntry: + name: str + actor_type: ActorType + principals: List[str] + ttl_hours: int + description: str = "" + + +@dataclass +class HostEntry: + name: str + allowed_principals: Dict[str, List[str]] # actor_type.value -> [principal, ...] + + +@dataclass +class PrincipalsInventory: + actors: Dict[str, ActorEntry] = field(default_factory=dict) + hosts: Dict[str, HostEntry] = field(default_factory=dict) + + +def load_inventory(path: Path) -> PrincipalsInventory: + """Load inventory.yaml. Returns empty inventory if path does not exist.""" + if not path.exists(): + return PrincipalsInventory() + + try: + with path.open() as f: + raw = yaml.safe_load(f) or {} + except yaml.YAMLError as e: + raise InventoryError(f"Invalid YAML in {path}: {e}") from e + + actors: Dict[str, ActorEntry] = {} + for name, data in (raw.get("actors") or {}).items(): + if not isinstance(data, dict): + raise InventoryError(f"Actor {name!r} must be a mapping") + type_raw = str(data.get("type", "")) + try: + actor_type = ActorType(type_raw) + except ValueError: + raise InventoryError( + f"Actor {name!r} has invalid type: {type_raw!r}. " + f"Must be one of: adm, agt, atm" + ) + try: + validate_actor_name(name, actor_type) + except ValueError as e: + raise InventoryError(str(e)) from e + + ttl = int(data.get("ttl_hours", DEFAULT_TTL_HOURS[actor_type])) + principals = list(data.get("principals") or [name]) + actors[name] = ActorEntry( + name=name, + actor_type=actor_type, + principals=principals, + ttl_hours=ttl, + description=str(data.get("description", "")), + ) + + hosts: Dict[str, HostEntry] = {} + for hostname, data in (raw.get("hosts") or {}).items(): + if not isinstance(data, dict): + raise InventoryError(f"Host {hostname!r} must be a mapping") + hosts[hostname] = HostEntry( + name=hostname, + allowed_principals=dict(data.get("allowed_principals") or {}), + ) + + return PrincipalsInventory(actors=actors, hosts=hosts) + + +def save_inventory(inventory: PrincipalsInventory, path: Path) -> None: + """Write inventory to path, creating parent directories as needed.""" + path.parent.mkdir(parents=True, exist_ok=True) + raw: dict = { + "actors": { + name: { + "type": e.actor_type.value, + "principals": e.principals, + "ttl_hours": e.ttl_hours, + **({"description": e.description} if e.description else {}), + } + for name, e in inventory.actors.items() + }, + } + if inventory.hosts: + raw["hosts"] = { + name: {"allowed_principals": h.allowed_principals} + for name, h in inventory.hosts.items() + } + with path.open("w") as f: + yaml.dump(raw, f, default_flow_style=False, sort_keys=False) diff --git a/src/warden/models.py b/src/warden/models.py new file mode 100644 index 0000000..2e386f0 --- /dev/null +++ b/src/warden/models.py @@ -0,0 +1,67 @@ +"""Domain models for OpsWarden.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import List + + +class ActorType(str, Enum): + ADM = "adm" # human operator + AGT = "agt" # LLM-powered autonomous agent + ATM = "atm" # deterministic script / pipeline + + +# Default certificate TTLs per ActorType (AccessManagementDirective §2) +DEFAULT_TTL_HOURS: dict[ActorType, int] = { + ActorType.ADM: 48, + ActorType.AGT: 24, + ActorType.ATM: 8, +} + +# Required name prefixes per ActorType (directive §2 naming convention) +ACTOR_PREFIX: dict[ActorType, str] = { + ActorType.ADM: "adm-", + ActorType.AGT: "agt-", + ActorType.ATM: "atm-", +} + + +def validate_actor_name(name: str, actor_type: ActorType) -> None: + """Raise ValueError if name does not carry the required prefix for actor_type.""" + prefix = ACTOR_PREFIX[actor_type] + if not name.startswith(prefix): + raise ValueError( + f"Actor name {name!r} must start with {prefix!r} for type {actor_type.value!r}. " + f"(AccessManagementDirective §2 naming convention)" + ) + + +@dataclass +class CertSpec: + """Signing request passed to a CABackend.""" + + actor_name: str + actor_type: ActorType + pubkey_path: Path + ttl_hours: int + principals: List[str] + identity: str = "" # defaults to actor_name if empty + + def __post_init__(self) -> None: + if not self.identity: + self.identity = self.actor_name + + +@dataclass +class CertRecord: + """Result returned by a CABackend after signing.""" + + identity: str + valid_before: datetime + cert_path: Path + signed_at: datetime + principals: List[str] = field(default_factory=list) + actor_name: str = "" diff --git a/src/warden/scorecard.py b/src/warden/scorecard.py new file mode 100644 index 0000000..caa5b95 --- /dev/null +++ b/src/warden/scorecard.py @@ -0,0 +1,98 @@ +"""Compliance scorecard — cert-side checks (AccessManagementDirective §5).""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import List + +from warden.ca import CAError, parse_cert_metadata +from warden.inventory import PrincipalsInventory +from warden.models import ACTOR_PREFIX, ActorType + + +@dataclass +class CheckResult: + name: str + passed: bool + detail: str = "" + + +def check_actor_name_prefixes(inventory: PrincipalsInventory) -> CheckResult: + """All actor names must carry the prefix matching their type.""" + violations = [] + for name, entry in inventory.actors.items(): + expected = ACTOR_PREFIX[entry.actor_type] + if not name.startswith(expected): + violations.append(f"{name!r} should start with {expected!r}") + return CheckResult( + name="actor_name_prefixes", + passed=len(violations) == 0, + detail=( + "; ".join(violations) if violations else "all actor names match prefix convention" + ), + ) + + +def check_all_actors_have_principals(inventory: PrincipalsInventory) -> CheckResult: + """Every actor in inventory must have at least one principal.""" + missing = [name for name, e in inventory.actors.items() if not e.principals] + return CheckResult( + name="actors_have_principals", + passed=len(missing) == 0, + detail=f"missing principals: {missing}" if missing else "all actors have principals", + ) + + +def check_no_expired_certs(state_dir: Path) -> CheckResult: + """No cert in state_dir should be currently expired.""" + if not state_dir.exists(): + return CheckResult("no_expired_certs", passed=True, detail="no state dir") + + now = datetime.now(timezone.utc) + expired = [] + for cert_path in state_dir.glob("*-cert.pub"): + try: + meta = parse_cert_metadata(cert_path) + except CAError: + continue + if meta["valid_before"] < now: + expired.append(cert_path.stem.replace("-cert", "")) + + return CheckResult( + name="no_expired_certs", + passed=len(expired) == 0, + detail=f"expired: {expired}" if expired else "no expired certs", + ) + + +def check_no_stale_certs(state_dir: Path) -> CheckResult: + """Certs expired by more than 5 minutes should have been cleaned up.""" + if not state_dir.exists(): + return CheckResult("no_stale_certs", passed=True, detail="no state dir") + + cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) + stale = [] + for cert_path in state_dir.glob("*-cert.pub"): + try: + meta = parse_cert_metadata(cert_path) + except CAError: + continue + if meta["valid_before"] < cutoff: + stale.append(cert_path.name) + + return CheckResult( + name="no_stale_certs", + passed=len(stale) == 0, + detail=f"stale certs present: {stale}" if stale else "no stale certs", + ) + + +def run_scorecard(state_dir: Path, inventory: PrincipalsInventory) -> List[CheckResult]: + """Run all cert-side scorecard checks. Returns list of CheckResult.""" + return [ + check_actor_name_prefixes(inventory), + check_all_actors_have_principals(inventory), + check_no_expired_certs(state_dir), + check_no_stale_certs(state_dir), + ] diff --git a/src/warden/scripts/__init__.py b/src/warden/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/warden/scripts/ops_ssh_wrapper.py b/src/warden/scripts/ops_ssh_wrapper.py new file mode 100644 index 0000000..5928a9e --- /dev/null +++ b/src/warden/scripts/ops_ssh_wrapper.py @@ -0,0 +1,82 @@ +"""ops-ssh-wrapper — acquire a warden cert and exec the given SSH command. + +Usage: + WARDEN_ACTOR=agt-my-agent SSH_PUBKEY=~/.ssh/agt-my-agent_ed25519.pub \\ + ops-ssh-wrapper ssh -R 8001:127.0.0.1:8000 agt-my-agent@host + +Environment: + WARDEN_ACTOR Actor name in the warden inventory (e.g. agt-state-hub-bridge) + SSH_PUBKEY Path to the actor's SSH public key file + +The wrapper requests a fresh cert from warden on every invocation, loads it into +ssh-agent, then execs the given command. Equivalent to the pattern in +AccessManagementDirective §4.1, hardened for production use. +""" +from __future__ import annotations + +import os +import subprocess +import sys +import tempfile +from pathlib import Path + + +def main() -> None: + actor = os.environ.get("WARDEN_ACTOR") + pubkey = os.environ.get("SSH_PUBKEY") + + if not actor: + print("ops-ssh-wrapper: WARDEN_ACTOR not set", file=sys.stderr) + sys.exit(1) + if not pubkey: + print("ops-ssh-wrapper: SSH_PUBKEY not set", file=sys.stderr) + sys.exit(1) + + pubkey_path = Path(os.path.expanduser(pubkey)) + if not pubkey_path.exists(): + print(f"ops-ssh-wrapper: SSH_PUBKEY not found: {pubkey_path}", file=sys.stderr) + sys.exit(1) + + try: + cert_text = subprocess.check_output( + ["warden", "sign", actor, "--pubkey", str(pubkey_path)], + text=True, + ).strip() + except subprocess.CalledProcessError as e: + print( + f"ops-ssh-wrapper: warden sign failed (exit {e.returncode})", file=sys.stderr + ) + sys.exit(1) + except FileNotFoundError: + print( + "ops-ssh-wrapper: 'warden' not found in PATH. " + "Install ops-warden: uv tool install ops-warden", + file=sys.stderr, + ) + sys.exit(1) + + with tempfile.NamedTemporaryFile( + suffix="-cert.pub", mode="w", delete=False, prefix=f"{actor}-" + ) as f: + f.write(cert_text + "\n") + cert_path = f.name + + try: + result = subprocess.run( + ["ssh-add", cert_path], capture_output=True, text=True + ) + if result.returncode != 0: + print( + f"ops-ssh-wrapper: ssh-add warning: {result.stderr.strip()} " + f"(ssh-agent may not be running — continuing anyway)", + file=sys.stderr, + ) + finally: + os.unlink(cert_path) + + if len(sys.argv) > 1: + os.execvp(sys.argv[1], sys.argv[1:]) + + +if __name__ == "__main__": + main() diff --git a/src/warden/vault.py b/src/warden/vault.py new file mode 100644 index 0000000..d694da3 --- /dev/null +++ b/src/warden/vault.py @@ -0,0 +1,97 @@ +"""VaultCA backend — HashiCorp Vault SSH engine.""" +from __future__ import annotations + +import os +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +import httpx + +from warden.ca import CABackend, CAError, parse_cert_metadata +from warden.config import VaultConfig +from warden.models import CertRecord, CertSpec + + +class VaultCA(CABackend): + """CA backend that signs via HashiCorp Vault SSH secrets engine.""" + + def __init__(self, vault_cfg: VaultConfig, state_dir: Path) -> None: + self._cfg = vault_cfg + self._state_dir = Path(os.path.expanduser(str(state_dir))) + + def _token(self) -> str: + token = os.environ.get(self._cfg.token_env, "") + if not token: + raise CAError( + f"Vault token not found. Set the {self._cfg.token_env!r} " + f"environment variable, or run: vault login" + ) + return token + + def sign(self, spec: CertSpec) -> CertRecord: + """Sign the public key via Vault SSH engine. Returns a CertRecord.""" + pubkey_path = Path(os.path.expanduser(str(spec.pubkey_path))) + if not pubkey_path.exists(): + raise CAError(f"Public key not found: {pubkey_path}") + + pubkey_text = pubkey_path.read_text().strip() + role = self._cfg.role_map.get(spec.actor_type.value) + if not role: + raise CAError( + f"No Vault role mapped for actor type {spec.actor_type.value!r}. " + f"Add it to vault.role_map in warden.yaml." + ) + + url = f"{self._cfg.addr}/v1/{self._cfg.mount}/sign/{role}" + try: + response = httpx.post( + url, + json={ + "public_key": pubkey_text, + "valid_principals": ",".join(spec.principals), + "ttl": f"{spec.ttl_hours}h", + "cert_type": "user", + "key_id": spec.identity, + }, + headers={"X-Vault-Token": self._token()}, + timeout=10.0, + ) + response.raise_for_status() + except httpx.HTTPStatusError as e: + raise CAError( + f"Vault signing failed (HTTP {e.response.status_code}): " + f"{e.response.text}" + ) from e + except httpx.RequestError as e: + raise CAError( + f"Vault unreachable at {self._cfg.addr}. " + f"Is Vault running? Consider --backend local as a fallback.\n{e}" + ) from e + + cert_text = response.json()["data"]["signed_key"].strip() + + self._state_dir.mkdir(parents=True, exist_ok=True) + dest = self._state_dir / f"{spec.actor_name}-cert.pub" + dest.write_text(cert_text + "\n") + + # Parse metadata by writing to a tempfile and running ssh-keygen -L + with tempfile.NamedTemporaryFile( + suffix="-cert.pub", mode="w", delete=False + ) as f: + f.write(cert_text + "\n") + tmp_cert = Path(f.name) + + try: + meta = parse_cert_metadata(tmp_cert) + finally: + tmp_cert.unlink(missing_ok=True) + + return CertRecord( + identity=meta["identity"] or spec.identity, + valid_before=meta["valid_before"], + cert_path=dest, + signed_at=datetime.now(timezone.utc), + principals=meta["principals"], + actor_name=spec.actor_name, + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_ca.py b/tests/test_ca.py new file mode 100644 index 0000000..f1fb5a5 --- /dev/null +++ b/tests/test_ca.py @@ -0,0 +1,180 @@ +"""Tests for warden.ca — LocalCA and parse_cert_metadata.""" +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from warden.ca import CAError, LocalCA, parse_cert_metadata +from warden.models import ActorType, CertSpec + +SAMPLE_SSHKEYGEN_L = """\ +/tmp/key-cert.pub: + Type: ssh-ed25519-cert-v01@openssh.com user certificate + Public key: ED25519-CERT SHA256:abc123 + Signing CA: ED25519 SHA256:xyz (using ssh-ed25519) + Key ID: "agt-state-hub-bridge" + Serial: 0 + Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00 + Principals: + agt-task-bridge + Critical Options: (none) + Extensions: + permit-pty +""" + +CERT_CONTENT = "ssh-ed25519-cert-v01@openssh.com AAAA_fake_cert_data" + + +def _mock_run_factory(cert_content: str): + """Returns a mock subprocess.run that writes the cert file on sign and returns + SAMPLE_SSHKEYGEN_L on -L.""" + + def mock_run(cmd, **kwargs): + result = MagicMock() + result.returncode = 0 + result.stdout = "" + result.stderr = "" + + if not isinstance(cmd, list) or not cmd: + return result + + if cmd[0] == "ssh-keygen" and "-s" in cmd: + # Signing: write cert next to the pubkey copy (last arg) + pubkey_path = Path(cmd[-1]) + cert_path = pubkey_path.parent / (pubkey_path.stem + "-cert.pub") + cert_path.write_text(cert_content) + elif cmd[0] == "ssh-keygen" and "-L" in cmd: + result.stdout = SAMPLE_SSHKEYGEN_L + + return result + + return mock_run + + +# --------------------------------------------------------------------------- +# parse_cert_metadata +# --------------------------------------------------------------------------- + +def test_parse_cert_metadata(tmp_path): + cert_path = tmp_path / "key-cert.pub" + cert_path.write_text(CERT_CONTENT) + + mock_result = MagicMock(returncode=0, stdout=SAMPLE_SSHKEYGEN_L, stderr="") + with patch("warden.ca.subprocess.run", return_value=mock_result): + meta = parse_cert_metadata(cert_path) + + assert meta["identity"] == "agt-state-hub-bridge" + assert meta["principals"] == ["agt-task-bridge"] + assert meta["valid_before"] == datetime(2026, 3, 29, 10, 0, 0, tzinfo=timezone.utc) + + +def test_parse_cert_metadata_failure(tmp_path): + cert_path = tmp_path / "key-cert.pub" + cert_path.write_text("not a cert") + + mock_result = MagicMock(returncode=1, stdout="", stderr="not a certificate") + with patch("warden.ca.subprocess.run", return_value=mock_result): + with pytest.raises(CAError, match="ssh-keygen -L failed"): + parse_cert_metadata(cert_path) + + +def test_parse_cert_metadata_missing_valid_before(tmp_path): + cert_path = tmp_path / "key-cert.pub" + cert_path.write_text(CERT_CONTENT) + + output_no_valid = SAMPLE_SSHKEYGEN_L.replace( + " Valid: from 2026-03-28T10:00:00 to 2026-03-29T10:00:00\n", "" + ) + mock_result = MagicMock(returncode=0, stdout=output_no_valid, stderr="") + with patch("warden.ca.subprocess.run", return_value=mock_result): + with pytest.raises(CAError, match="valid_before"): + parse_cert_metadata(cert_path) + + +# --------------------------------------------------------------------------- +# LocalCA.sign +# --------------------------------------------------------------------------- + +def test_local_ca_sign(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca-private-key") + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA actor-key") + + spec = CertSpec( + actor_name="agt-state-hub-bridge", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=24, + principals=["agt-task-bridge"], + identity="agt-state-hub-bridge", + ) + + with patch("warden.ca.subprocess.run", side_effect=_mock_run_factory(CERT_CONTENT)): + ca = LocalCA(ca_key, tmp_path / "state") + record = ca.sign(spec) + + assert record.identity == "agt-state-hub-bridge" + assert record.actor_name == "agt-state-hub-bridge" + assert record.principals == ["agt-task-bridge"] + cert_dest = tmp_path / "state" / "agt-state-hub-bridge-cert.pub" + assert cert_dest.exists() + assert cert_dest.read_text().strip() == CERT_CONTENT + + +def test_local_ca_sign_missing_pubkey(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + spec = CertSpec( + actor_name="agt-test", + actor_type=ActorType.AGT, + pubkey_path=tmp_path / "nonexistent.pub", + ttl_hours=24, + principals=["agt-test"], + ) + ca = LocalCA(ca_key, tmp_path / "state") + with pytest.raises(CAError, match="Public key not found"): + ca.sign(spec) + + +def test_local_ca_sign_missing_ca_key(tmp_path): + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA") + spec = CertSpec( + actor_name="agt-test", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=24, + principals=["agt-test"], + ) + ca = LocalCA(tmp_path / "nonexistent_ca", tmp_path / "state") + with pytest.raises(CAError, match="CA key not found"): + ca.sign(spec) + + +def test_local_ca_sign_ssh_keygen_failure(tmp_path): + ca_key = tmp_path / "ca_key" + ca_key.write_text("fake-ca") + pubkey = tmp_path / "key.pub" + pubkey.write_text("ssh-ed25519 AAAA") + + spec = CertSpec( + actor_name="agt-test", + actor_type=ActorType.AGT, + pubkey_path=pubkey, + ttl_hours=24, + principals=["agt-test"], + ) + + def fail_run(cmd, **kwargs): + result = MagicMock() + result.returncode = 1 + result.stderr = "load key: invalid format" + result.stdout = "" + return result + + ca = LocalCA(ca_key, tmp_path / "state") + with patch("warden.ca.subprocess.run", side_effect=fail_run): + with pytest.raises(CAError, match="Signing failed"): + ca.sign(spec) diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..63055be --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,84 @@ +"""Tests for warden.config.""" +from pathlib import Path + +import pytest +import yaml + +from warden.config import ConfigError, load_config + + +def write_yaml(path: Path, content: dict) -> None: + with path.open("w") as f: + yaml.dump(content, f) + + +def test_load_local_config(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, {"backend": "local", "ca_key": str(tmp_path / "ca")}) + cfg = load_config(cfg_path) + assert cfg.backend == "local" + assert cfg.ca_key == tmp_path / "ca" + + +def test_local_backend_missing_ca_key_raises(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, {"backend": "local"}) + with pytest.raises(ConfigError, match="ca_key"): + load_config(cfg_path) + + +def test_invalid_backend_raises(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, {"backend": "magic", "ca_key": "/tmp/ca"}) + with pytest.raises(ConfigError, match="backend"): + load_config(cfg_path) + + +def test_vault_backend(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, { + "backend": "vault", + "vault": { + "addr": "https://vault.example.com", + "role_map": {"adm": "adm-role", "agt": "agt-role", "atm": "atm-role"}, + }, + }) + cfg = load_config(cfg_path) + assert cfg.backend == "vault" + assert cfg.vault is not None + assert cfg.vault.addr == "https://vault.example.com" + assert cfg.vault.role_map["agt"] == "agt-role" + + +def test_vault_backend_missing_addr_raises(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, {"backend": "vault", "vault": {}}) + with pytest.raises(ConfigError, match="addr"): + load_config(cfg_path) + + +def test_missing_config_raises(): + with pytest.raises(ConfigError, match="not found"): + load_config(Path("/nonexistent/path/warden.yaml")) + + +def test_custom_state_dir(tmp_path): + cfg_path = tmp_path / "warden.yaml" + custom_state = tmp_path / "my-state" + write_yaml(cfg_path, { + "backend": "local", + "ca_key": str(tmp_path / "ca"), + "state_dir": str(custom_state), + }) + cfg = load_config(cfg_path) + assert cfg.state_dir == custom_state + + +def test_default_vault_token_env(tmp_path): + cfg_path = tmp_path / "warden.yaml" + write_yaml(cfg_path, { + "backend": "vault", + "vault": {"addr": "https://vault.example.com"}, + }) + cfg = load_config(cfg_path) + assert cfg.vault.token_env == "VAULT_TOKEN" diff --git a/tests/test_inventory.py b/tests/test_inventory.py new file mode 100644 index 0000000..58c63a6 --- /dev/null +++ b/tests/test_inventory.py @@ -0,0 +1,87 @@ +"""Tests for warden.inventory.""" +from pathlib import Path + +import pytest + +from warden.inventory import ( + ActorEntry, + InventoryError, + PrincipalsInventory, + load_inventory, + save_inventory, +) +from warden.models import ActorType + + +def test_empty_inventory_on_missing_file(tmp_path): + inv = load_inventory(tmp_path / "nonexistent.yaml") + assert inv.actors == {} + assert inv.hosts == {} + + +def test_roundtrip(tmp_path): + inv = PrincipalsInventory() + inv.actors["agt-test"] = ActorEntry( + name="agt-test", + actor_type=ActorType.AGT, + principals=["agt-task-test"], + ttl_hours=24, + description="test actor", + ) + path = tmp_path / "inventory.yaml" + save_inventory(inv, path) + + loaded = load_inventory(path) + assert "agt-test" in loaded.actors + entry = loaded.actors["agt-test"] + assert entry.actor_type == ActorType.AGT + assert entry.principals == ["agt-task-test"] + assert entry.ttl_hours == 24 + assert entry.description == "test actor" + + +def test_roundtrip_multiple_actors(tmp_path): + inv = PrincipalsInventory() + inv.actors["adm-bernd"] = ActorEntry("adm-bernd", ActorType.ADM, ["adm-full"], 48) + inv.actors["atm-backup"] = ActorEntry("atm-backup", ActorType.ATM, ["atm-backup-daily"], 8) + path = tmp_path / "inventory.yaml" + save_inventory(inv, path) + + loaded = load_inventory(path) + assert set(loaded.actors) == {"adm-bernd", "atm-backup"} + assert loaded.actors["adm-bernd"].actor_type == ActorType.ADM + + +def test_invalid_actor_type_raises(tmp_path): + path = tmp_path / "inventory.yaml" + path.write_text("actors:\n agt-test:\n type: bogus\n principals: []\n") + with pytest.raises(InventoryError, match="invalid type"): + load_inventory(path) + + +def test_actor_name_prefix_violation_raises(tmp_path): + path = tmp_path / "inventory.yaml" + path.write_text("actors:\n wrong-name:\n type: agt\n principals: [x]\n") + with pytest.raises(InventoryError): + load_inventory(path) + + +def test_default_principal_is_actor_name(tmp_path): + path = tmp_path / "inventory.yaml" + path.write_text("actors:\n agt-bridge:\n type: agt\n") + inv = load_inventory(path) + assert inv.actors["agt-bridge"].principals == ["agt-bridge"] + + +def test_default_ttl_applied(tmp_path): + path = tmp_path / "inventory.yaml" + path.write_text("actors:\n atm-cron:\n type: atm\n principals: [atm-cron]\n") + inv = load_inventory(path) + assert inv.actors["atm-cron"].ttl_hours == 8 # DEFAULT_TTL_HOURS[ATM] + + +def test_invalid_yaml_raises(tmp_path): + path = tmp_path / "inventory.yaml" + path.write_text(": : : invalid yaml :::") + with pytest.raises(InventoryError, match="Invalid YAML"): + load_inventory(path) diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..e6781c3 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,67 @@ +"""Tests for warden.models.""" +from pathlib import Path + +import pytest + +from warden.models import ( + ACTOR_PREFIX, + DEFAULT_TTL_HOURS, + ActorType, + CertSpec, + validate_actor_name, +) + + +def test_default_ttl_per_type(): + assert DEFAULT_TTL_HOURS[ActorType.ADM] == 48 + assert DEFAULT_TTL_HOURS[ActorType.AGT] == 24 + assert DEFAULT_TTL_HOURS[ActorType.ATM] == 8 + + +def test_actor_prefix_map(): + assert ACTOR_PREFIX[ActorType.ADM] == "adm-" + assert ACTOR_PREFIX[ActorType.AGT] == "agt-" + assert ACTOR_PREFIX[ActorType.ATM] == "atm-" + + +@pytest.mark.parametrize("name,actor_type", [ + ("adm-bernd", ActorType.ADM), + ("agt-incident-resolver-v2", ActorType.AGT), + ("atm-backup-daily", ActorType.ATM), +]) +def test_validate_actor_name_valid(name, actor_type): + validate_actor_name(name, actor_type) # should not raise + + +@pytest.mark.parametrize("name,actor_type", [ + ("bernd", ActorType.ADM), + ("automation-backup", ActorType.ATM), + ("agt-bridge", ActorType.ADM), # wrong type for prefix + ("atm-backup", ActorType.AGT), +]) +def test_validate_actor_name_invalid(name, actor_type): + with pytest.raises(ValueError, match="must start with"): + validate_actor_name(name, actor_type) + + +def test_certspec_default_identity(): + spec = CertSpec( + actor_name="agt-test", + actor_type=ActorType.AGT, + pubkey_path=Path("/tmp/key.pub"), + ttl_hours=24, + principals=["agt-task-bridge"], + ) + assert spec.identity == "agt-test" + + +def test_certspec_explicit_identity(): + spec = CertSpec( + actor_name="agt-test", + actor_type=ActorType.AGT, + pubkey_path=Path("/tmp/key.pub"), + ttl_hours=24, + principals=["agt-task-bridge"], + identity="custom-identity", + ) + assert spec.identity == "custom-identity" diff --git a/tests/test_scorecard.py b/tests/test_scorecard.py new file mode 100644 index 0000000..f92b174 --- /dev/null +++ b/tests/test_scorecard.py @@ -0,0 +1,100 @@ +"""Tests for warden.scorecard.""" +from pathlib import Path + +import pytest + +from warden.inventory import ActorEntry, PrincipalsInventory +from warden.models import ActorType +from warden.scorecard import ( + check_actor_name_prefixes, + check_all_actors_have_principals, + check_no_stale_certs, + check_no_expired_certs, + run_scorecard, +) + + +def make_inventory(*actors): + inv = PrincipalsInventory() + for name, atype, principals in actors: + inv.actors[name] = ActorEntry( + name=name, actor_type=atype, principals=principals, ttl_hours=24 + ) + return inv + + +# --------------------------------------------------------------------------- +# check_actor_name_prefixes +# --------------------------------------------------------------------------- + +def test_prefix_check_pass(): + inv = make_inventory( + ("adm-bernd", ActorType.ADM, ["adm-full"]), + ("agt-bridge", ActorType.AGT, ["agt-task-bridge"]), + ("atm-cron", ActorType.ATM, ["atm-cron"]), + ) + result = check_actor_name_prefixes(inv) + assert result.passed + + +def test_prefix_check_fail_bad_name(): + # Bypass validate_actor_name by inserting directly + inv = PrincipalsInventory() + inv.actors["bad-name"] = ActorEntry( + name="bad-name", actor_type=ActorType.AGT, principals=["x"], ttl_hours=24 + ) + result = check_actor_name_prefixes(inv) + assert not result.passed + assert "bad-name" in result.detail + + +# --------------------------------------------------------------------------- +# check_all_actors_have_principals +# --------------------------------------------------------------------------- + +def test_principals_check_pass(): + inv = make_inventory(("agt-bridge", ActorType.AGT, ["agt-task-bridge"])) + result = check_all_actors_have_principals(inv) + assert result.passed + + +def test_principals_check_fail_empty(): + inv = PrincipalsInventory() + inv.actors["agt-bridge"] = ActorEntry( + name="agt-bridge", actor_type=ActorType.AGT, principals=[], ttl_hours=24 + ) + result = check_all_actors_have_principals(inv) + assert not result.passed + assert "agt-bridge" in result.detail + + +# --------------------------------------------------------------------------- +# check_no_stale_certs +# --------------------------------------------------------------------------- + +def test_no_stale_certs_nonexistent_dir(): + result = check_no_stale_certs(Path("/nonexistent/state/dir")) + assert result.passed + + +def test_no_stale_certs_empty_dir(tmp_path): + result = check_no_stale_certs(tmp_path) + assert result.passed + + +def test_no_expired_certs_empty_dir(tmp_path): + result = check_no_expired_certs(tmp_path) + assert result.passed + + +# --------------------------------------------------------------------------- +# run_scorecard +# --------------------------------------------------------------------------- + +def test_run_scorecard_clean(tmp_path): + inv = make_inventory( + ("agt-bridge", ActorType.AGT, ["agt-task-bridge"]), + ) + results = run_scorecard(tmp_path, inv) + assert all(r.passed for r in results) + assert len(results) == 4 diff --git a/wiki/AccessManagementDirective.md b/wiki/AccessManagementDirective.md new file mode 100644 index 0000000..38cb8ed --- /dev/null +++ b/wiki/AccessManagementDirective.md @@ -0,0 +1,203 @@ +AccessManagementDirective + +*Practical host access control management * + +# AccessManagementDirective + +**Document Title:** SSH Access Management Directive +**Version:** 1.1 (Production-Ready Revision – Post-SWOT Improvements) +**Date:** 28 March 2026 +**Audience:** Operations Department +**Purpose:** Establish a simple, efficient, scalable, and secure standard for managing SSH access across all hosts for three actor types: Admins (adm), Agents (agt), and Automations (atm). +**Author:** Grok (on behalf of the team) +**Status:** Official Directive – All ops personnel, agents, and automation pipelines MUST follow this. +**Changes in v1.1:** Added prerequisites, emergency break-glass procedure, concrete issuance examples, strengthened CA security, enhanced scorecard, human UX guidance, agent risk clarification, KRL support, and tighter TTL recommendations. + +## 0. Prerequisites + +Before bootstrapping, the following must be in place: +- Ansible (or equivalent config-management tool) with a central inventory. +- HashiCorp Vault (or equivalent secrets manager) with the SSH secrets engine enabled. +- GitOps repository containing the authoritative principals inventory. +- Basic monitoring/alerting for Vault and SSH logs (e.g., Prometheus + Loki or equivalent). +- At least two ops personnel trained on Vault SSH signing and Ansible playbooks. + +If any of these are missing, complete them first or the “automatic” parts of this directive will not function reliably. + +## 1. Concept Overview + +This directive replaces the legacy practice of scattering static SSH public keys in `~/.ssh/authorized_keys` files. Instead, we adopt **SSH Certificate Authority (CA) based authentication** as the single source of truth. + +**Why this model?** +- A central CA signs short-lived certificates for every login. +- No more manual key copying, key sprawl, or painful revocation. +- Built-in expiration, role-based principals, and auditability. +- Works identically for humans, LLM-powered autonomous agents, and deterministic scripts. +- Scales from 5 hosts to 500+ with almost zero per-host maintenance. + +**Core Principles** +- **Least privilege** – Every certificate carries explicit *principals* (roles) and optional `force-command` / `source-address` restrictions. +- **Short-lived credentials** – Certificates expire automatically (24–48 h for admins, 4–24 h for agents, 1–8 h for automations). +- **One CA, many issuers** – A single offline User CA whose public key is trusted by every host. +- **Automation-first** – All key issuance, rotation, and host configuration is driven by code (Ansible + Vault). +- **Separation of concerns** – + - **Admins (adm)**: Human operators (full interactive shell when needed). + - **Agents (agt)**: LLM-powered autonomous entities that can self-register wake-up triggers and execute tasks. + - **Automations (atm)**: Deterministic scripts / cron jobs / pipelines with narrow, purpose-specific rights. + +## 2. Actor Definitions & Access Model + +| Actor Type | Identifier Prefix | Description | Typical Certificate Lifetime | Principals / Restrictions | +|------------|-------------------|-------------|------------------------------|---------------------------| +| **Admin (adm)** | `adm-` | Human operator (on-call engineers) | 24–48 hours (renewable) | `adm-full`, `adm-readonly` + optional `force-command` | +| **Agent (agt)** | `agt-` | LLM-powered autonomous agent (can schedule own wake-ups) | 4–24 hours (auto-refresh) | `agt-task-`, limited to specific scripts/directories | +| **Automation (atm)** | `atm-` | Deterministic script / pipeline | 1–8 hours (per invocation) | `atm-`, `force-command=/usr/local/bin/atm-wrapper.sh` | + +**Certificate Naming Convention** +- Identity string (`-I`): `adm-bernd`, `agt-incident-resolver-v2`, `atm-backup-daily` +- Principals (`-n`): comma-separated list of allowed roles (stored in `/etc/ssh/auth_principals/%u` on hosts) + +**LLM-Agent Risk Clarification** +Agent signing policy MUST enforce least-privilege principals + `force-command` wrappers; never grant blanket shell access to autonomous agents. + +## 3. Bootstrapping the System (One-Time Setup) + +### 3.1. Create the CA (do this once, offline) +```bash +ssh-keygen -t ed25519 -f /secure/vault/ca_user -C "Ops SSH User CA (2026)" -N "" +``` +- Store the private key in an HSM-backed Vault (or air-gapped offline storage) with **4-eyes approval** required for any signing operation. +- Rotate the CA key itself every 2–3 years using the same bootstrap playbook. +- Public key: `ca_user.pub` + +### 3.2. Deploy Trust on Every Host (Ansible playbook `bootstrap-ssh-ca.yml`) +- Copy `ca_user.pub` → `/etc/ssh/ca/ca_user.pub` (mode 644, root-owned). +- Update `/etc/ssh/sshd_config`: + ```bash + TrustedUserCAKeys /etc/ssh/ca/ca_user.pub + AuthorizedPrincipalsFile /etc/ssh/auth_principals/%u + PubkeyAuthentication yes + PasswordAuthentication no + PermitRootLogin no + ``` +- Create principals directory and files from the central Git inventory. +- `systemctl restart sshd` + +### 3.3. Initial Admin Access +First admin generates personal keypair → submits `.pub` → CA signs a bootstrap certificate valid for 48 hours with principal `adm-bootstrap`. This is the ONLY manual step. + +## 4. Automatic Management of Access Rights + +### 4.1. Daily / On-Demand Workflow +1. **Key/Certificate Issuance Pipeline** (GitOps + Vault) + - **Humans (adm)**: Use the recommended CLI wrapper `ops-ssh-sign` (or Teleport `tsh` if adopted early) so signing feels invisible. + - **Agents (agt)**: At startup, call Vault SSH engine API (auto-refreshed by a wrapper daemon). + - **Automations (atm)**: Just-in-time cert request via Vault inside a thin wrapper script. + +2. **Ansible-Driven Host Updates** (run hourly via CI/CD) + - `auth_principals/` files are rendered from a central inventory (JSON/YAML in Git). + - Example inventory snippet: + ```yaml + hosts: + - name: prod-db-01 + allowed_principals: + adm: [adm-full] + agt: [agt-incident-resolver-v2] + atm: [atm-backup-daily, atm-logrotate] + ``` + +3. **Revocation & Rotation** + - Short expiry = automatic revocation. + - For emergency revocation of a still-valid cert, maintain a Key Revocation List (KRL) and push it via Ansible (`RevokedKeys` directive in `sshd_config`). + - Agents/automations never store long-lived private keys on disk. + +4. **Concrete Agent & Automation Wrapper Example** (Python snippet – place in `/usr/local/bin/ops-ssh-wrapper`) + ```python + #!/usr/bin/env python3 + import subprocess, os, tempfile + # Request short-lived cert from Vault + cert = subprocess.check_output(["vault", "write", "-field=signed_key", "ssh/sign/agt-role", f"public_key={os.environ['SSH_PUBKEY']}"]).decode().strip() + with tempfile.NamedTemporaryFile(suffix="-cert.pub", delete=False) as f: + f.write(cert.encode()) + cert_path = f.name + # Load into ssh-agent and exec the real command + subprocess.run(["ssh-add", cert_path]) + os.execvp(sys.argv[1], sys.argv[1:]) + ``` + Agents call this wrapper; it auto-refreshes the cert on every wake-up. + +### 4.2. Human UX Guidance +Admins are encouraged to use the `ops-ssh-sign` wrapper script (provided in the ops repo) or Teleport `tsh ssh` for seamless experience. Manual `ssh-keygen -s` is only for edge cases. + +### 4.3. Emergency Break-Glass Procedure +In case of total lockout (CA offline, misconfigured Ansible push, etc.): +1. Use the pre-documented static emergency key pair on a separate bastion host (rotated quarterly, stored in Vault with 4-eyes access). +2. Or fall back to cloud-provider console access (AWS SSM Session Manager, GCP IAP, Azure Bastion). +3. Document the exact recovery playbook in the same Git repo under `emergency/break-glass.md`. +4. After recovery, immediately rotate the CA and run a full scorecard. + +## 5. AccessManagement Scorecard (Checklist) + +Run via Ansible `ssh-access-audit.yml`. Each item is pass/fail. + +| Category | Check | Target | Tool | +|----------|-------|--------|------| +| **CA Trust** | `TrustedUserCAKeys` points to correct file | All hosts | `ssh-audit` | +| **No Static Keys** | `authorized_keys` files are empty or contain only emergency bootstrap keys | All hosts | `find /home -name authorized_keys -size +0` | +| **Principals Config** | `/etc/ssh/auth_principals/%u` exists and is up-to-date | All hosts | Ansible inventory diff | +| **Expiry Policy** | All issued certs have `Valid: < 48h` (adm) or `< 24h` (agt/atm) | Last 100 certs | `ssh-keygen -L -f *.pub` | +| **Password Auth** | Disabled globally | All hosts | `sshd -T \| grep password` | +| **Root Login** | Disabled | All hosts | `sshd -T \| grep permitroot` | +| **Agent/Automation Wrapper** | Every agt/atm binary calls Vault for cert | All pipelines | Code review + runtime trace | +| **Audit Logging** | Every SSH connection logs certificate identity (`-I`) to central SIEM | All hosts | `journalctl -u sshd` + SIEM query | +| **CA Security** | CA key access is 4-eyes / HSM-backed | Vault policy | Vault audit log | +| **Bootstrap Complete** | No `adm-bootstrap` principal in use | All hosts | Scorecard run | +| **Score** | ≥ 10/10 = **Operational** | - | - | + +**Scorecard Execution Command** (run from ops laptop): +```bash +ansible all -m command -a "ssh-access-scorecard.sh" --become +``` + +## 6. Scope & Operational Boundaries + +### 6.1. When Bootstrapping Is Officially Closed +The system is **fully operational** when **ALL** of the following are true: +- Scorecard passes 10/10 on every host. +- Central Git repo contains the authoritative principals inventory. +- First three admins have successfully used signed certificates for 7 consecutive days. +- At least one agent (agt) and one automation (atm) have executed a task using a CA-signed certificate. +- CI/CD pipeline for host config updates is green and runs hourly. +- Emergency break-glass procedure has been tested once. + +**Declaration:** Ops Lead signs off with date in the Git commit message. + +### 6.2. Scope Boundary – When to Switch to Sophisticated Tooling +Stay with **native OpenSSH CA + Ansible + Vault** while: +- ≤ 200 hosts +- ≤ 50 distinct agent/automation identities +- No regulatory requirement for SSO or full session recording + +**Switch triggers** (any one): +- > 200 hosts OR rapid daily growth +- Need for human SSO (Okta/Google) integration +- Requirement for audited web-based SSH sessions or just-in-time access approval +- Agents need built-in Machine-ID / workload identity (e.g., Teleport tbot) +- Audit/compliance demands central policy engine or session recording + +**Recommended next-level tools** (in order): +1. **Teleport** – Best for mixed human + agent workloads (SSO + Machine ID). +2. **HashiCorp Vault SSH + Boundary** – When you already use Vault heavily. +3. **step-ca + smallstep** – If you prefer a pure open-source CA with OIDC. + +**Migration path:** The CA public key and principals model are fully compatible; you can import the existing CA into Teleport/Vault without re-issuing keys to users. + +## 7. Enforcement & Review +- **Quarterly review** of this directive and scorecard results. +- **Violations** (e.g., adding static keys) trigger immediate access revocation and incident ticket. +- **Questions / improvements** → create PR against this file in the ops repo. + +**End of Document** +Approved for immediate use across all production and staging environments. + +xxx diff --git a/wiki/CertCommandInterface.md b/wiki/CertCommandInterface.md new file mode 100644 index 0000000..a6506c5 --- /dev/null +++ b/wiki/CertCommandInterface.md @@ -0,0 +1,105 @@ +# cert_command Interface + +**Version:** 1.0 +**Date:** 2026-03-28 +**Purpose:** Define the contract between OpsWarden (issuer) and callers such as ops-bridge +(consumer) for just-in-time SSH certificate acquisition. + +--- + +## Overview + +`cert_command` is a shell string that a caller executes to obtain a short-lived, CA-signed +SSH certificate for a named actor. The caller passes the cert to the SSH process alongside +the actor's private key. + +This interface is intentionally tool-agnostic: the caller (`ops-bridge`, a script, a CI +pipeline) does not need to know whether the CA is a local file or HashiCorp Vault. Any +command that writes a cert to stdout and exits 0 satisfies the contract. + +--- + +## Contract + +### Invocation + +``` +warden sign --pubkey +``` + +Or any equivalent shell command: + +``` +vault write -field=signed_key ssh/sign/agt-role public_key=@/tmp/key.pub +ssh-keygen -s /path/to/ca -I agt-test -n agt-task -V +24h /tmp/key.pub && cat /tmp/key-cert.pub +``` + +### Success (exit 0) + +- Stdout: certificate text only — a single line starting with the key type, e.g.: + ``` + ssh-ed25519-cert-v01@openssh.com AAAA... + ``` +- Stderr: ignored by the caller (warden may print warnings there) +- Side effect: cert is also written to `~/.local/state/warden/-cert.pub` by warden + (for use by `warden status` and `warden scorecard`) + +### Failure (exit non-zero) + +- Exit code: any non-zero value +- Stdout: ignored +- Stderr: passed through to caller logs / audit detail field +- Caller behaviour: treat as a transient error; apply reconnect backoff and retry + +--- + +## Caller Responsibilities (ops-bridge) + +1. Run `cert_command` via `subprocess.run(shell=True)` before each SSH subprocess launch +2. Write stdout to a tempfile in the state dir: `~/.local/state/bridge/-cert.pub` +3. Add `-i ` after `-i ` in the `ssh` command +4. Parse `ssh-keygen -L -f ` to extract `Key ID` → log as `cert_identity` in audit +5. Parse `Valid before:` → schedule pre-emptive cert refresh ~5 min before expiry +6. On `cert_command` failure: log `BRIDGE_DISCONNECTED` with stderr; apply backoff + +## What the Caller Must NOT Do + +- Cache or reuse a cert across reconnects (always re-run `cert_command` per reconnect) +- Write the cert to disk with world-readable permissions (mode 600) +- Ignore a non-zero exit from `cert_command` (must treat as failure, trigger backoff) + +--- + +## Example: ops-bridge tunnels.yaml + +```yaml +tunnels: + state-hub-coulombcore: + host: coulombcore + remote_port: 8001 + local_port: 8000 + ssh_user: agt-state-hub-bridge + ssh_key: ~/.ssh/agt-state-hub-bridge_ed25519 + actor: agt-state-hub-bridge + # cert_command is optional. When absent, ssh_key is used directly (static key mode). + cert_command: "warden sign agt-state-hub-bridge --pubkey ~/.ssh/agt-state-hub-bridge_ed25519.pub" +``` + +--- + +## TTL Guidelines (AccessManagementDirective §2) + +| Actor type | Max TTL | Pre-emptive refresh | +|---|---|---| +| `adm` | 48 h | 5 min before expiry | +| `agt` | 24 h | 5 min before expiry | +| `atm` | 8 h | 5 min before expiry | + +ops-bridge enforces the refresh schedule. OpsWarden enforces the max TTL at signing time. + +--- + +## Backward Compatibility + +Callers that do not set `cert_command` continue to use the static key (`ssh_key`) with no +TTL, cert logic, or refresh. The two modes are fully independent. diff --git a/wiki/OpsWardenConfig.md b/wiki/OpsWardenConfig.md new file mode 100644 index 0000000..4cbea70 --- /dev/null +++ b/wiki/OpsWardenConfig.md @@ -0,0 +1,147 @@ +# OpsWarden Configuration Reference + +Config file: `~/.config/warden/warden.yaml` (override with `WARDEN_CONFIG` env var) + +--- + +## Local Backend (lab / non-Vault) + +```yaml +# Backend selection. "local" uses ssh-keygen -s with a CA key on disk. +backend: local + +# Path to the CA private key. Keep this file mode 600 and never commit it. +ca_key: ~/.ssh/ops-ca-user + +# Path to the principals inventory (default shown). +inventory_path: ~/.config/warden/inventory.yaml + +# Where to store signed certs and generated keypairs (default shown). +state_dir: ~/.local/state/warden +``` + +### Bootstrapping the local CA key + +```bash +# Generate CA keypair once (offline, secure location) +ssh-keygen -t ed25519 -f ~/.ssh/ops-ca-user -C "Ops SSH User CA (2026)" -N "" +chmod 600 ~/.ssh/ops-ca-user +chmod 644 ~/.ssh/ops-ca-user.pub + +# Distribute ops-ca-user.pub to every host: +# TrustedUserCAKeys /etc/ssh/ca/ca_user.pub (in sshd_config) +# See railiance-infra bootstrap-ssh-ca.yml playbook. +``` + +--- + +## Vault Backend (production) + +```yaml +backend: vault + +vault: + # Vault server address. + addr: https://vault.example.com + + # Vault SSH secrets engine mount path (default: ssh). + mount: ssh + + # Map from ActorType to Vault signing role name. + role_map: + adm: adm-role + agt: agt-role + atm: atm-role + + # Environment variable holding the Vault token (default: VAULT_TOKEN). + token_env: VAULT_TOKEN + +inventory_path: ~/.config/warden/inventory.yaml +state_dir: ~/.local/state/warden +``` + +### Vault setup snippet + +```bash +vault secrets enable ssh +vault write ssh/roles/agt-role \ + key_type=ca \ + allowed_users="*" \ + allow_user_certificates=true \ + default_user="agt" \ + ttl=24h max_ttl=24h + +export VAULT_TOKEN=$(vault token create -field=token) +``` + +--- + +## Principals Inventory (`inventory.yaml`) + +```yaml +actors: + # Actor name must carry the prefix matching its type: + # adm-* for adm, agt-* for agt, atm-* for atm + agt-state-hub-bridge: + type: agt + # Principals embedded in the cert; matched against /etc/ssh/auth_principals/%u + principals: + - agt-task-bridge + # Certificate TTL in hours. Defaults: adm=48, agt=24, atm=8 + ttl_hours: 24 + description: "ops-bridge tunnel agent for state-hub" + + adm-bernd: + type: adm + principals: + - adm-full + ttl_hours: 48 + + atm-backup-daily: + type: atm + principals: + - atm-backup-daily + ttl_hours: 8 + description: "nightly backup automation" + +hosts: + # Optional: documents which principals are allowed on each host. + # Not enforced by warden; used for reference and future tooling. + coulombcore: + allowed_principals: + agt: + - agt-task-bridge + atm: + - atm-backup-daily +``` + +--- + +## Environment Variables + +| Variable | Default | Description | +|---|---|---| +| `WARDEN_CONFIG` | `~/.config/warden/warden.yaml` | Config file path | +| `VAULT_TOKEN` | — | Vault token (vault backend only; env var name is configurable) | + +--- + +## cert_command integration with ops-bridge + +Add `cert_command` to a tunnel in `~/.config/bridge/tunnels.yaml`: + +```yaml +tunnels: + state-hub-coulombcore: + host: coulombcore + remote_port: 8001 + local_port: 8000 + ssh_user: agt-state-hub-bridge + ssh_key: ~/.ssh/agt-state-hub-bridge_ed25519 + actor: agt-state-hub-bridge + cert_command: "warden sign agt-state-hub-bridge --pubkey ~/.ssh/agt-state-hub-bridge_ed25519.pub" +``` + +`ops-bridge` runs `cert_command` before each SSH launch, captures stdout as the cert, +and passes it alongside the private key via `ssh -i -i `. +See `wiki/CertCommandInterface.md` for the full contract. diff --git a/workplans/WARDEN-WP-0001-initial-implementation.md b/workplans/WARDEN-WP-0001-initial-implementation.md new file mode 100644 index 0000000..d8d0ef0 --- /dev/null +++ b/workplans/WARDEN-WP-0001-initial-implementation.md @@ -0,0 +1,126 @@ +--- +id: WARDEN-WP-0001 +type: workplan +title: "OpsWarden Initial Implementation" +domain: custodian +repo: ops-warden +status: draft +owner: Bernd +topic_slug: custodian +created: "2026-03-28" +updated: "2026-03-28" +--- + +# WARDEN-WP-0001 — OpsWarden Initial Implementation + +**Scope:** Deliver a working `warden` CLI that implements the SSH CA and certificate +lifecycle defined in `wiki/AccessManagementDirective.md`. Scaffolding (models, config, +CA backends, inventory, scorecard, CLI) is already present in the repo; this workplan +tracks the remaining implementation, testing, and integration work. + +**Out of scope:** Vault HA/cluster setup, Ansible playbooks for host principal deployment +(those live in `railiance-infra`), session recording, and SSO integration (trigger §6.2 of +the directive when scale requires it). + +--- + +## Goal + +After this workplan: + +1. `warden sign agt-test --pubkey /tmp/test.pub` outputs a valid cert (local backend). +2. `warden status agt-test` shows correct identity, principals, and time-to-expiry. +3. `warden scorecard` returns 4/4 on a clean test inventory. +4. `warden sign` called from ops-bridge `cert_command` works end-to-end in an integration + test tunnel. +5. All tests pass (`uv run pytest`) and lints pass (`uv run ruff check .`). + +--- + +## Reference Documents + +| Document | Location | +|---|---| +| AccessManagementDirective | `wiki/AccessManagementDirective.md` | +| cert_command interface | `wiki/CertCommandInterface.md` | +| Config reference | `wiki/OpsWardenConfig.md` | +| ops-bridge alignment workplan | `../ops-bridge/workplans/BRIDGE-WP-0004-directive-alignment.md` | + +--- + +## Architecture Summary + +``` +~/.config/warden/warden.yaml # backend, ca_key, inventory_path, state_dir +~/.config/warden/inventory.yaml # actor registry (name → type, principals, ttl_hours) +~/.local/state/warden/ # signed certs (*-cert.pub); keypairs (keys/) +``` + +Two swappable CA backends — both expose the same `sign(spec) -> CertRecord` interface: +- `LocalCA` — `ssh-keygen -s`; no Vault dependency; default for dev/lab +- `VaultCA` — Vault SSH engine via httpx + +cert_command interface (consumed by ops-bridge): +``` +warden sign --pubkey # → cert text to stdout +``` + +--- + +## Tasks + +### T1 — Repository registration +- [ ] Register repo with state-hub (`register_repo`); assign Repo ID; update + `.claude/rules/repo-identity.md` +- [ ] Create state-hub workstream for this workplan + +### T2 — LocalCA integration test +- [ ] Generate a test CA key: `ssh-keygen -t ed25519 -f /tmp/test-ca -N ""` +- [ ] Run `warden sign` against a real pubkey with the test CA (requires `ssh-keygen` in PATH) +- [ ] Verify cert parses correctly with `ssh-keygen -L` +- [ ] Add to `tests/test_ca.py` as an integration test (skipped if `ssh-keygen` not in PATH) + +### T3 — VaultCA integration test +- [ ] Set up a local Vault dev server (`vault server -dev`) +- [ ] Enable SSH secrets engine: `vault secrets enable ssh` +- [ ] Configure a signing role for `agt` +- [ ] Run `warden sign` with `backend: vault` config +- [ ] Add to `tests/test_vault.py` as an integration test (skipped if Vault not reachable) + +### T4 — CLI end-to-end smoke tests +- [ ] `warden inventory add agt-test --type agt --principal agt-task-test` +- [ ] `warden inventory list` shows the actor +- [ ] `warden issue agt-test` (local backend) produces keypair + cert +- [ ] `warden status agt-test` shows valid cert +- [ ] `warden scorecard` returns 4/4 +- [ ] `warden inventory remove agt-test` removes actor + +### T5 — ops-bridge cert_command integration +- [ ] Add `agt-state-hub-bridge` to inventory (or use existing from ops-bridge config) +- [ ] Set `cert_command: "warden sign agt-state-hub-bridge --pubkey ~/.ssh/agt-state-hub-bridge_ed25519.pub"` + in a test `tunnels.yaml` +- [ ] Run `bridge up state-hub-coulombcore`; confirm cert is present in + `~/.local/state/bridge/` and `cert_identity` appears in the audit log +- [ ] Document result in a progress event + +### T6 — CI/CD setup +- [ ] Add `.github/workflows/ci.yml` (or equivalent) running `uv run pytest` and + `uv run ruff check .` on push +- [ ] Tests must pass without Vault (VaultCA integration tests skipped via pytest marker) + +### T7 — Documentation +- [ ] `wiki/OpsWardenConfig.md` — annotated `warden.yaml` reference (already stubbed) +- [ ] `wiki/CertCommandInterface.md` — contract for `cert_command` callers (already stubbed) +- [ ] Ensure `wiki/AccessManagementDirective.md` is in sync with `ops-bridge/wiki/` + +--- + +## Acceptance Criteria + +- [ ] `warden sign agt-test --pubkey /tmp/test.pub` → valid cert on stdout (local backend) +- [ ] `warden status agt-test` → identity, principals, time-to-expiry shown correctly +- [ ] `warden scorecard` → 4/4 on clean inventory +- [ ] `warden sign` works as `cert_command` in ops-bridge tunnel config +- [ ] All unit tests pass: `uv run pytest` +- [ ] All lints pass: `uv run ruff check .` +- [ ] No secrets (CA private key, certs) committed to repo