generated from coulomb/repo-seed
Parses go.sum lockfiles for Go projects. Reads go.mod alongside to mark direct vs indirect dependencies. Deduplicates by (module, version), skipping go.mod hash lines. Used to ingest key-cape (netkingdom domain): 23 Go modules. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
564 lines
20 KiB
Python
564 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store.
|
|
|
|
Usage:
|
|
python ingest_sbom.py --repo <slug> [--repo-path <path>] [--dry-run]
|
|
|
|
Auto-detects all of the following in one scan:
|
|
uv.lock → python
|
|
requirements.txt → python
|
|
package-lock.json → node
|
|
yarn.lock → node
|
|
Cargo.lock → rust
|
|
go.sum → go (reads go.mod alongside for direct/indirect)
|
|
.terraform.lock.hcl → terraform (anywhere in tree)
|
|
ansible/requirements.yml → ansible (anywhere under ansible/ dirs)
|
|
ansible/requirements.yaml → ansible
|
|
sbom-tools.yaml → tool (repo root; agent-generated)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml # optional; only needed for sbom-tools.yaml and ansible parsers
|
|
_YAML_AVAILABLE = True
|
|
except ImportError:
|
|
_YAML_AVAILABLE = False
|
|
|
|
API_BASE = os.environ.get("API_BASE", "http://127.0.0.1:8000").rstrip("/")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lockfile parsers — each returns list[dict]
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parse_uv_lock(path: Path) -> list[dict]:
|
|
"""Parse uv.lock TOML format (v0.1 — [[package]] blocks)."""
|
|
entries = []
|
|
current: dict | None = None
|
|
|
|
for line in path.read_text().splitlines():
|
|
stripped = line.strip()
|
|
if stripped == "[[package]]":
|
|
if current:
|
|
entries.append(current)
|
|
current = {}
|
|
elif current is not None:
|
|
if stripped.startswith("name = "):
|
|
current["package_name"] = stripped.split("=", 1)[1].strip().strip('"')
|
|
elif stripped.startswith("version = "):
|
|
current["package_version"] = stripped.split("=", 1)[1].strip().strip('"')
|
|
|
|
if current:
|
|
entries.append(current)
|
|
|
|
return [
|
|
{
|
|
"package_name": e.get("package_name", "unknown"),
|
|
"package_version": e.get("package_version"),
|
|
"ecosystem": "python",
|
|
"license_spdx": None,
|
|
"is_direct": False,
|
|
"is_dev": False,
|
|
}
|
|
for e in entries
|
|
if "package_name" in e
|
|
]
|
|
|
|
|
|
def _parse_requirements_txt(path: Path) -> list[dict]:
|
|
"""Parse requirements.txt (basic — name==version lines)."""
|
|
entries = []
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or line.startswith("-"):
|
|
continue
|
|
m = re.match(r"^([A-Za-z0-9_.\-]+)(?:[>=<!~^]+([^\s;]+))?", line)
|
|
if m:
|
|
entries.append({
|
|
"package_name": m.group(1),
|
|
"package_version": m.group(2),
|
|
"ecosystem": "python",
|
|
"license_spdx": None,
|
|
"is_direct": True,
|
|
"is_dev": False,
|
|
})
|
|
return entries
|
|
|
|
|
|
def _parse_package_lock_json(path: Path) -> list[dict]:
|
|
"""Parse package-lock.json (npm) — packages dict."""
|
|
try:
|
|
data = json.loads(path.read_text())
|
|
except json.JSONDecodeError as e:
|
|
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
packages = data.get("packages", {})
|
|
entries = []
|
|
for pkg_path, info in packages.items():
|
|
if not pkg_path:
|
|
continue
|
|
name = info.get("name") or pkg_path.split("node_modules/")[-1]
|
|
entries.append({
|
|
"package_name": name,
|
|
"package_version": info.get("version"),
|
|
"ecosystem": "node",
|
|
"license_spdx": info.get("license"),
|
|
"is_direct": not info.get("indirect", False),
|
|
"is_dev": bool(info.get("dev", False)),
|
|
})
|
|
return entries
|
|
|
|
|
|
def _parse_yarn_lock(path: Path) -> list[dict]:
|
|
"""Parse yarn.lock — basic name extraction."""
|
|
entries = []
|
|
current_names: list[str] = []
|
|
current_version: str | None = None
|
|
|
|
for line in path.read_text().splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
if not line.startswith(" ") and stripped.endswith(":"):
|
|
current_names = []
|
|
current_version = None
|
|
for part in stripped.rstrip(":").split(","):
|
|
m = re.match(r'"?([^@"]+)@', part.strip())
|
|
if m:
|
|
current_names.append(m.group(1).strip())
|
|
elif stripped.startswith("version "):
|
|
current_version = stripped.split('"')[1] if '"' in stripped else None
|
|
elif not stripped and current_names and current_version:
|
|
for name in current_names:
|
|
entries.append({
|
|
"package_name": name,
|
|
"package_version": current_version,
|
|
"ecosystem": "node",
|
|
"license_spdx": None,
|
|
"is_direct": False,
|
|
"is_dev": False,
|
|
})
|
|
current_names = []
|
|
current_version = None
|
|
|
|
return entries
|
|
|
|
|
|
def _parse_cargo_lock(path: Path) -> list[dict]:
|
|
"""Parse Cargo.lock TOML format ([[package]] blocks)."""
|
|
entries = []
|
|
current: dict | None = None
|
|
|
|
for line in path.read_text().splitlines():
|
|
stripped = line.strip()
|
|
if stripped == "[[package]]":
|
|
if current:
|
|
entries.append(current)
|
|
current = {}
|
|
elif current is not None:
|
|
if stripped.startswith("name = "):
|
|
current["package_name"] = stripped.split("=", 1)[1].strip().strip('"')
|
|
elif stripped.startswith("version = "):
|
|
current["package_version"] = stripped.split("=", 1)[1].strip().strip('"')
|
|
|
|
if current:
|
|
entries.append(current)
|
|
|
|
return [
|
|
{
|
|
"package_name": e.get("package_name", "unknown"),
|
|
"package_version": e.get("package_version"),
|
|
"ecosystem": "rust",
|
|
"license_spdx": None,
|
|
"is_direct": False,
|
|
"is_dev": False,
|
|
}
|
|
for e in entries
|
|
if "package_name" in e
|
|
]
|
|
|
|
|
|
def _parse_terraform_lock_hcl(path: Path) -> list[dict]:
|
|
"""Parse .terraform.lock.hcl — extract Terraform provider name + version."""
|
|
entries = []
|
|
current_name: str | None = None
|
|
current_version: str | None = None
|
|
|
|
for line in path.read_text().splitlines():
|
|
stripped = line.strip()
|
|
m = re.match(r'^provider\s+"([^"]+)"\s*\{', stripped)
|
|
if m:
|
|
full = m.group(1)
|
|
current_name = full
|
|
current_version = None
|
|
elif current_name is not None:
|
|
vm = re.match(r'version\s*=\s*"([^"]+)"', stripped)
|
|
if vm:
|
|
current_version = vm.group(1)
|
|
elif stripped == "}":
|
|
entries.append({
|
|
"package_name": current_name,
|
|
"package_version": current_version,
|
|
"ecosystem": "terraform",
|
|
"license_spdx": None,
|
|
"is_direct": True,
|
|
"is_dev": False,
|
|
})
|
|
current_name = None
|
|
current_version = None
|
|
|
|
return entries
|
|
|
|
|
|
def _parse_ansible_requirements(path: Path) -> list[dict]:
|
|
"""Parse ansible/requirements.yml — collections and roles from Ansible Galaxy."""
|
|
if not _YAML_AVAILABLE:
|
|
print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr)
|
|
return []
|
|
|
|
try:
|
|
data = yaml.safe_load(path.read_text())
|
|
except yaml.YAMLError as e:
|
|
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
if not isinstance(data, dict):
|
|
return []
|
|
|
|
entries = []
|
|
|
|
for item in data.get("collections", []) or []:
|
|
if isinstance(item, str):
|
|
name, version = item, None
|
|
elif isinstance(item, dict):
|
|
name = item.get("name", "")
|
|
version = str(item.get("version", "")) if item.get("version") else None
|
|
else:
|
|
continue
|
|
if name:
|
|
entries.append({
|
|
"package_name": name,
|
|
"package_version": version,
|
|
"ecosystem": "ansible",
|
|
"license_spdx": None,
|
|
"is_direct": True,
|
|
"is_dev": False,
|
|
})
|
|
|
|
for item in data.get("roles", []) or []:
|
|
if isinstance(item, str):
|
|
name, version = item, None
|
|
elif isinstance(item, dict):
|
|
name = item.get("name", item.get("src", ""))
|
|
version = str(item.get("version", "")) if item.get("version") else None
|
|
else:
|
|
continue
|
|
if name:
|
|
entries.append({
|
|
"package_name": name,
|
|
"package_version": version,
|
|
"ecosystem": "ansible",
|
|
"license_spdx": None,
|
|
"is_direct": True,
|
|
"is_dev": False,
|
|
})
|
|
|
|
return entries
|
|
|
|
|
|
def _parse_go_sum(path: Path) -> list[dict]:
|
|
"""Parse go.sum — deduplicated Go module list with direct/indirect from go.mod."""
|
|
# Determine direct deps by reading go.mod in the same directory
|
|
direct: set[str] = set()
|
|
go_mod = path.parent / "go.mod"
|
|
if go_mod.exists():
|
|
in_require = False
|
|
for line in go_mod.read_text().splitlines():
|
|
stripped = line.strip()
|
|
if stripped.startswith("require ("):
|
|
in_require = True
|
|
continue
|
|
if in_require and stripped == ")":
|
|
in_require = False
|
|
continue
|
|
if in_require and stripped and not stripped.startswith("//"):
|
|
if "// indirect" not in stripped:
|
|
parts = stripped.split()
|
|
if parts:
|
|
direct.add(parts[0])
|
|
# single-line require without parens
|
|
elif stripped.startswith("require ") and "(" not in stripped:
|
|
rest = stripped[len("require "):].strip()
|
|
if "// indirect" not in rest:
|
|
parts = rest.split()
|
|
if parts:
|
|
direct.add(parts[0])
|
|
|
|
seen: set[tuple[str, str | None]] = set()
|
|
entries = []
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("//"):
|
|
continue
|
|
parts = line.split()
|
|
if len(parts) < 3:
|
|
continue
|
|
module, version = parts[0], parts[1]
|
|
# Skip go.mod hash lines — only ingest the module itself
|
|
if "/go.mod" in version:
|
|
continue
|
|
key = (module, version)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
entries.append({
|
|
"package_name": module,
|
|
"package_version": version,
|
|
"ecosystem": "go",
|
|
"license_spdx": None,
|
|
"is_direct": module in direct,
|
|
"is_dev": False,
|
|
})
|
|
return entries
|
|
|
|
|
|
def _parse_sbom_tools_yaml(path: Path) -> list[dict]:
|
|
"""Parse sbom-tools.yaml — agent-generated tool manifest at repo root."""
|
|
if not _YAML_AVAILABLE:
|
|
print(f"Warning: PyYAML not available; skipping {path}", file=sys.stderr)
|
|
return []
|
|
|
|
try:
|
|
data = yaml.safe_load(path.read_text())
|
|
except yaml.YAMLError as e:
|
|
print(f"Warning: cannot parse {path}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
if not isinstance(data, dict):
|
|
return []
|
|
|
|
entries = []
|
|
valid_ecosystems = {
|
|
"python", "node", "rust", "go", "java",
|
|
"terraform", "ansible", "tool", "other",
|
|
}
|
|
|
|
for item in data.get("tools", []) or []:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
name = item.get("name", "")
|
|
version = str(item.get("version", "")) if item.get("version") else None
|
|
if version == "unknown":
|
|
print(f" Warning: tool '{name}' has version=unknown — flagged for review", file=sys.stderr)
|
|
version = None
|
|
ecosystem = item.get("ecosystem", "tool")
|
|
if ecosystem not in valid_ecosystems:
|
|
print(f" Warning: unknown ecosystem '{ecosystem}' for '{name}'; using 'tool'", file=sys.stderr)
|
|
ecosystem = "tool"
|
|
license_spdx = item.get("license_spdx") or None
|
|
entries.append({
|
|
"package_name": name,
|
|
"package_version": version,
|
|
"ecosystem": ecosystem,
|
|
"license_spdx": license_spdx,
|
|
"is_direct": bool(item.get("is_direct", True)),
|
|
"is_dev": bool(item.get("is_dev", False)),
|
|
})
|
|
|
|
return entries
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Detection helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Filename → parser for standard lockfiles (detected by filename anywhere in tree)
|
|
_LOCKFILE_PARSERS: dict[str, object] = {
|
|
"uv.lock": _parse_uv_lock,
|
|
"requirements.txt": _parse_requirements_txt,
|
|
"package-lock.json": _parse_package_lock_json,
|
|
"yarn.lock": _parse_yarn_lock,
|
|
"Cargo.lock": _parse_cargo_lock,
|
|
".terraform.lock.hcl": _parse_terraform_lock_hcl,
|
|
"go.sum": _parse_go_sum,
|
|
}
|
|
|
|
# Directories that never contain project-level lockfiles
|
|
_SKIP_DIRS = {
|
|
".git", ".hg", ".svn",
|
|
".venv", "venv", ".env",
|
|
"node_modules",
|
|
"__pycache__", ".mypy_cache", ".pytest_cache", ".ruff_cache",
|
|
"dist", "build", ".build", "target",
|
|
".tox", ".nox",
|
|
}
|
|
|
|
|
|
def detect_all(repo_path: Path) -> list[tuple[Path, str, object]]:
|
|
"""Scan repo_path and return all discovered dependency sources.
|
|
|
|
Returns list of (path, label, parser_fn) tuples covering:
|
|
- Standard lockfiles (anywhere in tree)
|
|
- Ansible requirements files (in ansible/ subdirs)
|
|
- sbom-tools.yaml at repo root
|
|
"""
|
|
found: list[tuple[Path, str, object]] = []
|
|
seen_paths: set[Path] = set()
|
|
|
|
# Walk tree for all source types
|
|
for dirpath, dirnames, filenames in os.walk(repo_path):
|
|
dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS)
|
|
dirpath_p = Path(dirpath)
|
|
|
|
# Standard lockfiles
|
|
for fname, parser in _LOCKFILE_PARSERS.items():
|
|
if fname in filenames:
|
|
p = dirpath_p / fname
|
|
if p not in seen_paths:
|
|
found.append((p, fname, parser))
|
|
seen_paths.add(p)
|
|
|
|
# Ansible requirements files — only under directories named "ansible"
|
|
if dirpath_p.name == "ansible":
|
|
for fname in ("requirements.yml", "requirements.yaml"):
|
|
if fname in filenames:
|
|
p = dirpath_p / fname
|
|
if p not in seen_paths:
|
|
found.append((p, f"ansible/{fname}", _parse_ansible_requirements))
|
|
seen_paths.add(p)
|
|
|
|
# sbom-tools.yaml at repo root only
|
|
tools_manifest = repo_path / "sbom-tools.yaml"
|
|
if tools_manifest.exists() and tools_manifest not in seen_paths:
|
|
found.append((tools_manifest, "sbom-tools.yaml", _parse_sbom_tools_yaml))
|
|
|
|
return found
|
|
|
|
|
|
def detect_lockfile(repo_path: Path) -> tuple[Path, str] | None:
|
|
"""Return (lockfile_path, filename) for the first recognised lockfile at repo root."""
|
|
for name in _LOCKFILE_PARSERS:
|
|
candidate = repo_path / name
|
|
if candidate.exists():
|
|
return candidate, name
|
|
return None
|
|
|
|
|
|
def detect_lockfiles_recursive(repo_path: Path) -> list[Path]:
|
|
"""Walk repo_path and return all recognised lockfiles, skipping non-dep dirs.
|
|
|
|
Kept for backwards compatibility; prefer detect_all() for new code.
|
|
"""
|
|
found: list[Path] = []
|
|
for dirpath, dirnames, filenames in os.walk(repo_path):
|
|
dirnames[:] = sorted(d for d in dirnames if d not in _SKIP_DIRS)
|
|
for name in _LOCKFILE_PARSERS:
|
|
if name in filenames:
|
|
found.append(Path(dirpath) / name)
|
|
return found
|
|
|
|
|
|
def parse_lockfile(lockfile_path: Path) -> list[dict]:
|
|
filename = lockfile_path.name
|
|
parser = _LOCKFILE_PARSERS.get(filename)
|
|
if parser is None:
|
|
print(f"Error: unsupported lockfile type '{filename}'", file=sys.stderr)
|
|
sys.exit(1)
|
|
return parser(lockfile_path)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API submission
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def post_ingest(api_base: str, repo_slug: str, entries: list[dict]) -> dict:
|
|
payload = json.dumps({"repo_slug": repo_slug, "entries": entries}).encode()
|
|
req = urllib.request.Request(
|
|
f"{api_base}/sbom/ingest/",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode(errors="replace")
|
|
print(f"HTTP {e.code} from API: {body}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except urllib.error.URLError as e:
|
|
print(f"API unreachable: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Ingest a repo's lockfiles and tool manifests into the State Hub SBOM store."
|
|
)
|
|
parser.add_argument("--repo", required=True, help="Managed-repo slug (e.g. 'the-custodian')")
|
|
parser.add_argument("--lockfile", action="append", dest="lockfiles",
|
|
metavar="PATH", help="Path to a specific lockfile (repeatable)")
|
|
parser.add_argument("--repo-path", default=".", help="Repo root for auto-detection/scan (default: cwd)")
|
|
parser.add_argument("--scan", action="store_true",
|
|
help="Recursively find ALL lockfiles under --repo-path (deprecated; now default behaviour)")
|
|
parser.add_argument("--api-base", default=API_BASE, help="State Hub API base URL")
|
|
parser.add_argument("--dry-run", action="store_true", help="Parse only — do not submit")
|
|
args = parser.parse_args()
|
|
|
|
repo_root = Path(args.repo_path).resolve()
|
|
all_entries: list[dict] = []
|
|
|
|
if args.lockfiles:
|
|
# Explicit paths: parse each, detect parser by filename
|
|
for lf_str in args.lockfiles:
|
|
lf = Path(lf_str).resolve()
|
|
parsed = parse_lockfile(lf)
|
|
rel = lf.relative_to(repo_root) if lf.is_relative_to(repo_root) else lf
|
|
print(f" {rel}: {len(parsed)} packages")
|
|
all_entries.extend(parsed)
|
|
else:
|
|
# Comprehensive auto-detection: all mechanisms in one scan
|
|
sources = detect_all(repo_root)
|
|
if not sources:
|
|
print(
|
|
f"No recognised dependency sources found in '{repo_root}'.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
for src_path, label, parser_fn in sources:
|
|
parsed = parser_fn(src_path)
|
|
rel = src_path.relative_to(repo_root) if src_path.is_relative_to(repo_root) else src_path
|
|
print(f" {label} ({rel}): {len(parsed)} entries")
|
|
all_entries.extend(parsed)
|
|
|
|
print(f"Total: {len(all_entries)} entries")
|
|
|
|
if args.dry_run:
|
|
print(json.dumps(all_entries[:5], indent=2))
|
|
if len(all_entries) > 5:
|
|
print(f" … and {len(all_entries) - 5} more")
|
|
return
|
|
|
|
result = post_ingest(args.api_base, args.repo, all_entries)
|
|
print(f"Ingested {result.get('ingested', '?')} entries for repo '{args.repo}'")
|
|
print(f"Snapshot at: {result.get('snapshot_at', '?')}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|