Initial implementation

This commit is contained in:
2026-05-14 11:32:25 +02:00
parent 6fd1ff7581
commit 916a895a85
31 changed files with 1461 additions and 21 deletions

View File

@@ -0,0 +1,28 @@
from .errors import InfospaceError
from .evaluation import EntityEvaluation, EvaluationSnapshot, MetricValue, ScoreEntry
from .lifecycle import add_artifact, create_infospace, load_infospace
from .models import (
DisciplineBinding,
Infospace,
InfospaceConfig,
KnowledgeArtifact,
TopicConfig,
ViabilityThreshold,
)
__all__ = [
"DisciplineBinding",
"EntityEvaluation",
"EvaluationSnapshot",
"Infospace",
"InfospaceConfig",
"InfospaceError",
"KnowledgeArtifact",
"MetricValue",
"ScoreEntry",
"TopicConfig",
"ViabilityThreshold",
"add_artifact",
"create_infospace",
"load_infospace",
]

View File

@@ -0,0 +1,5 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from dataclasses import dataclass
from math import log2
from .models import KnowledgeArtifact
@dataclass(frozen=True)
class CollectionCheckReport:
metrics: dict[str, float]
details: dict[str, object]
def run_collection_checks(artifacts: list[KnowledgeArtifact]) -> CollectionCheckReport:
graph = _directed_graph(artifacts)
metrics = {
"redundancy_ratio": _redundancy_ratio(artifacts),
"coverage_ratio": _coverage_ratio(artifacts),
"coherence_components": float(_component_count(graph)),
"consistency_cycles": float(_cycle_count(graph)),
"granularity_entropy": _kind_entropy(artifacts),
}
return CollectionCheckReport(
metrics=metrics,
details={
"artifact_count": len(artifacts),
"relationship_count": sum(len(item.relationships) for item in artifacts),
},
)
def _redundancy_ratio(artifacts: list[KnowledgeArtifact]) -> float:
if not artifacts:
return 0.0
labels = [item.title or item.id for item in artifacts]
duplicate_count = len(labels) - len(set(labels))
return duplicate_count / len(artifacts)
def _coverage_ratio(artifacts: list[KnowledgeArtifact]) -> float:
if not artifacts:
return 0.0
covered = sum(1 for item in artifacts if item.title and item.path)
return covered / len(artifacts)
def _kind_entropy(artifacts: list[KnowledgeArtifact]) -> float:
if not artifacts:
return 0.0
counts: dict[str, int] = {}
for artifact in artifacts:
counts[artifact.kind] = counts.get(artifact.kind, 0) + 1
total = len(artifacts)
return -sum((count / total) * log2(count / total) for count in counts.values())
def _directed_graph(artifacts: list[KnowledgeArtifact]) -> dict[str, set[str]]:
ids = {item.id for item in artifacts}
graph = {item.id: set() for item in artifacts}
for item in artifacts:
for relationship in item.relationships:
target = relationship.get("target")
if isinstance(target, str) and target in ids:
graph[item.id].add(target)
return graph
def _component_count(graph: dict[str, set[str]]) -> int:
if not graph:
return 0
undirected = {node: set(edges) for node, edges in graph.items()}
for node, edges in graph.items():
for target in edges:
undirected.setdefault(target, set()).add(node)
seen: set[str] = set()
count = 0
for node in undirected:
if node in seen:
continue
count += 1
stack = [node]
while stack:
current = stack.pop()
if current in seen:
continue
seen.add(current)
stack.extend(undirected[current] - seen)
return count
def _cycle_count(graph: dict[str, set[str]]) -> int:
cycles = 0
visited: set[str] = set()
active: set[str] = set()
def visit(node: str) -> None:
nonlocal cycles
visited.add(node)
active.add(node)
for target in graph[node]:
if target not in visited:
visit(target)
elif target in active:
cycles += 1
active.remove(node)
for node in graph:
if node not in visited:
visit(node)
return cycles

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from .errors import InfospaceError
from .lifecycle import add_artifact, create_infospace, load_infospace
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="infospace-bench")
sub = parser.add_subparsers(dest="command", required=True)
create = sub.add_parser("create", help="Create an infospace")
create.add_argument("workspace")
create.add_argument("slug")
create.add_argument("--name", required=True)
create.add_argument("--topic-domain", default="")
inspect = sub.add_parser("inspect", help="Inspect an infospace")
inspect.add_argument("root")
add = sub.add_parser("add-artifact", help="Add an artifact to an infospace")
add.add_argument("root")
add.add_argument("source")
add.add_argument("--kind", required=True)
add.add_argument("--title", default="")
export = sub.add_parser("export", help="Print the infospace representation")
export.add_argument("root")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
if args.command == "create":
infospace = create_infospace(
Path(args.workspace),
args.slug,
name=args.name,
topic_domain=args.topic_domain,
)
_write_json({"slug": infospace.config.slug, "root": str(infospace.root)})
elif args.command == "inspect":
_write_json(load_infospace(Path(args.root)).to_dict())
elif args.command == "add-artifact":
artifact = add_artifact(
Path(args.root),
Path(args.source),
kind=args.kind,
title=args.title,
)
_write_json({"artifact": artifact.to_dict()})
elif args.command == "export":
_write_json(load_infospace(Path(args.root)).to_dict())
else:
parser.error(f"Unhandled command: {args.command}")
except InfospaceError as exc:
print(json.dumps(exc.to_dict(), indent=2), file=sys.stderr)
return 2
return 0
def _write_json(payload: dict) -> None:
print(json.dumps(payload, indent=2))

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class InfospaceError(Exception):
"""Structured application error suitable for CLI and API surfaces."""
code: str
message: str
detail: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
super().__init__(self.message)
def to_dict(self) -> dict[str, Any]:
return {
"error": {
"code": self.code,
"message": self.message,
"detail": self.detail,
}
}

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass(frozen=True)
class ScoreEntry:
name: str
value: float
max_value: float = 5.0
rationale: str = ""
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {
"name": self.name,
"value": self.value,
"max_value": self.max_value,
}
if self.rationale:
data["rationale"] = self.rationale
return data
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ScoreEntry":
return cls(
name=str(data["name"]),
value=float(data["value"]),
max_value=float(data.get("max_value", 5.0)),
rationale=str(data.get("rationale") or ""),
)
@dataclass(frozen=True)
class EntityEvaluation:
artifact_id: str
evaluator: str
scores: list[ScoreEntry]
evaluated_at: datetime
notes: list[str] = field(default_factory=list)
@property
def overall_score(self) -> float:
if not self.scores:
return 0.0
return sum(score.value for score in self.scores) / len(self.scores)
def to_dict(self) -> dict[str, Any]:
return {
"artifact_id": self.artifact_id,
"evaluator": self.evaluator,
"evaluated_at": self.evaluated_at.isoformat(),
"overall_score": round(self.overall_score, 4),
"scores": [score.to_dict() for score in self.scores],
"notes": self.notes,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "EntityEvaluation":
return cls(
artifact_id=str(data["artifact_id"]),
evaluator=str(data["evaluator"]),
scores=[ScoreEntry.from_dict(item) for item in data.get("scores", [])],
evaluated_at=datetime.fromisoformat(str(data["evaluated_at"])),
notes=list(data.get("notes") or []),
)
@dataclass(frozen=True)
class MetricValue:
name: str
value: float
concern: str = ""
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {"name": self.name, "value": self.value}
if self.concern:
data["concern"] = self.concern
if self.details:
data["details"] = self.details
return data
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MetricValue":
return cls(
name=str(data["name"]),
value=float(data["value"]),
concern=str(data.get("concern") or ""),
details=dict(data.get("details") or {}),
)
@dataclass(frozen=True)
class EvaluationSnapshot:
snapshot_id: str
created_at: datetime
schema_name: str
artifact_count: int
artifact_evaluations: list[EntityEvaluation] = field(default_factory=list)
collection_metrics: list[MetricValue] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"snapshot_id": self.snapshot_id,
"created_at": self.created_at.isoformat(),
"schema_name": self.schema_name,
"artifact_count": self.artifact_count,
"artifact_evaluations": [
evaluation.to_dict() for evaluation in self.artifact_evaluations
],
"collection_metrics": [
metric.to_dict() for metric in self.collection_metrics
],
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "EvaluationSnapshot":
return cls(
snapshot_id=str(data["snapshot_id"]),
created_at=datetime.fromisoformat(str(data["created_at"])),
schema_name=str(data["schema_name"]),
artifact_count=int(data["artifact_count"]),
artifact_evaluations=[
EntityEvaluation.from_dict(item)
for item in data.get("artifact_evaluations", [])
],
collection_metrics=[
MetricValue.from_dict(item) for item in data.get("collection_metrics", [])
],
metadata=dict(data.get("metadata") or {}),
)
@dataclass(frozen=True)
class ScoreChange:
artifact_id: str
dimension: str
before: float
after: float
@property
def delta(self) -> float:
return self.after - self.before
@dataclass(frozen=True)
class MetricChange:
name: str
before: float
after: float
@property
def delta(self) -> float:
return self.after - self.before
@dataclass(frozen=True)
class SnapshotDiff:
before_id: str
after_id: str
added_artifacts: list[str] = field(default_factory=list)
removed_artifacts: list[str] = field(default_factory=list)
score_changes: list[ScoreChange] = field(default_factory=list)
metric_changes: list[MetricChange] = field(default_factory=list)
def diff_snapshots(
before: EvaluationSnapshot,
after: EvaluationSnapshot,
) -> SnapshotDiff:
before_scores = _score_index(before)
after_scores = _score_index(after)
before_artifacts = {artifact_id for artifact_id, _ in before_scores}
after_artifacts = {artifact_id for artifact_id, _ in after_scores}
score_changes = [
ScoreChange(artifact_id, dimension, before_scores[key], after_scores[key])
for key in sorted(before_scores.keys() & after_scores.keys())
for artifact_id, dimension in [key]
if before_scores[key] != after_scores[key]
]
before_metrics = {metric.name: metric.value for metric in before.collection_metrics}
after_metrics = {metric.name: metric.value for metric in after.collection_metrics}
metric_changes = [
MetricChange(name, before_metrics[name], after_metrics[name])
for name in sorted(before_metrics.keys() & after_metrics.keys())
if before_metrics[name] != after_metrics[name]
]
return SnapshotDiff(
before_id=before.snapshot_id,
after_id=after.snapshot_id,
added_artifacts=sorted(after_artifacts - before_artifacts),
removed_artifacts=sorted(before_artifacts - after_artifacts),
score_changes=score_changes,
metric_changes=metric_changes,
)
def _score_index(snapshot: EvaluationSnapshot) -> dict[tuple[str, str], float]:
return {
(evaluation.artifact_id, score.name): score.value
for evaluation in snapshot.artifact_evaluations
for score in evaluation.scores
}

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass, field
from .models import KnowledgeArtifact
@dataclass(frozen=True)
class RelationshipEdge:
source: str
target: str
type: str
@dataclass(frozen=True)
class RelationshipSummary:
nodes: list[str]
edges: list[RelationshipEdge]
relationship_types: dict[str, int] = field(default_factory=dict)
@property
def node_count(self) -> int:
return len(self.nodes)
@property
def edge_count(self) -> int:
return len(self.edges)
def relationship_summary(artifacts: list[KnowledgeArtifact]) -> RelationshipSummary:
ids = {artifact.id for artifact in artifacts}
edges: list[RelationshipEdge] = []
type_counts: dict[str, int] = {}
for artifact in artifacts:
for relationship in artifact.relationships:
target = relationship.get("target")
relation_type = str(relationship.get("type") or "related")
if isinstance(target, str) and target in ids:
edges.append(RelationshipEdge(artifact.id, target, relation_type))
type_counts[relation_type] = type_counts.get(relation_type, 0) + 1
return RelationshipSummary(
nodes=sorted(ids),
edges=edges,
relationship_types=dict(sorted(type_counts.items())),
)
def export_mermaid(summary: RelationshipSummary) -> str:
lines = ["graph TD"]
for node in summary.nodes:
lines.append(f" {node}")
for edge in summary.edges:
lines.append(f" {edge.source} -->|{edge.type}| {edge.target}")
return "\n".join(lines) + "\n"

View File

@@ -0,0 +1,170 @@
from __future__ import annotations
import re
import shutil
from pathlib import Path
from typing import Any
import yaml
from .errors import InfospaceError
from .models import Infospace, InfospaceConfig, KnowledgeArtifact, TopicConfig
SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$")
CONFIG_FILE = "infospace.yaml"
ARTIFACT_INDEX = "artifacts/index.yaml"
LAYOUT_DIRS = (
"artifacts/sources",
"artifacts/generated",
"output/evaluations",
"output/metrics",
"reports",
"exports",
)
KIND_DIRS = {"source": "sources", "generated": "generated"}
def create_infospace(
workspace: Path | str,
slug: str,
*,
name: str,
topic_domain: str = "",
) -> Infospace:
_validate_slug(slug)
workspace_path = Path(workspace)
root = workspace_path / "infospaces" / slug
if root.exists():
raise InfospaceError(
"infospace_exists",
f"Infospace already exists: {root}",
{"root": str(root)},
)
for relative in LAYOUT_DIRS:
(root / relative).mkdir(parents=True, exist_ok=True)
config = InfospaceConfig(
slug=slug,
name=name,
topic=TopicConfig(name=name, domain=topic_domain),
)
_write_yaml(root / CONFIG_FILE, config.to_dict())
_write_yaml(root / ARTIFACT_INDEX, {"artifacts": []})
return Infospace(root=root, config=config, artifacts=[])
def load_infospace(root: Path | str) -> Infospace:
root_path = Path(root)
if not root_path.exists():
raise InfospaceError(
"missing_infospace",
f"Infospace path does not exist: {root_path}",
{"root": str(root_path)},
)
config_path = root_path / CONFIG_FILE
if not config_path.is_file():
raise InfospaceError(
"missing_config",
f"Missing infospace.yaml at {config_path}",
{"config_path": str(config_path)},
)
raw_config = _read_yaml(config_path)
try:
config = InfospaceConfig.from_dict(raw_config)
except KeyError as exc:
raise InfospaceError(
"invalid_config",
f"Missing required config field: {exc.args[0]}",
{"config_path": str(config_path), "field": exc.args[0]},
) from exc
return Infospace(root=root_path, config=config, artifacts=_read_artifacts(root_path))
def add_artifact(
root: Path | str,
source: Path | str,
*,
kind: str,
title: str = "",
relationships: list[dict[str, Any]] | None = None,
) -> KnowledgeArtifact:
infospace = load_infospace(root)
if kind not in KIND_DIRS:
raise InfospaceError(
"invalid_artifact_kind",
f"Unsupported artifact kind: {kind}",
{"kind": kind, "valid_kinds": sorted(KIND_DIRS)},
)
source_path = Path(source)
if not source_path.is_file():
raise InfospaceError(
"missing_artifact_source",
f"Artifact source does not exist: {source_path}",
{"source": str(source_path)},
)
artifact_id = f"{kind}/{source_path.name}"
if any(item.id == artifact_id for item in infospace.artifacts):
raise InfospaceError(
"duplicate_artifact",
f"Artifact already exists: {artifact_id}",
{"artifact_id": artifact_id},
)
target = infospace.root / "artifacts" / KIND_DIRS[kind] / source_path.name
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(source_path, target)
artifact = KnowledgeArtifact(
id=artifact_id,
path=str(target.relative_to(infospace.root)),
kind=kind,
title=title,
provenance={"source_path": str(source_path)},
relationships=relationships or [],
)
artifacts = [*infospace.artifacts, artifact]
_write_yaml(
infospace.root / ARTIFACT_INDEX,
{"artifacts": [item.to_dict() for item in artifacts]},
)
return artifact
def _validate_slug(slug: str) -> None:
if not SLUG_RE.match(slug):
raise InfospaceError(
"invalid_slug",
"Slug must contain only lowercase letters, numbers, and hyphens",
{"slug": slug},
)
def _read_artifacts(root: Path) -> list[KnowledgeArtifact]:
path = root / ARTIFACT_INDEX
if not path.exists():
return []
data = _read_yaml(path)
return [KnowledgeArtifact.from_dict(item) for item in data.get("artifacts", [])]
def _read_yaml(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if not isinstance(data, dict):
raise InfospaceError(
"invalid_yaml",
f"Expected mapping in YAML file: {path}",
{"path": str(path)},
)
return data
def _write_yaml(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
yaml.safe_dump(data, handle, sort_keys=False)

View File

@@ -0,0 +1,143 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class TopicConfig:
name: str
domain: str = ""
sources: str = "artifacts/sources"
@classmethod
def from_dict(cls, data: dict[str, Any] | None) -> "TopicConfig":
data = data or {}
return cls(
name=str(data.get("name") or ""),
domain=str(data.get("domain") or ""),
sources=str(data.get("sources") or "artifacts/sources"),
)
def to_dict(self) -> dict[str, Any]:
return {"name": self.name, "domain": self.domain, "sources": self.sources}
@dataclass(frozen=True)
class DisciplineBinding:
name: str
path: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DisciplineBinding":
return cls(name=str(data["name"]), path=str(data["path"]))
def to_dict(self) -> dict[str, Any]:
return {"name": self.name, "path": self.path}
@dataclass(frozen=True)
class ViabilityThreshold:
min: float | None = None
max: float | None = None
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ViabilityThreshold":
return cls(
min=float(data["min"]) if data.get("min") is not None else None,
max=float(data["max"]) if data.get("max") is not None else None,
)
def to_dict(self) -> dict[str, float]:
result: dict[str, float] = {}
if self.min is not None:
result["min"] = self.min
if self.max is not None:
result["max"] = self.max
return result
@dataclass(frozen=True)
class InfospaceConfig:
slug: str
name: str
topic: TopicConfig
disciplines: list[DisciplineBinding] = field(default_factory=list)
schemas: dict[str, str] = field(default_factory=dict)
workflows: list[dict[str, Any]] = field(default_factory=list)
viability: dict[str, ViabilityThreshold] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "InfospaceConfig":
return cls(
slug=str(data["slug"]),
name=str(data.get("name") or data["slug"]),
topic=TopicConfig.from_dict(data.get("topic")),
disciplines=[
DisciplineBinding.from_dict(item)
for item in data.get("disciplines", [])
],
schemas={str(k): str(v) for k, v in (data.get("schemas") or {}).items()},
workflows=list(data.get("workflows") or []),
viability={
str(k): ViabilityThreshold.from_dict(v)
for k, v in (data.get("viability") or {}).items()
},
)
def to_dict(self) -> dict[str, Any]:
return {
"slug": self.slug,
"name": self.name,
"topic": self.topic.to_dict(),
"disciplines": [item.to_dict() for item in self.disciplines],
"schemas": self.schemas,
"workflows": self.workflows,
"viability": {k: v.to_dict() for k, v in self.viability.items()},
}
@dataclass(frozen=True)
class KnowledgeArtifact:
id: str
path: str
kind: str
title: str = ""
provenance: dict[str, Any] = field(default_factory=dict)
relationships: list[dict[str, Any]] = field(default_factory=list)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "KnowledgeArtifact":
return cls(
id=str(data["id"]),
path=str(data["path"]),
kind=str(data["kind"]),
title=str(data.get("title") or ""),
provenance=dict(data.get("provenance") or {}),
relationships=list(data.get("relationships") or []),
)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"path": self.path,
"kind": self.kind,
"title": self.title,
"provenance": self.provenance,
"relationships": self.relationships,
}
@dataclass(frozen=True)
class Infospace:
root: Path
config: InfospaceConfig
artifacts: list[KnowledgeArtifact] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"root": str(self.root),
"config": self.config.to_dict(),
"artifacts": [item.to_dict() for item in self.artifacts],
}

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from dataclasses import dataclass
from .models import ViabilityThreshold
@dataclass(frozen=True)
class ViabilityResult:
metric: str
value: float | None
threshold: ViabilityThreshold
passed: bool
@dataclass(frozen=True)
class ViabilityReport:
passed: bool
results: dict[str, ViabilityResult]
def evaluate_viability(
metrics: dict[str, float],
thresholds: dict[str, ViabilityThreshold],
) -> ViabilityReport:
results: dict[str, ViabilityResult] = {}
for name, threshold in thresholds.items():
value = metrics.get(name)
passed = value is not None
if value is not None and threshold.min is not None:
passed = passed and value >= threshold.min
if value is not None and threshold.max is not None:
passed = passed and value <= threshold.max
results[name] = ViabilityResult(
metric=name,
value=value,
threshold=threshold,
passed=passed,
)
return ViabilityReport(
passed=all(result.passed for result in results.values()),
results=results,
)