Build dependency graph and dependency aware propagation of changes

This commit is contained in:
2026-05-03 01:20:58 +02:00
parent 66bc60a7a5
commit 85766be5bc
6 changed files with 554 additions and 6 deletions

View File

@@ -150,6 +150,54 @@ class SourceReference:
line: int | None = None
@dataclass(frozen=True)
class DependencyEdge:
source_kind: str
source_id: int | None
source_key: str
target_kind: str
target_id: int
target_key: str
dependency_type: str
strength: str
source: str
target_ownership: str
same_layer: bool = False
@dataclass(frozen=True)
class DependencyGraph:
repository: Repository
scope: "Scope"
edges: list[DependencyEdge]
@dataclass(frozen=True)
class DependencyImpactItem:
item_kind: str
item_id: int
item_key: str
name: str
freshness_state: str
ownership: str
recommended_action: str
impact_depth: int
reasons: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class DependencyImpactAnalysis:
repository: Repository
base_run: AnalysisRun
target_run: AnalysisRun
changed_fact_keys: list[str]
impacts: list[DependencyImpactItem]
max_depth: int
scope_impacted: bool
propagation_breadth: int
graph: DependencyGraph
@dataclass(frozen=True)
class CandidateEvidence:
id: int

View File

@@ -17,6 +17,10 @@ from repo_registry.core.models import (
CandidateGraph,
CharacteristicRebuildResult,
ContentChunk,
DependencyEdge,
DependencyGraph,
DependencyImpactAnalysis,
DependencyImpactItem,
ExpectationGap,
ObservedFact,
Repository,
@@ -946,6 +950,146 @@ class RegistryService:
),
)
def build_dependency_graph(self, repository_id: int) -> DependencyGraph:
repository = self.store.get_repository(repository_id)
ability_map = self.store.get_ability_map(repository_id)
edges: list[DependencyEdge] = []
scope_key = self._dependency_key("scope", ability_map.scope.id)
for ability in ability_map.abilities:
ability_key = self._dependency_key("ability", ability.id)
edges.append(
self._dependency_edge(
source_kind="ability",
source_id=ability.id,
source_key=ability_key,
target_kind="scope",
target_id=ability_map.scope.id,
target_key=scope_key,
dependency_type="summarizes",
strength="strong",
source="approved_characteristic",
)
)
for capability in ability.capabilities:
capability_key = self._dependency_key("capability", capability.id)
edges.append(
self._dependency_edge(
source_kind="capability",
source_id=capability.id,
source_key=capability_key,
target_kind="ability",
target_id=ability.id,
target_key=ability_key,
dependency_type="realizes",
strength="strong",
source="approved_characteristic",
)
)
edges.extend(
self._capability_dependency_edges(
capability,
capability_key=capability_key,
)
)
return DependencyGraph(
repository=repository,
scope=ability_map.scope,
edges=edges,
)
def analyze_dependency_impact(
self,
repository_id: int,
base_analysis_run_id: int,
target_analysis_run_id: int,
) -> DependencyImpactAnalysis:
diff = self.diff_analysis_runs(
repository_id,
base_analysis_run_id,
target_analysis_run_id,
)
graph = self.build_dependency_graph(repository_id)
changed_facts = [
item
for section in (
diff.facts.added,
diff.facts.removed,
diff.facts.changed,
diff.facts.weakened,
)
for item in section
]
changed_fact_keys = [item.key for item in changed_facts]
fact_reasons = {
item.key: f"{item.change_type} fact {item.key}" for item in changed_facts
}
adjacency: dict[str, list[DependencyEdge]] = {}
for edge in graph.edges:
adjacency.setdefault(edge.source_key, []).append(edge)
queue: list[tuple[str, int, str]] = [
(key, 0, fact_reasons[key]) for key in changed_fact_keys
]
impacts_by_key: dict[str, DependencyImpactItem] = {}
visited_edges: set[tuple[str, str]] = set()
while queue:
source_key, depth, inherited_reason = queue.pop(0)
for edge in adjacency.get(source_key, []):
edge_marker = (edge.source_key, edge.target_key)
if edge_marker in visited_edges:
continue
visited_edges.add(edge_marker)
impact_depth = depth + 1
reason = (
f"{inherited_reason} -> {edge.target_kind} depends on "
f"{edge.source_kind} via {edge.dependency_type}"
)
current = impacts_by_key.get(edge.target_key)
if current is None:
impacts_by_key[edge.target_key] = DependencyImpactItem(
item_kind=edge.target_kind,
item_id=edge.target_id,
item_key=edge.target_key,
name=self._dependency_display_name(
repository_id,
edge.target_kind,
edge.target_id,
),
freshness_state="stale",
ownership=edge.target_ownership,
recommended_action=self._recommended_action(
edge.target_ownership
),
impact_depth=impact_depth,
reasons=[reason],
)
else:
impacts_by_key[edge.target_key] = replace(
current,
impact_depth=min(current.impact_depth, impact_depth),
reasons=[*current.reasons, reason],
)
queue.append((edge.target_key, impact_depth, reason))
impacts = sorted(
impacts_by_key.values(),
key=lambda item: (item.impact_depth, item.item_kind, item.item_id),
)
max_depth = max((item.impact_depth for item in impacts), default=0)
return DependencyImpactAnalysis(
repository=diff.repository,
base_run=diff.base_run,
target_run=diff.target_run,
changed_fact_keys=changed_fact_keys,
impacts=impacts,
max_depth=max_depth,
scope_impacted=any(item.item_kind == "scope" for item in impacts),
propagation_breadth=len(impacts),
graph=graph,
)
def approve_analysis_run_changes(
self,
repository_id: int,
@@ -1989,6 +2133,7 @@ class RegistryService:
return {
f"fact:{fact.kind}:{fact.path}:{fact.name}": {
"item_type": "fact",
"id": fact.id,
"kind": fact.kind,
"path": fact.path,
"name": fact.name,
@@ -1998,6 +2143,163 @@ class RegistryService:
for fact in facts
}
def _capability_dependency_edges(
self,
capability,
*,
capability_key: str,
) -> list[DependencyEdge]:
edges: list[DependencyEdge] = []
for feature in capability.features:
feature_key = self._dependency_key("feature", feature.id)
edges.append(
self._dependency_edge(
source_kind="feature",
source_id=feature.id,
source_key=feature_key,
target_kind="capability",
target_id=capability.id,
target_key=capability_key,
dependency_type="supports",
strength="medium",
source="approved_characteristic",
)
)
for source_ref in feature.source_refs:
edges.append(
self._dependency_edge(
source_kind="fact",
source_id=source_ref.fact_id,
source_key=self._source_ref_fact_key(source_ref),
target_kind="feature",
target_id=feature.id,
target_key=feature_key,
dependency_type="observes",
strength="strong",
source="source_ref",
)
)
for evidence in capability.evidence:
evidence_key = self._dependency_key("evidence", evidence.id)
edges.append(
self._dependency_edge(
source_kind="evidence",
source_id=evidence.id,
source_key=evidence_key,
target_kind="capability",
target_id=capability.id,
target_key=capability_key,
dependency_type="supports",
strength=evidence.strength or "medium",
source="approved_characteristic",
)
)
for source_ref in evidence.source_refs:
edges.append(
self._dependency_edge(
source_kind="fact",
source_id=source_ref.fact_id,
source_key=self._source_ref_fact_key(source_ref),
target_kind="evidence",
target_id=evidence.id,
target_key=evidence_key,
dependency_type="observes",
strength=evidence.strength or "medium",
source="source_ref",
)
)
if evidence.reference_kind in {"feature", "capability", "ability", "scope"}:
reference_id = evidence.reference_id
if reference_id is not None:
edges.append(
self._dependency_edge(
source_kind=evidence.reference_kind,
source_id=reference_id,
source_key=self._dependency_key(
evidence.reference_kind,
reference_id,
),
target_kind=evidence.target_kind,
target_id=evidence.target_id or capability.id,
target_key=self._dependency_key(
evidence.target_kind,
evidence.target_id or capability.id,
),
dependency_type="relates",
strength=evidence.strength or "medium",
source="approved_evidence",
)
)
return edges
def _dependency_edge(
self,
*,
source_kind: str,
source_id: int | None,
source_key: str,
target_kind: str,
target_id: int,
target_key: str,
dependency_type: str,
strength: str,
source: str,
) -> DependencyEdge:
return DependencyEdge(
source_kind=source_kind,
source_id=source_id,
source_key=source_key,
target_kind=target_kind,
target_id=target_id,
target_key=target_key,
dependency_type=dependency_type,
strength=strength,
source=source,
target_ownership=self._ownership_for_kind(target_kind),
same_layer=source_kind == target_kind,
)
def _dependency_key(self, kind: str, item_id: int) -> str:
return f"{kind}:{item_id}"
def _source_ref_fact_key(self, source_ref) -> str:
return f"fact:{source_ref.kind}:{source_ref.path}:{source_ref.name}"
def _ownership_for_kind(self, kind: str) -> str:
if kind == "fact":
return "deterministic"
if kind in {"evidence", "feature", "capability"}:
return "mixed"
return "curator_owned"
def _recommended_action(self, ownership: str) -> str:
if ownership == "deterministic":
return "recalculate"
return "review"
def _dependency_display_name(
self,
repository_id: int,
kind: str,
item_id: int,
) -> str:
ability_map = self.store.get_ability_map(repository_id)
if kind == "scope" and ability_map.scope.id == item_id:
return ability_map.scope.name
for ability in ability_map.abilities:
if kind == "ability" and ability.id == item_id:
return ability.name
for capability in ability.capabilities:
if kind == "capability" and capability.id == item_id:
return capability.name
for feature in capability.features:
if kind == "feature" and feature.id == item_id:
return feature.name
for evidence in capability.evidence:
if kind == "evidence" and evidence.id == item_id:
return evidence.reference
return f"{kind}:{item_id}"
def _chunk_index(
self,
chunks: Sequence[ContentChunk],