generated from coulomb/repo-seed
first extension execution path
This commit is contained in:
@@ -9,6 +9,7 @@ from typing import Any
|
||||
|
||||
from guide_board.io import write_json
|
||||
from guide_board.planning import build_run_plan
|
||||
from guide_board.runners import run_step
|
||||
from guide_board.schema import assert_valid
|
||||
|
||||
|
||||
@@ -23,7 +24,10 @@ def run_assessment(
|
||||
run_dir = output_dir or root / "runs" / run_id
|
||||
created_at = _now()
|
||||
|
||||
evidence = [_evidence_for_step(run_id, plan, step) for step in plan["ordered_steps"]]
|
||||
evidence = [
|
||||
_evidence_for_step(root, run_dir, run_id, plan, step)
|
||||
for step in plan["ordered_steps"]
|
||||
]
|
||||
for item in evidence:
|
||||
assert_valid(item, "evidence-item")
|
||||
|
||||
@@ -53,19 +57,16 @@ def run_assessment(
|
||||
}
|
||||
|
||||
|
||||
def _evidence_for_step(run_id: str, plan: dict[str, Any], step: dict[str, Any]) -> dict[str, Any]:
|
||||
def _evidence_for_step(
|
||||
root: Path,
|
||||
run_dir: Path,
|
||||
run_id: str,
|
||||
plan: dict[str, Any],
|
||||
step: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
now = _now()
|
||||
runner_ref = step.get("runner_ref")
|
||||
if runner_ref is None:
|
||||
result = "manual" if step["kind"] == "check_group" else "skipped"
|
||||
observations = [
|
||||
"No runner is configured for this step in the baseline core."
|
||||
]
|
||||
else:
|
||||
result = "blocked"
|
||||
observations = [
|
||||
f"Runner {runner_ref!r} is declared but not implemented by the baseline core."
|
||||
]
|
||||
runner_result = run_step(root, run_dir, run_id, plan, step)
|
||||
|
||||
return {
|
||||
"id": f"evidence:{step['id']}",
|
||||
@@ -73,14 +74,15 @@ def _evidence_for_step(run_id: str, plan: dict[str, Any], step: dict[str, Any])
|
||||
"extension_id": step["extension_id"],
|
||||
"check_id": step["id"],
|
||||
"subject_ref": plan["target_profile_snapshot"]["id"],
|
||||
"result": result,
|
||||
"observations": observations,
|
||||
"result": runner_result["result"],
|
||||
"observations": runner_result["observations"],
|
||||
"facts": {
|
||||
"step_kind": step["kind"],
|
||||
"runner_ref": runner_ref,
|
||||
**runner_result["facts"],
|
||||
},
|
||||
"requirement_refs": _requirement_refs(plan, step),
|
||||
"artifact_refs": [],
|
||||
"artifact_refs": runner_result["artifact_refs"],
|
||||
"started_at": now,
|
||||
"completed_at": now,
|
||||
}
|
||||
@@ -95,25 +97,38 @@ def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]:
|
||||
def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
findings: list[dict[str, Any]] = []
|
||||
for item in evidence:
|
||||
if item["result"] != "blocked":
|
||||
if item["result"] not in {"blocked", "fail", "infrastructure_error"}:
|
||||
continue
|
||||
classification = {
|
||||
"blocked": "runner_not_implemented",
|
||||
"fail": "check_failed",
|
||||
"infrastructure_error": "infrastructure_error",
|
||||
}[item["result"]]
|
||||
findings.append(
|
||||
{
|
||||
"id": f"finding:{item['check_id']}",
|
||||
"run_id": run_id,
|
||||
"status": "blocked",
|
||||
"severity": "info",
|
||||
"classification": "runner_not_implemented",
|
||||
"status": item["result"],
|
||||
"severity": "info" if item["result"] == "blocked" else "medium",
|
||||
"classification": classification,
|
||||
"requirement_refs": item["requirement_refs"],
|
||||
"evidence_refs": [item["id"]],
|
||||
"expected": True,
|
||||
"expected": item["result"] == "blocked",
|
||||
"waiver_ref": None,
|
||||
"remediation": "Implement or configure the declared extension runner.",
|
||||
"remediation": _remediation_for_result(item["result"]),
|
||||
}
|
||||
)
|
||||
return findings
|
||||
|
||||
|
||||
def _remediation_for_result(result: str) -> str:
|
||||
if result == "blocked":
|
||||
return "Implement or configure the declared extension runner."
|
||||
if result == "infrastructure_error":
|
||||
return "Fix the target, network, credentials, or harness runtime and rerun the assessment."
|
||||
return "Review the failed check and target implementation."
|
||||
|
||||
|
||||
def _assessment_package(
|
||||
run_id: str,
|
||||
plan: dict[str, Any],
|
||||
@@ -198,6 +213,8 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
|
||||
def _run_status(evidence: list[dict[str, Any]]) -> str:
|
||||
if any(item["result"] == "fail" for item in evidence):
|
||||
return "failed"
|
||||
if any(item["result"] == "infrastructure_error" for item in evidence):
|
||||
return "infrastructure_error"
|
||||
if any(item["result"] == "blocked" for item in evidence):
|
||||
return "blocked"
|
||||
return "completed"
|
||||
|
||||
162
src/guide_board/runners.py
Normal file
162
src/guide_board/runners.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Runner bridge for extension-provided checks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Any, Callable
|
||||
|
||||
from guide_board.errors import ValidationError
|
||||
from guide_board.io import load_json
|
||||
|
||||
|
||||
RunnerCallable = Callable[[dict[str, Any]], dict[str, Any]]
|
||||
|
||||
|
||||
def run_step(
|
||||
root: Path,
|
||||
run_dir: Path,
|
||||
run_id: str,
|
||||
plan: dict[str, Any],
|
||||
step: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
runner_ref = step.get("runner_ref")
|
||||
if runner_ref is None:
|
||||
return _no_runner_result(step)
|
||||
|
||||
extension = _extension_snapshot(plan, step["extension_id"])
|
||||
extension_path = root / extension["path"]
|
||||
manifest = load_json(extension_path / "extension.json")
|
||||
entrypoint = _runner_entrypoint(manifest, runner_ref)
|
||||
if entrypoint["kind"] == "python_module":
|
||||
return _run_python_module(root, run_dir, run_id, plan, step, extension_path, entrypoint)
|
||||
if entrypoint["kind"] == "external":
|
||||
return {
|
||||
"result": "blocked",
|
||||
"observations": [
|
||||
f"Runner {runner_ref!r} is declared as an external runner and is not implemented by the core."
|
||||
],
|
||||
"facts": {
|
||||
"runner_ref": runner_ref,
|
||||
"runner_kind": "external",
|
||||
},
|
||||
"artifact_refs": [],
|
||||
}
|
||||
if entrypoint["kind"] == "command":
|
||||
return {
|
||||
"result": "blocked",
|
||||
"observations": [
|
||||
f"Runner {runner_ref!r} is declared as a command runner; command execution is not enabled yet."
|
||||
],
|
||||
"facts": {
|
||||
"runner_ref": runner_ref,
|
||||
"runner_kind": "command",
|
||||
},
|
||||
"artifact_refs": [],
|
||||
}
|
||||
raise ValidationError(f"{runner_ref}: unsupported runner kind {entrypoint['kind']!r}")
|
||||
|
||||
|
||||
def _no_runner_result(step: dict[str, Any]) -> dict[str, Any]:
|
||||
result = "manual" if step["kind"] == "check_group" else "skipped"
|
||||
return {
|
||||
"result": result,
|
||||
"observations": [
|
||||
"No runner is configured for this step in the baseline core."
|
||||
],
|
||||
"facts": {
|
||||
"runner_ref": None,
|
||||
"runner_kind": None,
|
||||
},
|
||||
"artifact_refs": [],
|
||||
}
|
||||
|
||||
|
||||
def _run_python_module(
|
||||
root: Path,
|
||||
run_dir: Path,
|
||||
run_id: str,
|
||||
plan: dict[str, Any],
|
||||
step: dict[str, Any],
|
||||
extension_path: Path,
|
||||
entrypoint: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
module_path = entrypoint.get("module_path")
|
||||
callable_name = entrypoint.get("callable")
|
||||
if not module_path or not callable_name:
|
||||
raise ValidationError(f"{entrypoint['id']}: python_module runners need module_path and callable")
|
||||
|
||||
module_file = (extension_path / module_path).resolve()
|
||||
try:
|
||||
module_file.relative_to(extension_path.resolve())
|
||||
except ValueError as exc:
|
||||
raise ValidationError(
|
||||
f"{entrypoint['id']}: module_path must stay inside the extension directory"
|
||||
) from exc
|
||||
|
||||
module = _load_module(module_file, entrypoint["id"])
|
||||
runner = getattr(module, callable_name, None)
|
||||
if not callable(runner):
|
||||
raise ValidationError(f"{entrypoint['id']}: callable {callable_name!r} was not found")
|
||||
|
||||
context = {
|
||||
"root": str(root),
|
||||
"run_dir": str(run_dir),
|
||||
"run_id": run_id,
|
||||
"plan": plan,
|
||||
"step": step,
|
||||
"target_profile": plan["target_profile_snapshot"],
|
||||
"assessment_profile": plan["assessment_profile_snapshot"],
|
||||
"extension_path": str(extension_path),
|
||||
"runner": entrypoint,
|
||||
}
|
||||
try:
|
||||
result = runner(context)
|
||||
except Exception as exc: # noqa: BLE001 - extension failures become evidence.
|
||||
return {
|
||||
"result": "infrastructure_error",
|
||||
"observations": [
|
||||
f"Runner {entrypoint['id']!r} failed before producing evidence: {exc}"
|
||||
],
|
||||
"facts": {
|
||||
"runner_ref": entrypoint["id"],
|
||||
"runner_kind": "python_module",
|
||||
"error_type": type(exc).__name__,
|
||||
},
|
||||
"artifact_refs": [],
|
||||
}
|
||||
if not isinstance(result, dict):
|
||||
raise ValidationError(f"{entrypoint['id']}: runner must return an object")
|
||||
return {
|
||||
"result": result.get("result", "unknown"),
|
||||
"observations": result.get("observations", []),
|
||||
"facts": result.get("facts", {}),
|
||||
"artifact_refs": result.get("artifact_refs", []),
|
||||
}
|
||||
|
||||
|
||||
def _load_module(path: Path, runner_id: str) -> ModuleType:
|
||||
if not path.exists():
|
||||
raise ValidationError(f"{runner_id}: module not found: {path}")
|
||||
module_name = f"_guide_board_runner_{runner_id.replace('-', '_')}"
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise ValidationError(f"{runner_id}: unable to load module from {path}")
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _extension_snapshot(plan: dict[str, Any], extension_id: str) -> dict[str, Any]:
|
||||
for extension in plan["extension_snapshots"]:
|
||||
if extension["id"] == extension_id:
|
||||
return extension
|
||||
raise ValidationError(f"step references unknown extension {extension_id!r}")
|
||||
|
||||
|
||||
def _runner_entrypoint(manifest: dict[str, Any], runner_ref: str) -> dict[str, Any]:
|
||||
for entrypoint in manifest.get("runner_entrypoints", []):
|
||||
if entrypoint["id"] == runner_ref:
|
||||
return entrypoint
|
||||
raise ValidationError(f"{manifest['id']}: runner {runner_ref!r} is not declared")
|
||||
16
src/guide_board/sdk.py
Normal file
16
src/guide_board/sdk.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Public helper types for extension runners.
|
||||
|
||||
Extension Python runners are called with one dictionary context and should return
|
||||
one dictionary shaped like `RunnerResult`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, TypedDict
|
||||
|
||||
|
||||
class RunnerResult(TypedDict, total=False):
|
||||
result: str
|
||||
observations: list[str]
|
||||
facts: dict[str, Any]
|
||||
artifact_refs: list[str]
|
||||
Reference in New Issue
Block a user