from __future__ import annotations from dataclasses import dataclass from math import log2 from .models import KnowledgeArtifact @dataclass(frozen=True) class CollectionCheckReport: metrics: dict[str, float] details: dict[str, object] def run_collection_checks(artifacts: list[KnowledgeArtifact]) -> CollectionCheckReport: graph = _directed_graph(artifacts) metrics = { "redundancy_ratio": _redundancy_ratio(artifacts), "coverage_ratio": _coverage_ratio(artifacts), "coherence_components": float(_component_count(graph)), "consistency_cycles": float(_cycle_count(graph)), "granularity_entropy": _kind_entropy(artifacts), } return CollectionCheckReport( metrics=metrics, details={ "artifact_count": len(artifacts), "relationship_count": sum(len(item.relationships) for item in artifacts), }, ) def _redundancy_ratio(artifacts: list[KnowledgeArtifact]) -> float: if not artifacts: return 0.0 labels = [item.title or item.id for item in artifacts] duplicate_count = len(labels) - len(set(labels)) return duplicate_count / len(artifacts) def _coverage_ratio(artifacts: list[KnowledgeArtifact]) -> float: if not artifacts: return 0.0 covered = sum(1 for item in artifacts if item.title and item.path) return covered / len(artifacts) def _kind_entropy(artifacts: list[KnowledgeArtifact]) -> float: if not artifacts: return 0.0 counts: dict[str, int] = {} for artifact in artifacts: counts[artifact.kind] = counts.get(artifact.kind, 0) + 1 total = len(artifacts) return -sum((count / total) * log2(count / total) for count in counts.values()) def _directed_graph(artifacts: list[KnowledgeArtifact]) -> dict[str, set[str]]: ids = {item.id for item in artifacts} graph = {item.id: set() for item in artifacts} for item in artifacts: for relationship in item.relationships: target = relationship.get("target") if isinstance(target, str) and target in ids: graph[item.id].add(target) return graph def _component_count(graph: dict[str, set[str]]) -> int: if not graph: return 0 undirected = {node: set(edges) for node, edges in graph.items()} for node, edges in graph.items(): for target in edges: undirected.setdefault(target, set()).add(node) seen: set[str] = set() count = 0 for node in undirected: if node in seen: continue count += 1 stack = [node] while stack: current = stack.pop() if current in seen: continue seen.add(current) stack.extend(undirected[current] - seen) return count def _cycle_count(graph: dict[str, set[str]]) -> int: cycles = 0 visited: set[str] = set() active: set[str] = set() def visit(node: str) -> None: nonlocal cycles visited.add(node) active.add(node) for target in graph[node]: if target not in visited: visit(target) elif target in active: cycles += 1 active.remove(node) for node in graph: if node not in visited: visit(node) return cycles