Files
guide-board/src/guide_board/execution.py

394 lines
13 KiB
Python

"""Baseline assessment execution."""
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.artifacts import build_artifact_manifest
from guide_board.io import write_json
from guide_board.mapping import build_mapping_records, summarize_mappings
from guide_board.planning import build_run_plan
from guide_board.policy import apply_policy
from guide_board.retention import build_retention_summary
from guide_board.runners import run_step
from guide_board.schema import assert_valid
def run_assessment(
root: Path,
target_path: Path,
assessment_path: Path,
output_dir: Path | None = None,
extension_dirs: list[Path] | None = None,
) -> dict[str, Any]:
plan = build_run_plan(root, target_path, assessment_path, extension_dirs)
run_id = f"run-{_timestamp()}"
run_dir = output_dir or root / "runs" / run_id
created_at = _now()
evidence = _execute_steps(root, run_dir, run_id, plan)
for item in evidence:
assert_valid(item, "evidence-item")
findings = _findings_for_evidence(run_id, evidence)
findings, policy_summary, applied_waivers = apply_policy(root, plan, findings)
for finding in findings:
assert_valid(finding, "finding")
artifact_manifest = build_artifact_manifest(run_dir, run_id, evidence)
mapping_records = build_mapping_records(root, run_id, plan, evidence)
mapping_summary = summarize_mappings(mapping_records)
assessment_package = _assessment_package(
run_id,
plan,
evidence,
findings,
artifact_manifest,
mapping_summary,
policy_summary,
applied_waivers,
created_at,
)
assert_valid(assessment_package, "assessment-package")
run_metadata = {
"id": run_id,
"status": _run_status(evidence),
"created_at": created_at,
"plan_id": plan["id"],
"target_profile_ref": plan["target_profile_snapshot"]["id"],
"assessment_profile_ref": plan["assessment_profile_snapshot"]["id"],
}
retention_summary = build_retention_summary(run_metadata, plan, assessment_package)
assert_valid(retention_summary, "retention-summary")
_write_run_directory(
run_dir,
run_metadata,
plan,
evidence,
findings,
mapping_records,
assessment_package,
retention_summary,
)
return {
"status": run_metadata["status"],
"run_id": run_id,
"run_dir": str(run_dir),
"assessment_package": str(run_dir / "reports" / "assessment-package.json"),
"report": str(run_dir / "reports" / "report.md"),
"retention_summary": str(run_dir / "retention-summary.json"),
}
def _execute_steps(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
) -> list[dict[str, Any]]:
evidence: list[dict[str, Any]] = []
preflight_blocks: dict[str, dict[str, Any]] = {}
for step in plan["ordered_steps"]:
extension_id = step["extension_id"]
if step["kind"] == "check_group" and extension_id in preflight_blocks:
item = _blocked_by_preflight_evidence(run_id, plan, step, preflight_blocks[extension_id])
else:
item = _evidence_for_step(root, run_dir, run_id, plan, step)
evidence.append(item)
if step["kind"] == "preflight" and _blocks_downstream(item):
preflight_blocks[extension_id] = item
return evidence
def _blocked_by_preflight_evidence(
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
preflight: dict[str, Any],
) -> dict[str, Any]:
now = _now()
runner_ref = step.get("runner_ref")
return {
"id": f"evidence:{step['id']}",
"run_id": run_id,
"extension_id": step["extension_id"],
"check_id": step["id"],
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": "blocked",
"observations": [
"Check group was not executed because extension preflight did not pass."
],
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
"blocked_reason": "preflight_failed",
"preflight_evidence_ref": preflight["id"],
"preflight_result": preflight["result"],
},
"requirement_refs": _requirement_refs(plan, step),
"artifact_refs": [],
"started_at": now,
"completed_at": now,
}
def _blocks_downstream(evidence: dict[str, Any]) -> bool:
return evidence["result"] in {"fail", "blocked", "infrastructure_error"}
def _evidence_for_step(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
) -> dict[str, Any]:
now = _now()
runner_ref = step.get("runner_ref")
runner_result = run_step(root, run_dir, run_id, plan, step)
return {
"id": f"evidence:{step['id']}",
"run_id": run_id,
"extension_id": step["extension_id"],
"check_id": step["id"],
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": runner_result["result"],
"observations": runner_result["observations"],
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
**runner_result["facts"],
},
"requirement_refs": _requirement_refs(plan, step),
"artifact_refs": runner_result["artifact_refs"],
"started_at": now,
"completed_at": now,
}
def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]:
if step["kind"] != "check_group":
return []
return list(step.get("requirement_refs", []))
def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
findings: list[dict[str, Any]] = []
for item in evidence:
if item["result"] not in {"blocked", "fail", "infrastructure_error"}:
continue
findings.append(
{
"id": f"finding:{item['check_id']}",
"run_id": run_id,
"check_id": item["check_id"],
"status": item["result"],
"severity": _severity_for_item(item),
"classification": _classification_for_item(item),
"requirement_refs": item["requirement_refs"],
"evidence_refs": [item["id"]],
"expected": _expected_for_item(item),
"waiver_ref": None,
"policy_ref": None,
"remediation": _remediation_for_item(item),
}
)
return findings
def _classification_for_item(item: dict[str, Any]) -> str:
result = item["result"]
if result == "blocked":
blocked_reason = item.get("facts", {}).get("blocked_reason")
if isinstance(blocked_reason, str):
return blocked_reason
return "runner_not_implemented"
if result == "fail":
return "check_failed"
return "infrastructure_error"
def _severity_for_item(item: dict[str, Any]) -> str:
if item["result"] == "blocked":
return "info"
return "medium"
def _expected_for_item(item: dict[str, Any]) -> bool:
if item["result"] != "blocked":
return False
blocked_reason = item.get("facts", {}).get("blocked_reason")
return blocked_reason in {
"missing_command",
"missing_dependency",
"preflight_failed",
"tck_invocation_not_configured",
}
def _remediation_for_item(item: dict[str, Any]) -> str:
result = item["result"]
if result == "blocked":
blocked_reason = item.get("facts", {}).get("blocked_reason")
if blocked_reason == "missing_dependency":
return "Install the missing runner dependencies and rerun the assessment."
if blocked_reason == "preflight_failed":
return "Fix the preflight failure and rerun downstream checks."
if blocked_reason == "tck_invocation_not_configured":
return "Configure the final harness invocation, group mapping, and raw artifact capture."
return "Implement or configure the declared extension runner."
if result == "infrastructure_error":
return "Fix the target, network, credentials, or harness runtime and rerun the assessment."
return "Review the failed check and target implementation."
def _assessment_package(
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
artifact_manifest: list[dict[str, Any]],
mapping_summary: dict[str, Any],
policy_summary: dict[str, Any],
applied_waivers: list[dict[str, Any]],
created_at: str,
) -> dict[str, Any]:
summary = dict(Counter(item["result"] for item in evidence))
return {
"id": f"assessment-package:{run_id}",
"run_id": run_id,
"target": plan["target_profile_snapshot"],
"frameworks": [
{"id": framework_id} for framework_id in plan["source_lock"]["framework_refs"]
],
"extensions": plan["extension_snapshots"],
"source_lock": plan["source_lock"],
"summary": summary,
"mapping_summary": mapping_summary,
"policy_summary": policy_summary,
"findings": findings,
"evidence_refs": [item["id"] for item in evidence],
"artifact_manifest": artifact_manifest,
"waivers": applied_waivers,
"certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.",
"created_at": created_at,
}
def _write_run_directory(
run_dir: Path,
run_metadata: dict[str, Any],
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
mapping_records: list[dict[str, Any]],
assessment_package: dict[str, Any],
retention_summary: dict[str, Any],
) -> None:
write_json(run_dir / "run.json", run_metadata)
write_json(run_dir / "retention-summary.json", retention_summary)
write_json(run_dir / "plan.json", plan)
write_json(run_dir / "sources.lock.json", plan["source_lock"])
write_json(run_dir / "target-profile.snapshot.json", plan["target_profile_snapshot"])
write_json(
run_dir / "assessment-profile.snapshot.json",
plan["assessment_profile_snapshot"],
)
write_json(run_dir / "normalized" / "evidence.json", {"evidence": evidence})
write_json(run_dir / "normalized" / "findings.json", {"findings": findings})
write_json(run_dir / "normalized" / "mappings.json", {"mappings": mapping_records})
write_json(run_dir / "reports" / "assessment-package.json", assessment_package)
(run_dir / "reports").mkdir(parents=True, exist_ok=True)
(run_dir / "reports" / "report.md").write_text(
_markdown_report(run_metadata, assessment_package),
encoding="utf-8",
)
def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> str:
summary_lines = "\n".join(
f"- {status}: {count}" for status, count in sorted(package["summary"].items())
)
if not summary_lines:
summary_lines = "- no evidence produced"
mapping_lines = _mapping_summary_lines(package)
policy_lines = _policy_summary_lines(package)
return "\n".join(
[
f"# Guide Board Assessment Report: {run_metadata['id']}",
"",
f"Status: {run_metadata['status']}",
f"Target: {run_metadata['target_profile_ref']}",
f"Assessment: {run_metadata['assessment_profile_ref']}",
"",
"## Summary",
"",
summary_lines,
"",
"## Mappings",
"",
mapping_lines,
"",
"## Policy",
"",
policy_lines,
"",
"## Boundary",
"",
package["certification_boundary"],
"",
]
)
def _mapping_summary_lines(package: dict[str, Any]) -> str:
targets = package.get("mapping_summary", {}).get("targets", [])
if not targets:
return "- no mapped evidence"
lines = []
for target in targets:
results = ", ".join(
f"{status}: {count}"
for status, count in sorted(target.get("results", {}).items())
)
lines.append(f"- {target['label']} ({target['target_id']}): {results}")
return "\n".join(lines)
def _policy_summary_lines(package: dict[str, Any]) -> str:
summary = package.get("policy_summary", {})
return "\n".join(
[
f"- applied expectations: {summary.get('applied_expectations', 0)}",
f"- applied waivers: {summary.get('applied_waivers', 0)}",
f"- unexpected findings: {summary.get('unexpected_findings', 0)}",
]
)
def _run_status(evidence: list[dict[str, Any]]) -> str:
if any(item["result"] == "fail" for item in evidence):
return "failed"
if any(item["result"] == "infrastructure_error" for item in evidence):
return "infrastructure_error"
if any(item["result"] == "blocked" for item in evidence):
return "blocked"
return "completed"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")