feat(sbom): CUST-WP-0013 — expand SBOM infra to terraform, ansible, and tool manifests

- Migration d6e7f8a9b0c1: add terraform, ansible, tool to Ecosystem enum
- ingest_sbom.py: new Ansible Galaxy requirements.yml parser (collections + roles)
- ingest_sbom.py: new sbom-tools.yaml manifest parser (agent-generated tool deps)
- ingest_sbom.py: promote .terraform.lock.hcl parser from ecosystem=other → terraform
- ingest_sbom.py: detect_all() runs all four parsers in one comprehensive scan
- capture_sbom_tools.py: agent-assisted tool manifest generator (claude -p)
- prompts/sbom-capture-agent.md: parameterised prompt for repo tool discovery
- Makefile: capture-tools target; ingest-sbom updated docs and DRY_RUN support
- 29 unit tests covering all new parsers and detect_all() behaviour
- canon/standards/sbom-convention_v0.1.md: updated with four-mechanism model and workflow

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-12 04:40:26 +01:00
parent 4a8942f310
commit df083b1840
7 changed files with 920 additions and 52 deletions

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""Invoke the SBOM capture agent to generate/update sbom-tools.yaml for a repo.
Usage:
python capture_sbom_tools.py --repo <slug> [--repo-path <path>] [--dry-run]
The script:
1. Resolves repo path from the state-hub API (if --repo-path is not given)
2. Loads the agent prompt from prompts/sbom-capture-agent.md
3. Substitutes {repo_slug}, {repo_path}, {date} placeholders
4. Invokes `claude -p "<prompt>"` non-interactively
5. Extracts the YAML block from the response
6. Writes (or shows diff of) sbom-tools.yaml in the repo root
Requirements:
- `claude` CLI must be on PATH (Claude Code)
- PyYAML must be available in the active venv
"""
from __future__ import annotations
import argparse
import datetime
import difflib
import json
import os
import re
import subprocess
import sys
import urllib.error
import urllib.request
from pathlib import Path
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
SCRIPT_DIR = Path(__file__).parent
PROMPT_FILE = SCRIPT_DIR.parent / "prompts" / "sbom-capture-agent.md"
def resolve_repo_path(repo_slug: str) -> Path | None:
"""Look up the registered path for a repo slug via the state-hub API."""
url = f"{API_BASE}/repos/{repo_slug}/"
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read())
path_str = data.get("local_path")
if path_str:
return Path(path_str)
except (urllib.error.URLError, KeyError):
pass
return None
def load_prompt(repo_slug: str, repo_path: Path) -> str:
if not PROMPT_FILE.exists():
print(f"Error: prompt file not found at {PROMPT_FILE}", file=sys.stderr)
sys.exit(1)
template = PROMPT_FILE.read_text()
today = datetime.date.today().isoformat()
return (
template
.replace("{repo_slug}", repo_slug)
.replace("{repo_path}", str(repo_path))
.replace("{date}", today)
)
def invoke_agent(prompt: str) -> str:
"""Run `claude -p <prompt>` and return stdout."""
try:
result = subprocess.run(
["claude", "-p", prompt],
capture_output=True,
text=True,
timeout=120,
)
except FileNotFoundError:
print("Error: `claude` CLI not found on PATH. Install Claude Code.", file=sys.stderr)
sys.exit(1)
except subprocess.TimeoutExpired:
print("Error: claude invocation timed out after 120s.", file=sys.stderr)
sys.exit(1)
if result.returncode != 0:
print(f"Error: claude exited with code {result.returncode}", file=sys.stderr)
if result.stderr:
print(result.stderr, file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_yaml(response: str) -> str:
"""Extract YAML content from the agent response.
Accepts:
- Raw YAML (starts with # or 'tools:')
- YAML wrapped in ```yaml ... ``` fences
"""
# Try fenced block first
m = re.search(r"```(?:yaml)?\s*\n(.*?)```", response, re.DOTALL)
if m:
return m.group(1).strip()
# Otherwise treat entire response as YAML
stripped = response.strip()
if stripped.startswith("#") or stripped.startswith("tools:"):
return stripped
print("Warning: could not extract YAML from agent response.", file=sys.stderr)
print("Raw response:", file=sys.stderr)
print(response[:500], file=sys.stderr)
sys.exit(1)
def show_diff(old: str | None, new: str, target: Path) -> None:
if old is None:
print(f"[new file] {target}")
for line in new.splitlines():
print(f" + {line}")
else:
diff = list(difflib.unified_diff(
old.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{target.name}",
tofile=f"b/{target.name}",
))
if diff:
print("".join(diff))
else:
print(f"[no changes] {target}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Generate/update sbom-tools.yaml for a repo using the SBOM capture agent."
)
parser.add_argument("--repo", required=True, help="Repo slug (e.g. 'railiance-infra')")
parser.add_argument("--repo-path", help="Path to repo root (auto-resolved from state-hub if omitted)")
parser.add_argument("--dry-run", action="store_true",
help="Show prompt and diff without writing sbom-tools.yaml")
parser.add_argument("--print-prompt", action="store_true",
help="Print the rendered prompt and exit (useful for inspection)")
args = parser.parse_args()
# Resolve repo path
if args.repo_path:
repo_path = Path(args.repo_path).resolve()
else:
repo_path = resolve_repo_path(args.repo)
if repo_path is None:
# Fall back to ~/repo_slug convention
repo_path = Path.home() / args.repo
print(f"Could not resolve path from API; trying {repo_path}", file=sys.stderr)
if not repo_path.exists():
print(f"Error: repo path does not exist: {repo_path}", file=sys.stderr)
sys.exit(1)
target = repo_path / "sbom-tools.yaml"
existing_content = target.read_text() if target.exists() else None
prompt = load_prompt(args.repo, repo_path)
if args.print_prompt:
print(prompt)
return
print(f"Running SBOM capture agent for {args.repo} ({repo_path})…")
response = invoke_agent(prompt)
yaml_content = extract_yaml(response)
# Ensure trailing newline
if not yaml_content.endswith("\n"):
yaml_content += "\n"
show_diff(existing_content, yaml_content, target)
if args.dry_run:
print("\n[dry-run] sbom-tools.yaml not written.")
return
target.write_text(yaml_content)
print(f"\nWritten: {target}")
print("Review the file, correct any 'confidence: low' entries, then commit.")
if __name__ == "__main__":
main()

View File

@@ -1,15 +1,19 @@
#!/usr/bin/env python3
"""Ingest a repo's lockfile into the State Hub SBOM store.
"""Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store.
Usage:
python ingest_sbom.py --repo <slug> [--lockfile <path>] [--api-base <url>]
python ingest_sbom.py --repo <slug> [--repo-path <path>] [--dry-run]
Auto-detects lockfile type:
uv.lock → Python ecosystem
requirements.txt → Python ecosystem (basic)
package-lock.json → Node ecosystem
yarn.lock → Node ecosystem
Cargo.lock → Rust ecosystem
Auto-detects all of the following in one scan:
uv.lock python
requirements.txt python
package-lock.json node
yarn.lock node
Cargo.lock rust
.terraform.lock.hcl → terraform (anywhere in tree)
ansible/requirements.yml → ansible (anywhere under ansible/ dirs)
ansible/requirements.yaml → ansible
sbom-tools.yaml → tool (repo root; agent-generated)
"""
from __future__ import annotations
@@ -22,11 +26,17 @@ import urllib.error
import urllib.request
from pathlib import Path
try:
import yaml # optional; only needed for sbom-tools.yaml and ansible parsers
_YAML_AVAILABLE = True
except ImportError:
_YAML_AVAILABLE = False
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
# ---------------------------------------------------------------------------
# Lockfile parsers
# Lockfile parsers — each returns list[dict]
# ---------------------------------------------------------------------------
def _parse_uv_lock(path: Path) -> list[dict]:
@@ -55,7 +65,7 @@ def _parse_uv_lock(path: Path) -> list[dict]:
"package_version": e.get("package_version"),
"ecosystem": "python",
"license_spdx": None,
"is_direct": False, # uv.lock doesn't distinguish; treat all as transitive
"is_direct": False,
"is_dev": False,
}
for e in entries
@@ -70,7 +80,6 @@ def _parse_requirements_txt(path: Path) -> list[dict]:
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
# Handle: pkg==1.2.3, pkg>=1.2, pkg
m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>=<!~^]+([^\s;]+))?", line)
if m:
entries.append({
@@ -95,7 +104,7 @@ def _parse_package_lock_json(path: Path) -> list[dict]:
packages = data.get("packages", {})
entries = []
for pkg_path, info in packages.items():
if not pkg_path: # root package
if not pkg_path:
continue
name = info.get("name") or pkg_path.split("node_modules/")[-1]
entries.append({
@@ -120,8 +129,6 @@ def _parse_yarn_lock(path: Path) -> list[dict]:
if not stripped or stripped.startswith("#"):
continue
if not line.startswith(" ") and stripped.endswith(":"):
# New package block header: "name@version::" or "\"name@version\":"
# May list multiple versions: "name@^1.0, name@~1.0:"
current_names = []
current_version = None
for part in stripped.rstrip(":").split(","):
@@ -188,12 +195,10 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]:
for line in path.read_text().splitlines():
stripped = line.strip()
# e.g.: provider "registry.terraform.io/hetznercloud/hcloud" {
m = re.match(r'^provider\s+"([^"]+)"\s*\{', stripped)
if m:
# Use full provider address as package_name, short name as display
full = m.group(1)
current_name = full # e.g. "registry.terraform.io/hetznercloud/hcloud"
current_name = full
current_version = None
elif current_name is not None:
vm = re.match(r'version\s*=\s*"([^"]+)"', stripped)
@@ -203,7 +208,7 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]:
entries.append({
"package_name": current_name,
"package_version": current_version,
"ecosystem": "other", # "terraform" not yet in ENUM; tracked as other
"ecosystem": "terraform",
"license_spdx": None,
"is_direct": True,
"is_dev": False,
@@ -214,7 +219,114 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]:
return entries
_LOCKFILE_PARSERS = {
def _parse_ansible_requirements(path: Path) -> list[dict]:
"""Parse ansible/requirements.yml — collections and roles from Ansible Galaxy."""
if not _YAML_AVAILABLE:
print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr)
return []
try:
data = yaml.safe_load(path.read_text())
except yaml.YAMLError as e:
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
return []
if not isinstance(data, dict):
return []
entries = []
for item in data.get("collections", []) or []:
if isinstance(item, str):
name, version = item, None
elif isinstance(item, dict):
name = item.get("name", "")
version = str(item.get("version", "")) if item.get("version") else None
else:
continue
if name:
entries.append({
"package_name": name,
"package_version": version,
"ecosystem": "ansible",
"license_spdx": None,
"is_direct": True,
"is_dev": False,
})
for item in data.get("roles", []) or []:
if isinstance(item, str):
name, version = item, None
elif isinstance(item, dict):
name = item.get("name", item.get("src", ""))
version = str(item.get("version", "")) if item.get("version") else None
else:
continue
if name:
entries.append({
"package_name": name,
"package_version": version,
"ecosystem": "ansible",
"license_spdx": None,
"is_direct": True,
"is_dev": False,
})
return entries
def _parse_sbom_tools_yaml(path: Path) -> list[dict]:
"""Parse sbom-tools.yaml — agent-generated tool manifest at repo root."""
if not _YAML_AVAILABLE:
print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr)
return []
try:
data = yaml.safe_load(path.read_text())
except yaml.YAMLError as e:
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
return []
if not isinstance(data, dict):
return []
entries = []
valid_ecosystems = {
"python", "node", "rust", "go", "java",
"terraform", "ansible", "tool", "other",
}
for item in data.get("tools", []) or []:
if not isinstance(item, dict):
continue
name = item.get("name", "")
version = str(item.get("version", "")) if item.get("version") else None
if version == "unknown":
print(f" Warning: tool '{name}' has version=unknown — flagged for review", file=sys.stderr)
version = None
ecosystem = item.get("ecosystem", "tool")
if ecosystem not in valid_ecosystems:
print(f" Warning: unknown ecosystem '{ecosystem}' for '{name}'; using 'tool'", file=sys.stderr)
ecosystem = "tool"
license_spdx = item.get("license_spdx") or None
entries.append({
"package_name": name,
"package_version": version,
"ecosystem": ecosystem,
"license_spdx": license_spdx,
"is_direct": bool(item.get("is_direct", True)),
"is_dev": bool(item.get("is_dev", False)),
})
return entries
# ---------------------------------------------------------------------------
# Detection helpers
# ---------------------------------------------------------------------------
# Filename → parser for standard lockfiles (detected by filename anywhere in tree)
_LOCKFILE_PARSERS: dict[str, object] = {
"uv.lock": _parse_uv_lock,
"requirements.txt": _parse_requirements_txt,
"package-lock.json": _parse_package_lock_json,
@@ -234,6 +346,47 @@ _SKIP_DIRS = {
}
def detect_all(repo_path: Path) -> list[tuple[Path, str, object]]:
"""Scan repo_path and return all discovered dependency sources.
Returns list of (path, label, parser_fn) tuples covering:
- Standard lockfiles (anywhere in tree)
- Ansible requirements files (in ansible/ subdirs)
- sbom-tools.yaml at repo root
"""
found: list[tuple[Path, str, object]] = []
seen_paths: set[Path] = set()
# Walk tree for all source types
for dirpath, dirnames, filenames in os.walk(repo_path):
dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS)
dirpath_p = Path(dirpath)
# Standard lockfiles
for fname, parser in _LOCKFILE_PARSERS.items():
if fname in filenames:
p = dirpath_p / fname
if p not in seen_paths:
found.append((p, fname, parser))
seen_paths.add(p)
# Ansible requirements files — only under directories named "ansible"
if dirpath_p.name == "ansible":
for fname in ("requirements.yml", "requirements.yaml"):
if fname in filenames:
p = dirpath_p / fname
if p not in seen_paths:
found.append((p, f"ansible/{fname}", _parse_ansible_requirements))
seen_paths.add(p)
# sbom-tools.yaml at repo root only
tools_manifest = repo_path / "sbom-tools.yaml"
if tools_manifest.exists() and tools_manifest not in seen_paths:
found.append((tools_manifest, "sbom-tools.yaml", _parse_sbom_tools_yaml))
return found
def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None:
"""Return (lockfile_path, filename) for the first recognised lockfile at repo root."""
for name in _LOCKFILE_PARSERS:
@@ -244,7 +397,10 @@ def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None:
def detect_lockfiles_recursive(repo_path: Path) -> list[Path]:
"""Walk repo_path and return all recognised lockfiles, skipping non-dep dirs."""
"""Walk repo_path and return all recognised lockfiles, skipping non-dep dirs.
Kept for backwards compatibility; prefer detect_all() for new code.
"""
found: list[Path] = []
for dirpath, dirnames, filenames in os.walk(repo_path):
dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS)
@@ -292,52 +448,47 @@ def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict:
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Ingest a repo's lockfiles into the State Hub SBOM store.")
parser = argparse.ArgumentParser(
description="Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store."
)
parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')")
parser.add_argument("--lockfile", action="append", dest="lockfiles",
metavar="PATH", help="Path to a specific lockfile (repeatable)")
parser.add_argument("--repo-path", default=".", help="Repo root for auto-detection/scan (default: cwd)")
parser.add_argument("--scan", action="store_true",
help="Recursively find ALL lockfiles under --repo-path (handles multi-ecosystem repos)")
help="Recursively find ALL lockfiles under --repo-path (deprecated; now default behaviour)")
parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL")
parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit")
args = parser.parse_args()
repo_root = Path(args.repo_path).resolve()
lockfile_paths: list[Path] = []
all_entries: list[dict] = []
if args.lockfiles:
lockfile_paths = [Path(lf).resolve() for lf in args.lockfiles]
elif args.scan:
lockfile_paths = detect_lockfiles_recursive(repo_root)
if not lockfile_paths:
print(f"No lockfiles found under '{repo_root}'.", file=sys.stderr)
sys.exit(1)
print(f"Scan found {len(lockfile_paths)} lockfile(s):")
for lf in lockfile_paths:
print(f" {lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf}")
# Explicit paths: parse each, detect parser by filename
for lf_str in args.lockfiles:
lf = Path(lf_str).resolve()
parsed = parse_lockfile(lf)
rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf
print(f" {rel}: {len(parsed)} packages")
all_entries.extend(parsed)
else:
found = detect_lockfile(repo_root)
if not found:
# Comprehensive auto-detection: all mechanisms in one scan
sources = detect_all(repo_root)
if not sources:
print(
f"No recognised lockfile found in '{repo_root}'. "
f"Supported: {', '.join(_LOCKFILE_PARSERS)}. "
"Use --scan to search subdirectories.",
f"No recognised dependency sources found in '{repo_root}'.",
file=sys.stderr,
)
sys.exit(1)
lockfile_path, _ = found
print(f"Auto-detected: {lockfile_path}")
lockfile_paths = [lockfile_path]
all_entries: list[dict] = []
for lf in lockfile_paths:
parsed = parse_lockfile(lf)
rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf
print(f" {rel}: {len(parsed)} packages")
all_entries.extend(parsed)
for src_path, label, parser_fn in sources:
parsed = parser_fn(src_path)
rel = src_path.relative_to(repo_root) if src_path.is_relative_to(repo_root) else src_path
print(f" {label} ({rel}): {len(parsed)} entries")
all_entries.extend(parsed)
print(f"Total: {len(all_entries)} packages across {len(lockfile_paths)} lockfile(s)")
print(f"Total: {len(all_entries)} entries")
if args.dry_run:
print(json.dumps(all_entries[:5], indent=2))