#!/usr/bin/env python3 """Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store. Usage: python ingest_sbom.py --repo [--repo-path ] [--dry-run] Auto-detects all of the following in one scan: uv.lock → python requirements.txt → python package-lock.json → node yarn.lock → node Cargo.lock → rust go.sum → go (reads go.mod alongside for direct/indirect) .terraform.lock.hcl → terraform (anywhere in tree) ansible/requirements.yml → ansible (anywhere under ansible/ dirs) ansible/requirements.yaml → ansible sbom-tools.yaml → tool (repo root; agent-generated) """ from __future__ import annotations import argparse import json import os import re import socket import sys import urllib.error import urllib.request from pathlib import Path try: import yaml # optional; only needed for sbom-tools.yaml and ansible parsers _YAML_AVAILABLE = True except ImportError: _YAML_AVAILABLE = False API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") # --------------------------------------------------------------------------- # Lockfile parsers — each returns list[dict] # --------------------------------------------------------------------------- def _parse_uv_lock(path: Path) -> list[dict]: """Parse uv.lock TOML format (v0.1 — [[package]] blocks).""" entries = [] current: dict | None = None for line in path.read_text().splitlines(): stripped = line.strip() if stripped == "[[package]]": if current: entries.append(current) current = {} elif current is not None: if stripped.startswith("name = "): current["package_name"] = stripped.split("=", 1)[1].strip().strip('"') elif stripped.startswith("version = "): current["package_version"] = stripped.split("=", 1)[1].strip().strip('"') if current: entries.append(current) return [ { "package_name": e.get("package_name", "unknown"), "package_version": e.get("package_version"), "ecosystem": "python", "license_spdx": None, "is_direct": False, "is_dev": False, } for e in entries if "package_name" in e ] def _parse_requirements_txt(path: Path) -> list[dict]: """Parse requirements.txt (basic — name==version lines).""" entries = [] for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or line.startswith("-"): continue m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>= list[dict]: """Parse package-lock.json (npm) — packages dict.""" try: data = json.loads(path.read_text()) except json.JSONDecodeError as e: print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) return [] packages = data.get("packages", {}) entries = [] for pkg_path, info in packages.items(): if not pkg_path: continue name = info.get("name") or pkg_path.split("node_modules/")[-1] entries.append({ "package_name": name, "package_version": info.get("version"), "ecosystem": "node", "license_spdx": info.get("license"), "is_direct": not info.get("indirect", False), "is_dev": bool(info.get("dev", False)), }) return entries def _parse_yarn_lock(path: Path) -> list[dict]: """Parse yarn.lock — basic name extraction.""" entries = [] current_names: list[str] = [] current_version: str | None = None for line in path.read_text().splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if not line.startswith(" ") and stripped.endswith(":"): current_names = [] current_version = None for part in stripped.rstrip(":").split(","): m = re.match(r'"?([^@"]+)@', part.strip()) if m: current_names.append(m.group(1).strip()) elif stripped.startswith("version "): current_version = stripped.split('"')[1] if '"' in stripped else None elif not stripped and current_names and current_version: for name in current_names: entries.append({ "package_name": name, "package_version": current_version, "ecosystem": "node", "license_spdx": None, "is_direct": False, "is_dev": False, }) current_names = [] current_version = None return entries def _parse_cargo_lock(path: Path) -> list[dict]: """Parse Cargo.lock TOML format ([[package]] blocks).""" entries = [] current: dict | None = None for line in path.read_text().splitlines(): stripped = line.strip() if stripped == "[[package]]": if current: entries.append(current) current = {} elif current is not None: if stripped.startswith("name = "): current["package_name"] = stripped.split("=", 1)[1].strip().strip('"') elif stripped.startswith("version = "): current["package_version"] = stripped.split("=", 1)[1].strip().strip('"') if current: entries.append(current) return [ { "package_name": e.get("package_name", "unknown"), "package_version": e.get("package_version"), "ecosystem": "rust", "license_spdx": None, "is_direct": False, "is_dev": False, } for e in entries if "package_name" in e ] def _parse_terraform_lock_hcl(path: Path) -> list[dict]: """Parse .terraform.lock.hcl — extract Terraform provider name + version.""" entries = [] current_name: str | None = None current_version: str | None = None for line in path.read_text().splitlines(): stripped = line.strip() m = re.match(r'^provider\s+"([^"]+)"\s*\{', stripped) if m: full = m.group(1) current_name = full current_version = None elif current_name is not None: vm = re.match(r'version\s*=\s*"([^"]+)"', stripped) if vm: current_version = vm.group(1) elif stripped == "}": entries.append({ "package_name": current_name, "package_version": current_version, "ecosystem": "terraform", "license_spdx": None, "is_direct": True, "is_dev": False, }) current_name = None current_version = None return entries def _parse_ansible_requirements(path: Path) -> list[dict]: """Parse ansible/requirements.yml — collections and roles from Ansible Galaxy.""" if not _YAML_AVAILABLE: print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr) return [] try: data = yaml.safe_load(path.read_text()) except yaml.YAMLError as e: print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) return [] if not isinstance(data, dict): return [] entries = [] for item in data.get("collections", []) or []: if isinstance(item, str): name, version = item, None elif isinstance(item, dict): name = item.get("name", "") version = str(item.get("version", "")) if item.get("version") else None else: continue if name: entries.append({ "package_name": name, "package_version": version, "ecosystem": "ansible", "license_spdx": None, "is_direct": True, "is_dev": False, }) for item in data.get("roles", []) or []: if isinstance(item, str): name, version = item, None elif isinstance(item, dict): name = item.get("name", item.get("src", "")) version = str(item.get("version", "")) if item.get("version") else None else: continue if name: entries.append({ "package_name": name, "package_version": version, "ecosystem": "ansible", "license_spdx": None, "is_direct": True, "is_dev": False, }) return entries def _parse_go_sum(path: Path) -> list[dict]: """Parse go.sum — deduplicated Go module list with direct/indirect from go.mod.""" # Determine direct deps by reading go.mod in the same directory direct: set[str] = set() go_mod = path.parent / "go.mod" if go_mod.exists(): in_require = False for line in go_mod.read_text().splitlines(): stripped = line.strip() if stripped.startswith("require ("): in_require = True continue if in_require and stripped == ")": in_require = False continue if in_require and stripped and not stripped.startswith("//"): if "// indirect" not in stripped: parts = stripped.split() if parts: direct.add(parts[0]) # single-line require without parens elif stripped.startswith("require ") and "(" not in stripped: rest = stripped[len("require "):].strip() if "// indirect" not in rest: parts = rest.split() if parts: direct.add(parts[0]) seen: set[tuple[str, str | None]] = set() entries = [] for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("//"): continue parts = line.split() if len(parts) < 3: continue module, version = parts[0], parts[1] # Skip go.mod hash lines — only ingest the module itself if "/go.mod" in version: continue key = (module, version) if key in seen: continue seen.add(key) entries.append({ "package_name": module, "package_version": version, "ecosystem": "go", "license_spdx": None, "is_direct": module in direct, "is_dev": False, }) return entries def _parse_sbom_tools_yaml(path: Path) -> list[dict]: """Parse sbom-tools.yaml — agent-generated tool manifest at repo root.""" if not _YAML_AVAILABLE: print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr) return [] try: data = yaml.safe_load(path.read_text()) except yaml.YAMLError as e: print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) return [] if not isinstance(data, dict): return [] entries = [] valid_ecosystems = { "python", "node", "rust", "go", "java", "terraform", "ansible", "tool", "other", } for item in data.get("tools", []) or []: if not isinstance(item, dict): continue name = item.get("name", "") version = str(item.get("version", "")) if item.get("version") else None if version == "unknown": print(f" Warning: tool '{name}' has version=unknown — flagged for review", file=sys.stderr) version = None ecosystem = item.get("ecosystem", "tool") if ecosystem not in valid_ecosystems: print(f" Warning: unknown ecosystem '{ecosystem}' for '{name}'; using 'tool'", file=sys.stderr) ecosystem = "tool" license_spdx = item.get("license_spdx") or None entries.append({ "package_name": name, "package_version": version, "ecosystem": ecosystem, "license_spdx": license_spdx, "is_direct": bool(item.get("is_direct", True)), "is_dev": bool(item.get("is_dev", False)), }) return entries # --------------------------------------------------------------------------- # Detection helpers # --------------------------------------------------------------------------- # Filename → parser for standard lockfiles (detected by filename anywhere in tree) _LOCKFILE_PARSERS: dict[str, object] = { "uv.lock": _parse_uv_lock, "requirements.txt": _parse_requirements_txt, "package-lock.json": _parse_package_lock_json, "yarn.lock": _parse_yarn_lock, "Cargo.lock": _parse_cargo_lock, ".terraform.lock.hcl": _parse_terraform_lock_hcl, "go.sum": _parse_go_sum, } # Directories that never contain project-level lockfiles _SKIP_DIRS = { ".git", ".hg", ".svn", ".venv", "venv", ".env", "node_modules", "__pycache__", ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", ".build", "target", ".tox", ".nox", } def detect_all(repo_path: Path) -> list[tuple[Path, str, object]]: """Scan repo_path and return all discovered dependency sources. Returns list of (path, label, parser_fn) tuples covering: - Standard lockfiles (anywhere in tree) - Ansible requirements files (in ansible/ subdirs) - sbom-tools.yaml at repo root """ found: list[tuple[Path, str, object]] = [] seen_paths: set[Path] = set() # Walk tree for all source types for dirpath, dirnames, filenames in os.walk(repo_path): dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS) dirpath_p = Path(dirpath) # Standard lockfiles for fname, parser in _LOCKFILE_PARSERS.items(): if fname in filenames: p = dirpath_p / fname if p not in seen_paths: found.append((p, fname, parser)) seen_paths.add(p) # Ansible requirements files — only under directories named "ansible" if dirpath_p.name == "ansible": for fname in ("requirements.yml", "requirements.yaml"): if fname in filenames: p = dirpath_p / fname if p not in seen_paths: found.append((p, f"ansible/{fname}", _parse_ansible_requirements)) seen_paths.add(p) # sbom-tools.yaml at repo root only tools_manifest = repo_path / "sbom-tools.yaml" if tools_manifest.exists() and tools_manifest not in seen_paths: found.append((tools_manifest, "sbom-tools.yaml", _parse_sbom_tools_yaml)) return found def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None: """Return (lockfile_path, filename) for the first recognised lockfile at repo root.""" for name in _LOCKFILE_PARSERS: candidate = repo_path / name if candidate.exists(): return candidate, name return None def detect_lockfiles_recursive(repo_path: Path) -> list[Path]: """Walk repo_path and return all recognised lockfiles, skipping non-dep dirs. Kept for backwards compatibility; prefer detect_all() for new code. """ found: list[Path] = [] for dirpath, dirnames, filenames in os.walk(repo_path): dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS) for name in _LOCKFILE_PARSERS: if name in filenames: found.append(Path(dirpath) / name) return found def parse_lockfile(lockfile_path: Path) -> list[dict]: filename = lockfile_path.name parser = _LOCKFILE_PARSERS.get(filename) if parser is None: print(f"Error: unsupported lockfile type '{filename}'", file=sys.stderr) sys.exit(1) return parser(lockfile_path) # --------------------------------------------------------------------------- # API submission # --------------------------------------------------------------------------- def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict: payload = json.dumps({"repo_slug": repo_slug, "entries": entries}).encode() req = urllib.request.Request( f"{api_base}/sbom/ingest/", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") print(f"HTTP {e.code} from API: {body}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"API unreachable: {e}", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def _resolve_repo_path_from_hub(api_base: str, repo_slug: str) -> Path | None: """Query the hub for this host's registered path for repo_slug.""" try: url = f"{api_base}/repos/{repo_slug}" with urllib.request.urlopen(url) as resp: data = json.loads(resp.read()) hostname = socket.gethostname() host_paths = data.get("host_paths", {}) if hostname in host_paths: p = Path(host_paths[hostname]) if p.exists(): return p except Exception: pass return None def main() -> None: parser = argparse.ArgumentParser( description="Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store." ) parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')") parser.add_argument("--lockfile", action="append", dest="lockfiles", metavar="PATH", help="Path to a specific lockfile (repeatable)") parser.add_argument("--repo-path", default=None, help="Repo root for auto-detection/scan (default: resolved from hub host_paths)") parser.add_argument("--scan", action="store_true", help="Recursively find ALL lockfiles under --repo-path (deprecated; now default behaviour)") parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL") parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit") args = parser.parse_args() if args.repo_path is not None: repo_root = Path(args.repo_path).resolve() else: resolved = _resolve_repo_path_from_hub(args.api_base, args.repo) if resolved: repo_root = resolved print(f" Repo path resolved from hub: {repo_root}") else: print( f"ERROR: --repo-path not given and hub lookup failed for '{args.repo}'.\n" f" Register the repo first or pass --repo-path explicitly.", file=sys.stderr, ) sys.exit(1) all_entries: list[dict] = [] if args.lockfiles: # Explicit paths: parse each, detect parser by filename for lf_str in args.lockfiles: lf = Path(lf_str).resolve() parsed = parse_lockfile(lf) rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf print(f" {rel}: {len(parsed)} packages") all_entries.extend(parsed) else: # Comprehensive auto-detection: all mechanisms in one scan sources = detect_all(repo_root) if not sources: print( f"No recognised dependency sources found in '{repo_root}'.", file=sys.stderr, ) sys.exit(1) for src_path, label, parser_fn in sources: parsed = parser_fn(src_path) rel = src_path.relative_to(repo_root) if src_path.is_relative_to(repo_root) else src_path print(f" {label} ({rel}): {len(parsed)} entries") all_entries.extend(parsed) print(f"Total: {len(all_entries)} entries") if args.dry_run: print(json.dumps(all_entries[:5], indent=2)) if len(all_entries) > 5: print(f" … and {len(all_entries) - 5} more") return result = post_ingest(args.api_base, args.repo, all_entries) print(f"Ingested {result.get('ingested', '?')} entries for repo '{args.repo}'") print(f"Snapshot at: {result.get('snapshot_at', '?')}") if __name__ == "__main__": main()