first working guide-board architecture core

This commit is contained in:
2026-05-07 11:56:14 +02:00
parent 360236ff71
commit be3ab87c6a
34 changed files with 1536 additions and 2 deletions

View File

@@ -0,0 +1,3 @@
"""Guide Board core package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,5 @@
from guide_board.cli import main
if __name__ == "__main__":
raise SystemExit(main())

138
src/guide_board/cli.py Normal file
View File

@@ -0,0 +1,138 @@
"""Guide Board command line interface."""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from guide_board.discovery import discover_extensions
from guide_board.errors import GuideBoardError
from guide_board.execution import run_assessment
from guide_board.io import load_json, write_json
from guide_board.planning import (
build_run_plan,
validate_assessment_profile,
validate_target_profile,
)
from guide_board.schema import assert_valid
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
result = args.func(args)
except GuideBoardError as exc:
print(f"guide-board: {exc}", file=sys.stderr)
return 2
except (OSError, ValueError) as exc:
print(f"guide-board: {exc}", file=sys.stderr)
return 1
if result is not None:
print_json(result)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="guide-board")
parser.add_argument("--root", type=Path, default=Path.cwd(), help="repository root")
subcommands = parser.add_subparsers(required=True)
extensions = subcommands.add_parser("extensions", help="extension operations")
extension_commands = extensions.add_subparsers(required=True)
list_extensions = extension_commands.add_parser("list", help="list discovered extensions")
list_extensions.set_defaults(func=cmd_extensions_list)
validate_extensions = extension_commands.add_parser(
"validate", help="validate discovered extension manifests"
)
validate_extensions.set_defaults(func=cmd_extensions_validate)
profile = subcommands.add_parser("profile", help="profile validation")
profile_commands = profile.add_subparsers(required=True)
target = profile_commands.add_parser("validate-target", help="validate a target profile")
target.add_argument("path", type=Path)
target.set_defaults(func=cmd_validate_target)
assessment = profile_commands.add_parser(
"validate-assessment", help="validate an assessment profile"
)
assessment.add_argument("path", type=Path)
assessment.set_defaults(func=cmd_validate_assessment)
plan = subcommands.add_parser("plan", help="build a run plan")
plan.add_argument("--target", type=Path, required=True)
plan.add_argument("--assessment", type=Path, required=True)
plan.add_argument("--output", type=Path)
plan.set_defaults(func=cmd_plan)
run = subcommands.add_parser("run", help="run the baseline assessment executor")
run.add_argument("--target", type=Path, required=True)
run.add_argument("--assessment", type=Path, required=True)
run.add_argument("--output-dir", type=Path)
run.set_defaults(func=cmd_run)
schema = subcommands.add_parser("schema", help="schema validation")
schema.add_argument("schema_name")
schema.add_argument("path", type=Path)
schema.set_defaults(func=cmd_schema_validate)
return parser
def cmd_extensions_list(args: argparse.Namespace) -> dict[str, Any]:
extensions = discover_extensions(args.root)
return {
"extensions": [
{
"id": extension.id,
"name": extension.manifest["name"],
"version": extension.manifest["version"],
"type": extension.manifest["extension_type"],
"path": str(extension.path.relative_to(args.root)),
}
for extension in extensions
]
}
def cmd_extensions_validate(args: argparse.Namespace) -> dict[str, Any]:
extensions = discover_extensions(args.root)
return {
"status": "valid",
"extensions": [extension.id for extension in extensions],
}
def cmd_validate_target(args: argparse.Namespace) -> dict[str, Any]:
profile = validate_target_profile(args.path)
return {"status": "valid", "target_profile": profile["id"]}
def cmd_validate_assessment(args: argparse.Namespace) -> dict[str, Any]:
profile = validate_assessment_profile(args.path)
return {"status": "valid", "assessment_profile": profile["id"]}
def cmd_plan(args: argparse.Namespace) -> dict[str, Any] | None:
plan = build_run_plan(args.root, args.target, args.assessment)
if args.output:
write_json(args.output, plan)
return {"status": "written", "path": str(args.output)}
return plan
def cmd_run(args: argparse.Namespace) -> dict[str, Any]:
return run_assessment(args.root, args.target, args.assessment, args.output_dir)
def cmd_schema_validate(args: argparse.Namespace) -> dict[str, Any]:
document = load_json(args.path)
assert_valid(document, args.schema_name)
return {"status": "valid", "schema": args.schema_name, "path": str(args.path)}
def print_json(value: Any) -> None:
print(json.dumps(value, indent=2, sort_keys=True))

View File

@@ -0,0 +1,48 @@
"""Extension discovery."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from guide_board.errors import DiscoveryError, ValidationError
from guide_board.io import load_json
from guide_board.schema import assert_valid
@dataclass(frozen=True)
class Extension:
id: str
path: Path
manifest: dict[str, Any]
def discover_extensions(root: Path) -> list[Extension]:
extension_root = root / "extensions"
if not extension_root.exists():
raise DiscoveryError(f"extension directory not found: {extension_root}")
extensions: list[Extension] = []
for child in sorted(extension_root.iterdir()):
if not child.is_dir() or child.name.startswith("_"):
continue
manifest_path = child / "extension.json"
if not manifest_path.exists():
continue
manifest = load_json(manifest_path)
assert_valid(manifest, "extension-manifest")
extension_id = manifest["id"]
if extension_id != child.name:
raise ValidationError(
f"{manifest_path}: extension id {extension_id!r} must match directory {child.name!r}"
)
extensions.append(Extension(id=extension_id, path=child, manifest=manifest))
return extensions
def find_extension(root: Path, extension_id: str) -> Extension:
for extension in discover_extensions(root):
if extension.id == extension_id:
return extension
raise DiscoveryError(f"extension not found: {extension_id}")

13
src/guide_board/errors.py Normal file
View File

@@ -0,0 +1,13 @@
"""Shared exceptions for guide-board core."""
class GuideBoardError(Exception):
"""Base exception for user-facing guide-board errors."""
class ValidationError(GuideBoardError):
"""Raised when a document does not match its contract."""
class DiscoveryError(GuideBoardError):
"""Raised when extension discovery fails."""

View File

@@ -0,0 +1,211 @@
"""Baseline assessment execution."""
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.io import write_json
from guide_board.planning import build_run_plan
from guide_board.schema import assert_valid
def run_assessment(
root: Path,
target_path: Path,
assessment_path: Path,
output_dir: Path | None = None,
) -> dict[str, Any]:
plan = build_run_plan(root, target_path, assessment_path)
run_id = f"run-{_timestamp()}"
run_dir = output_dir or root / "runs" / run_id
created_at = _now()
evidence = [_evidence_for_step(run_id, plan, step) for step in plan["ordered_steps"]]
for item in evidence:
assert_valid(item, "evidence-item")
findings = _findings_for_evidence(run_id, evidence)
for finding in findings:
assert_valid(finding, "finding")
assessment_package = _assessment_package(run_id, plan, evidence, findings, created_at)
assert_valid(assessment_package, "assessment-package")
run_metadata = {
"id": run_id,
"status": _run_status(evidence),
"created_at": created_at,
"plan_id": plan["id"],
"target_profile_ref": plan["target_profile_snapshot"]["id"],
"assessment_profile_ref": plan["assessment_profile_snapshot"]["id"],
}
_write_run_directory(run_dir, run_metadata, plan, evidence, findings, assessment_package)
return {
"status": run_metadata["status"],
"run_id": run_id,
"run_dir": str(run_dir),
"assessment_package": str(run_dir / "reports" / "assessment-package.json"),
"report": str(run_dir / "reports" / "report.md"),
}
def _evidence_for_step(run_id: str, plan: dict[str, Any], step: dict[str, Any]) -> dict[str, Any]:
now = _now()
runner_ref = step.get("runner_ref")
if runner_ref is None:
result = "manual" if step["kind"] == "check_group" else "skipped"
observations = [
"No runner is configured for this step in the baseline core."
]
else:
result = "blocked"
observations = [
f"Runner {runner_ref!r} is declared but not implemented by the baseline core."
]
return {
"id": f"evidence:{step['id']}",
"run_id": run_id,
"extension_id": step["extension_id"],
"check_id": step["id"],
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": result,
"observations": observations,
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
},
"requirement_refs": _requirement_refs(plan, step),
"artifact_refs": [],
"started_at": now,
"completed_at": now,
}
def _requirement_refs(plan: dict[str, Any], step: dict[str, Any]) -> list[str]:
if step["kind"] != "check_group":
return []
return list(step.get("requirement_refs", []))
def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
findings: list[dict[str, Any]] = []
for item in evidence:
if item["result"] != "blocked":
continue
findings.append(
{
"id": f"finding:{item['check_id']}",
"run_id": run_id,
"status": "blocked",
"severity": "info",
"classification": "runner_not_implemented",
"requirement_refs": item["requirement_refs"],
"evidence_refs": [item["id"]],
"expected": True,
"waiver_ref": None,
"remediation": "Implement or configure the declared extension runner.",
}
)
return findings
def _assessment_package(
run_id: str,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
created_at: str,
) -> dict[str, Any]:
summary = dict(Counter(item["result"] for item in evidence))
return {
"id": f"assessment-package:{run_id}",
"run_id": run_id,
"target": plan["target_profile_snapshot"],
"frameworks": [
{"id": framework_id} for framework_id in plan["source_lock"]["framework_refs"]
],
"extensions": plan["extension_snapshots"],
"source_lock": plan["source_lock"],
"summary": summary,
"findings": findings,
"evidence_refs": [item["id"] for item in evidence],
"artifact_manifest": [],
"waivers": [],
"certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.",
"created_at": created_at,
}
def _write_run_directory(
run_dir: Path,
run_metadata: dict[str, Any],
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
assessment_package: dict[str, Any],
) -> None:
write_json(run_dir / "run.json", run_metadata)
write_json(run_dir / "plan.json", plan)
write_json(run_dir / "sources.lock.json", plan["source_lock"])
write_json(run_dir / "target-profile.snapshot.json", plan["target_profile_snapshot"])
write_json(
run_dir / "assessment-profile.snapshot.json",
plan["assessment_profile_snapshot"],
)
write_json(run_dir / "normalized" / "evidence.json", {"evidence": evidence})
write_json(run_dir / "normalized" / "findings.json", {"findings": findings})
write_json(run_dir / "normalized" / "mappings.json", {"mappings": []})
write_json(run_dir / "reports" / "assessment-package.json", assessment_package)
(run_dir / "reports").mkdir(parents=True, exist_ok=True)
(run_dir / "reports" / "report.md").write_text(
_markdown_report(run_metadata, assessment_package),
encoding="utf-8",
)
def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> str:
summary_lines = "\n".join(
f"- {status}: {count}" for status, count in sorted(package["summary"].items())
)
if not summary_lines:
summary_lines = "- no evidence produced"
return "\n".join(
[
f"# Guide Board Assessment Report: {run_metadata['id']}",
"",
f"Status: {run_metadata['status']}",
f"Target: {run_metadata['target_profile_ref']}",
f"Assessment: {run_metadata['assessment_profile_ref']}",
"",
"## Summary",
"",
summary_lines,
"",
"## Boundary",
"",
package["certification_boundary"],
"",
]
)
def _run_status(evidence: list[dict[str, Any]]) -> str:
if any(item["result"] == "fail" for item in evidence):
return "failed"
if any(item["result"] == "blocked" for item in evidence):
return "blocked"
return "completed"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

22
src/guide_board/io.py Normal file
View File

@@ -0,0 +1,22 @@
"""Small file-loading helpers."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
def load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
value = json.load(handle)
if not isinstance(value, dict):
raise ValueError(f"{path} must contain a JSON object")
return value
def write_json(path: Path, value: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(value, handle, indent=2, sort_keys=True)
handle.write("\n")

109
src/guide_board/planning.py Normal file
View File

@@ -0,0 +1,109 @@
"""Assessment planning."""
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.discovery import discover_extensions
from guide_board.errors import ValidationError
from guide_board.io import load_json
from guide_board.schema import assert_valid
def validate_target_profile(path: Path) -> dict[str, Any]:
document = load_json(path)
assert_valid(document, "target-profile")
return document
def validate_assessment_profile(path: Path) -> dict[str, Any]:
document = load_json(path)
assert_valid(document, "assessment-profile")
return document
def build_run_plan(root: Path, target_path: Path, assessment_path: Path) -> dict[str, Any]:
target = validate_target_profile(target_path)
assessment = validate_assessment_profile(assessment_path)
extensions = {extension.id: extension for extension in discover_extensions(root)}
selected_extensions = assessment["extension_refs"]
missing = [extension_id for extension_id in selected_extensions if extension_id not in extensions]
if missing:
raise ValidationError(f"assessment references unknown extension(s): {', '.join(missing)}")
if assessment["target_profile_ref"] != target["id"]:
raise ValidationError(
"assessment target_profile_ref "
f"{assessment['target_profile_ref']!r} does not match target profile {target['id']!r}"
)
ordered_steps: list[dict[str, Any]] = []
for extension_id in selected_extensions:
extension = extensions[extension_id]
selected_groups = assessment["selected_check_groups"].get(extension_id, [])
available_groups = {group["id"]: group for group in extension.manifest["check_groups"]}
unknown_groups = [group_id for group_id in selected_groups if group_id not in available_groups]
if unknown_groups:
raise ValidationError(
f"{extension_id}: unknown check group(s): {', '.join(unknown_groups)}"
)
ordered_steps.append(
{
"id": f"preflight:{extension_id}",
"extension_id": extension_id,
"kind": "preflight",
"check_groups": selected_groups,
"runner_ref": extension.manifest.get("preflight_runner"),
}
)
for group_id in selected_groups:
group = available_groups[group_id]
ordered_steps.append(
{
"id": f"check-group:{extension_id}:{group_id}",
"extension_id": extension_id,
"kind": "check_group",
"check_group": group_id,
"runner_ref": group.get("runner_ref"),
"requirement_refs": group.get("requirement_refs", []),
}
)
plan = {
"id": f"plan-{_timestamp()}",
"assessment_profile_snapshot": assessment,
"target_profile_snapshot": target,
"extension_snapshots": [
{
"id": extension_id,
"version": extensions[extension_id].manifest["version"],
"path": str(extensions[extension_id].path.relative_to(root)),
}
for extension_id in selected_extensions
],
"source_lock": {
"framework_refs": assessment["framework_refs"],
"extension_refs": selected_extensions,
},
"ordered_steps": ordered_steps,
"credential_refs": _credential_refs(target),
"artifact_policy": assessment["output_policy"],
"runtime_policy": assessment.get("runtime_policy", {}),
}
assert_valid(plan, "run-plan")
return plan
def _credential_refs(target: dict[str, Any]) -> list[str]:
credential_ref = target.get("credentials_ref")
if isinstance(credential_ref, str) and credential_ref:
return [credential_ref]
return []
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

108
src/guide_board/schema.py Normal file
View File

@@ -0,0 +1,108 @@
"""Minimal JSON-schema-like validation for guide-board contracts.
The first core should work from a clean checkout without pulling dependencies.
This validator intentionally supports only the schema features used by the
project's own draft contracts.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from guide_board.errors import ValidationError
from guide_board.io import load_json
SCHEMA_DIR = Path(__file__).resolve().parents[2] / "docs" / "schemas"
def load_schema(schema_name: str) -> dict[str, Any]:
return load_json(SCHEMA_DIR / f"{schema_name}.schema.json")
def validate_document(document: Any, schema: dict[str, Any], path: str = "$") -> list[str]:
errors: list[str] = []
_validate(document, schema, path, errors)
return errors
def assert_valid(document: Any, schema_name: str) -> None:
schema = load_schema(schema_name)
errors = validate_document(document, schema)
if errors:
formatted = "\n".join(f"- {error}" for error in errors)
raise ValidationError(f"{schema_name} validation failed:\n{formatted}")
def _validate(value: Any, schema: dict[str, Any], path: str, errors: list[str]) -> None:
if "type" in schema and not _matches_type(value, schema["type"]):
errors.append(f"{path}: expected {schema['type']}, got {_type_name(value)}")
return
if "enum" in schema and value not in schema["enum"]:
allowed = ", ".join(repr(item) for item in schema["enum"])
errors.append(f"{path}: expected one of {allowed}, got {value!r}")
if isinstance(value, dict):
required = schema.get("required", [])
for key in required:
if key not in value:
errors.append(f"{path}: missing required property {key!r}")
properties = schema.get("properties", {})
additional_allowed = schema.get("additionalProperties", True)
for key, child in value.items():
child_path = f"{path}.{key}"
if key in properties:
_validate(child, properties[key], child_path, errors)
elif additional_allowed is False:
errors.append(f"{child_path}: unexpected property")
if isinstance(value, list):
min_items = schema.get("minItems")
if isinstance(min_items, int) and len(value) < min_items:
errors.append(f"{path}: expected at least {min_items} item(s)")
item_schema = schema.get("items")
if isinstance(item_schema, dict):
for index, child in enumerate(value):
_validate(child, item_schema, f"{path}[{index}]", errors)
def _matches_type(value: Any, expected: str | list[str]) -> bool:
if isinstance(expected, list):
return any(_matches_type(value, item) for item in expected)
if expected == "object":
return isinstance(value, dict)
if expected == "array":
return isinstance(value, list)
if expected == "string":
return isinstance(value, str)
if expected == "integer":
return isinstance(value, int) and not isinstance(value, bool)
if expected == "number":
return isinstance(value, (int, float)) and not isinstance(value, bool)
if expected == "boolean":
return isinstance(value, bool)
if expected == "null":
return value is None
return True
def _type_name(value: Any) -> str:
if isinstance(value, bool):
return "boolean"
if isinstance(value, dict):
return "object"
if isinstance(value, list):
return "array"
if isinstance(value, str):
return "string"
if isinstance(value, int):
return "integer"
if isinstance(value, float):
return "number"
if value is None:
return "null"
return type(value).__name__