Files
the-custodian/state-hub/tests/test_ingest_sbom.py
tegwick 1c94f5545c feat(sbom): CUST-WP-0013 — expand SBOM infra to terraform, ansible, and tool manifests
- Migration d6e7f8a9b0c1: add terraform, ansible, tool to Ecosystem enum
- ingest_sbom.py: new Ansible Galaxy requirements.yml parser (collections + roles)
- ingest_sbom.py: new sbom-tools.yaml manifest parser (agent-generated tool deps)
- ingest_sbom.py: promote .terraform.lock.hcl parser from ecosystem=other → terraform
- ingest_sbom.py: detect_all() runs all four parsers in one comprehensive scan
- capture_sbom_tools.py: agent-assisted tool manifest generator (claude -p)
- prompts/sbom-capture-agent.md: parameterised prompt for repo tool discovery
- Makefile: capture-tools target; ingest-sbom updated docs and DRY_RUN support
- 29 unit tests covering all new parsers and detect_all() behaviour
- canon/standards/sbom-convention_v0.1.md: updated with four-mechanism model and workflow

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 04:40:26 +01:00

398 lines
13 KiB
Python

"""Unit tests for ingest_sbom.py parsers and auto-detection."""
from __future__ import annotations
import json
import sys
import textwrap
from pathlib import Path
import pytest
# Make scripts/ importable
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
import ingest_sbom as sb
# ---------------------------------------------------------------------------
# Terraform parser
# ---------------------------------------------------------------------------
TERRAFORM_LOCK = textwrap.dedent("""\
provider "registry.terraform.io/hashicorp/template" {
version = "2.2.0"
constraints = ">= 2.0.0"
hashes = [
"h1:abc123",
]
}
provider "registry.terraform.io/hetznercloud/hcloud" {
version = "1.52.0"
constraints = ">= 1.40.0"
}
""")
def test_terraform_parser_ecosystem(tmp_path):
lock = tmp_path / ".terraform.lock.hcl"
lock.write_text(TERRAFORM_LOCK)
entries = sb._parse_terraform_lock_hcl(lock)
assert len(entries) == 2
for e in entries:
assert e["ecosystem"] == "terraform", f"expected terraform, got {e['ecosystem']}"
names = {e["package_name"] for e in entries}
assert "registry.terraform.io/hashicorp/template" in names
assert "registry.terraform.io/hetznercloud/hcloud" in names
def test_terraform_parser_versions(tmp_path):
lock = tmp_path / ".terraform.lock.hcl"
lock.write_text(TERRAFORM_LOCK)
entries = sb._parse_terraform_lock_hcl(lock)
by_name = {e["package_name"]: e for e in entries}
assert by_name["registry.terraform.io/hashicorp/template"]["package_version"] == "2.2.0"
assert by_name["registry.terraform.io/hetznercloud/hcloud"]["package_version"] == "1.52.0"
def test_terraform_parser_is_direct(tmp_path):
lock = tmp_path / ".terraform.lock.hcl"
lock.write_text(TERRAFORM_LOCK)
entries = sb._parse_terraform_lock_hcl(lock)
assert all(e["is_direct"] for e in entries)
def test_terraform_parser_empty(tmp_path):
lock = tmp_path / ".terraform.lock.hcl"
lock.write_text("# no providers\n")
entries = sb._parse_terraform_lock_hcl(lock)
assert entries == []
# ---------------------------------------------------------------------------
# Ansible Galaxy parser
# ---------------------------------------------------------------------------
ANSIBLE_REQUIREMENTS_FULL = textwrap.dedent("""\
collections:
- name: community.general
version: "9.5.0"
- name: ansible.posix
version: "1.6.0"
- community.crypto
roles:
- name: geerlingguy.docker
version: "6.1.0"
- geerlingguy.pip
""")
ANSIBLE_REQUIREMENTS_EMPTY = textwrap.dedent("""\
collections: []
roles: []
""")
ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY = textwrap.dedent("""\
collections:
- name: community.general
version: "9.0.0"
""")
def test_ansible_parser_collections_and_roles(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_FULL)
entries = sb._parse_ansible_requirements(req)
assert len(entries) == 5
names = {e["package_name"] for e in entries}
assert "community.general" in names
assert "ansible.posix" in names
assert "community.crypto" in names
assert "geerlingguy.docker" in names
assert "geerlingguy.pip" in names
def test_ansible_parser_ecosystem(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_FULL)
entries = sb._parse_ansible_requirements(req)
for e in entries:
assert e["ecosystem"] == "ansible"
def test_ansible_parser_versions(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_FULL)
entries = sb._parse_ansible_requirements(req)
by_name = {e["package_name"]: e for e in entries}
assert by_name["community.general"]["package_version"] == "9.5.0"
assert by_name["ansible.posix"]["package_version"] == "1.6.0"
assert by_name["community.crypto"]["package_version"] is None # no version specified
assert by_name["geerlingguy.docker"]["package_version"] == "6.1.0"
assert by_name["geerlingguy.pip"]["package_version"] is None
def test_ansible_parser_is_direct(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_FULL)
entries = sb._parse_ansible_requirements(req)
assert all(e["is_direct"] for e in entries)
def test_ansible_parser_empty(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_EMPTY)
entries = sb._parse_ansible_requirements(req)
assert entries == []
def test_ansible_parser_collections_only(tmp_path):
req = tmp_path / "requirements.yml"
req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY)
entries = sb._parse_ansible_requirements(req)
assert len(entries) == 1
assert entries[0]["package_name"] == "community.general"
def test_ansible_parser_yaml_extension(tmp_path):
"""Both .yml and .yaml extensions must work."""
req = tmp_path / "requirements.yaml"
req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY)
entries = sb._parse_ansible_requirements(req)
assert len(entries) == 1
def test_ansible_parser_invalid_yaml(tmp_path, capsys):
req = tmp_path / "requirements.yml"
req.write_text("collections: [unclosed")
entries = sb._parse_ansible_requirements(req)
assert entries == []
captured = capsys.readouterr()
assert "Warning" in captured.err
# ---------------------------------------------------------------------------
# sbom-tools.yaml parser
# ---------------------------------------------------------------------------
SBOM_TOOLS_YAML = textwrap.dedent("""\
tools:
- name: ansible
version: "12.3.0"
ecosystem: ansible
license_spdx: GPL-3.0-only
is_direct: true
is_dev: false
- name: terraform
version: "1.10.5"
ecosystem: terraform
license_spdx: BSL-1.1
is_direct: true
is_dev: false
- name: helm
version: "3.17.1"
ecosystem: tool
license_spdx: Apache-2.0
is_direct: true
is_dev: false
- name: k3s
version: unknown
ecosystem: other
license_spdx: Apache-2.0
is_direct: true
is_dev: false
""")
SBOM_TOOLS_YAML_MINIMAL = textwrap.dedent("""\
tools:
- name: kubectl
ecosystem: tool
""")
def test_sbom_tools_parser_basic(tmp_path):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text(SBOM_TOOLS_YAML)
entries = sb._parse_sbom_tools_yaml(manifest)
assert len(entries) == 4
names = {e["package_name"] for e in entries}
assert {"ansible", "terraform", "helm", "k3s"} == names
def test_sbom_tools_parser_ecosystems(tmp_path):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text(SBOM_TOOLS_YAML)
entries = sb._parse_sbom_tools_yaml(manifest)
by_name = {e["package_name"]: e for e in entries}
assert by_name["ansible"]["ecosystem"] == "ansible"
assert by_name["terraform"]["ecosystem"] == "terraform"
assert by_name["helm"]["ecosystem"] == "tool"
assert by_name["k3s"]["ecosystem"] == "other"
def test_sbom_tools_parser_licenses(tmp_path):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text(SBOM_TOOLS_YAML)
entries = sb._parse_sbom_tools_yaml(manifest)
by_name = {e["package_name"]: e for e in entries}
assert by_name["ansible"]["license_spdx"] == "GPL-3.0-only"
assert by_name["terraform"]["license_spdx"] == "BSL-1.1"
assert by_name["helm"]["license_spdx"] == "Apache-2.0"
def test_sbom_tools_parser_unknown_version_becomes_none(tmp_path, capsys):
"""version: unknown must be converted to None and emit a warning."""
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text(SBOM_TOOLS_YAML)
entries = sb._parse_sbom_tools_yaml(manifest)
by_name = {e["package_name"]: e for e in entries}
assert by_name["k3s"]["package_version"] is None
captured = capsys.readouterr()
assert "unknown" in captured.err
def test_sbom_tools_parser_minimal_entry(tmp_path):
"""Only 'name' and 'ecosystem' required; version and license default to None."""
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text(SBOM_TOOLS_YAML_MINIMAL)
entries = sb._parse_sbom_tools_yaml(manifest)
assert len(entries) == 1
e = entries[0]
assert e["package_name"] == "kubectl"
assert e["ecosystem"] == "tool"
assert e["package_version"] is None
assert e["license_spdx"] is None
assert e["is_direct"] is True
assert e["is_dev"] is False
def test_sbom_tools_parser_invalid_ecosystem_falls_back(tmp_path, capsys):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text("tools:\n - name: foo\n ecosystem: nonsense\n")
entries = sb._parse_sbom_tools_yaml(manifest)
assert entries[0]["ecosystem"] == "tool"
captured = capsys.readouterr()
assert "Warning" in captured.err
def test_sbom_tools_parser_empty_tools(tmp_path):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text("tools: []\n")
entries = sb._parse_sbom_tools_yaml(manifest)
assert entries == []
def test_sbom_tools_parser_invalid_yaml(tmp_path, capsys):
manifest = tmp_path / "sbom-tools.yaml"
manifest.write_text("tools: {bad yaml: [unclosed")
entries = sb._parse_sbom_tools_yaml(manifest)
assert entries == []
captured = capsys.readouterr()
assert "Warning" in captured.err
# ---------------------------------------------------------------------------
# detect_all — comprehensive multi-parser scan
# ---------------------------------------------------------------------------
def test_detect_all_uv_lock(tmp_path):
(tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n")
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
assert "uv.lock" in labels
def test_detect_all_terraform_lock(tmp_path):
tf_dir = tmp_path / "terraform" / "hetzner"
tf_dir.mkdir(parents=True)
(tf_dir / ".terraform.lock.hcl").write_text(
'provider "registry.terraform.io/hetznercloud/hcloud" {\n version = "1.52.0"\n}\n'
)
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
assert ".terraform.lock.hcl" in labels
def test_detect_all_ansible_requirements(tmp_path):
ansible_dir = tmp_path / "ansible"
ansible_dir.mkdir()
(ansible_dir / "requirements.yml").write_text("collections:\n - name: community.general\n")
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
assert "ansible/requirements.yml" in labels
def test_detect_all_sbom_tools_yaml(tmp_path):
(tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n")
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
assert "sbom-tools.yaml" in labels
def test_detect_all_multi_ecosystem(tmp_path):
"""A repo with Python + Terraform + Ansible + tools manifest yields all four."""
# Python
(tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n")
# Terraform
tf_dir = tmp_path / "terraform"
tf_dir.mkdir()
(tf_dir / ".terraform.lock.hcl").write_text(
'provider "registry.terraform.io/hashicorp/null" {\n version = "3.2.3"\n}\n'
)
# Ansible
ansible_dir = tmp_path / "ansible"
ansible_dir.mkdir()
(ansible_dir / "requirements.yml").write_text("collections:\n - name: ansible.posix\n version: \"1.6.0\"\n")
# Tool manifest
(tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n version: \"3.17.1\"\n")
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
assert "uv.lock" in labels
assert ".terraform.lock.hcl" in labels
assert "ansible/requirements.yml" in labels
assert "sbom-tools.yaml" in labels
# Parse all and verify merged entries
all_entries = []
for path, label, parser_fn in sources:
all_entries.extend(parser_fn(path))
ecosystems = {e["ecosystem"] for e in all_entries}
assert "python" in ecosystems
assert "terraform" in ecosystems
assert "ansible" in ecosystems
assert "tool" in ecosystems
def test_detect_all_skips_venv(tmp_path):
"""Lockfiles inside .venv must be ignored."""
venv_dir = tmp_path / ".venv" / "lib"
venv_dir.mkdir(parents=True)
(venv_dir / "requirements.txt").write_text("requests==2.31.0\n")
sources = sb.detect_all(tmp_path)
paths = {str(p) for p, _, _ in sources}
assert not any(".venv" in p for p in paths)
def test_detect_all_ansible_req_only_in_ansible_dir(tmp_path):
"""requirements.yml at repo root (not in ansible/) should not be picked up as ansible."""
(tmp_path / "requirements.yml").write_text("collections:\n - name: community.general\n")
sources = sb.detect_all(tmp_path)
labels = {label for _, label, _ in sources}
# Should NOT be detected since it's not under an 'ansible/' directory
assert "ansible/requirements.yml" not in labels
assert "ansible/requirements.yaml" not in labels
def test_detect_all_no_duplicates(tmp_path):
"""Same file should not appear twice."""
(tmp_path / "uv.lock").write_text("[[package]]\nname = \"x\"\nversion = \"1.0\"\n")
sources = sb.detect_all(tmp_path)
paths = [p for p, _, _ in sources]
assert len(paths) == len(set(paths))
def test_detect_all_empty_repo(tmp_path):
sources = sb.detect_all(tmp_path)
assert sources == []