Files
open-cmis-tck/src/guide_board/execution.py

270 lines
8.8 KiB
Python

"""Baseline assessment execution."""
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.artifacts import build_artifact_manifest
from guide_board.io import write_json
from guide_board.planning import build_run_plan
from guide_board.runners import run_step
from guide_board.schema import assert_valid
def run_assessment(
root: Path,
target_path: Path,
assessment_path: Path,
output_dir: Path | None = None,
) -> dict[str, Any]:
plan = build_run_plan(root, target_path, assessment_path)
run_id = f"run-{_timestamp()}"
run_dir = output_dir or root / "runs" / run_id
created_at = _now()
evidence = [
_evidence_for_step(root, run_dir, run_id, plan, step)
for step in plan["ordered_steps"]
]
for item in evidence:
assert_valid(item, "evidence-item")
findings = _findings_for_evidence(run_id, evidence)
for finding in findings:
assert_valid(finding, "finding")
artifact_manifest = build_artifact_manifest(run_dir, run_id, evidence)
assessment_package = _assessment_package(
run_id,
plan,
evidence,
findings,
artifact_manifest,
created_at,
)
assert_valid(assessment_package, "assessment-package")
run_metadata = {
"id": run_id,
"status": _run_status(evidence),
"created_at": created_at,
"plan_id": plan["id"],
"target_profile_ref": plan["target_profile_snapshot"]["id"],
"assessment_profile_ref": plan["assessment_profile_snapshot"]["id"],
}
_write_run_directory(run_dir, run_metadata, plan, evidence, findings, assessment_package)
return {
"status": run_metadata["status"],
"run_id": run_id,
"run_dir": str(run_dir),
"assessment_package": str(run_dir / "reports" / "assessment-package.json"),
"report": str(run_dir / "reports" / "report.md"),
}
def _evidence_for_step(
root: Path,
run_dir: Path,
run_id: str,
plan: dict[str, Any],
step: dict[str, Any],
) -> dict[str, Any]:
now = _now()
runner_ref = step.get("runner_ref")
runner_result = run_step(root, run_dir, run_id, plan, step)
return {
"id": f"evidence:{step['id']}",
"run_id": run_id,
"extension_id": step["extension_id"],
"check_id": step["id"],
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": runner_result["result"],
"observations": runner_result["observations"],
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
**runner_result["facts"],
},
"requirement_refs": _requirement_refs(plan, step),
"artifact_refs": runner_result["artifact_refs"],
"started_at": now,
"completed_at": now,
}
def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]:
if step["kind"] != "check_group":
return []
return list(step.get("requirement_refs", []))
def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
findings: list[dict[str, Any]] = []
for item in evidence:
if item["result"] not in {"blocked", "fail", "infrastructure_error"}:
continue
findings.append(
{
"id": f"finding:{item['check_id']}",
"run_id": run_id,
"status": item["result"],
"severity": _severity_for_item(item),
"classification": _classification_for_item(item),
"requirement_refs": item["requirement_refs"],
"evidence_refs": [item["id"]],
"expected": _expected_for_item(item),
"waiver_ref": None,
"remediation": _remediation_for_item(item),
}
)
return findings
def _classification_for_item(item: dict[str, Any]) -> str:
result = item["result"]
if result == "blocked":
blocked_reason = item.get("facts", {}).get("blocked_reason")
if isinstance(blocked_reason, str):
return blocked_reason
return "runner_not_implemented"
if result == "fail":
return "check_failed"
return "infrastructure_error"
def _severity_for_item(item: dict[str, Any]) -> str:
if item["result"] == "blocked":
return "info"
return "medium"
def _expected_for_item(item: dict[str, Any]) -> bool:
if item["result"] != "blocked":
return False
blocked_reason = item.get("facts", {}).get("blocked_reason")
return blocked_reason in {
"missing_command",
"missing_dependency",
"tck_invocation_not_configured",
}
def _remediation_for_item(item: dict[str, Any]) -> str:
result = item["result"]
if result == "blocked":
blocked_reason = item.get("facts", {}).get("blocked_reason")
if blocked_reason == "missing_dependency":
return "Install the missing runner dependencies and rerun the assessment."
if blocked_reason == "tck_invocation_not_configured":
return "Configure the final harness invocation, group mapping, and raw artifact capture."
return "Implement or configure the declared extension runner."
if result == "infrastructure_error":
return "Fix the target, network, credentials, or harness runtime and rerun the assessment."
return "Review the failed check and target implementation."
def _assessment_package(
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
artifact_manifest: list[dict[str, Any]],
created_at: str,
) -> dict[str, Any]:
summary = dict(Counter(item["result"] for item in evidence))
return {
"id": f"assessment-package:{run_id}",
"run_id": run_id,
"target": plan["target_profile_snapshot"],
"frameworks": [
{"id": framework_id} for framework_id in plan["source_lock"]["framework_refs"]
],
"extensions": plan["extension_snapshots"],
"source_lock": plan["source_lock"],
"summary": summary,
"findings": findings,
"evidence_refs": [item["id"] for item in evidence],
"artifact_manifest": artifact_manifest,
"waivers": [],
"certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.",
"created_at": created_at,
}
def _write_run_directory(
run_dir: Path,
run_metadata: dict[str, Any],
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
assessment_package: dict[str, Any],
) -> None:
write_json(run_dir / "run.json", run_metadata)
write_json(run_dir / "plan.json", plan)
write_json(run_dir / "sources.lock.json", plan["source_lock"])
write_json(run_dir / "target-profile.snapshot.json", plan["target_profile_snapshot"])
write_json(
run_dir / "assessment-profile.snapshot.json",
plan["assessment_profile_snapshot"],
)
write_json(run_dir / "normalized" / "evidence.json", {"evidence": evidence})
write_json(run_dir / "normalized" / "findings.json", {"findings": findings})
write_json(run_dir / "normalized" / "mappings.json", {"mappings": []})
write_json(run_dir / "reports" / "assessment-package.json", assessment_package)
(run_dir / "reports").mkdir(parents=True, exist_ok=True)
(run_dir / "reports" / "report.md").write_text(
_markdown_report(run_metadata, assessment_package),
encoding="utf-8",
)
def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> str:
summary_lines = "\n".join(
f"- {status}: {count}" for status, count in sorted(package["summary"].items())
)
if not summary_lines:
summary_lines = "- no evidence produced"
return "\n".join(
[
f"# Guide Board Assessment Report: {run_metadata['id']}",
"",
f"Status: {run_metadata['status']}",
f"Target: {run_metadata['target_profile_ref']}",
f"Assessment: {run_metadata['assessment_profile_ref']}",
"",
"## Summary",
"",
summary_lines,
"",
"## Boundary",
"",
package["certification_boundary"],
"",
]
)
def _run_status(evidence: list[dict[str, Any]]) -> str:
if any(item["result"] == "fail" for item in evidence):
return "failed"
if any(item["result"] == "infrastructure_error" for item in evidence):
return "infrastructure_error"
if any(item["result"] == "blocked" for item in evidence):
return "blocked"
return "completed"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")