"""Unit tests for ingest_sbom.py parsers and auto-detection.""" from __future__ import annotations import json import sys import textwrap from pathlib import Path import pytest # Make scripts/ importable sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) import ingest_sbom as sb # --------------------------------------------------------------------------- # Terraform parser # --------------------------------------------------------------------------- TERRAFORM_LOCK = textwrap.dedent("""\ provider "registry.terraform.io/hashicorp/template" { version = "2.2.0" constraints = ">= 2.0.0" hashes = [ "h1:abc123", ] } provider "registry.terraform.io/hetznercloud/hcloud" { version = "1.52.0" constraints = ">= 1.40.0" } """) def test_terraform_parser_ecosystem(tmp_path): lock = tmp_path / ".terraform.lock.hcl" lock.write_text(TERRAFORM_LOCK) entries = sb._parse_terraform_lock_hcl(lock) assert len(entries) == 2 for e in entries: assert e["ecosystem"] == "terraform", f"expected terraform, got {e['ecosystem']}" names = {e["package_name"] for e in entries} assert "registry.terraform.io/hashicorp/template" in names assert "registry.terraform.io/hetznercloud/hcloud" in names def test_terraform_parser_versions(tmp_path): lock = tmp_path / ".terraform.lock.hcl" lock.write_text(TERRAFORM_LOCK) entries = sb._parse_terraform_lock_hcl(lock) by_name = {e["package_name"]: e for e in entries} assert by_name["registry.terraform.io/hashicorp/template"]["package_version"] == "2.2.0" assert by_name["registry.terraform.io/hetznercloud/hcloud"]["package_version"] == "1.52.0" def test_terraform_parser_is_direct(tmp_path): lock = tmp_path / ".terraform.lock.hcl" lock.write_text(TERRAFORM_LOCK) entries = sb._parse_terraform_lock_hcl(lock) assert all(e["is_direct"] for e in entries) def test_terraform_parser_empty(tmp_path): lock = tmp_path / ".terraform.lock.hcl" lock.write_text("# no providers\n") entries = sb._parse_terraform_lock_hcl(lock) assert entries == [] # --------------------------------------------------------------------------- # Ansible Galaxy parser # --------------------------------------------------------------------------- ANSIBLE_REQUIREMENTS_FULL = textwrap.dedent("""\ collections: - name: community.general version: "9.5.0" - name: ansible.posix version: "1.6.0" - community.crypto roles: - name: geerlingguy.docker version: "6.1.0" - geerlingguy.pip """) ANSIBLE_REQUIREMENTS_EMPTY = textwrap.dedent("""\ collections: [] roles: [] """) ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY = textwrap.dedent("""\ collections: - name: community.general version: "9.0.0" """) def test_ansible_parser_collections_and_roles(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_FULL) entries = sb._parse_ansible_requirements(req) assert len(entries) == 5 names = {e["package_name"] for e in entries} assert "community.general" in names assert "ansible.posix" in names assert "community.crypto" in names assert "geerlingguy.docker" in names assert "geerlingguy.pip" in names def test_ansible_parser_ecosystem(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_FULL) entries = sb._parse_ansible_requirements(req) for e in entries: assert e["ecosystem"] == "ansible" def test_ansible_parser_versions(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_FULL) entries = sb._parse_ansible_requirements(req) by_name = {e["package_name"]: e for e in entries} assert by_name["community.general"]["package_version"] == "9.5.0" assert by_name["ansible.posix"]["package_version"] == "1.6.0" assert by_name["community.crypto"]["package_version"] is None # no version specified assert by_name["geerlingguy.docker"]["package_version"] == "6.1.0" assert by_name["geerlingguy.pip"]["package_version"] is None def test_ansible_parser_is_direct(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_FULL) entries = sb._parse_ansible_requirements(req) assert all(e["is_direct"] for e in entries) def test_ansible_parser_empty(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_EMPTY) entries = sb._parse_ansible_requirements(req) assert entries == [] def test_ansible_parser_collections_only(tmp_path): req = tmp_path / "requirements.yml" req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY) entries = sb._parse_ansible_requirements(req) assert len(entries) == 1 assert entries[0]["package_name"] == "community.general" def test_ansible_parser_yaml_extension(tmp_path): """Both .yml and .yaml extensions must work.""" req = tmp_path / "requirements.yaml" req.write_text(ANSIBLE_REQUIREMENTS_COLLECTIONS_ONLY) entries = sb._parse_ansible_requirements(req) assert len(entries) == 1 def test_ansible_parser_invalid_yaml(tmp_path, capsys): req = tmp_path / "requirements.yml" req.write_text("collections: [unclosed") entries = sb._parse_ansible_requirements(req) assert entries == [] captured = capsys.readouterr() assert "Warning" in captured.err # --------------------------------------------------------------------------- # sbom-tools.yaml parser # --------------------------------------------------------------------------- SBOM_TOOLS_YAML = textwrap.dedent("""\ tools: - name: ansible version: "12.3.0" ecosystem: ansible license_spdx: GPL-3.0-only is_direct: true is_dev: false - name: terraform version: "1.10.5" ecosystem: terraform license_spdx: BSL-1.1 is_direct: true is_dev: false - name: helm version: "3.17.1" ecosystem: tool license_spdx: Apache-2.0 is_direct: true is_dev: false - name: k3s version: unknown ecosystem: other license_spdx: Apache-2.0 is_direct: true is_dev: false """) SBOM_TOOLS_YAML_MINIMAL = textwrap.dedent("""\ tools: - name: kubectl ecosystem: tool """) def test_sbom_tools_parser_basic(tmp_path): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text(SBOM_TOOLS_YAML) entries = sb._parse_sbom_tools_yaml(manifest) assert len(entries) == 4 names = {e["package_name"] for e in entries} assert {"ansible", "terraform", "helm", "k3s"} == names def test_sbom_tools_parser_ecosystems(tmp_path): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text(SBOM_TOOLS_YAML) entries = sb._parse_sbom_tools_yaml(manifest) by_name = {e["package_name"]: e for e in entries} assert by_name["ansible"]["ecosystem"] == "ansible" assert by_name["terraform"]["ecosystem"] == "terraform" assert by_name["helm"]["ecosystem"] == "tool" assert by_name["k3s"]["ecosystem"] == "other" def test_sbom_tools_parser_licenses(tmp_path): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text(SBOM_TOOLS_YAML) entries = sb._parse_sbom_tools_yaml(manifest) by_name = {e["package_name"]: e for e in entries} assert by_name["ansible"]["license_spdx"] == "GPL-3.0-only" assert by_name["terraform"]["license_spdx"] == "BSL-1.1" assert by_name["helm"]["license_spdx"] == "Apache-2.0" def test_sbom_tools_parser_unknown_version_becomes_none(tmp_path, capsys): """version: unknown must be converted to None and emit a warning.""" manifest = tmp_path / "sbom-tools.yaml" manifest.write_text(SBOM_TOOLS_YAML) entries = sb._parse_sbom_tools_yaml(manifest) by_name = {e["package_name"]: e for e in entries} assert by_name["k3s"]["package_version"] is None captured = capsys.readouterr() assert "unknown" in captured.err def test_sbom_tools_parser_minimal_entry(tmp_path): """Only 'name' and 'ecosystem' required; version and license default to None.""" manifest = tmp_path / "sbom-tools.yaml" manifest.write_text(SBOM_TOOLS_YAML_MINIMAL) entries = sb._parse_sbom_tools_yaml(manifest) assert len(entries) == 1 e = entries[0] assert e["package_name"] == "kubectl" assert e["ecosystem"] == "tool" assert e["package_version"] is None assert e["license_spdx"] is None assert e["is_direct"] is True assert e["is_dev"] is False def test_sbom_tools_parser_invalid_ecosystem_falls_back(tmp_path, capsys): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text("tools:\n - name: foo\n ecosystem: nonsense\n") entries = sb._parse_sbom_tools_yaml(manifest) assert entries[0]["ecosystem"] == "tool" captured = capsys.readouterr() assert "Warning" in captured.err def test_sbom_tools_parser_empty_tools(tmp_path): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text("tools: []\n") entries = sb._parse_sbom_tools_yaml(manifest) assert entries == [] def test_sbom_tools_parser_invalid_yaml(tmp_path, capsys): manifest = tmp_path / "sbom-tools.yaml" manifest.write_text("tools: {bad yaml: [unclosed") entries = sb._parse_sbom_tools_yaml(manifest) assert entries == [] captured = capsys.readouterr() assert "Warning" in captured.err # --------------------------------------------------------------------------- # detect_all — comprehensive multi-parser scan # --------------------------------------------------------------------------- def test_detect_all_uv_lock(tmp_path): (tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n") sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} assert "uv.lock" in labels def test_detect_all_terraform_lock(tmp_path): tf_dir = tmp_path / "terraform" / "hetzner" tf_dir.mkdir(parents=True) (tf_dir / ".terraform.lock.hcl").write_text( 'provider "registry.terraform.io/hetznercloud/hcloud" {\n version = "1.52.0"\n}\n' ) sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} assert ".terraform.lock.hcl" in labels def test_detect_all_ansible_requirements(tmp_path): ansible_dir = tmp_path / "ansible" ansible_dir.mkdir() (ansible_dir / "requirements.yml").write_text("collections:\n - name: community.general\n") sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} assert "ansible/requirements.yml" in labels def test_detect_all_sbom_tools_yaml(tmp_path): (tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n") sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} assert "sbom-tools.yaml" in labels def test_detect_all_multi_ecosystem(tmp_path): """A repo with Python + Terraform + Ansible + tools manifest yields all four.""" # Python (tmp_path / "uv.lock").write_text("[[package]]\nname = \"typer\"\nversion = \"0.12.0\"\n") # Terraform tf_dir = tmp_path / "terraform" tf_dir.mkdir() (tf_dir / ".terraform.lock.hcl").write_text( 'provider "registry.terraform.io/hashicorp/null" {\n version = "3.2.3"\n}\n' ) # Ansible ansible_dir = tmp_path / "ansible" ansible_dir.mkdir() (ansible_dir / "requirements.yml").write_text("collections:\n - name: ansible.posix\n version: \"1.6.0\"\n") # Tool manifest (tmp_path / "sbom-tools.yaml").write_text("tools:\n - name: helm\n ecosystem: tool\n version: \"3.17.1\"\n") sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} assert "uv.lock" in labels assert ".terraform.lock.hcl" in labels assert "ansible/requirements.yml" in labels assert "sbom-tools.yaml" in labels # Parse all and verify merged entries all_entries = [] for path, label, parser_fn in sources: all_entries.extend(parser_fn(path)) ecosystems = {e["ecosystem"] for e in all_entries} assert "python" in ecosystems assert "terraform" in ecosystems assert "ansible" in ecosystems assert "tool" in ecosystems def test_detect_all_skips_venv(tmp_path): """Lockfiles inside .venv must be ignored.""" venv_dir = tmp_path / ".venv" / "lib" venv_dir.mkdir(parents=True) (venv_dir / "requirements.txt").write_text("requests==2.31.0\n") sources = sb.detect_all(tmp_path) paths = {str(p) for p, _, _ in sources} assert not any(".venv" in p for p in paths) def test_detect_all_ansible_req_only_in_ansible_dir(tmp_path): """requirements.yml at repo root (not in ansible/) should not be picked up as ansible.""" (tmp_path / "requirements.yml").write_text("collections:\n - name: community.general\n") sources = sb.detect_all(tmp_path) labels = {label for _, label, _ in sources} # Should NOT be detected since it's not under an 'ansible/' directory assert "ansible/requirements.yml" not in labels assert "ansible/requirements.yaml" not in labels def test_detect_all_no_duplicates(tmp_path): """Same file should not appear twice.""" (tmp_path / "uv.lock").write_text("[[package]]\nname = \"x\"\nversion = \"1.0\"\n") sources = sb.detect_all(tmp_path) paths = [p for p, _, _ in sources] assert len(paths) == len(set(paths)) def test_detect_all_empty_repo(tmp_path): sources = sb.detect_all(tmp_path) assert sources == []