diff --git a/Makefile b/Makefile index 52db38f..9e6607a 100644 --- a/Makefile +++ b/Makefile @@ -133,16 +133,26 @@ list-repos: @test -n "$(DOMAIN)" || (echo "ERROR: DOMAIN is required."; exit 1) curl -sf "http://127.0.0.1:8000/repos/?domain=$(DOMAIN)" | python3 -m json.tool -## Ingest SBOM data for a repo. +## Ingest SBOM data for a repo (all mechanisms: lockfiles + ansible + sbom-tools.yaml). +## Auto-detect all sources: make ingest-sbom REPO=the-custodian REPO_PATH=/home/worsch/the-custodian ## Single lockfile (explicit): make ingest-sbom REPO=the-custodian LOCKFILE=/path/to/uv.lock -## Scan all lockfiles in tree: make ingest-sbom REPO=the-custodian SCAN=1 REPO_PATH=/home/worsch/the-custodian -## Auto-detect at repo root: make ingest-sbom REPO=the-custodian REPO_PATH=/home/worsch/the-custodian +## Dry-run (no submit): make ingest-sbom REPO=the-custodian REPO_PATH=... DRY_RUN=1 +## Tip: run capture-tools first for repos with system-level tool dependencies. ingest-sbom: @test -n "$(REPO)" || (echo "ERROR: REPO is required."; exit 1) uv run python scripts/ingest_sbom.py --repo "$(REPO)" \ $(if $(LOCKFILE),--lockfile "$(LOCKFILE)") \ - $(if $(SCAN),--scan) \ - $(if $(REPO_PATH),--repo-path "$(REPO_PATH)") + $(if $(REPO_PATH),--repo-path "$(REPO_PATH)") \ + $(if $(DRY_RUN),--dry-run) + +## Run SBOM capture agent for a repo — generates/updates sbom-tools.yaml. +## Usage: make capture-tools REPO=railiance-infra [REPO_PATH=/home/worsch/railiance-infra] +## Add DRY_RUN=1 to preview without writing. +capture-tools: + @test -n "$(REPO)" || (echo "ERROR: REPO is required."; exit 1) + uv run python scripts/capture_sbom_tools.py --repo "$(REPO)" \ + $(if $(REPO_PATH),--repo-path "$(REPO_PATH)") \ + $(if $(DRY_RUN),--dry-run) ## Check a repo for ADR-001 compliance: make validate-adr REPO=/path/to/repo [DOMAIN=custodian] validate-adr: diff --git a/api/models/sbom_entry.py b/api/models/sbom_entry.py index cece53e..70a15b1 100644 --- a/api/models/sbom_entry.py +++ b/api/models/sbom_entry.py @@ -15,6 +15,9 @@ class Ecosystem(str, enum.Enum): rust = "rust" go = "go" java = "java" + terraform = "terraform" + ansible = "ansible" + tool = "tool" other = "other" diff --git a/migrations/versions/d6e7f8a9b0c1_sbom_ecosystem_expand.py b/migrations/versions/d6e7f8a9b0c1_sbom_ecosystem_expand.py new file mode 100644 index 0000000..8e432b0 --- /dev/null +++ b/migrations/versions/d6e7f8a9b0c1_sbom_ecosystem_expand.py @@ -0,0 +1,30 @@ +"""SBOM ecosystem enum expansion: add terraform, ansible, tool + +Revision ID: d6e7f8a9b0c1 +Revises: c5d6e7f8a9b0 +Create Date: 2026-03-12 00:00:00.000000 +""" +from typing import Sequence, Union + +from alembic import op + +revision: str = "d6e7f8a9b0c1" +down_revision: Union[str, None] = "c5d6e7f8a9b0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # PostgreSQL requires each ADD VALUE in its own statement and cannot be + # run inside a transaction that also modifies data. ADD VALUE is + # transactional in PG 12+ (no COMMIT needed). + op.execute("ALTER TYPE ecosystem ADD VALUE IF NOT EXISTS 'terraform'") + op.execute("ALTER TYPE ecosystem ADD VALUE IF NOT EXISTS 'ansible'") + op.execute("ALTER TYPE ecosystem ADD VALUE IF NOT EXISTS 'tool'") + + +def downgrade() -> None: + # PostgreSQL does not support removing enum values without recreating the + # type. Document the limitation and do nothing — reverting this migration + # requires a full type recreation if needed. + pass diff --git a/prompts/sbom-capture-agent.md b/prompts/sbom-capture-agent.md new file mode 100644 index 0000000..2061821 --- /dev/null +++ b/prompts/sbom-capture-agent.md @@ -0,0 +1,90 @@ +# SBOM Capture Agent Prompt + +**Task:** Generate or update `sbom-tools.yaml` for the repository at `{repo_path}` (slug: `{repo_slug}`). + +This file captures system-level tool dependencies that are not tracked by any package manager lockfile — tools that are installed via provisioning, Homebrew, system packages, or assumed present in the environment. + +--- + +## Instructions + +1. **Read the following files** in `{repo_path}` (read each that exists; skip gracefully if absent): + - `CLAUDE.md` — look for stack declarations, tool prerequisites, dev commands + - `README.md` / `QUICKSTART.md` — prerequisites sections, tool version requirements + - `Makefile` — tool invocations, version variables (e.g. `ANSIBLE_VERSION := 12.3`) + - `pyproject.toml` — Python tool dependencies (already covered by uv.lock; note but don't duplicate) + - `.tool-versions` — asdf version pins + - `.terraform-version` — tfenv pin + - `.ansible-version` — if present + - `Dockerfile` / `docker-compose.yml` — base image versions, tool installs + - `.github/workflows/*.yml` / `.gitlab-ci.yml` — CI tool install steps, version pins + - `ansible/requirements.yml` — **already captured by lockfile parser; do NOT include Galaxy collections here** + - Any `scripts/setup*.sh`, `scripts/bootstrap*.sh`, or `tools/` directory + +2. **Identify system-level tools only** — tools that: + - Are invoked as CLI commands (e.g. `ansible-playbook`, `terraform`, `helm`, `kubectl`, `k3s`, `goss`, `age`, `sops`) + - Are NOT installed via `uv`/`pip`/`npm`/`cargo` into a project virtualenv (those are in lockfiles) + - Note: `ansible` itself as a CLI tool is a system dep even if `ansible-core` appears in `uv.lock` + +3. **For each tool, determine**: + - `name`: canonical tool name (e.g. `ansible`, `terraform`, `helm`, `kubectl`, `k3s`, `goss`, `age`, `sops`, `cloud-init`) + - `version`: the pinned or documented version. Use `unknown` only if no evidence found anywhere. + - `ecosystem`: one of `python`, `node`, `rust`, `go`, `java`, `terraform`, `ansible`, `tool`, `other` + - Use `ansible` for Ansible itself; `terraform` for Terraform itself; `tool` for generic CLI tools + - `license_spdx`: the SPDX identifier. Common known licences (use these exact strings): + - ansible / ansible-core: `GPL-3.0-only` + - terraform ≤ 1.5.5: `MPL-2.0`; terraform ≥ 1.5.6: `BSL-1.1` + - helm: `Apache-2.0` + - kubectl: `Apache-2.0` + - k3s: `Apache-2.0` + - goss: `Apache-2.0` + - age: `BSD-3-Clause` + - sops: `MPL-2.0` + - cloud-init: `Apache-2.0` (or `GPL-3.0-only` for older versions — check) + - docker: `Apache-2.0` + - If unknown, use `null` + - `is_direct`: `true` if this repo directly declares/uses it; `false` if it's a transitive dependency of another tool + - `is_dev`: `true` only if the tool is only used for development/testing, not production operation + +4. **Confidence annotation**: Add a `# confidence: high/medium/low` comment after each entry: + - `high`: version found explicitly pinned in a file + - `medium`: version inferred from context (e.g. "Ansible 12" in README) + - `low`: version not found; using `unknown` or a reasonable guess + +5. **Do NOT include**: + - Python packages already covered by `uv.lock` or `requirements.txt` + - Ansible Galaxy collections (covered by `ansible/requirements.yml`) + - Terraform providers (covered by `.terraform.lock.hcl`) + - Node packages, Rust crates, etc. (covered by their lockfiles) + - Operating system packages unless the repo explicitly declares them + +6. **Output format**: Emit ONLY the YAML block below — no prose, no markdown fences, no explanation. The output must be valid YAML that can be written directly to `sbom-tools.yaml`. + +--- + +## Output format + +```yaml +# sbom-tools.yaml — system-level tool dependencies for {repo_slug} +# Generated by sbom-capture-agent on {date} +# Review each entry before committing. Entries with confidence: low need human verification. +tools: + - name: example-tool + version: "1.2.3" # confidence: high + ecosystem: tool + license_spdx: Apache-2.0 + is_direct: true + is_dev: false +``` + +If no system-level tools are found, output: +```yaml +# sbom-tools.yaml — system-level tool dependencies for {repo_slug} +# Generated by sbom-capture-agent on {date} +# No system-level tools identified — all dependencies are covered by lockfiles. +tools: [] +``` + +--- + +Now read `{repo_path}` and produce the `sbom-tools.yaml` content. diff --git a/scripts/capture_sbom_tools.py b/scripts/capture_sbom_tools.py new file mode 100644 index 0000000..a42ed6f --- /dev/null +++ b/scripts/capture_sbom_tools.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +"""Invoke the SBOM capture agent to generate/update sbom-tools.yaml for a repo. + +Usage: + python capture_sbom_tools.py --repo [--repo-path ] [--dry-run] + +The script: +1. Resolves repo path from the state-hub API (if --repo-path is not given) +2. Loads the agent prompt from prompts/sbom-capture-agent.md +3. Substitutes {repo_slug}, {repo_path}, {date} placeholders +4. Invokes `claude -p ""` non-interactively +5. Extracts the YAML block from the response +6. Writes (or shows diff of) sbom-tools.yaml in the repo root + +Requirements: + - `claude` CLI must be on PATH (Claude Code) + - PyYAML must be available in the active venv +""" +from __future__ import annotations + +import argparse +import datetime +import difflib +import json +import os +import re +import subprocess +import sys +import urllib.error +import urllib.request +from pathlib import Path + +API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") +SCRIPT_DIR = Path(__file__).parent +PROMPT_FILE = SCRIPT_DIR.parent / "prompts" / "sbom-capture-agent.md" + + +def resolve_repo_path(repo_slug: str) -> Path | None: + """Look up the registered path for a repo slug via the state-hub API.""" + url = f"{API_BASE}/repos/{repo_slug}/" + try: + with urllib.request.urlopen(url, timeout=10) as resp: + data = json.loads(resp.read()) + path_str = data.get("local_path") + if path_str: + return Path(path_str) + except (urllib.error.URLError, KeyError): + pass + return None + + +def load_prompt(repo_slug: str, repo_path: Path) -> str: + if not PROMPT_FILE.exists(): + print(f"Error: prompt file not found at {PROMPT_FILE}", file=sys.stderr) + sys.exit(1) + template = PROMPT_FILE.read_text() + today = datetime.date.today().isoformat() + return ( + template + .replace("{repo_slug}", repo_slug) + .replace("{repo_path}", str(repo_path)) + .replace("{date}", today) + ) + + +def invoke_agent(prompt: str) -> str: + """Run `claude -p ` and return stdout.""" + try: + result = subprocess.run( + ["claude", "-p", prompt], + capture_output=True, + text=True, + timeout=120, + ) + except FileNotFoundError: + print("Error: `claude` CLI not found on PATH. Install Claude Code.", file=sys.stderr) + sys.exit(1) + except subprocess.TimeoutExpired: + print("Error: claude invocation timed out after 120s.", file=sys.stderr) + sys.exit(1) + + if result.returncode != 0: + print(f"Error: claude exited with code {result.returncode}", file=sys.stderr) + if result.stderr: + print(result.stderr, file=sys.stderr) + sys.exit(1) + + return result.stdout + + +def extract_yaml(response: str) -> str: + """Extract YAML content from the agent response. + + Accepts: + - Raw YAML (starts with # or 'tools:') + - YAML wrapped in ```yaml ... ``` fences + """ + # Try fenced block first + m = re.search(r"```(?:yaml)?\s*\n(.*?)```", response, re.DOTALL) + if m: + return m.group(1).strip() + + # Otherwise treat entire response as YAML + stripped = response.strip() + if stripped.startswith("#") or stripped.startswith("tools:"): + return stripped + + print("Warning: could not extract YAML from agent response.", file=sys.stderr) + print("Raw response:", file=sys.stderr) + print(response[:500], file=sys.stderr) + sys.exit(1) + + +def show_diff(old: str | None, new: str, target: Path) -> None: + if old is None: + print(f"[new file] {target}") + for line in new.splitlines(): + print(f" + {line}") + else: + diff = list(difflib.unified_diff( + old.splitlines(keepends=True), + new.splitlines(keepends=True), + fromfile=f"a/{target.name}", + tofile=f"b/{target.name}", + )) + if diff: + print("".join(diff)) + else: + print(f"[no changes] {target}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Generate/update sbom-tools.yaml for a repo using the SBOM capture agent." + ) + parser.add_argument("--repo", required=True, help="Repo slug (e.g. 'railiance-infra')") + parser.add_argument("--repo-path", help="Path to repo root (auto-resolved from state-hub if omitted)") + parser.add_argument("--dry-run", action="store_true", + help="Show prompt and diff without writing sbom-tools.yaml") + parser.add_argument("--print-prompt", action="store_true", + help="Print the rendered prompt and exit (useful for inspection)") + args = parser.parse_args() + + # Resolve repo path + if args.repo_path: + repo_path = Path(args.repo_path).resolve() + else: + repo_path = resolve_repo_path(args.repo) + if repo_path is None: + # Fall back to ~/repo_slug convention + repo_path = Path.home() / args.repo + print(f"Could not resolve path from API; trying {repo_path}", file=sys.stderr) + + if not repo_path.exists(): + print(f"Error: repo path does not exist: {repo_path}", file=sys.stderr) + sys.exit(1) + + target = repo_path / "sbom-tools.yaml" + existing_content = target.read_text() if target.exists() else None + + prompt = load_prompt(args.repo, repo_path) + + if args.print_prompt: + print(prompt) + return + + print(f"Running SBOM capture agent for {args.repo} ({repo_path})…") + response = invoke_agent(prompt) + yaml_content = extract_yaml(response) + + # Ensure trailing newline + if not yaml_content.endswith("\n"): + yaml_content += "\n" + + show_diff(existing_content, yaml_content, target) + + if args.dry_run: + print("\n[dry-run] sbom-tools.yaml not written.") + return + + target.write_text(yaml_content) + print(f"\nWritten: {target}") + print("Review the file, correct any 'confidence: low' entries, then commit.") + + +if __name__ == "__main__": + main() diff --git a/scripts/ingest_sbom.py b/scripts/ingest_sbom.py index c024ac8..6696234 100644 --- a/scripts/ingest_sbom.py +++ b/scripts/ingest_sbom.py @@ -1,15 +1,19 @@ #!/usr/bin/env python3 -"""Ingest a repo's lockfile into the State Hub SBOM store. +"""Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store. Usage: - python ingest_sbom.py --repo [--lockfile ] [--api-base ] + python ingest_sbom.py --repo [--repo-path ] [--dry-run] -Auto-detects lockfile type: - uv.lock → Python ecosystem - requirements.txt → Python ecosystem (basic) - package-lock.json → Node ecosystem - yarn.lock → Node ecosystem - Cargo.lock → Rust ecosystem +Auto-detects all of the following in one scan: + uv.lock → python + requirements.txt → python + package-lock.json → node + yarn.lock → node + Cargo.lock → rust + .terraform.lock.hcl → terraform (anywhere in tree) + ansible/requirements.yml → ansible (anywhere under ansible/ dirs) + ansible/requirements.yaml → ansible + sbom-tools.yaml → tool (repo root; agent-generated) """ from __future__ import annotations @@ -22,11 +26,17 @@ import urllib.error import urllib.request from pathlib import Path +try: + import yaml # optional; only needed for sbom-tools.yaml and ansible parsers + _YAML_AVAILABLE = True +except ImportError: + _YAML_AVAILABLE = False + API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") # --------------------------------------------------------------------------- -# Lockfile parsers +# Lockfile parsers — each returns list[dict] # --------------------------------------------------------------------------- def _parse_uv_lock(path: Path) -> list[dict]: @@ -55,7 +65,7 @@ def _parse_uv_lock(path: Path) -> list[dict]: "package_version": e.get("package_version"), "ecosystem": "python", "license_spdx": None, - "is_direct": False, # uv.lock doesn't distinguish; treat all as transitive + "is_direct": False, "is_dev": False, } for e in entries @@ -70,7 +80,6 @@ def _parse_requirements_txt(path: Path) -> list[dict]: line = line.strip() if not line or line.startswith("#") or line.startswith("-"): continue - # Handle: pkg==1.2.3, pkg>=1.2, pkg m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>= list[dict]: packages = data.get("packages", {}) entries = [] for pkg_path, info in packages.items(): - if not pkg_path: # root package + if not pkg_path: continue name = info.get("name") or pkg_path.split("node_modules/")[-1] entries.append({ @@ -120,8 +129,6 @@ def _parse_yarn_lock(path: Path) -> list[dict]: if not stripped or stripped.startswith("#"): continue if not line.startswith(" ") and stripped.endswith(":"): - # New package block header: "name@version::" or "\"name@version\":" - # May list multiple versions: "name@^1.0, name@~1.0:" current_names = [] current_version = None for part in stripped.rstrip(":").split(","): @@ -188,12 +195,10 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]: for line in path.read_text().splitlines(): stripped = line.strip() - # e.g.: provider "registry.terraform.io/hetznercloud/hcloud" { m = re.match(r'^provider\s+"([^"]+)"\s*\{', stripped) if m: - # Use full provider address as package_name, short name as display full = m.group(1) - current_name = full # e.g. "registry.terraform.io/hetznercloud/hcloud" + current_name = full current_version = None elif current_name is not None: vm = re.match(r'version\s*=\s*"([^"]+)"', stripped) @@ -203,7 +208,7 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]: entries.append({ "package_name": current_name, "package_version": current_version, - "ecosystem": "other", # "terraform" not yet in ENUM; tracked as other + "ecosystem": "terraform", "license_spdx": None, "is_direct": True, "is_dev": False, @@ -214,7 +219,114 @@ def _parse_terraform_lock_hcl(path: Path) -> list[dict]: return entries -_LOCKFILE_PARSERS = { +def _parse_ansible_requirements(path: Path) -> list[dict]: + """Parse ansible/requirements.yml — collections and roles from Ansible Galaxy.""" + if not _YAML_AVAILABLE: + print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr) + return [] + + try: + data = yaml.safe_load(path.read_text()) + except yaml.YAMLError as e: + print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) + return [] + + if not isinstance(data, dict): + return [] + + entries = [] + + for item in data.get("collections", []) or []: + if isinstance(item, str): + name, version = item, None + elif isinstance(item, dict): + name = item.get("name", "") + version = str(item.get("version", "")) if item.get("version") else None + else: + continue + if name: + entries.append({ + "package_name": name, + "package_version": version, + "ecosystem": "ansible", + "license_spdx": None, + "is_direct": True, + "is_dev": False, + }) + + for item in data.get("roles", []) or []: + if isinstance(item, str): + name, version = item, None + elif isinstance(item, dict): + name = item.get("name", item.get("src", "")) + version = str(item.get("version", "")) if item.get("version") else None + else: + continue + if name: + entries.append({ + "package_name": name, + "package_version": version, + "ecosystem": "ansible", + "license_spdx": None, + "is_direct": True, + "is_dev": False, + }) + + return entries + + +def _parse_sbom_tools_yaml(path: Path) -> list[dict]: + """Parse sbom-tools.yaml — agent-generated tool manifest at repo root.""" + if not _YAML_AVAILABLE: + print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr) + return [] + + try: + data = yaml.safe_load(path.read_text()) + except yaml.YAMLError as e: + print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) + return [] + + if not isinstance(data, dict): + return [] + + entries = [] + valid_ecosystems = { + "python", "node", "rust", "go", "java", + "terraform", "ansible", "tool", "other", + } + + for item in data.get("tools", []) or []: + if not isinstance(item, dict): + continue + name = item.get("name", "") + version = str(item.get("version", "")) if item.get("version") else None + if version == "unknown": + print(f" Warning: tool '{name}' has version=unknown — flagged for review", file=sys.stderr) + version = None + ecosystem = item.get("ecosystem", "tool") + if ecosystem not in valid_ecosystems: + print(f" Warning: unknown ecosystem '{ecosystem}' for '{name}'; using 'tool'", file=sys.stderr) + ecosystem = "tool" + license_spdx = item.get("license_spdx") or None + entries.append({ + "package_name": name, + "package_version": version, + "ecosystem": ecosystem, + "license_spdx": license_spdx, + "is_direct": bool(item.get("is_direct", True)), + "is_dev": bool(item.get("is_dev", False)), + }) + + return entries + + +# --------------------------------------------------------------------------- +# Detection helpers +# --------------------------------------------------------------------------- + +# Filename → parser for standard lockfiles (detected by filename anywhere in tree) +_LOCKFILE_PARSERS: dict[str, object] = { "uv.lock": _parse_uv_lock, "requirements.txt": _parse_requirements_txt, "package-lock.json": _parse_package_lock_json, @@ -234,6 +346,47 @@ _SKIP_DIRS = { } +def detect_all(repo_path: Path) -> list[tuple[Path, str, object]]: + """Scan repo_path and return all discovered dependency sources. + + Returns list of (path, label, parser_fn) tuples covering: + - Standard lockfiles (anywhere in tree) + - Ansible requirements files (in ansible/ subdirs) + - sbom-tools.yaml at repo root + """ + found: list[tuple[Path, str, object]] = [] + seen_paths: set[Path] = set() + + # Walk tree for all source types + for dirpath, dirnames, filenames in os.walk(repo_path): + dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS) + dirpath_p = Path(dirpath) + + # Standard lockfiles + for fname, parser in _LOCKFILE_PARSERS.items(): + if fname in filenames: + p = dirpath_p / fname + if p not in seen_paths: + found.append((p, fname, parser)) + seen_paths.add(p) + + # Ansible requirements files — only under directories named "ansible" + if dirpath_p.name == "ansible": + for fname in ("requirements.yml", "requirements.yaml"): + if fname in filenames: + p = dirpath_p / fname + if p not in seen_paths: + found.append((p, f"ansible/{fname}", _parse_ansible_requirements)) + seen_paths.add(p) + + # sbom-tools.yaml at repo root only + tools_manifest = repo_path / "sbom-tools.yaml" + if tools_manifest.exists() and tools_manifest not in seen_paths: + found.append((tools_manifest, "sbom-tools.yaml", _parse_sbom_tools_yaml)) + + return found + + def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None: """Return (lockfile_path, filename) for the first recognised lockfile at repo root.""" for name in _LOCKFILE_PARSERS: @@ -244,7 +397,10 @@ def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None: def detect_lockfiles_recursive(repo_path: Path) -> list[Path]: - """Walk repo_path and return all recognised lockfiles, skipping non-dep dirs.""" + """Walk repo_path and return all recognised lockfiles, skipping non-dep dirs. + + Kept for backwards compatibility; prefer detect_all() for new code. + """ found: list[Path] = [] for dirpath, dirnames, filenames in os.walk(repo_path): dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS) @@ -292,52 +448,47 @@ def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict: # --------------------------------------------------------------------------- def main() -> None: - parser = argparse.ArgumentParser(description="Ingest a repo's lockfiles into the State Hub SBOM store.") + parser = argparse.ArgumentParser( + description="Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store." + ) parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')") parser.add_argument("--lockfile", action="append", dest="lockfiles", metavar="PATH", help="Path to a specific lockfile (repeatable)") parser.add_argument("--repo-path", default=".", help="Repo root for auto-detection/scan (default: cwd)") parser.add_argument("--scan", action="store_true", - help="Recursively find ALL lockfiles under --repo-path (handles multi-ecosystem repos)") + help="Recursively find ALL lockfiles under --repo-path (deprecated; now default behaviour)") parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL") parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit") args = parser.parse_args() repo_root = Path(args.repo_path).resolve() - lockfile_paths: list[Path] = [] + all_entries: list[dict] = [] if args.lockfiles: - lockfile_paths = [Path(lf).resolve() for lf in args.lockfiles] - elif args.scan: - lockfile_paths = detect_lockfiles_recursive(repo_root) - if not lockfile_paths: - print(f"No lockfiles found under '{repo_root}'.", file=sys.stderr) - sys.exit(1) - print(f"Scan found {len(lockfile_paths)} lockfile(s):") - for lf in lockfile_paths: - print(f" {lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf}") + # Explicit paths: parse each, detect parser by filename + for lf_str in args.lockfiles: + lf = Path(lf_str).resolve() + parsed = parse_lockfile(lf) + rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf + print(f" {rel}: {len(parsed)} packages") + all_entries.extend(parsed) else: - found = detect_lockfile(repo_root) - if not found: + # Comprehensive auto-detection: all mechanisms in one scan + sources = detect_all(repo_root) + if not sources: print( - f"No recognised lockfile found in '{repo_root}'. " - f"Supported: {', '.join(_LOCKFILE_PARSERS)}. " - "Use --scan to search subdirectories.", + f"No recognised dependency sources found in '{repo_root}'.", file=sys.stderr, ) sys.exit(1) - lockfile_path, _ = found - print(f"Auto-detected: {lockfile_path}") - lockfile_paths = [lockfile_path] - all_entries: list[dict] = [] - for lf in lockfile_paths: - parsed = parse_lockfile(lf) - rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf - print(f" {rel}: {len(parsed)} packages") - all_entries.extend(parsed) + for src_path, label, parser_fn in sources: + parsed = parser_fn(src_path) + rel = src_path.relative_to(repo_root) if src_path.is_relative_to(repo_root) else src_path + print(f" {label} ({rel}): {len(parsed)} entries") + all_entries.extend(parsed) - print(f"Total: {len(all_entries)} packages across {len(lockfile_paths)} lockfile(s)") + print(f"Total: {len(all_entries)} entries") if args.dry_run: print(json.dumps(all_entries[:5], indent=2)) diff --git a/tests/test_ingest_sbom.py b/tests/test_ingest_sbom.py new file mode 100644 index 0000000..51dc48f --- /dev/null +++ b/tests/test_ingest_sbom.py @@ -0,0 +1,397 @@ +"""Unit tests for ingest_sbom.py parsers and auto-detection.""" +from __future__ import annotations + +import json +import sys +import textwrap +from pathlib import Path + +import pytest + +# Make scripts/ importable +sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) +import ingest_sbom as sb + + +# --------------------------------------------------------------------------- +# Terraform parser +# --------------------------------------------------------------------------- + +TERRAFORM_LOCK = textwrap.dedent("""\ + provider "registry.terraform.io/hashicorp/template" { + version = "2.2.0" + constraints = ">= 2.0.0" + hashes = [ + "h1:abc123", + ] + } + + provider "registry.terraform.io/hetznercloud/hcloud" { + version = "1.52.0" + constraints = ">= 1.40.0" + } +""") + + +def test_terraform_parser_ecosystem(tmp_path): + lock = tmp_path / ".terraform.lock.hcl" + lock.write_text(TERRAFORM_LOCK) + entries = sb._parse_terraform_lock_hcl(lock) + assert len(entries) == 2 + for e in entries: + assert e["ecosystem"] == "terraform", f"expected terraform, got {e['ecosystem']}" + names = {e["package_name"] for e in entries} + assert "registry.terraform.io/hashicorp/template" in names + assert "registry.terraform.io/hetznercloud/hcloud" in names + + +def test_terraform_parser_versions(tmp_path): + lock = tmp_path / ".terraform.lock.hcl" + lock.write_text(TERRAFORM_LOCK) + entries = sb._parse_terraform_lock_hcl(lock) + by_name = {e["package_name"]: e for e in entries} + assert by_name["registry.terraform.io/hashicorp/template"]["package_version"] == "2.2.0" + assert by_name["registry.terraform.io/hetznercloud/hcloud"]["package_version"] == "1.52.0" + + +def test_terraform_parser_is_direct(tmp_path): + lock = tmp_path / ".terraform.lock.hcl" + lock.write_text(TERRAFORM_LOCK) + entries = sb._parse_terraform_lock_hcl(lock) + assert all(e["is_direct"] for e in entries) + + +def test_terraform_parser_empty(tmp_path): + lock = tmp_path / ".terraform.lock.hcl" + lock.write_text("# no providers\n") + entries = sb._parse_terraform_lock_hcl(lock) + assert entries == [] + + +# --------------------------------------------------------------------------- +# Ansible Galaxy parser +# --------------------------------------------------------------------------- + +ANSIBLE_REQUIREMENTS_FULL = textwrap.dedent("""\ + collections: + - name: community.general + version: "9.5.0" + - name: ansible.posix + version: "1.6.0" + - community.crypto + + roles: + - name: geerlingguy.docker + version: "6.1.0" + - geerlingguy.pip +""") + +ANSIBLE_REQUIREMENTS_EMPTY = textwrap.dedent("""\ + collections: [] + roles: [] +""") + +ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY = textwrap.dedent("""\ + collections: + - name: community.general + version: "9.0.0" +""") + + +def test_ansible_parser_collections_and_roles(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_FULL) + entries = sb._parse_ansible_requirements(req) + assert len(entries) == 5 + names = {e["package_name"] for e in entries} + assert "community.general" in names + assert "ansible.posix" in names + assert "community.crypto" in names + assert "geerlingguy.docker" in names + assert "geerlingguy.pip" in names + + +def test_ansible_parser_ecosystem(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_FULL) + entries = sb._parse_ansible_requirements(req) + for e in entries: + assert e["ecosystem"] == "ansible" + + +def test_ansible_parser_versions(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_FULL) + entries = sb._parse_ansible_requirements(req) + by_name = {e["package_name"]: e for e in entries} + assert by_name["community.general"]["package_version"] == "9.5.0" + assert by_name["ansible.posix"]["package_version"] == "1.6.0" + assert by_name["community.crypto"]["package_version"] is None # no version specified + assert by_name["geerlingguy.docker"]["package_version"] == "6.1.0" + assert by_name["geerlingguy.pip"]["package_version"] is None + + +def test_ansible_parser_is_direct(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_FULL) + entries = sb._parse_ansible_requirements(req) + assert all(e["is_direct"] for e in entries) + + +def test_ansible_parser_empty(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_EMPTY) + entries = sb._parse_ansible_requirements(req) + assert entries == [] + + +def test_ansible_parser_collections_only(tmp_path): + req = tmp_path / "requirements.yml" + req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY) + entries = sb._parse_ansible_requirements(req) + assert len(entries) == 1 + assert entries[0]["package_name"] == "community.general" + + +def test_ansible_parser_yaml_extension(tmp_path): + """Both .yml and .yaml extensions must work.""" + req = tmp_path / "requirements.yaml" + req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY) + entries = sb._parse_ansible_requirements(req) + assert len(entries) == 1 + + +def test_ansible_parser_invalid_yaml(tmp_path, capsys): + req = tmp_path / "requirements.yml" + req.write_text("collections: [unclosed") + entries = sb._parse_ansible_requirements(req) + assert entries == [] + captured = capsys.readouterr() + assert "Warning" in captured.err + + +# --------------------------------------------------------------------------- +# sbom-tools.yaml parser +# --------------------------------------------------------------------------- + +SBOM_TOOLS_YAML = textwrap.dedent("""\ + tools: + - name: ansible + version: "12.3.0" + ecosystem: ansible + license_spdx: GPL-3.0-only + is_direct: true + is_dev: false + - name: terraform + version: "1.10.5" + ecosystem: terraform + license_spdx: BSL-1.1 + is_direct: true + is_dev: false + - name: helm + version: "3.17.1" + ecosystem: tool + license_spdx: Apache-2.0 + is_direct: true + is_dev: false + - name: k3s + version: unknown + ecosystem: other + license_spdx: Apache-2.0 + is_direct: true + is_dev: false +""") + +SBOM_TOOLS_YAML_MINIMAL = textwrap.dedent("""\ + tools: + - name: kubectl + ecosystem: tool +""") + + +def test_sbom_tools_parser_basic(tmp_path): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text(SBOM_TOOLS_YAML) + entries = sb._parse_sbom_tools_yaml(manifest) + assert len(entries) == 4 + names = {e["package_name"] for e in entries} + assert {"ansible", "terraform", "helm", "k3s"} == names + + +def test_sbom_tools_parser_ecosystems(tmp_path): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text(SBOM_TOOLS_YAML) + entries = sb._parse_sbom_tools_yaml(manifest) + by_name = {e["package_name"]: e for e in entries} + assert by_name["ansible"]["ecosystem"] == "ansible" + assert by_name["terraform"]["ecosystem"] == "terraform" + assert by_name["helm"]["ecosystem"] == "tool" + assert by_name["k3s"]["ecosystem"] == "other" + + +def test_sbom_tools_parser_licenses(tmp_path): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text(SBOM_TOOLS_YAML) + entries = sb._parse_sbom_tools_yaml(manifest) + by_name = {e["package_name"]: e for e in entries} + assert by_name["ansible"]["license_spdx"] == "GPL-3.0-only" + assert by_name["terraform"]["license_spdx"] == "BSL-1.1" + assert by_name["helm"]["license_spdx"] == "Apache-2.0" + + +def test_sbom_tools_parser_unknown_version_becomes_none(tmp_path, capsys): + """version: unknown must be converted to None and emit a warning.""" + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text(SBOM_TOOLS_YAML) + entries = sb._parse_sbom_tools_yaml(manifest) + by_name = {e["package_name"]: e for e in entries} + assert by_name["k3s"]["package_version"] is None + captured = capsys.readouterr() + assert "unknown" in captured.err + + +def test_sbom_tools_parser_minimal_entry(tmp_path): + """Only 'name' and 'ecosystem' required; version and license default to None.""" + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text(SBOM_TOOLS_YAML_MINIMAL) + entries = sb._parse_sbom_tools_yaml(manifest) + assert len(entries) == 1 + e = entries[0] + assert e["package_name"] == "kubectl" + assert e["ecosystem"] == "tool" + assert e["package_version"] is None + assert e["license_spdx"] is None + assert e["is_direct"] is True + assert e["is_dev"] is False + + +def test_sbom_tools_parser_invalid_ecosystem_falls_back(tmp_path, capsys): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text("tools:\n - name: foo\n ecosystem: nonsense\n") + entries = sb._parse_sbom_tools_yaml(manifest) + assert entries[0]["ecosystem"] == "tool" + captured = capsys.readouterr() + assert "Warning" in captured.err + + +def test_sbom_tools_parser_empty_tools(tmp_path): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text("tools: []\n") + entries = sb._parse_sbom_tools_yaml(manifest) + assert entries == [] + + +def test_sbom_tools_parser_invalid_yaml(tmp_path, capsys): + manifest = tmp_path / "sbom-tools.yaml" + manifest.write_text("tools: {bad yaml: [unclosed") + entries = sb._parse_sbom_tools_yaml(manifest) + assert entries == [] + captured = capsys.readouterr() + assert "Warning" in captured.err + + +# --------------------------------------------------------------------------- +# detect_all — comprehensive multi-parser scan +# --------------------------------------------------------------------------- + +def test_detect_all_uv_lock(tmp_path): + (tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n") + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + assert "uv.lock" in labels + + +def test_detect_all_terraform_lock(tmp_path): + tf_dir = tmp_path / "terraform" / "hetzner" + tf_dir.mkdir(parents=True) + (tf_dir / ".terraform.lock.hcl").write_text( + 'provider "registry.terraform.io/hetznercloud/hcloud" {\n version = "1.52.0"\n}\n' + ) + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + assert ".terraform.lock.hcl" in labels + + +def test_detect_all_ansible_requirements(tmp_path): + ansible_dir = tmp_path / "ansible" + ansible_dir.mkdir() + (ansible_dir / "requirements.yml").write_text("collections:\n - name: community.general\n") + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + assert "ansible/requirements.yml" in labels + + +def test_detect_all_sbom_tools_yaml(tmp_path): + (tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n") + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + assert "sbom-tools.yaml" in labels + + +def test_detect_all_multi_ecosystem(tmp_path): + """A repo with Python + Terraform + Ansible + tools manifest yields all four.""" + # Python + (tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n") + # Terraform + tf_dir = tmp_path / "terraform" + tf_dir.mkdir() + (tf_dir / ".terraform.lock.hcl").write_text( + 'provider "registry.terraform.io/hashicorp/null" {\n version = "3.2.3"\n}\n' + ) + # Ansible + ansible_dir = tmp_path / "ansible" + ansible_dir.mkdir() + (ansible_dir / "requirements.yml").write_text("collections:\n - name: ansible.posix\n version: \"1.6.0\"\n") + # Tool manifest + (tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n version: \"3.17.1\"\n") + + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + assert "uv.lock" in labels + assert ".terraform.lock.hcl" in labels + assert "ansible/requirements.yml" in labels + assert "sbom-tools.yaml" in labels + + # Parse all and verify merged entries + all_entries = [] + for path, label, parser_fn in sources: + all_entries.extend(parser_fn(path)) + + ecosystems = {e["ecosystem"] for e in all_entries} + assert "python" in ecosystems + assert "terraform" in ecosystems + assert "ansible" in ecosystems + assert "tool" in ecosystems + + +def test_detect_all_skips_venv(tmp_path): + """Lockfiles inside .venv must be ignored.""" + venv_dir = tmp_path / ".venv" / "lib" + venv_dir.mkdir(parents=True) + (venv_dir / "requirements.txt").write_text("requests==2.31.0\n") + sources = sb.detect_all(tmp_path) + paths = {str(p) for p, _, _ in sources} + assert not any(".venv" in p for p in paths) + + +def test_detect_all_ansible_req_only_in_ansible_dir(tmp_path): + """requirements.yml at repo root (not in ansible/) should not be picked up as ansible.""" + (tmp_path / "requirements.yml").write_text("collections:\n - name: community.general\n") + sources = sb.detect_all(tmp_path) + labels = {label for _, label, _ in sources} + # Should NOT be detected since it's not under an 'ansible/' directory + assert "ansible/requirements.yml" not in labels + assert "ansible/requirements.yaml" not in labels + + +def test_detect_all_no_duplicates(tmp_path): + """Same file should not appear twice.""" + (tmp_path / "uv.lock").write_text("[[package]]\nname = \"x\"\nversion = \"1.0\"\n") + sources = sb.detect_all(tmp_path) + paths = [p for p, _, _ in sources] + assert len(paths) == len(set(paths)) + + +def test_detect_all_empty_repo(tmp_path): + sources = sb.detect_all(tmp_path) + assert sources == []