generated from coulomb/repo-seed
109 lines
3.8 KiB
Python
109 lines
3.8 KiB
Python
"""Evidence-to-capability/control mapping."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from guide_board.io import load_json
|
|
from guide_board.schema import assert_valid
|
|
|
|
|
|
def build_mapping_records(
|
|
root: Path,
|
|
run_id: str,
|
|
plan: dict[str, Any],
|
|
evidence: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
index = _mapping_index(root, plan)
|
|
records: list[dict[str, Any]] = []
|
|
for item in evidence:
|
|
extension_id = item["extension_id"]
|
|
for requirement_ref in item.get("requirement_refs", []):
|
|
mappings = index.get((extension_id, requirement_ref), [])
|
|
for mapping in mappings:
|
|
records.append(
|
|
{
|
|
"id": _record_id(item["id"], mapping),
|
|
"run_id": run_id,
|
|
"evidence_id": item["id"],
|
|
"check_id": item["check_id"],
|
|
"extension_id": extension_id,
|
|
"requirement_ref": requirement_ref,
|
|
"result": item["result"],
|
|
"target_type": mapping["target_type"],
|
|
"target_id": mapping["target_id"],
|
|
"label": mapping["label"],
|
|
"description": mapping["description"],
|
|
}
|
|
)
|
|
return records
|
|
|
|
|
|
def summarize_mappings(mapping_records: list[dict[str, Any]]) -> dict[str, Any]:
|
|
targets: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for record in mapping_records:
|
|
key = (record["target_type"], record["target_id"])
|
|
if key not in targets:
|
|
targets[key] = {
|
|
"target_type": record["target_type"],
|
|
"target_id": record["target_id"],
|
|
"label": record["label"],
|
|
"results": {},
|
|
"requirement_refs": [],
|
|
}
|
|
target = targets[key]
|
|
target["results"][record["result"]] = target["results"].get(record["result"], 0) + 1
|
|
if record["requirement_ref"] not in target["requirement_refs"]:
|
|
target["requirement_refs"].append(record["requirement_ref"])
|
|
return {
|
|
"targets": sorted(
|
|
targets.values(),
|
|
key=lambda item: (item["target_type"], item["target_id"]),
|
|
)
|
|
}
|
|
|
|
|
|
def _mapping_index(
|
|
root: Path,
|
|
plan: dict[str, Any],
|
|
) -> dict[tuple[str, str], list[dict[str, Any]]]:
|
|
by_requirement: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list)
|
|
for extension in plan["extension_snapshots"]:
|
|
extension_path = _snapshot_path(root, extension)
|
|
manifest = load_json(extension_path / "extension.json")
|
|
for mapping_id in manifest.get("mappings", []):
|
|
mapping_path = extension_path / "mappings" / f"{mapping_id}.json"
|
|
if not mapping_path.exists():
|
|
continue
|
|
mapping_set = load_json(mapping_path)
|
|
assert_valid(mapping_set, "mapping-set")
|
|
for mapping in mapping_set["mappings"]:
|
|
by_requirement[
|
|
(mapping_set["extension_id"], mapping["requirement_ref"])
|
|
].append(mapping)
|
|
return by_requirement
|
|
|
|
|
|
def _record_id(evidence_id: str, mapping: dict[str, Any]) -> str:
|
|
return "mapping:" + _safe_id(
|
|
":".join(
|
|
[
|
|
evidence_id,
|
|
mapping["requirement_ref"],
|
|
mapping["target_type"],
|
|
mapping["target_id"],
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path:
|
|
path = Path(extension["path"])
|
|
return path if path.is_absolute() else root / path
|
|
|
|
|
|
def _safe_id(value: str) -> str:
|
|
return "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in value)
|