Files
state-hub/scripts/ingest_sbom.py
tegwick 7d3487d4fe feat(state-hub): v0.3 registration workflow + ingest-sbom + CLAUDE.md template update
- scripts/ingest_sbom.py: lockfile parser + API poster for uv.lock, requirements.txt,
  package-lock.json, yarn.lock, Cargo.lock; auto-detects from repo root
- Makefile: make ingest-sbom REPO=<slug> [LOCKFILE=<path>] target
- scripts/register_project.sh: adds {REPO_SLUG} template substitution + optional
  SBOM ingest prompt at end of registration (non-fatal if venv not ready)
- scripts/project_claude_md.template: adds Contribution Tracking + SBOM sections
  documenting register_contribution(), update_contribution_status(), ingest-sbom,
  and the contrib/ directory layout
- workplans/CUST-WP-0002: all 15 tasks → done, status → completed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:28:49 +01:00

277 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""Ingest a repo's lockfile into the State Hub SBOM store.
Usage:
python ingest_sbom.py --repo <slug> [--lockfile <path>] [--api-base <url>]
Auto-detects lockfile type:
uv.lock → Python ecosystem
requirements.txt → Python ecosystem (basic)
package-lock.json → Node ecosystem
yarn.lock → Node ecosystem
Cargo.lock → Rust ecosystem
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
# ---------------------------------------------------------------------------
# Lockfile parsers
# ---------------------------------------------------------------------------
def _parse_uv_lock(path: Path) -> list[dict]:
"""Parse uv.lock TOML format (v0.1 — [[package]] blocks)."""
entries = []
current: dict | None = None
for line in path.read_text().splitlines():
stripped = line.strip()
if stripped == "[[package]]":
if current:
entries.append(current)
current = {}
elif current is not None:
if stripped.startswith("name = "):
current["package_name"] = stripped.split("=", 1)[1].strip().strip('"')
elif stripped.startswith("version = "):
current["package_version"] = stripped.split("=", 1)[1].strip().strip('"')
if current:
entries.append(current)
return [
{
"package_name": e.get("package_name", "unknown"),
"package_version": e.get("package_version"),
"ecosystem": "python",
"license_spdx": None,
"is_direct": False, # uv.lock doesn't distinguish; treat all as transitive
"is_dev": False,
}
for e in entries
if "package_name" in e
]
def _parse_requirements_txt(path: Path) -> list[dict]:
"""Parse requirements.txt (basic — name==version lines)."""
entries = []
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
# Handle: pkg==1.2.3, pkg>=1.2, pkg
m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>=<!~^]+([^\s;]+))?", line)
if m:
entries.append({
"package_name": m.group(1),
"package_version": m.group(2),
"ecosystem": "python",
"license_spdx": None,
"is_direct": True,
"is_dev": False,
})
return entries
def _parse_package_lock_json(path: Path) -> list[dict]:
"""Parse package-lock.json (npm) — packages dict."""
try:
data = json.loads(path.read_text())
except json.JSONDecodeError as e:
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
return []
packages = data.get("packages", {})
entries = []
for pkg_path, info in packages.items():
if not pkg_path: # root package
continue
name = info.get("name") or pkg_path.split("node_modules/")[-1]
entries.append({
"package_name": name,
"package_version": info.get("version"),
"ecosystem": "node",
"license_spdx": info.get("license"),
"is_direct": not info.get("indirect", False),
"is_dev": bool(info.get("dev", False)),
})
return entries
def _parse_yarn_lock(path: Path) -> list[dict]:
"""Parse yarn.lock — basic name extraction."""
entries = []
current_names: list[str] = []
current_version: str | None = None
for line in path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if not line.startswith(" ") and stripped.endswith(":"):
# New package block header: "name@version::" or "\"name@version\":"
# May list multiple versions: "name@^1.0, name@~1.0:"
current_names = []
current_version = None
for part in stripped.rstrip(":").split(","):
m = re.match(r'"?([^@"]+)@', part.strip())
if m:
current_names.append(m.group(1).strip())
elif stripped.startswith("version "):
current_version = stripped.split('"')[1] if '"' in stripped else None
elif not stripped and current_names and current_version:
for name in current_names:
entries.append({
"package_name": name,
"package_version": current_version,
"ecosystem": "node",
"license_spdx": None,
"is_direct": False,
"is_dev": False,
})
current_names = []
current_version = None
return entries
def _parse_cargo_lock(path: Path) -> list[dict]:
"""Parse Cargo.lock TOML format ([[package]] blocks)."""
entries = []
current: dict | None = None
for line in path.read_text().splitlines():
stripped = line.strip()
if stripped == "[[package]]":
if current:
entries.append(current)
current = {}
elif current is not None:
if stripped.startswith("name = "):
current["package_name"] = stripped.split("=", 1)[1].strip().strip('"')
elif stripped.startswith("version = "):
current["package_version"] = stripped.split("=", 1)[1].strip().strip('"')
if current:
entries.append(current)
return [
{
"package_name": e.get("package_name", "unknown"),
"package_version": e.get("package_version"),
"ecosystem": "rust",
"license_spdx": None,
"is_direct": False,
"is_dev": False,
}
for e in entries
if "package_name" in e
]
_LOCKFILE_PARSERS = {
"uv.lock": _parse_uv_lock,
"requirements.txt": _parse_requirements_txt,
"package-lock.json": _parse_package_lock_json,
"yarn.lock": _parse_yarn_lock,
"Cargo.lock": _parse_cargo_lock,
}
def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None:
"""Return (lockfile_path, ecosystem) for the first recognised lockfile found."""
for name in _LOCKFILE_PARSERS:
candidate = repo_path / name
if candidate.exists():
return candidate, name
return None
def parse_lockfile(lockfile_path: Path) -> list[dict]:
filename = lockfile_path.name
parser = _LOCKFILE_PARSERS.get(filename)
if parser is None:
print(f"Error: unsupported lockfile type '{filename}'", file=sys.stderr)
sys.exit(1)
return parser(lockfile_path)
# ---------------------------------------------------------------------------
# API submission
# ---------------------------------------------------------------------------
def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict:
payload = json.dumps({"repo_slug": repo_slug, "entries": entries}).encode()
req = urllib.request.Request(
f"{api_base}/sbom/ingest/",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
print(f"HTTP {e.code} from API: {body}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as e:
print(f"API unreachable: {e}", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Ingest a lockfile into the State Hub SBOM store.")
parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')")
parser.add_argument("--lockfile", help="Path to lockfile (auto-detected if omitted)")
parser.add_argument("--repo-path", default=".", help="Repo root for auto-detection (default: cwd)")
parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL")
parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit")
args = parser.parse_args()
if args.lockfile:
lockfile_path = Path(args.lockfile).resolve()
else:
found = detect_lockfile(Path(args.repo_path).resolve())
if not found:
print(
f"No recognised lockfile found in '{args.repo_path}'. "
"Supported: " + ", ".join(_LOCKFILE_PARSERS),
file=sys.stderr,
)
sys.exit(1)
lockfile_path, _ = found
print(f"Auto-detected: {lockfile_path}")
entries = parse_lockfile(lockfile_path)
print(f"Parsed {len(entries)} packages from {lockfile_path.name}")
if args.dry_run:
print(json.dumps(entries[:5], indent=2))
if len(entries) > 5:
print(f" … and {len(entries) - 5} more")
return
result = post_ingest(args.api_base, args.repo, entries)
print(f"Ingested {result.get('ingested', '?')} entries for repo '{args.repo}'")
print(f"Snapshot at: {result.get('snapshot_at', '?')}")
if __name__ == "__main__":
main()