Files
info-tech-canon/src/info_tech_canon/service.py

422 lines
13 KiB
Python

from __future__ import annotations
from collections import Counter
from dataclasses import asdict, dataclass
from pathlib import Path
import json
from typing import Any
import yaml
from . import generation
from . import profiles
from .bench import (
Infospace,
KnowledgeArtifact,
export_mermaid,
load_infospace,
relationship_summary,
run_collection_checks,
)
from .validation import structural_checks
REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_INFOSPACE_ROOT = REPO_ROOT / "infospace"
REVIEW_KIT_COMPONENTS = {
"manifest": "agent/review-kit/review-kit.yaml",
"workflow": "agent/review-kit/review-workflow.yaml",
"scorecard": "agent/review-kit/scorecard.yaml",
"model_selection_guide": "agent/review-kit/model-selection-guide.yaml",
"schema": "schemas/alignment-review.schema.yaml",
}
ALIGNMENT_TEMPLATE_PATH = "agent/templates/consumer-alignment-workplan.template.md"
class CanonServiceError(Exception):
def __init__(
self,
code: str,
message: str,
details: dict[str, Any] | None = None,
) -> None:
super().__init__(message)
self.code = code
self.message = message
self.details = details or {}
def to_dict(self) -> dict[str, Any]:
return {
"ok": False,
"error": {
"code": self.code,
"message": self.message,
"details": self.details,
},
}
@dataclass(frozen=True)
class CanonContext:
repo_root: Path
infospace_root: Path
infospace: Infospace
def load_context(root: Path | str | None = None) -> CanonContext:
infospace_root = Path(root) if root else DEFAULT_INFOSPACE_ROOT
try:
infospace = load_infospace(infospace_root)
except Exception as exc:
raise CanonServiceError(
"infospace_load_failed",
f"Unable to load infospace at {infospace_root}",
{"root": str(infospace_root), "reason": str(exc)},
) from exc
return CanonContext(
repo_root=REPO_ROOT,
infospace_root=infospace_root,
infospace=infospace,
)
def inspect_canon(root: Path | str | None = None) -> dict[str, Any]:
context = load_context(root)
artifacts = context.infospace.artifacts
kinds = Counter(artifact.kind for artifact in artifacts)
return {
"ok": True,
"repo": {
"slug": "info-tech-canon",
"root": str(context.repo_root),
},
"infospace": {
"slug": context.infospace.config.slug,
"name": context.infospace.config.name,
"root": str(context.infospace_root),
"artifact_count": len(artifacts),
"kinds": dict(sorted(kinds.items())),
},
"service": {
"package": "info_tech_canon",
"contract": "cli-json-api",
},
}
def list_artifacts(
root: Path | str | None = None,
*,
kind: str | None = None,
) -> dict[str, Any]:
context = load_context(root)
artifacts = [
_artifact_to_dict(artifact, context.infospace_root)
for artifact in context.infospace.artifacts
if kind is None or artifact.kind == kind
]
return {
"ok": True,
"count": len(artifacts),
"artifacts": artifacts,
}
def list_models(root: Path | str | None = None) -> dict[str, Any]:
return list_artifacts(root, kind="model")
def list_standards(root: Path | str | None = None) -> dict[str, Any]:
return list_artifacts(root, kind="standard")
def review_kit(root: Path | str | None = None) -> dict[str, Any]:
context = load_context(root)
components = {
name: {
"path": relative,
"content": _read_yaml_component(context.infospace_root, relative),
}
for name, relative in REVIEW_KIT_COMPONENTS.items()
}
template = alignment_template(root)
return {
"ok": True,
"review_kit": components["manifest"]["content"],
"components": components,
"template": {
"path": template["path"],
"content": template["content"],
},
}
def alignment_template(root: Path | str | None = None) -> dict[str, Any]:
context = load_context(root)
path = context.infospace_root / ALIGNMENT_TEMPLATE_PATH
try:
content = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise CanonServiceError(
"missing_alignment_template",
"Consumer alignment workplan template not found.",
{"path": str(path)},
) from exc
return {
"ok": True,
"path": ALIGNMENT_TEMPLATE_PATH,
"content": content,
}
def validate_canon(root: Path | str | None = None) -> dict[str, Any]:
context = load_context(root)
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
artifact_ids = {artifact.id for artifact in context.infospace.artifacts}
for artifact in context.infospace.artifacts:
artifact_path = context.infospace_root / artifact.path
if not artifact_path.is_file():
errors.append(
{
"code": "missing_artifact_path",
"artifact_id": artifact.id,
"path": artifact.path,
}
)
for relationship in artifact.relationships:
target = relationship.get("target")
if target not in artifact_ids:
errors.append(
{
"code": "missing_relationship_target",
"artifact_id": artifact.id,
"target": target,
}
)
for discipline in context.infospace.config.disciplines:
discipline_path = context.infospace_root / discipline.path
if not discipline_path.is_file():
errors.append(
{
"code": "missing_discipline_path",
"discipline": discipline.name,
"path": discipline.path,
}
)
checks = run_collection_checks(context.infospace.artifacts)
threshold_errors = _evaluate_thresholds(
checks.metrics,
context.infospace.config.viability,
)
errors.extend(threshold_errors)
structural = structural_checks(context)
errors.extend(structural["errors"])
warnings.extend(structural["warnings"])
return {
"ok": not errors,
"errors": errors,
"warnings": warnings,
"metrics": checks.metrics,
"details": checks.details,
}
def write_validation_report(
destination: Path | str,
root: Path | str | None = None,
) -> dict[str, Any]:
payload = validate_canon(root)
path = Path(destination)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
payload["report_path"] = str(path)
return payload
def artifact_graph(
root: Path | str | None = None,
*,
output_format: str = "json",
) -> dict[str, Any]:
context = load_context(root)
summary = relationship_summary(context.infospace.artifacts)
if output_format == "mermaid":
return {"ok": True, "format": "mermaid", "graph": export_mermaid(summary)}
if output_format != "json":
raise CanonServiceError(
"unsupported_graph_format",
f"Unsupported graph format: {output_format}",
{"supported": ["json", "mermaid"]},
)
return {
"ok": True,
"format": "json",
"graph": {
"node_count": summary.node_count,
"edge_count": summary.edge_count,
"nodes": summary.nodes,
"edges": [asdict(edge) for edge in summary.edges],
"relationship_types": summary.relationship_types,
},
}
def profile_inspect(
profile: str,
root: Path | str | None = None,
) -> dict[str, Any]:
context = load_context(root)
profile_path = context.infospace_root / "profiles" / profile / "profile.yaml"
if not profile_path.is_file():
raise CanonServiceError(
"missing_profile",
f"Profile not found: {profile}",
{"profile": profile, "path": str(profile_path)},
)
try:
return profiles.inspect_profile(context, profile)
except ValueError as exc:
raise CanonServiceError(
"invalid_profile",
f"Profile must be a YAML mapping: {profile}",
{"profile": profile, "path": str(profile_path)},
) from exc
def profile_validate(
profile: str,
root: Path | str | None = None,
) -> dict[str, Any]:
context = load_context(root)
profile_path = context.infospace_root / "profiles" / profile / "profile.yaml"
if not profile_path.is_file():
raise CanonServiceError(
"missing_profile",
f"Profile not found: {profile}",
{"profile": profile, "path": str(profile_path)},
)
return profiles.validate_profile(context, profile)
def profile_graph(
profile: str,
root: Path | str | None = None,
*,
output_format: str = "json",
) -> dict[str, Any]:
context = load_context(root)
profile_path = context.infospace_root / "profiles" / profile / "profile.yaml"
if not profile_path.is_file():
raise CanonServiceError(
"missing_profile",
f"Profile not found: {profile}",
{"profile": profile, "path": str(profile_path)},
)
try:
return profiles.profile_graph(context, profile, output_format=output_format)
except ValueError as exc:
raise CanonServiceError(
"unsupported_graph_format",
str(exc),
{"supported": ["json", "mermaid"]},
) from exc
def generate_indexes(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_indexes(load_context(root))
def generate_tree(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_tree(load_context(root))
def generate_agent_briefs(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_agent_briefs(load_context(root))
def list_views(root: Path | str | None = None) -> dict[str, Any]:
return generation.list_generated_views(load_context(root))
def read_view(name: str, root: Path | str | None = None) -> dict[str, Any]:
try:
return generation.read_generated_view(load_context(root), name)
except FileNotFoundError as exc:
raise CanonServiceError(
"missing_view",
f"View not found: {name}",
{"view": name},
) from exc
except ValueError as exc:
raise CanonServiceError(
"invalid_view_name",
str(exc),
{"view": name},
) from exc
def _read_yaml_component(infospace_root: Path, relative: str) -> Any:
path = infospace_root / relative
try:
with path.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
except FileNotFoundError as exc:
raise CanonServiceError(
"missing_review_kit_component",
f"Review kit component not found: {relative}",
{"path": str(path)},
) from exc
except yaml.YAMLError as exc:
raise CanonServiceError(
"invalid_review_kit_component",
f"Review kit component is not valid YAML: {relative}",
{"path": str(path), "reason": str(exc)},
) from exc
def _artifact_to_dict(
artifact: KnowledgeArtifact,
infospace_root: Path,
) -> dict[str, Any]:
data = artifact.to_dict()
data["exists"] = (infospace_root / artifact.path).is_file()
return data
def _evaluate_thresholds(
metrics: dict[str, float],
thresholds: dict[str, Any],
) -> list[dict[str, Any]]:
errors: list[dict[str, Any]] = []
for metric, threshold in thresholds.items():
value = metrics.get(metric)
if value is None:
continue
min_value = getattr(threshold, "min", None)
max_value = getattr(threshold, "max", None)
if min_value is not None and value < min_value:
errors.append(
{
"code": "metric_below_threshold",
"metric": metric,
"value": value,
"min": min_value,
}
)
if max_value is not None and value > max_value:
errors.append(
{
"code": "metric_above_threshold",
"metric": metric,
"value": value,
"max": max_value,
}
)
return errors