Implement source lock and submission package baseline

This commit is contained in:
2026-05-16 02:51:00 +02:00
parent d73a73b455
commit c8ac42154c
18 changed files with 852 additions and 22 deletions

View File

@@ -46,6 +46,107 @@ def build_artifact_manifest(
return artifacts
def build_submission_manifest(
run_dir: Path,
run_metadata: dict[str, Any],
plan: dict[str, Any],
evidence: list[dict[str, Any]],
assessment_package: dict[str, Any],
) -> dict[str, Any]:
"""Build a portable manifest for the files that make up a review package."""
source_lock = plan["source_lock"]
manifest = {
"id": f"submission-package:{run_metadata['id']}",
"schema_version": "guide-board.submission-package.v1",
"run_id": run_metadata["id"],
"created_at": datetime.now(timezone.utc).isoformat(),
"package_identity": {
"target_profile_ref": run_metadata["target_profile_ref"],
"assessment_profile_ref": run_metadata["assessment_profile_ref"],
"framework_refs": source_lock.get("framework_refs", []),
"extension_refs": source_lock.get("extension_refs", []),
},
"source_lock_ref": "sources.lock.json",
"source_lock": {
"id": source_lock.get("id"),
"schema_version": source_lock.get("schema_version"),
"checksum": _file_entry(run_dir, "sources.lock.json").get("checksum"),
"framework_refs": source_lock.get("framework_refs", []),
"extension_refs": source_lock.get("extension_refs", []),
},
"reports": _existing_file_entries(
run_dir,
[
("assessment-package", "reports/assessment-package.json"),
("markdown-report", "reports/report.md"),
],
),
"normalized_outputs": _existing_file_entries(
run_dir,
[
("evidence", "normalized/evidence.json"),
("findings", "normalized/findings.json"),
("mappings", "normalized/mappings.json"),
],
),
"profile_snapshots": _existing_file_entries(
run_dir,
[
("target-profile", "target-profile.snapshot.json"),
("assessment-profile", "assessment-profile.snapshot.json"),
],
),
"artifact_manifest": assessment_package.get("artifact_manifest", []),
"reported_metadata": _reported_metadata(evidence),
"certification_boundary": assessment_package["certification_boundary"],
}
assert_valid(manifest, "submission-package")
return manifest
def _existing_file_entries(run_dir: Path, refs: list[tuple[str, str]]) -> list[dict[str, Any]]:
entries = []
for entry_id, ref in refs:
entry = _file_entry(run_dir, ref)
if entry:
entry["id"] = entry_id
entries.append(entry)
return entries
def _file_entry(run_dir: Path, ref: str) -> dict[str, Any]:
path = (run_dir / ref).resolve()
try:
path.relative_to(run_dir.resolve())
except ValueError:
return {}
if not path.is_file():
return {}
return {
"path": ref,
"media_type": _media_type(path),
"checksum": f"sha256:{_sha256(path)}",
"size_bytes": path.stat().st_size,
}
def _reported_metadata(evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
records = []
for item in evidence:
metadata = item.get("facts", {}).get("source_metadata")
if not isinstance(metadata, dict) or not metadata:
continue
records.append(
{
"evidence_ref": item["id"],
"check_id": item["check_id"],
"extension_id": item["extension_id"],
"metadata": metadata,
}
)
return records
def _sha256(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:

View File

@@ -7,8 +7,8 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.artifacts import build_artifact_manifest
from guide_board.io import write_json
from guide_board.artifacts import build_artifact_manifest, build_submission_manifest
from guide_board.io import load_json, write_json
from guide_board.mapping import build_mapping_records, summarize_mappings
from guide_board.normalizers import normalize_step_result
from guide_board.planning import build_run_plan
@@ -83,6 +83,7 @@ def run_assessment(
"run_dir": str(run_dir),
"assessment_package": str(run_dir / "reports" / "assessment-package.json"),
"report": str(run_dir / "reports" / "report.md"),
"submission_package": str(run_dir / "reports" / "submission-package.json"),
"retention_summary": str(run_dir / "retention-summary.json"),
}
@@ -155,6 +156,14 @@ def _evidence_for_step(
runner_ref = step.get("runner_ref")
runner_result = run_step(root, run_dir, run_id, plan, step)
runner_result = normalize_step_result(root, run_dir, run_id, plan, step, runner_result)
facts = {
"step_kind": step["kind"],
"runner_ref": runner_ref,
**runner_result["facts"],
}
source_metadata = _source_metadata_for_step(root, plan, step, runner_result)
if source_metadata:
facts["source_metadata"] = source_metadata
return {
"id": f"evidence:{step['id']}",
@@ -164,11 +173,7 @@ def _evidence_for_step(
"subject_ref": plan["target_profile_snapshot"]["id"],
"result": runner_result["result"],
"observations": runner_result["observations"],
"facts": {
"step_kind": step["kind"],
"runner_ref": runner_ref,
**runner_result["facts"],
},
"facts": facts,
"requirement_refs": _requirement_refs(plan, step, runner_result),
"artifact_refs": runner_result["artifact_refs"],
"started_at": now,
@@ -198,6 +203,95 @@ def _runner_requirement_refs(runner_result: dict[str, Any] | None) -> list[str]:
return [ref for ref in refs if isinstance(ref, str)]
def _source_metadata_for_step(
root: Path,
plan: dict[str, Any],
step: dict[str, Any],
runner_result: dict[str, Any],
) -> dict[str, Any]:
runner_ref = step.get("runner_ref")
extension = _extension_snapshot(plan, step["extension_id"])
extension_path = _snapshot_path(root, extension)
manifest = load_json(extension_path / "extension.json")
metadata: dict[str, Any] = {
"extension": {
"id": step["extension_id"],
"version": extension.get("version"),
"metadata": _object_or_empty(manifest.get("metadata")),
}
}
if runner_ref:
entrypoint = _runner_entrypoint(manifest, runner_ref)
metadata["runner"] = {
"id": runner_ref,
"kind": entrypoint.get("kind"),
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
applied_normalizers = runner_result.get("facts", {}).get("normalizer_refs", [])
normalizers = []
if isinstance(applied_normalizers, list):
normalizer_ids = {item for item in applied_normalizers if isinstance(item, str)}
for normalizer in manifest.get("normalizers", []):
if isinstance(normalizer, dict) and normalizer.get("id") in normalizer_ids:
normalizers.append(
{
"id": normalizer["id"],
"kind": normalizer.get("kind"),
"runner_ref": normalizer.get("runner_ref"),
"metadata": _object_or_empty(normalizer.get("metadata")),
}
)
if normalizers:
metadata["normalizers"] = normalizers
reported = _object_or_empty(runner_result.get("metadata"))
if reported:
metadata["reported"] = reported
return _drop_empty_metadata(metadata)
def _extension_snapshot(plan: dict[str, Any], extension_id: str) -> dict[str, Any]:
for extension in plan["extension_snapshots"]:
if extension["id"] == extension_id:
return extension
return {}
def _snapshot_path(root: Path, extension: dict[str, Any]) -> Path:
path = Path(extension["path"])
return path if path.is_absolute() else root / path
def _runner_entrypoint(manifest: dict[str, Any], runner_ref: str) -> dict[str, Any]:
for entrypoint in manifest.get("runner_entrypoints", []):
if entrypoint.get("id") == runner_ref:
return entrypoint
return {}
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _drop_empty_metadata(value: dict[str, Any]) -> dict[str, Any]:
compact = {}
for key, child in value.items():
if isinstance(child, dict):
child = _drop_empty_metadata(child)
if isinstance(child, list):
child = [
_drop_empty_metadata(item) if isinstance(item, dict) else item
for item in child
]
child = [item for item in child if item]
if child:
compact[key] = child
return compact
def _dedupe(values: list[str]) -> list[str]:
seen = set()
deduped = []
@@ -340,6 +434,14 @@ def _write_run_directory(
_markdown_report(run_metadata, assessment_package),
encoding="utf-8",
)
submission_manifest = build_submission_manifest(
run_dir,
run_metadata,
plan,
evidence,
assessment_package,
)
write_json(run_dir / "reports" / "submission-package.json", submission_manifest)
def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> str:

View File

@@ -127,6 +127,7 @@ def _run_normalizer(
},
"artifact_refs": runner_result.get("artifact_refs", []),
"requirement_refs": runner_result.get("requirement_refs", []),
"metadata": runner_result.get("metadata", {}),
}
if not isinstance(result, dict):
@@ -160,6 +161,12 @@ def _merge_result(
_string_list(base.get("requirement_refs", []))
+ _string_list(update.get("requirement_refs", []))
)
if "metadata" in update:
metadata = dict(base.get("metadata", {}))
update_metadata = update.get("metadata", {})
if isinstance(update_metadata, dict):
metadata.update(update_metadata)
merged["metadata"] = metadata
return _coerce_result(merged)
@@ -173,6 +180,7 @@ def _coerce_result(value: dict[str, Any]) -> dict[str, Any]:
"facts": facts,
"artifact_refs": _string_list(value.get("artifact_refs", [])),
"requirement_refs": _string_list(value.get("requirement_refs", [])),
"metadata": _object_or_empty(value.get("metadata")),
}
@@ -189,6 +197,10 @@ def _string_list(value: Any) -> list[str]:
return [item for item in value if isinstance(item, str)]
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _dedupe(values: list[str]) -> list[str]:
seen = set()
deduped = []

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -96,6 +97,16 @@ def build_run_plan(
}
)
source_lock = _build_source_lock(
root,
target_path,
assessment_path,
target,
assessment,
[extensions[extension_id] for extension_id in selected_extensions],
)
assert_valid(source_lock, "source-lock")
plan = {
"id": f"plan-{_timestamp()}",
"assessment_profile_snapshot": assessment,
@@ -109,10 +120,7 @@ def build_run_plan(
}
for extension_id in selected_extensions
],
"source_lock": {
"framework_refs": assessment["framework_refs"],
"extension_refs": selected_extensions,
},
"source_lock": source_lock,
"profile_paths": {
"target_profile_path": str(target_path.resolve()),
"assessment_profile_path": str(assessment_path.resolve()),
@@ -208,6 +216,270 @@ def _load_extension_profile_schema(
return load_json(schema_path)
def _build_source_lock(
root: Path,
target_path: Path,
assessment_path: Path,
target: dict[str, Any],
assessment: dict[str, Any],
extensions: list[Extension],
) -> dict[str, Any]:
framework_refs = assessment["framework_refs"]
extension_refs = [extension.id for extension in extensions]
return {
"id": f"source-lock:{assessment['id']}:{target['id']}",
"schema_version": "guide-board.source-lock.v1",
"created_at": _now(),
"framework_refs": framework_refs,
"extension_refs": extension_refs,
"frameworks": _framework_records(framework_refs, extensions),
"extensions": [_extension_source_record(root, extension) for extension in extensions],
"mapping_sets": _mapping_source_records(root, extensions),
"profiles": {
"target": _file_source_record(
"target-profile",
target["id"],
target_path,
"target-profile.snapshot.json",
),
"assessment": _file_source_record(
"assessment-profile",
assessment["id"],
assessment_path,
"assessment-profile.snapshot.json",
),
},
"policy_refs": {
"expectations": _optional_policy_source_record(
root,
assessment_path,
assessment.get("expectations_ref"),
"expectation-set",
),
"waivers": _optional_policy_source_record(
root,
assessment_path,
assessment.get("waivers_ref"),
"waiver-set",
),
},
"authorities": _authority_source_records(extensions),
"metadata_hooks": {
"runner_entrypoints": _entrypoint_metadata_records(extensions),
"normalizers": _normalizer_metadata_records(extensions),
},
}
def _framework_records(
framework_refs: list[str],
extensions: list[Extension],
) -> list[dict[str, Any]]:
records = []
for framework_ref in framework_refs:
declaring_extensions = [
extension.id
for extension in extensions
if framework_ref in _manifest_framework_ids(extension.manifest)
]
records.append(
{
"id": framework_ref,
"version": _version_hint(framework_ref),
"declared_by_extensions": declaring_extensions,
}
)
return records
def _extension_source_record(root: Path, extension: Extension) -> dict[str, Any]:
manifest_path = extension.path / "extension.json"
return {
"id": extension.id,
"version": extension.manifest["version"],
"path": _extension_path_ref(root, extension.path),
"source": extension.source,
"manifest_path": _display_path(root, manifest_path),
"manifest_checksum": _checksum_if_file(manifest_path),
"supported_frameworks": _manifest_framework_ids(extension.manifest),
"authorities": _authority_ids(extension.manifest.get("authorities", [])),
"certification_boundary": extension.manifest["certification_boundary"],
"metadata": _object_or_empty(extension.manifest.get("metadata")),
}
def _mapping_source_records(root: Path, extensions: list[Extension]) -> list[dict[str, Any]]:
records = []
for extension in extensions:
for mapping_id in extension.manifest.get("mappings", []):
if not isinstance(mapping_id, str):
continue
mapping_path = extension.path / "mappings" / f"{mapping_id}.json"
record = {
"id": mapping_id,
"extension_id": extension.id,
"path": _display_path(root, mapping_path),
"exists": mapping_path.is_file(),
"checksum": _checksum_if_file(mapping_path),
"framework_refs": [],
}
if mapping_path.is_file():
mapping_set = load_json(mapping_path)
record["framework_refs"] = _string_list(mapping_set.get("framework_refs", []))
records.append(record)
return records
def _file_source_record(
kind: str,
profile_id: str,
path: Path,
snapshot_ref: str,
) -> dict[str, Any]:
return {
"kind": kind,
"id": profile_id,
"path": str(path.resolve()),
"checksum": _checksum_if_file(path),
"snapshot_ref": snapshot_ref,
}
def _optional_policy_source_record(
root: Path,
assessment_path: Path,
ref: Any,
kind: str,
) -> dict[str, Any] | None:
if not isinstance(ref, str) or not ref:
return None
path = _resolve_assessment_ref(root, assessment_path, ref)
return {
"kind": kind,
"ref": ref,
"path": str(path.resolve()),
"exists": path.is_file(),
"checksum": _checksum_if_file(path),
}
def _authority_source_records(extensions: list[Extension]) -> list[dict[str, Any]]:
records = []
for extension in extensions:
for authority in extension.manifest.get("authorities", []):
if isinstance(authority, str):
records.append({"id": authority, "extension_id": extension.id})
elif isinstance(authority, dict):
record = {
"id": authority.get("id"),
"extension_id": extension.id,
}
for key in ("name", "version", "source_url", "license", "access"):
if key in authority:
record[key] = authority[key]
records.append(record)
return [record for record in records if isinstance(record.get("id"), str)]
def _entrypoint_metadata_records(extensions: list[Extension]) -> list[dict[str, Any]]:
records = []
for extension in extensions:
for entrypoint in extension.manifest.get("runner_entrypoints", []):
if not isinstance(entrypoint, dict):
continue
records.append(
{
"extension_id": extension.id,
"id": entrypoint.get("id"),
"kind": entrypoint.get("kind"),
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
)
return records
def _normalizer_metadata_records(extensions: list[Extension]) -> list[dict[str, Any]]:
records = []
for extension in extensions:
for normalizer in extension.manifest.get("normalizers", []):
if not isinstance(normalizer, dict):
continue
records.append(
{
"extension_id": extension.id,
"id": normalizer.get("id"),
"kind": normalizer.get("kind"),
"runner_ref": normalizer.get("runner_ref"),
"metadata": _object_or_empty(normalizer.get("metadata")),
}
)
return records
def _manifest_framework_ids(manifest: dict[str, Any]) -> list[str]:
values = []
for framework in manifest.get("supported_frameworks", []):
if isinstance(framework, str):
values.append(framework)
elif isinstance(framework, dict) and isinstance(framework.get("id"), str):
values.append(framework["id"])
return values
def _authority_ids(authorities: list[Any]) -> list[str]:
values = []
for authority in authorities:
if isinstance(authority, str):
values.append(authority)
elif isinstance(authority, dict) and isinstance(authority.get("id"), str):
values.append(authority["id"])
return values
def _resolve_assessment_ref(root: Path, assessment_path: Path, ref: str) -> Path:
ref_path = Path(ref)
if ref_path.is_absolute():
return ref_path
root_relative = root / ref_path
if root_relative.exists():
return root_relative
return assessment_path.resolve().parent / ref_path
def _checksum_if_file(path: Path) -> str | None:
if not path.is_file():
return None
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return f"sha256:{digest.hexdigest()}"
def _version_hint(ref: str) -> str | None:
for part in reversed(ref.replace("-", ".").split(".")):
if len(part) > 1 and part[0].lower() == "v" and any(char.isdigit() for char in part[1:]):
return part
return None
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, str)]
def _display_path(root: Path, path: Path) -> str:
try:
return str(path.resolve().relative_to(root.resolve()))
except ValueError:
return str(path.resolve())
def _extension_path_ref(root: Path, path: Path) -> str:
try:
return str(path.resolve().relative_to(root.resolve()))
@@ -215,5 +487,9 @@ def _extension_path_ref(root: Path, path: Path) -> str:
return str(path.resolve())
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

View File

@@ -45,6 +45,7 @@ def build_retention_summary(
"report_refs": [
"reports/assessment-package.json",
"reports/report.md",
"reports/submission-package.json",
],
"artifact_retention": {
"policy": plan["assessment_profile_snapshot"].get("retention_policy", {}),

View File

@@ -45,6 +45,8 @@ def run_step(
"runner_kind": "external",
},
"artifact_refs": [],
"requirement_refs": [],
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
if entrypoint["kind"] == "command":
return _run_command(root, run_dir, run_id, plan, step, extension_path, entrypoint)
@@ -63,6 +65,8 @@ def _no_runner_result(step: dict[str, Any]) -> dict[str, Any]:
"runner_kind": None,
},
"artifact_refs": [],
"requirement_refs": [],
"metadata": {},
}
@@ -118,6 +122,8 @@ def _run_python_module(
"error_type": type(exc).__name__,
},
"artifact_refs": [],
"requirement_refs": [],
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
if not isinstance(result, dict):
raise ValidationError(f"{entrypoint['id']}: runner must return an object")
@@ -126,6 +132,8 @@ def _run_python_module(
"observations": result.get("observations", []),
"facts": result.get("facts", {}),
"artifact_refs": result.get("artifact_refs", []),
"requirement_refs": result.get("requirement_refs", []),
"metadata": _merge_metadata(entrypoint.get("metadata"), result.get("metadata")),
}
@@ -192,6 +200,8 @@ def _run_command(
"command": command,
},
"artifact_refs": [str(context_path.relative_to(run_dir))],
"requirement_refs": [],
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
except subprocess.TimeoutExpired:
return {
@@ -206,6 +216,8 @@ def _run_command(
"command": command,
},
"artifact_refs": [str(context_path.relative_to(run_dir))],
"requirement_refs": [],
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
parsed = _parse_runner_stdout(completed.stdout)
@@ -225,6 +237,8 @@ def _run_command(
"command": command,
},
"artifact_refs": [str(context_path.relative_to(run_dir))],
"requirement_refs": [],
"metadata": _object_or_empty(entrypoint.get("metadata")),
}
facts = parsed.get("facts", {})
@@ -245,6 +259,9 @@ def _run_command(
if not isinstance(artifact_refs, list):
artifact_refs = []
artifact_refs.append(str(context_path.relative_to(run_dir)))
requirement_refs = parsed.get("requirement_refs", [])
if not isinstance(requirement_refs, list):
requirement_refs = []
result = parsed.get("result", "unknown")
if completed.returncode != 0 and result in {"pass", "warning", "manual", "skipped"}:
@@ -258,6 +275,8 @@ def _run_command(
"observations": observations,
"facts": facts,
"artifact_refs": artifact_refs,
"requirement_refs": requirement_refs,
"metadata": _merge_metadata(entrypoint.get("metadata"), parsed.get("metadata")),
}
@@ -328,5 +347,17 @@ def _parse_runner_stdout(stdout: str) -> dict[str, Any] | None:
return parsed
def _merge_metadata(*values: Any) -> dict[str, Any]:
merged: dict[str, Any] = {}
for value in values:
if isinstance(value, dict):
merged.update(value)
return merged
def _object_or_empty(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _safe_id(value: str) -> str:
return "".join(char if char.isalnum() or char in {"-", "_"} else "_" for char in value)