#!/usr/bin/env python3 """Ingest a repo's lockfile into the State Hub SBOM store. Usage: python ingest_sbom.py --repo [--lockfile ] [--api-base ] Auto-detects lockfile type: uv.lock → Python ecosystem requirements.txt → Python ecosystem (basic) package-lock.json → Node ecosystem yarn.lock → Node ecosystem Cargo.lock → Rust ecosystem """ from __future__ import annotations import argparse import json import os import re import sys import urllib.error import urllib.request from pathlib import Path API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/") # --------------------------------------------------------------------------- # Lockfile parsers # --------------------------------------------------------------------------- def _parse_uv_lock(path: Path) -> list[dict]: """Parse uv.lock TOML format (v0.1 — [[package]] blocks).""" entries = [] current: dict | None = None for line in path.read_text().splitlines(): stripped = line.strip() if stripped == "[[package]]": if current: entries.append(current) current = {} elif current is not None: if stripped.startswith("name = "): current["package_name"] = stripped.split("=", 1)[1].strip().strip('"') elif stripped.startswith("version = "): current["package_version"] = stripped.split("=", 1)[1].strip().strip('"') if current: entries.append(current) return [ { "package_name": e.get("package_name", "unknown"), "package_version": e.get("package_version"), "ecosystem": "python", "license_spdx": None, "is_direct": False, # uv.lock doesn't distinguish; treat all as transitive "is_dev": False, } for e in entries if "package_name" in e ] def _parse_requirements_txt(path: Path) -> list[dict]: """Parse requirements.txt (basic — name==version lines).""" entries = [] for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or line.startswith("-"): continue # Handle: pkg==1.2.3, pkg>=1.2, pkg m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>= list[dict]: """Parse package-lock.json (npm) — packages dict.""" try: data = json.loads(path.read_text()) except json.JSONDecodeError as e: print(f"Warning: cannot parse {path}: {e}", file=sys.stderr) return [] packages = data.get("packages", {}) entries = [] for pkg_path, info in packages.items(): if not pkg_path: # root package continue name = info.get("name") or pkg_path.split("node_modules/")[-1] entries.append({ "package_name": name, "package_version": info.get("version"), "ecosystem": "node", "license_spdx": info.get("license"), "is_direct": not info.get("indirect", False), "is_dev": bool(info.get("dev", False)), }) return entries def _parse_yarn_lock(path: Path) -> list[dict]: """Parse yarn.lock — basic name extraction.""" entries = [] current_names: list[str] = [] current_version: str | None = None for line in path.read_text().splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if not line.startswith(" ") and stripped.endswith(":"): # New package block header: "name@version::" or "\"name@version\":" # May list multiple versions: "name@^1.0, name@~1.0:" current_names = [] current_version = None for part in stripped.rstrip(":").split(","): m = re.match(r'"?([^@"]+)@', part.strip()) if m: current_names.append(m.group(1).strip()) elif stripped.startswith("version "): current_version = stripped.split('"')[1] if '"' in stripped else None elif not stripped and current_names and current_version: for name in current_names: entries.append({ "package_name": name, "package_version": current_version, "ecosystem": "node", "license_spdx": None, "is_direct": False, "is_dev": False, }) current_names = [] current_version = None return entries def _parse_cargo_lock(path: Path) -> list[dict]: """Parse Cargo.lock TOML format ([[package]] blocks).""" entries = [] current: dict | None = None for line in path.read_text().splitlines(): stripped = line.strip() if stripped == "[[package]]": if current: entries.append(current) current = {} elif current is not None: if stripped.startswith("name = "): current["package_name"] = stripped.split("=", 1)[1].strip().strip('"') elif stripped.startswith("version = "): current["package_version"] = stripped.split("=", 1)[1].strip().strip('"') if current: entries.append(current) return [ { "package_name": e.get("package_name", "unknown"), "package_version": e.get("package_version"), "ecosystem": "rust", "license_spdx": None, "is_direct": False, "is_dev": False, } for e in entries if "package_name" in e ] _LOCKFILE_PARSERS = { "uv.lock": _parse_uv_lock, "requirements.txt": _parse_requirements_txt, "package-lock.json": _parse_package_lock_json, "yarn.lock": _parse_yarn_lock, "Cargo.lock": _parse_cargo_lock, } def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None: """Return (lockfile_path, ecosystem) for the first recognised lockfile found.""" for name in _LOCKFILE_PARSERS: candidate = repo_path / name if candidate.exists(): return candidate, name return None def parse_lockfile(lockfile_path: Path) -> list[dict]: filename = lockfile_path.name parser = _LOCKFILE_PARSERS.get(filename) if parser is None: print(f"Error: unsupported lockfile type '{filename}'", file=sys.stderr) sys.exit(1) return parser(lockfile_path) # --------------------------------------------------------------------------- # API submission # --------------------------------------------------------------------------- def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict: payload = json.dumps({"repo_slug": repo_slug, "entries": entries}).encode() req = urllib.request.Request( f"{api_base}/sbom/ingest/", data=payload, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: body = e.read().decode(errors="replace") print(f"HTTP {e.code} from API: {body}", file=sys.stderr) sys.exit(1) except urllib.error.URLError as e: print(f"API unreachable: {e}", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="Ingest a lockfile into the State Hub SBOM store.") parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')") parser.add_argument("--lockfile", help="Path to lockfile (auto-detected if omitted)") parser.add_argument("--repo-path", default=".", help="Repo root for auto-detection (default: cwd)") parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL") parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit") args = parser.parse_args() if args.lockfile: lockfile_path = Path(args.lockfile).resolve() else: found = detect_lockfile(Path(args.repo_path).resolve()) if not found: print( f"No recognised lockfile found in '{args.repo_path}'. " "Supported: " + ", ".join(_LOCKFILE_PARSERS), file=sys.stderr, ) sys.exit(1) lockfile_path, _ = found print(f"Auto-detected: {lockfile_path}") entries = parse_lockfile(lockfile_path) print(f"Parsed {len(entries)} packages from {lockfile_path.name}") if args.dry_run: print(json.dumps(entries[:5], indent=2)) if len(entries) > 5: print(f" … and {len(entries) - 5} more") return result = post_ingest(args.api_base, args.repo, entries) print(f"Ingested {result.get('ingested', '?')} entries for repo '{args.repo}'") print(f"Snapshot at: {result.get('snapshot_at', '?')}") if __name__ == "__main__": main()