generated from coulomb/repo-seed
requirement refs map to capability groups
This commit is contained in:
@@ -9,6 +9,7 @@ from typing import Any
|
||||
|
||||
from guide_board.artifacts import build_artifact_manifest
|
||||
from guide_board.io import write_json
|
||||
from guide_board.mapping import build_mapping_records, summarize_mappings
|
||||
from guide_board.planning import build_run_plan
|
||||
from guide_board.runners import run_step
|
||||
from guide_board.schema import assert_valid
|
||||
@@ -37,6 +38,8 @@ def run_assessment(
|
||||
assert_valid(finding, "finding")
|
||||
|
||||
artifact_manifest = build_artifact_manifest(run_dir, run_id, evidence)
|
||||
mapping_records = build_mapping_records(root, run_id, plan, evidence)
|
||||
mapping_summary = summarize_mappings(mapping_records)
|
||||
|
||||
assessment_package = _assessment_package(
|
||||
run_id,
|
||||
@@ -44,6 +47,7 @@ def run_assessment(
|
||||
evidence,
|
||||
findings,
|
||||
artifact_manifest,
|
||||
mapping_summary,
|
||||
created_at,
|
||||
)
|
||||
assert_valid(assessment_package, "assessment-package")
|
||||
@@ -57,7 +61,15 @@ def run_assessment(
|
||||
"assessment_profile_ref": plan["assessment_profile_snapshot"]["id"],
|
||||
}
|
||||
|
||||
_write_run_directory(run_dir, run_metadata, plan, evidence, findings, assessment_package)
|
||||
_write_run_directory(
|
||||
run_dir,
|
||||
run_metadata,
|
||||
plan,
|
||||
evidence,
|
||||
findings,
|
||||
mapping_records,
|
||||
assessment_package,
|
||||
)
|
||||
return {
|
||||
"status": run_metadata["status"],
|
||||
"run_id": run_id,
|
||||
@@ -175,6 +187,7 @@ def _assessment_package(
|
||||
evidence: list[dict[str, Any]],
|
||||
findings: list[dict[str, Any]],
|
||||
artifact_manifest: list[dict[str, Any]],
|
||||
mapping_summary: dict[str, Any],
|
||||
created_at: str,
|
||||
) -> dict[str, Any]:
|
||||
summary = dict(Counter(item["result"] for item in evidence))
|
||||
@@ -188,6 +201,7 @@ def _assessment_package(
|
||||
"extensions": plan["extension_snapshots"],
|
||||
"source_lock": plan["source_lock"],
|
||||
"summary": summary,
|
||||
"mapping_summary": mapping_summary,
|
||||
"findings": findings,
|
||||
"evidence_refs": [item["id"] for item in evidence],
|
||||
"artifact_manifest": artifact_manifest,
|
||||
@@ -203,6 +217,7 @@ def _write_run_directory(
|
||||
plan: dict[str, Any],
|
||||
evidence: list[dict[str, Any]],
|
||||
findings: list[dict[str, Any]],
|
||||
mapping_records: list[dict[str, Any]],
|
||||
assessment_package: dict[str, Any],
|
||||
) -> None:
|
||||
write_json(run_dir / "run.json", run_metadata)
|
||||
@@ -215,7 +230,7 @@ def _write_run_directory(
|
||||
)
|
||||
write_json(run_dir / "normalized" / "evidence.json", {"evidence": evidence})
|
||||
write_json(run_dir / "normalized" / "findings.json", {"findings": findings})
|
||||
write_json(run_dir / "normalized" / "mappings.json", {"mappings": []})
|
||||
write_json(run_dir / "normalized" / "mappings.json", {"mappings": mapping_records})
|
||||
write_json(run_dir / "reports" / "assessment-package.json", assessment_package)
|
||||
(run_dir / "reports").mkdir(parents=True, exist_ok=True)
|
||||
(run_dir / "reports" / "report.md").write_text(
|
||||
@@ -230,6 +245,7 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
|
||||
)
|
||||
if not summary_lines:
|
||||
summary_lines = "- no evidence produced"
|
||||
mapping_lines = _mapping_summary_lines(package)
|
||||
|
||||
return "\n".join(
|
||||
[
|
||||
@@ -243,6 +259,10 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
|
||||
"",
|
||||
summary_lines,
|
||||
"",
|
||||
"## Mappings",
|
||||
"",
|
||||
mapping_lines,
|
||||
"",
|
||||
"## Boundary",
|
||||
"",
|
||||
package["certification_boundary"],
|
||||
@@ -251,6 +271,20 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
|
||||
)
|
||||
|
||||
|
||||
def _mapping_summary_lines(package: dict[str, Any]) -> str:
|
||||
targets = package.get("mapping_summary", {}).get("targets", [])
|
||||
if not targets:
|
||||
return "- no mapped evidence"
|
||||
lines = []
|
||||
for target in targets:
|
||||
results = ", ".join(
|
||||
f"{status}: {count}"
|
||||
for status, count in sorted(target.get("results", {}).items())
|
||||
)
|
||||
lines.append(f"- {target['label']} ({target['target_id']}): {results}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _run_status(evidence: list[dict[str, Any]]) -> str:
|
||||
if any(item["result"] == "fail" for item in evidence):
|
||||
return "failed"
|
||||
|
||||
103
src/guide_board/mapping.py
Normal file
103
src/guide_board/mapping.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Evidence-to-capability/control mapping."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from guide_board.io import load_json
|
||||
from guide_board.schema import assert_valid
|
||||
|
||||
|
||||
def build_mapping_records(
|
||||
root: Path,
|
||||
run_id: str,
|
||||
plan: dict[str, Any],
|
||||
evidence: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
index = _mapping_index(root, plan)
|
||||
records: list[dict[str, Any]] = []
|
||||
for item in evidence:
|
||||
extension_id = item["extension_id"]
|
||||
for requirement_ref in item.get("requirement_refs", []):
|
||||
mappings = index.get((extension_id, requirement_ref), [])
|
||||
for mapping in mappings:
|
||||
records.append(
|
||||
{
|
||||
"id": _record_id(item["id"], mapping),
|
||||
"run_id": run_id,
|
||||
"evidence_id": item["id"],
|
||||
"check_id": item["check_id"],
|
||||
"extension_id": extension_id,
|
||||
"requirement_ref": requirement_ref,
|
||||
"result": item["result"],
|
||||
"target_type": mapping["target_type"],
|
||||
"target_id": mapping["target_id"],
|
||||
"label": mapping["label"],
|
||||
"description": mapping["description"],
|
||||
}
|
||||
)
|
||||
return records
|
||||
|
||||
|
||||
def summarize_mappings(mapping_records: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
targets: dict[tuple[str, str], dict[str, Any]] = {}
|
||||
for record in mapping_records:
|
||||
key = (record["target_type"], record["target_id"])
|
||||
if key not in targets:
|
||||
targets[key] = {
|
||||
"target_type": record["target_type"],
|
||||
"target_id": record["target_id"],
|
||||
"label": record["label"],
|
||||
"results": {},
|
||||
"requirement_refs": [],
|
||||
}
|
||||
target = targets[key]
|
||||
target["results"][record["result"]] = target["results"].get(record["result"], 0) + 1
|
||||
if record["requirement_ref"] not in target["requirement_refs"]:
|
||||
target["requirement_refs"].append(record["requirement_ref"])
|
||||
return {
|
||||
"targets": sorted(
|
||||
targets.values(),
|
||||
key=lambda item: (item["target_type"], item["target_id"]),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
def _mapping_index(
|
||||
root: Path,
|
||||
plan: dict[str, Any],
|
||||
) -> dict[tuple[str, str], list[dict[str, Any]]]:
|
||||
by_requirement: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list)
|
||||
for extension in plan["extension_snapshots"]:
|
||||
extension_path = root / extension["path"]
|
||||
manifest = load_json(extension_path / "extension.json")
|
||||
for mapping_id in manifest.get("mappings", []):
|
||||
mapping_path = extension_path / "mappings" / f"{mapping_id}.json"
|
||||
if not mapping_path.exists():
|
||||
continue
|
||||
mapping_set = load_json(mapping_path)
|
||||
assert_valid(mapping_set, "mapping-set")
|
||||
for mapping in mapping_set["mappings"]:
|
||||
by_requirement[
|
||||
(mapping_set["extension_id"], mapping["requirement_ref"])
|
||||
].append(mapping)
|
||||
return by_requirement
|
||||
|
||||
|
||||
def _record_id(evidence_id: str, mapping: dict[str, Any]) -> str:
|
||||
return "mapping:" + _safe_id(
|
||||
":".join(
|
||||
[
|
||||
evidence_id,
|
||||
mapping["requirement_ref"],
|
||||
mapping["target_type"],
|
||||
mapping["target_id"],
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _safe_id(value: str) -> str:
|
||||
return "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in value)
|
||||
Reference in New Issue
Block a user