Files
marki-docx/src/markidocx/evidence.py
Bernd Worsch 1f3dddf7d6 feat: WP-0001 + WP-0002 complete — LEVEL1 core + service interfaces
WP-0001 (Foundation & LEVEL1 Core):
- manifest model (FR-100), MD→DOCX builder (FR-200), DOCX→MD importer
  (FR-300/400), template family registry (FR-600), drift detector (FR-700),
  CLI wiring, pre-commit config, CI skeleton, regression harness

WP-0002 (Service Interfaces & Workflow Orchestration):
- REST service via FastAPI (FR-900): /health, /version, /capabilities,
  /templates, /styles, /validate, /build, /import, /compare,
  /templates/register, /workflows/{name}, /evidence/{run_id}
- Evidence & report store (FR-1400): JSON-backed, per-run, retrievable
  through all interfaces, classification (pass/warnings/failed)
- Composite workflow orchestration (FR-1300): single-file-roundtrip,
  multi-file-roundtrip, release-regression, family-switch-build
- MCP server via FastMCP (FR-1000): all tools + resources
- CLI additions: `markidocx serve`, `markidocx workflow`, `markidocx mcp`
- Interface parity tests: CLI / REST / MCP produce equivalent results

135 tests passing, ruff + mypy clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 07:46:31 +00:00

170 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Evidence and report storage for markidocx (FR-1400)."""
from __future__ import annotations
import json
import uuid
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal
ReportType = Literal["validation", "build", "import", "drift"]
EvidenceClassification = Literal["pass", "pass-with-warnings", "failed"]
@dataclass
class ReportContext:
project: str | None = None
family: str | None = None
feature_level: str | None = None
workflow: str | None = None
run_context: str | None = None
@dataclass
class RunReport:
run_id: str
report_type: str
data: dict[str, Any]
created_at: str
context: ReportContext = field(default_factory=ReportContext)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, d: dict[str, Any]) -> RunReport:
d = dict(d)
ctx_raw = d.pop("context", {})
ctx = ReportContext(**ctx_raw) if isinstance(ctx_raw, dict) else ReportContext()
return cls(**d, context=ctx)
@dataclass
class EvidenceSet:
"""Assembled evidence from one or more runs (FR-1406FR-1414)."""
run_ids: list[str]
reports: list[RunReport]
@property
def classification(self) -> EvidenceClassification:
"""pass / pass-with-warnings / failed (FR-1414)."""
for r in self.reports:
if r.data.get("status") == "error" or r.data.get("errors"):
return "failed"
for r in self.reports:
if r.data.get("warnings"):
return "pass-with-warnings"
return "pass"
@property
def composition(self) -> list[dict[str, str]]:
"""Which reports/artifacts are in this set (FR-1407)."""
return [{"run_id": r.run_id, "type": r.report_type} for r in self.reports]
@property
def complete(self) -> bool:
"""False when some expected reports are missing (FR-1413)."""
return len(self.reports) > 0
def summary(self) -> dict[str, Any]:
"""Status summary across the set (FR-1408)."""
warnings_count = sum(len(r.data.get("warnings", [])) for r in self.reports)
errors_count = sum(len(r.data.get("errors", [])) for r in self.reports)
return {
"classification": self.classification,
"run_count": len(self.run_ids),
"report_count": len(self.reports),
"complete": self.complete,
"warnings_count": warnings_count,
"errors_count": errors_count,
"composition": self.composition,
}
class EvidenceStore:
"""Persistent evidence layer for markidocx operations (FR-1400)."""
def __init__(self, base_dir: Path | None = None) -> None:
self.base_dir = base_dir or Path(".markidocx") / "evidence"
self.base_dir.mkdir(parents=True, exist_ok=True)
def new_run_id(self) -> str:
"""Generate a fresh run identifier."""
return str(uuid.uuid4())
def save_report(
self,
run_id: str,
report_type: str,
data: dict[str, Any],
context: ReportContext | None = None,
) -> Path:
"""Persist a report keyed by run_id and type (FR-14011404)."""
run_dir = self.base_dir / run_id
run_dir.mkdir(parents=True, exist_ok=True)
report = RunReport(
run_id=run_id,
report_type=report_type,
data=data,
created_at=datetime.now(UTC).isoformat(),
context=context or ReportContext(),
)
path = run_dir / f"{report_type}.json"
path.write_text(json.dumps(report.to_dict(), indent=2), encoding="utf-8")
return path
def get_report(self, run_id: str, report_type: str) -> RunReport | None:
"""Retrieve a specific report (FR-1409)."""
path = self.base_dir / run_id / f"{report_type}.json"
if not path.exists():
return None
return RunReport.from_dict(json.loads(path.read_text(encoding="utf-8")))
def list_runs(self) -> list[str]:
"""List all run IDs in the store."""
if not self.base_dir.exists():
return []
return sorted(d.name for d in self.base_dir.iterdir() if d.is_dir())
def list_reports(self, run_id: str) -> list[RunReport]:
"""List all reports for a run (FR-1409)."""
run_dir = self.base_dir / run_id
if not run_dir.exists():
return []
reports = []
for p in sorted(run_dir.glob("*.json")):
reports.append(RunReport.from_dict(json.loads(p.read_text(encoding="utf-8"))))
return reports
def assemble_set(self, run_ids: list[str]) -> EvidenceSet:
"""Assemble an evidence set from multiple runs (FR-1406)."""
reports: list[RunReport] = []
for run_id in run_ids:
reports.extend(self.list_reports(run_id))
return EvidenceSet(run_ids=run_ids, reports=reports)
def to_markdown(self, run_id: str) -> str:
"""Human-readable Markdown report for a run (FR-1411)."""
reports = self.list_reports(run_id)
lines = [f"# Evidence Run: {run_id}\n"]
for r in reports:
lines.append(f"## {r.report_type.title()} Report")
lines.append(f"- Status: {r.data.get('status', 'unknown')}")
for w in r.data.get("warnings", []):
lines.append(f"- Warning: {w}")
for e in r.data.get("errors", []):
lines.append(f"- Error: {e}")
lines.append("")
return "\n".join(lines)
def to_json(self, run_id: str) -> str:
"""Machine-readable JSON report for a run (FR-1412)."""
reports = self.list_reports(run_id)
return json.dumps(
{"run_id": run_id, "reports": [r.to_dict() for r in reports]},
indent=2,
)