Files
ops-warden/src/warden/proxy.py
tegwick d6088e4e16 Implement WP-0022 audit trail and WP-0023 INTENT–SCOPE closeout
Add unified metadata-only audit.jsonl with secret-material guard, instrument
sign/access/worker paths, and expose warden activity CLI. Surface broker hint
when VAULT_TOKEN is unset, refresh INTENT/SCOPE docs, and add production
integration checklists plus catalog lane promotion playbook.
2026-07-01 23:32:38 +02:00

202 lines
7.4 KiB
Python

"""Operator access proxy — transparent, audited fetch of a non-SSH credential.
WP-0014 T3. ops-warden does not own these secrets; the proxy lane lets an operator
obtain one *through* the `warden access` front door while keeping the security model
intact. Three guardrails are enforced here in code:
* **G1 — caller identity, never warden's.** The proxy runs the owner's tool with the
caller's own environment. ops-warden injects no token of its own; if the caller has
no credential, the underlying tool fails and we surface the auth pointer. We never
add a `*_TOKEN` warden owns to the child environment.
* **G2 — transit only, no persistence/logging of values.** ``proxy_fetch`` runs the
tool with **inherited** stdout/stderr (never a pipe), so the value streams to the
caller and never enters warden's memory. ``proxy_exec`` reads the value solely to
place it in a child process's environment (the accepted proxy tradeoff) and never
writes it to disk or log. The audit record is metadata only.
* **G3 — policy gate before fetch.** The CLI runs ``check_fetch_policy`` before
calling anything here; this module refuses to run an unresolved command template.
This module shells out but never *interprets* secret bytes in the ``--fetch`` path.
"""
from __future__ import annotations
import json
import os
import re
import shlex
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
from warden.routing.models import RouteEntry
_PLACEHOLDER = re.compile(r"<[^>]+>")
class ProxyError(Exception):
"""Raised when a proxy fetch cannot be performed safely."""
def resolve_fetch_command(
entry: RouteEntry,
*,
domain: Optional[str] = None,
field: Optional[str] = None,
path: Optional[str] = None,
) -> List[str]:
"""Build the concrete argv for an entry's fetch, or raise if under-specified.
Starts from the catalog ``fetch_command`` template (with ``<path_template>``
inlined), substitutes ``<domain>``/``<FIELD>`` and an explicit ``--path`` override,
then **refuses** if any ``<…>`` placeholder remains. We never run a half-templated
command — an unresolved placeholder means the operator has not named the owner-side
resource, and guessing it is exactly the failure mode we avoid.
"""
if not entry.exec_capable or not entry.fetch_command:
raise ProxyError(
f"{entry.id!r} is not exec_capable — it has no proxyable fetch command. "
"Use `warden access` (advisory) and obtain it from the owner directly."
)
cmd = entry.fetch_command
if entry.path_template and "<path_template>" in cmd:
cmd = cmd.replace("<path_template>", path or entry.path_template)
elif path:
# No <path_template> token but caller supplied a path — append/override is
# ambiguous, so require the template to carry the token.
raise ProxyError(
f"{entry.id!r} fetch_command has no <path_template> token to override with --path."
)
if domain:
cmd = cmd.replace("<domain>", domain)
if field:
cmd = cmd.replace("<FIELD>", field)
leftover = _PLACEHOLDER.findall(cmd)
if leftover:
raise ProxyError(
f"unresolved placeholder(s) {', '.join(sorted(set(leftover)))} in fetch command. "
"Supply --domain/--field (and --path for owner-side names) — warden will not "
"guess owner-confirmed resource names."
)
return shlex.split(cmd)
def caller_auth_present(token_envs: tuple[str, ...] = ("VAULT_TOKEN", "BAO_TOKEN")) -> bool:
"""True if the *caller* appears to hold an auth token (G1 sanity check).
Best-effort: also accepts a ``~/.vault-token`` file. We do not validate it — the
owner's tool does that — we only avoid proxying when the caller clearly has no
credential, so the failure is a clear auth pointer rather than a confusing tool error.
"""
if any(os.environ.get(e, "").strip() for e in token_envs):
return True
return (Path.home() / ".vault-token").exists()
def write_audit(
state_dir: Path,
*,
need_id: str,
owner_repo: str,
domain: Optional[str],
action: str,
decision_id: Optional[str],
exit_code: Optional[int] = None,
) -> Path:
"""Append a metadata-only audit record. Never contains a secret value (G2)."""
state_dir.mkdir(parents=True, exist_ok=True)
log_path = state_dir / "access-audit.log"
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": action, # "fetch" | "exec"
"need_id": need_id,
"owner_repo": owner_repo,
"domain": domain,
"subject": os.environ.get("WARDEN_POLICY_SUBJECT", "").strip() or "operator",
"policy_decision_id": decision_id,
"exit_code": exit_code,
}
with log_path.open("a") as f:
f.write(json.dumps(record) + "\n")
try:
from warden.audit import record_event
record_event(
state_dir,
kind="access",
action=action,
subject=record["subject"],
target=need_id,
decision_id=decision_id,
outcome="ok" if exit_code in (None, 0) else "error",
source="access",
owner_repo=owner_repo,
domain=domain,
)
except Exception:
pass
return log_path
def _caller_env() -> dict:
"""The child environment = the caller's own env. warden adds no credential (G1)."""
return dict(os.environ)
def proxy_fetch(argv: List[str]) -> int:
"""Run the owner's tool, streaming its output straight to the caller.
stdout/stderr are **inherited** (``None``), never piped — the secret value flows
subsystem → caller and is never read into warden's memory, buffer, or log (G2).
Returns the tool's exit code.
"""
completed = subprocess.run( # noqa: S603 — argv is shlex-split from a validated template
argv,
stdout=None,
stderr=None,
stdin=None,
env=_caller_env(),
check=False,
)
return completed.returncode
def proxy_exec(argv: List[str], *, env_var: str, child_argv: List[str]) -> int:
"""Fetch the value and inject it into a child command's environment only.
The value transits warden's memory here (the accepted proxy tradeoff for `--exec`)
but is never written to disk or log and never enters the caller's own shell env.
Captures the fetch tool's stdout to obtain the value, strips a single trailing
newline, and runs ``child_argv`` with ``env_var`` set in its environment.
"""
if not env_var:
raise ProxyError("--exec requires --field (the env var name to inject), e.g. NPM_AUTH_TOKEN")
fetched = subprocess.run( # noqa: S603
argv, stdout=subprocess.PIPE, stderr=None, stdin=None,
env=_caller_env(), check=False, text=True,
)
if fetched.returncode != 0:
raise ProxyError(
f"fetch failed (exit {fetched.returncode}) — check caller auth and the path."
)
value = fetched.stdout
if value.endswith("\n"):
value = value[:-1]
child_env = _caller_env()
child_env[env_var] = value
try:
child = subprocess.run( # noqa: S603
child_argv, stdout=None, stderr=None, stdin=None, env=child_env, check=False
)
return child.returncode
finally:
# Best-effort scrub of the local reference; do not log it.
value = "" # noqa: F841
del child_env[env_var]