Add report fragments and export manifest

This commit is contained in:
2026-05-16 03:11:56 +02:00
parent 2a1a53c140
commit 6c467dd1f4
22 changed files with 630 additions and 24 deletions

View File

@@ -8,11 +8,13 @@ from pathlib import Path
from typing import Any
from guide_board.artifacts import build_artifact_manifest, build_submission_manifest
from guide_board.exports import build_export_manifest
from guide_board.io import load_json, write_json
from guide_board.mapping import build_mapping_records, summarize_mappings
from guide_board.normalizers import normalize_step_result
from guide_board.planning import build_run_plan
from guide_board.policy import apply_policy
from guide_board.reports import build_report_fragments, markdown_for_fragments
from guide_board.retention import build_retention_summary
from guide_board.runners import run_step
from guide_board.schema import assert_valid
@@ -64,7 +66,19 @@ def run_assessment(
applied_exclusions,
created_at,
)
report_fragments = build_report_fragments(
root,
run_dir,
run_id,
plan,
evidence,
findings,
mapping_records,
assessment_package,
)
assessment_package["report_fragments"] = report_fragments
assert_valid(assessment_package, "assessment-package")
export_manifest = build_export_manifest(assessment_package)
run_metadata = {
"id": run_id,
@@ -85,6 +99,7 @@ def run_assessment(
findings,
mapping_records,
assessment_package,
export_manifest,
retention_summary,
)
return {
@@ -94,6 +109,7 @@ def run_assessment(
"assessment_package": str(run_dir / "reports" / "assessment-package.json"),
"report": str(run_dir / "reports" / "report.md"),
"submission_package": str(run_dir / "reports" / "submission-package.json"),
"export_manifest": str(run_dir / "exports" / "export-manifest.json"),
"retention_summary": str(run_dir / "retention-summary.json"),
}
@@ -419,6 +435,7 @@ def _assessment_package(
"waivers": applied_waivers,
"challenges": applied_challenges,
"exclusions": applied_exclusions,
"report_fragments": [],
"certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.",
"created_at": created_at,
}
@@ -432,6 +449,7 @@ def _write_run_directory(
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
export_manifest: dict[str, Any],
retention_summary: dict[str, Any],
) -> None:
write_json(run_dir / "run.json", run_metadata)
@@ -447,6 +465,8 @@ def _write_run_directory(
write_json(run_dir / "normalized" / "findings.json", {"findings": findings})
write_json(run_dir / "normalized" / "mappings.json", {"mappings": mapping_records})
write_json(run_dir / "reports" / "assessment-package.json", assessment_package)
write_json(run_dir / "reports" / "fragments.json", {"fragments": assessment_package["report_fragments"]})
write_json(run_dir / "exports" / "export-manifest.json", export_manifest)
(run_dir / "reports").mkdir(parents=True, exist_ok=True)
(run_dir / "reports" / "report.md").write_text(
_markdown_report(run_metadata, assessment_package),
@@ -471,6 +491,7 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
mapping_lines = _mapping_summary_lines(package)
policy_lines = _policy_summary_lines(package)
review_lines = _review_summary_lines(package)
fragment_lines = markdown_for_fragments(package.get("report_fragments", []))
return "\n".join(
[
@@ -496,6 +517,10 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
"",
review_lines,
"",
"## Extension Fragments",
"",
fragment_lines,
"",
"## Boundary",
"",
package["certification_boundary"],

View File

@@ -0,0 +1,50 @@
"""Portable export builders derived from assessment packages."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from guide_board.schema import assert_valid
def build_export_manifest(assessment_package: dict[str, Any]) -> dict[str, Any]:
manifest = {
"id": f"export-manifest:{assessment_package['run_id']}",
"schema_version": "guide-board.export-manifest.v1",
"export_type": "guide-board.generic-json.v1",
"run_id": assessment_package["run_id"],
"created_at": datetime.now(timezone.utc).isoformat(),
"source_package_ref": "reports/assessment-package.json",
"source_lock_ref": "sources.lock.json",
"summary": assessment_package.get("summary", {}),
"policy_summary": assessment_package.get("policy_summary", {}),
"mapping_summary": assessment_package.get("mapping_summary", {}),
"report_fragments": [
_export_fragment(fragment)
for fragment in assessment_package.get("report_fragments", [])
if isinstance(fragment, dict)
],
"counts": {
"evidence_refs": len(assessment_package.get("evidence_refs", [])),
"findings": len(assessment_package.get("findings", [])),
"artifacts": len(assessment_package.get("artifact_manifest", [])),
"waivers": len(assessment_package.get("waivers", [])),
"challenges": len(assessment_package.get("challenges", [])),
"exclusions": len(assessment_package.get("exclusions", [])),
"report_fragments": len(assessment_package.get("report_fragments", [])),
},
"certification_boundary": assessment_package["certification_boundary"],
}
assert_valid(manifest, "export-manifest")
return manifest
def _export_fragment(fragment: dict[str, Any]) -> dict[str, Any]:
return {
"id": fragment.get("id"),
"extension_id": fragment.get("extension_id"),
"title": fragment.get("title"),
"kind": fragment.get("kind"),
"structured": fragment.get("structured", {}),
}

204
src/guide_board/reports.py Normal file
View File

@@ -0,0 +1,204 @@
"""Report fragment loading for extension-contributed report content."""
from __future__ import annotations
import importlib.util
from pathlib import Path
from types import ModuleType
from typing import Any
from guide_board.errors import ValidationError
from guide_board.io import load_json
def build_report_fragments(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
) -> list[dict[str, Any]]:
fragments: list[dict[str, Any]] = []
for extension in plan["extension_snapshots"]:
extension_path = _snapshot_path(root, extension)
manifest = load_json(extension_path / "extension.json")
for descriptor in manifest.get("report_fragments", []):
fragment = _load_fragment(
root,
run_dir,
run_id,
plan,
evidence,
findings,
mapping_records,
assessment_package,
extension,
extension_path,
descriptor,
)
if fragment is not None:
fragments.append(fragment)
return fragments
def markdown_for_fragments(fragments: list[dict[str, Any]]) -> str:
markdown_blocks = [
fragment.get("markdown", "")
for fragment in fragments
if isinstance(fragment.get("markdown"), str) and fragment.get("markdown")
]
if not markdown_blocks:
return "- no extension report fragments"
return "\n\n".join(markdown_blocks)
def _load_fragment(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
extension: dict[str, Any],
extension_path: Path,
descriptor: Any,
) -> dict[str, Any] | None:
if isinstance(descriptor, str):
descriptor = {
"id": descriptor,
"kind": "markdown_file",
"path": f"reports/{descriptor}.md",
"title": descriptor,
}
if not isinstance(descriptor, dict):
return None
if descriptor["kind"] == "markdown_file":
markdown = _load_markdown_fragment(extension_path, descriptor)
return _fragment_record(extension["id"], descriptor, markdown, {})
if descriptor["kind"] == "python_module":
result = _run_python_fragment(
root,
run_dir,
run_id,
plan,
evidence,
findings,
mapping_records,
assessment_package,
extension_path,
descriptor,
)
return _fragment_record(
extension["id"],
descriptor,
result.get("markdown", ""),
_object_or_empty(result.get("structured")),
)
raise ValidationError(f"{descriptor['id']}: unsupported report fragment kind")
def _load_markdown_fragment(extension_path: Path, descriptor: dict[str, Any]) -> str:
raw_path = descriptor.get("path") or f"reports/{descriptor['id']}.md"
fragment_path = _safe_extension_path(extension_path, raw_path, descriptor["id"])
if not fragment_path.is_file():
raise ValidationError(f"{descriptor['id']}: report fragment not found: {raw_path}")
return fragment_path.read_text(encoding="utf-8")
def _run_python_fragment(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
extension_path: Path,
descriptor: dict[str, Any],
) -> dict[str, Any]:
module_path = descriptor.get("module_path")
callable_name = descriptor.get("callable")
if not module_path or not callable_name:
raise ValidationError(
f"{descriptor['id']}: python_module report fragments need module_path and callable"
)
module_file = _safe_extension_path(extension_path, module_path, descriptor["id"])
module = _load_module(module_file, descriptor["id"])
fragment_callable = getattr(module, callable_name, None)
if not callable(fragment_callable):
raise ValidationError(f"{descriptor['id']}: callable {callable_name!r} was not found")
context = {
"root": str(root),
"run_dir": str(run_dir),
"run_id": run_id,
"plan": plan,
"evidence": evidence,
"findings": findings,
"mappings": mapping_records,
"assessment_package": assessment_package,
"policy_summary": assessment_package.get("policy_summary", {}),
"source_lock": assessment_package.get("source_lock", {}),
"extension_path": str(extension_path),
"report_fragment": descriptor,
}
result = fragment_callable(context)
if not isinstance(result, dict):
raise ValidationError(f"{descriptor['id']}: report fragment must return an object")
return result
def _fragment_record(
extension_id: str,
descriptor: dict[str, Any],
markdown: str,
structured: dict[str, Any],
) -> dict[str, Any]:
return {
"id": descriptor["id"],
"extension_id": extension_id,
"title": descriptor.get("title") or descriptor["id"],
"kind": descriptor["kind"],
"markdown": markdown if isinstance(markdown, str) else "",
"structured": structured,
}
def _safe_extension_path(extension_path: Path, raw_path: str, fragment_id: str) -> Path:
path = (extension_path / raw_path).resolve()
try:
path.relative_to(extension_path.resolve())
except ValueError as exc:
raise ValidationError(
f"{fragment_id}: report fragment path must stay inside the extension directory"
) from exc
return path
def _load_module(path: Path, fragment_id: str) -> ModuleType:
if not path.exists():
raise ValidationError(f"{fragment_id}: module not found: {path}")
module_name = f"_guide_board_report_fragment_{fragment_id.replace('-', '_')}"
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
raise ValidationError(f"{fragment_id}: unable to load module from {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path:
path = Path(extension["path"])
return path if path.is_absolute() else root / path
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}

View File

@@ -49,8 +49,12 @@ def build_retention_summary(
"report_refs": [
"reports/assessment-package.json",
"reports/report.md",
"reports/fragments.json",
"reports/submission-package.json",
],
"export_refs": [
"exports/export-manifest.json",
],
"artifact_retention": {
"policy": plan["assessment_profile_snapshot"].get("retention_policy", {}),
"output_artifact_retention": plan["assessment_profile_snapshot"]
@@ -200,6 +204,7 @@ def _run_projection(run: dict[str, Any]) -> dict[str, Any]:
"status": summary.get("status", "unknown"),
"unexpected_findings": _summary_int(summary, "unexpected_findings"),
"finding_count": _summary_int(summary, "finding_count"),
"mapping_target_count": _summary_int(summary, "mapping_target_count"),
"artifact_count": _summary_int(summary, "artifact_count"),
"challenged_findings": _summary_int(summary, "challenged_findings"),
"authority_exclusions": _summary_int(summary, "authority_exclusions"),
@@ -217,12 +222,18 @@ def _trend_between(
return {
"direction": "insufficient-history",
"status_changed": False,
"status_change": {
"from": None,
"to": _status_for(latest),
},
"unexpected_findings_delta": 0,
"finding_count_delta": 0,
"artifact_count_delta": 0,
"unresolved_review_items_delta": 0,
"evidence_result_deltas": {},
}
"artifact_count_delta": 0,
"unresolved_review_items_delta": 0,
"mapping_target_count_delta": 0,
"evidence_result_deltas": {},
"summary_text": "No previous retained run is available for comparison.",
}
previous_summary = previous.get("summary", {})
latest_summary = latest.get("summary", {})
@@ -242,17 +253,34 @@ def _trend_between(
review_delta = _summary_int(latest_summary, "unresolved_review_items") - _summary_int(
previous_summary, "unresolved_review_items"
)
mapping_target_delta = _summary_int(latest_summary, "mapping_target_count") - _summary_int(
previous_summary, "mapping_target_count"
)
previous_status = _status_for(previous)
latest_status = _status_for(latest)
direction = _trend_direction(previous_status, latest_status, unexpected_delta)
return {
"direction": _trend_direction(previous_status, latest_status, unexpected_delta),
"direction": direction,
"status_changed": previous_status != latest_status,
"status_change": {
"from": previous_status,
"to": latest_status,
},
"unexpected_findings_delta": unexpected_delta,
"finding_count_delta": finding_delta,
"artifact_count_delta": artifact_delta,
"unresolved_review_items_delta": review_delta,
"mapping_target_count_delta": mapping_target_delta,
"evidence_result_deltas": evidence_deltas,
"summary_text": _trend_summary_text(
direction,
previous_status,
latest_status,
unexpected_delta,
review_delta,
mapping_target_delta,
),
}
@@ -294,6 +322,24 @@ def _summary_int(summary: dict[str, Any], key: str) -> int:
return value if isinstance(value, int) and not isinstance(value, bool) else 0
def _trend_summary_text(
direction: str,
previous_status: str,
latest_status: str,
unexpected_delta: int,
review_delta: int,
mapping_target_delta: int,
) -> str:
parts = [
f"Trend {direction}",
f"status {previous_status} -> {latest_status}",
f"unexpected findings delta {unexpected_delta}",
f"unresolved review delta {review_delta}",
f"mapping target delta {mapping_target_delta}",
]
return "; ".join(parts) + "."
def _dict_deltas(previous: Any, latest: Any) -> dict[str, int]:
previous_dict = previous if isinstance(previous, dict) else {}
latest_dict = latest if isinstance(latest, dict) else {}

View File

@@ -285,6 +285,12 @@ class GuideBoardRequestHandler(BaseHTTPRequestHandler):
if isinstance(submission_value, str) and submission_value
else None
)
export_value = result.get("export_manifest")
export_path = (
Path(export_value)
if isinstance(export_value, str) and export_value
else None
)
try:
report_markdown = report_path.read_text(encoding="utf-8")
assessment_package = load_json(package_path)
@@ -294,6 +300,11 @@ class GuideBoardRequestHandler(BaseHTTPRequestHandler):
if submission_path is not None and submission_path.is_file()
else None
)
export_manifest = (
load_json(export_path)
if export_path is not None and export_path.is_file()
else None
)
except OSError as exc:
raise HttpProblem(404, f"run report artifact is missing: {exc}") from exc
@@ -307,6 +318,7 @@ class GuideBoardRequestHandler(BaseHTTPRequestHandler):
"assessment_package": str(package_path),
"retention_summary": str(retention_path),
"submission_package": str(submission_path) if submission_path is not None else None,
"export_manifest": str(export_path) if export_path is not None else None,
},
"report": {
"path": str(report_path),
@@ -324,6 +336,10 @@ class GuideBoardRequestHandler(BaseHTTPRequestHandler):
"path": str(submission_path) if submission_package else None,
"json": submission_package,
},
"export_manifest": {
"path": str(export_path) if export_manifest else None,
"json": export_manifest,
},
}
def _retained_runs(self, query: dict[str, str]) -> dict[str, Any]: