Files
guide-board/src/guide_board/reports.py

205 lines
6.7 KiB
Python

"""Report fragment loading for extension-contributed report content."""
from __future__ import annotations
import importlib.util
from pathlib import Path
from types import ModuleType
from typing import Any
from guide_board.errors import ValidationError
from guide_board.io import load_json
def build_report_fragments(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
) -> list[dict[str, Any]]:
fragments: list[dict[str, Any]] = []
for extension in plan["extension_snapshots"]:
extension_path = _snapshot_path(root, extension)
manifest = load_json(extension_path / "extension.json")
for descriptor in manifest.get("report_fragments", []):
fragment = _load_fragment(
root,
run_dir,
run_id,
plan,
evidence,
findings,
mapping_records,
assessment_package,
extension,
extension_path,
descriptor,
)
if fragment is not None:
fragments.append(fragment)
return fragments
def markdown_for_fragments(fragments: list[dict[str, Any]]) -> str:
markdown_blocks = [
fragment.get("markdown", "")
for fragment in fragments
if isinstance(fragment.get("markdown"), str) and fragment.get("markdown")
]
if not markdown_blocks:
return "- no extension report fragments"
return "\n\n".join(markdown_blocks)
def _load_fragment(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
extension: dict[str, Any],
extension_path: Path,
descriptor: Any,
) -> dict[str, Any] | None:
if isinstance(descriptor, str):
descriptor = {
"id": descriptor,
"kind": "markdown_file",
"path": f"reports/{descriptor}.md",
"title": descriptor,
}
if not isinstance(descriptor, dict):
return None
if descriptor["kind"] == "markdown_file":
markdown = _load_markdown_fragment(extension_path, descriptor)
return _fragment_record(extension["id"], descriptor, markdown, {})
if descriptor["kind"] == "python_module":
result = _run_python_fragment(
root,
run_dir,
run_id,
plan,
evidence,
findings,
mapping_records,
assessment_package,
extension_path,
descriptor,
)
return _fragment_record(
extension["id"],
descriptor,
result.get("markdown", ""),
_object_or_empty(result.get("structured")),
)
raise ValidationError(f"{descriptor['id']}: unsupported report fragment kind")
def _load_markdown_fragment(extension_path: Path, descriptor: dict[str, Any]) -> str:
raw_path = descriptor.get("path") or f"reports/{descriptor['id']}.md"
fragment_path = _safe_extension_path(extension_path, raw_path, descriptor["id"])
if not fragment_path.is_file():
raise ValidationError(f"{descriptor['id']}: report fragment not found: {raw_path}")
return fragment_path.read_text(encoding="utf-8")
def _run_python_fragment(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
extension_path: Path,
descriptor: dict[str, Any],
) -> dict[str, Any]:
module_path = descriptor.get("module_path")
callable_name = descriptor.get("callable")
if not module_path or not callable_name:
raise ValidationError(
f"{descriptor['id']}: python_module report fragments need module_path and callable"
)
module_file = _safe_extension_path(extension_path, module_path, descriptor["id"])
module = _load_module(module_file, descriptor["id"])
fragment_callable = getattr(module, callable_name, None)
if not callable(fragment_callable):
raise ValidationError(f"{descriptor['id']}: callable {callable_name!r} was not found")
context = {
"root": str(root),
"run_dir": str(run_dir),
"run_id": run_id,
"plan": plan,
"evidence": evidence,
"findings": findings,
"mappings": mapping_records,
"assessment_package": assessment_package,
"policy_summary": assessment_package.get("policy_summary", {}),
"source_lock": assessment_package.get("source_lock", {}),
"extension_path": str(extension_path),
"report_fragment": descriptor,
}
result = fragment_callable(context)
if not isinstance(result, dict):
raise ValidationError(f"{descriptor['id']}: report fragment must return an object")
return result
def _fragment_record(
extension_id: str,
descriptor: dict[str, Any],
markdown: str,
structured: dict[str, Any],
) -> dict[str, Any]:
return {
"id": descriptor["id"],
"extension_id": extension_id,
"title": descriptor.get("title") or descriptor["id"],
"kind": descriptor["kind"],
"markdown": markdown if isinstance(markdown, str) else "",
"structured": structured,
}
def _safe_extension_path(extension_path: Path, raw_path: str, fragment_id: str) -> Path:
path = (extension_path / raw_path).resolve()
try:
path.relative_to(extension_path.resolve())
except ValueError as exc:
raise ValidationError(
f"{fragment_id}: report fragment path must stay inside the extension directory"
) from exc
return path
def _load_module(path: Path, fragment_id: str) -> ModuleType:
if not path.exists():
raise ValidationError(f"{fragment_id}: module not found: {path}")
module_name = f"_guide_board_report_fragment_{fragment_id.replace('-', '_')}"
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
raise ValidationError(f"{fragment_id}: unable to load module from {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path:
path = Path(extension["path"])
return path if path.is_absolute() else root / path
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}