generated from coulomb/repo-seed
Implement infospace scaffold and service baseline
This commit is contained in:
262
src/info_tech_canon/service.py
Normal file
262
src/info_tech_canon/service.py
Normal file
@@ -0,0 +1,262 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import Counter
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
from .bench import (
|
||||
Infospace,
|
||||
KnowledgeArtifact,
|
||||
export_mermaid,
|
||||
load_infospace,
|
||||
relationship_summary,
|
||||
run_collection_checks,
|
||||
)
|
||||
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||||
DEFAULT_INFOSPACE_ROOT = REPO_ROOT / "infospace"
|
||||
|
||||
|
||||
class CanonServiceError(Exception):
|
||||
def __init__(
|
||||
self,
|
||||
code: str,
|
||||
message: str,
|
||||
details: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.code = code
|
||||
self.message = message
|
||||
self.details = details or {}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"ok": False,
|
||||
"error": {
|
||||
"code": self.code,
|
||||
"message": self.message,
|
||||
"details": self.details,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CanonContext:
|
||||
repo_root: Path
|
||||
infospace_root: Path
|
||||
infospace: Infospace
|
||||
|
||||
|
||||
def load_context(root: Path | str | None = None) -> CanonContext:
|
||||
infospace_root = Path(root) if root else DEFAULT_INFOSPACE_ROOT
|
||||
try:
|
||||
infospace = load_infospace(infospace_root)
|
||||
except Exception as exc:
|
||||
raise CanonServiceError(
|
||||
"infospace_load_failed",
|
||||
f"Unable to load infospace at {infospace_root}",
|
||||
{"root": str(infospace_root), "reason": str(exc)},
|
||||
) from exc
|
||||
return CanonContext(
|
||||
repo_root=REPO_ROOT,
|
||||
infospace_root=infospace_root,
|
||||
infospace=infospace,
|
||||
)
|
||||
|
||||
|
||||
def inspect_canon(root: Path | str | None = None) -> dict[str, Any]:
|
||||
context = load_context(root)
|
||||
artifacts = context.infospace.artifacts
|
||||
kinds = Counter(artifact.kind for artifact in artifacts)
|
||||
return {
|
||||
"ok": True,
|
||||
"repo": {
|
||||
"slug": "info-tech-canon",
|
||||
"root": str(context.repo_root),
|
||||
},
|
||||
"infospace": {
|
||||
"slug": context.infospace.config.slug,
|
||||
"name": context.infospace.config.name,
|
||||
"root": str(context.infospace_root),
|
||||
"artifact_count": len(artifacts),
|
||||
"kinds": dict(sorted(kinds.items())),
|
||||
},
|
||||
"service": {
|
||||
"package": "info_tech_canon",
|
||||
"contract": "cli-json-api",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def list_artifacts(
|
||||
root: Path | str | None = None,
|
||||
*,
|
||||
kind: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
context = load_context(root)
|
||||
artifacts = [
|
||||
_artifact_to_dict(artifact, context.infospace_root)
|
||||
for artifact in context.infospace.artifacts
|
||||
if kind is None or artifact.kind == kind
|
||||
]
|
||||
return {
|
||||
"ok": True,
|
||||
"count": len(artifacts),
|
||||
"artifacts": artifacts,
|
||||
}
|
||||
|
||||
|
||||
def list_models(root: Path | str | None = None) -> dict[str, Any]:
|
||||
return list_artifacts(root, kind="model")
|
||||
|
||||
|
||||
def list_standards(root: Path | str | None = None) -> dict[str, Any]:
|
||||
return list_artifacts(root, kind="standard")
|
||||
|
||||
|
||||
def validate_canon(root: Path | str | None = None) -> dict[str, Any]:
|
||||
context = load_context(root)
|
||||
errors: list[dict[str, Any]] = []
|
||||
|
||||
artifact_ids = {artifact.id for artifact in context.infospace.artifacts}
|
||||
for artifact in context.infospace.artifacts:
|
||||
artifact_path = context.infospace_root / artifact.path
|
||||
if not artifact_path.is_file():
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_artifact_path",
|
||||
"artifact_id": artifact.id,
|
||||
"path": artifact.path,
|
||||
}
|
||||
)
|
||||
for relationship in artifact.relationships:
|
||||
target = relationship.get("target")
|
||||
if target not in artifact_ids:
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_relationship_target",
|
||||
"artifact_id": artifact.id,
|
||||
"target": target,
|
||||
}
|
||||
)
|
||||
|
||||
for discipline in context.infospace.config.disciplines:
|
||||
discipline_path = context.infospace_root / discipline.path
|
||||
if not discipline_path.is_file():
|
||||
errors.append(
|
||||
{
|
||||
"code": "missing_discipline_path",
|
||||
"discipline": discipline.name,
|
||||
"path": discipline.path,
|
||||
}
|
||||
)
|
||||
|
||||
checks = run_collection_checks(context.infospace.artifacts)
|
||||
threshold_errors = _evaluate_thresholds(
|
||||
checks.metrics,
|
||||
context.infospace.config.viability,
|
||||
)
|
||||
errors.extend(threshold_errors)
|
||||
|
||||
return {
|
||||
"ok": not errors,
|
||||
"errors": errors,
|
||||
"metrics": checks.metrics,
|
||||
"details": checks.details,
|
||||
}
|
||||
|
||||
|
||||
def artifact_graph(
|
||||
root: Path | str | None = None,
|
||||
*,
|
||||
output_format: str = "json",
|
||||
) -> dict[str, Any]:
|
||||
context = load_context(root)
|
||||
summary = relationship_summary(context.infospace.artifacts)
|
||||
if output_format == "mermaid":
|
||||
return {"ok": True, "format": "mermaid", "graph": export_mermaid(summary)}
|
||||
if output_format != "json":
|
||||
raise CanonServiceError(
|
||||
"unsupported_graph_format",
|
||||
f"Unsupported graph format: {output_format}",
|
||||
{"supported": ["json", "mermaid"]},
|
||||
)
|
||||
return {
|
||||
"ok": True,
|
||||
"format": "json",
|
||||
"graph": {
|
||||
"node_count": summary.node_count,
|
||||
"edge_count": summary.edge_count,
|
||||
"nodes": summary.nodes,
|
||||
"edges": [asdict(edge) for edge in summary.edges],
|
||||
"relationship_types": summary.relationship_types,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def profile_inspect(
|
||||
profile: str,
|
||||
root: Path | str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
context = load_context(root)
|
||||
profile_path = context.infospace_root / "profiles" / profile / "profile.yaml"
|
||||
if not profile_path.is_file():
|
||||
raise CanonServiceError(
|
||||
"missing_profile",
|
||||
f"Profile not found: {profile}",
|
||||
{"profile": profile, "path": str(profile_path)},
|
||||
)
|
||||
with profile_path.open("r", encoding="utf-8") as handle:
|
||||
data = yaml.safe_load(handle) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise CanonServiceError(
|
||||
"invalid_profile",
|
||||
f"Profile must be a YAML mapping: {profile}",
|
||||
{"profile": profile, "path": str(profile_path)},
|
||||
)
|
||||
return {"ok": True, "profile": data, "path": str(profile_path)}
|
||||
|
||||
|
||||
def _artifact_to_dict(
|
||||
artifact: KnowledgeArtifact,
|
||||
infospace_root: Path,
|
||||
) -> dict[str, Any]:
|
||||
data = artifact.to_dict()
|
||||
data["exists"] = (infospace_root / artifact.path).is_file()
|
||||
return data
|
||||
|
||||
|
||||
def _evaluate_thresholds(
|
||||
metrics: dict[str, float],
|
||||
thresholds: dict[str, Any],
|
||||
) -> list[dict[str, Any]]:
|
||||
errors: list[dict[str, Any]] = []
|
||||
for metric, threshold in thresholds.items():
|
||||
value = metrics.get(metric)
|
||||
if value is None:
|
||||
continue
|
||||
min_value = getattr(threshold, "min", None)
|
||||
max_value = getattr(threshold, "max", None)
|
||||
if min_value is not None and value < min_value:
|
||||
errors.append(
|
||||
{
|
||||
"code": "metric_below_threshold",
|
||||
"metric": metric,
|
||||
"value": value,
|
||||
"min": min_value,
|
||||
}
|
||||
)
|
||||
if max_value is not None and value > max_value:
|
||||
errors.append(
|
||||
{
|
||||
"code": "metric_above_threshold",
|
||||
"metric": metric,
|
||||
"value": value,
|
||||
"max": max_value,
|
||||
}
|
||||
)
|
||||
return errors
|
||||
Reference in New Issue
Block a user