repository-scoped dependency graph view profile persistence and interactive exploration features

This commit is contained in:
2026-05-04 11:26:25 +02:00
parent 98a65581ac
commit 4f04491734
11 changed files with 1365 additions and 32 deletions

View File

@@ -165,6 +165,19 @@ class DependencyEdge:
same_layer: bool = False
@dataclass(frozen=True)
class DependencyGraphViewProfile:
id: int
repository_id: int
name: str
description: str
default_mode: str
filter_rules: list[dict[str, Any]]
manual_overrides: dict[str, str]
created_at: str
updated_at: str
@dataclass(frozen=True)
class DependencyGraph:
repository: Repository

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from collections.abc import Sequence
from dataclasses import asdict, replace
from typing import Any
from repo_registry.core.models import (
AbilitySummary,
@@ -19,6 +20,7 @@ from repo_registry.core.models import (
ContentChunk,
DependencyEdge,
DependencyGraph,
DependencyGraphViewProfile,
DependencyImpactAnalysis,
DependencyImpactItem,
ExpectationGap,
@@ -1096,6 +1098,9 @@ class RegistryService:
*,
base_analysis_run_id: int | None = None,
target_analysis_run_id: int | None = None,
profile_id: int | None = None,
rules: list[dict[str, Any]] | None = None,
manual_overrides: dict[str, str] | None = None,
) -> dict[str, object]:
impact = None
if base_analysis_run_id is not None or target_analysis_run_id is not None:
@@ -1116,18 +1121,66 @@ class RegistryService:
{item.item_key: item for item in impact.impacts} if impact is not None else {}
)
changed_fact_keys = set(impact.changed_fact_keys) if impact is not None else set()
ability_map = self.store.get_ability_map(repository_id)
facts_by_id = {
fact.id: fact for fact in self.store.list_observed_facts(repository_id)
}
characteristic_index = self._dependency_characteristic_index(ability_map)
nodes: dict[str, dict[str, object]] = {}
edge_sources: dict[str, DependencyEdge] = {}
profile = (
self.store.get_dependency_graph_profile(repository_id, profile_id)
if profile_id is not None
else None
)
merged_rules = [*(profile.filter_rules if profile is not None else []), *(rules or [])]
merged_overrides = {
**(profile.manual_overrides if profile is not None else {}),
**(manual_overrides or {}),
}
def ensure_node(kind: str, key: str, item_id: int | None) -> None:
if key in nodes:
return
impact_item = impact_by_key.get(key)
is_changed_fact = key in changed_fact_keys
detail = characteristic_index.get(key, {})
fact = facts_by_id.get(item_id) if kind == "fact" and item_id else None
if fact is not None:
detail = {
"name": fact.name,
"description": fact.value,
"primaryClass": fact.metadata.get("source_role", fact.kind),
"attributes": self._dependency_fact_attributes(fact),
"confidence": fact.metadata.get("confidence"),
"path": fact.path,
"value": fact.value,
"metadata": fact.metadata,
"sourceReferences": [
{
"fact_id": fact.id,
"path": fact.path,
"kind": fact.kind,
"name": fact.name,
"line": fact.metadata.get("line"),
}
],
}
nodes[key] = {
"data": {
"id": key,
"key": key,
"stableKey": key,
"kind": kind,
"layer": self._dependency_layer(kind),
"label": self._dependency_node_label(repository_id, kind, key, item_id),
"name": detail.get("name")
or self._dependency_node_label(repository_id, kind, key, item_id),
"description": detail.get("description", ""),
"primaryClass": detail.get("primaryClass", kind),
"attributes": detail.get("attributes", []),
"confidence": detail.get("confidence"),
"ownership": self._ownership_for_kind(kind),
"freshnessState": (
impact_item.freshness_state
@@ -1143,6 +1196,10 @@ class RegistryService:
impact_item.impact_depth if impact_item is not None else None
),
"reasons": impact_item.reasons if impact_item is not None else [],
"path": detail.get("path", ""),
"value": detail.get("value", ""),
"metadata": detail.get("metadata", {}),
"sourceReferences": detail.get("sourceReferences", []),
},
"classes": " ".join(
class_name
@@ -1159,16 +1216,49 @@ class RegistryService:
ensure_node(edge.source_kind, edge.source_key, edge.source_id)
ensure_node(edge.target_kind, edge.target_key, edge.target_id)
edges = [
{
edges = []
for index, edge in enumerate(graph.edges):
edge_id = f"{edge.source_key}->{edge.target_key}:{index}"
source_data = nodes[edge.source_key]["data"]
target_data = nodes[edge.target_key]["data"]
edge_sources[edge_id] = edge
edges.append(
{
"data": {
"id": f"{edge.source_key}->{edge.target_key}:{index}",
"id": edge_id,
"key": edge_id,
"stableKey": edge_id,
"kind": "edge",
"layer": "dependency",
"source": edge.source_key,
"target": edge.target_key,
"sourceKind": edge.source_kind,
"targetKind": edge.target_kind,
"sourceLayer": self._dependency_layer(edge.source_kind),
"targetLayer": self._dependency_layer(edge.target_kind),
"dependencyType": edge.dependency_type,
"strength": edge.strength,
"edgeSource": edge.source,
"sameLayer": edge.same_layer,
"freshnessState": (
"stale"
if edge.target_key in impact_by_key
else "changed"
if edge.source_key in changed_fact_keys
else "current"
),
"sourceMetadata": {
"key": edge.source_key,
"kind": edge.source_kind,
"layer": self._dependency_layer(edge.source_kind),
"name": source_data.get("name", edge.source_key),
},
"targetMetadata": {
"key": edge.target_key,
"kind": edge.target_kind,
"layer": self._dependency_layer(edge.target_kind),
"name": target_data.get("name", edge.target_key),
},
"label": edge.dependency_type,
},
"classes": " ".join(
@@ -1181,24 +1271,195 @@ class RegistryService:
if class_name
),
}
for index, edge in enumerate(graph.edges)
]
)
elements = [*nodes.values(), *edges]
visibility = self._evaluate_dependency_visibility(
elements,
merged_rules,
merged_overrides,
)
hidden_node_ids = {
element["data"]["id"]
for element in nodes.values()
if visibility[element["data"]["id"]]["displayState"] == "hide"
}
visible_elements: list[dict[str, object]] = []
hidden_elements: list[dict[str, object]] = []
orphaned_overrides = sorted(
key for key in merged_overrides if key not in visibility
)
for element in elements:
element_id = element["data"]["id"]
state = visibility[element_id]
if "source" in element["data"] and (
element["data"]["source"] in hidden_node_ids
or element["data"]["target"] in hidden_node_ids
):
state = {
**state,
"displayState": "hide",
"visibilityReason": "connected-node-hidden",
}
element["data"].update(state)
element["classes"] = " ".join(
part
for part in (
element.get("classes", ""),
f"display-{state['displayState']}",
"manual-override" if state["visibilitySource"] == "manual" else "",
"rule-derived" if state["visibilitySource"] == "rule" else "",
)
if part
)
if state["displayState"] == "hide":
hidden_elements.append(element)
else:
visible_elements.append(element)
return {
"repository": asdict(graph.repository),
"scope": asdict(graph.scope),
"mode": "impact" if impact is not None else "full",
"mode": (
profile.default_mode
if profile is not None and impact is None
else "impact"
if impact is not None
else "full"
),
"profile": asdict(profile) if profile is not None else None,
"metrics": {
"node_count": len(nodes),
"edge_count": len(edges),
"node_count": len(
[
element
for element in visible_elements
if "source" not in element["data"]
]
),
"edge_count": len(
[
element
for element in visible_elements
if "source" in element["data"]
]
),
"hidden_count": len(hidden_elements),
"blurred_count": len(
[
element
for element in visible_elements
if element["data"]["displayState"] == "blur"
]
),
"propagation_breadth": impact.propagation_breadth if impact else 0,
"max_depth": impact.max_depth if impact else 0,
"scope_impacted": impact.scope_impacted if impact else False,
},
"filter": {
"rules": merged_rules,
"manual_overrides": merged_overrides,
"orphaned_overrides": orphaned_overrides,
"precedence": "later rules override earlier rules; manual overrides win last",
"connected_edge_behavior": "edges connected to hidden nodes are hidden",
},
"changed_fact_keys": impact.changed_fact_keys if impact else [],
"elements": [*nodes.values(), *edges],
"elements": visible_elements,
"hidden_elements": hidden_elements,
"impacts": [asdict(item) for item in impact.impacts] if impact else [],
}
def list_dependency_graph_profiles(
self,
repository_id: int,
) -> list[DependencyGraphViewProfile]:
return self.store.list_dependency_graph_profiles(repository_id)
def get_dependency_graph_profile(
self,
repository_id: int,
profile_id: int,
) -> DependencyGraphViewProfile:
return self.store.get_dependency_graph_profile(repository_id, profile_id)
def create_dependency_graph_profile(
self,
repository_id: int,
*,
name: str,
description: str = "",
default_mode: str = "full",
filter_rules: list[dict[str, Any]] | None = None,
manual_overrides: dict[str, str] | None = None,
) -> DependencyGraphViewProfile:
self._validate_dependency_graph_profile_payload(
default_mode,
filter_rules or [],
manual_overrides or {},
)
return self.store.create_dependency_graph_profile(
repository_id,
name=name,
description=description,
default_mode=default_mode,
filter_rules=filter_rules or [],
manual_overrides=manual_overrides or {},
)
def update_dependency_graph_profile(
self,
repository_id: int,
profile_id: int,
*,
name: str | None = None,
description: str | None = None,
default_mode: str | None = None,
filter_rules: list[dict[str, Any]] | None = None,
manual_overrides: dict[str, str] | None = None,
) -> DependencyGraphViewProfile:
if default_mode is not None or filter_rules is not None or manual_overrides is not None:
current = self.store.get_dependency_graph_profile(repository_id, profile_id)
self._validate_dependency_graph_profile_payload(
default_mode or current.default_mode,
filter_rules if filter_rules is not None else current.filter_rules,
manual_overrides
if manual_overrides is not None
else current.manual_overrides,
)
return self.store.update_dependency_graph_profile(
repository_id,
profile_id,
name=name,
description=description,
default_mode=default_mode,
filter_rules=filter_rules,
manual_overrides=manual_overrides,
)
def duplicate_dependency_graph_profile(
self,
repository_id: int,
profile_id: int,
*,
name: str | None = None,
) -> DependencyGraphViewProfile:
profile = self.store.get_dependency_graph_profile(repository_id, profile_id)
return self.store.create_dependency_graph_profile(
repository_id,
name=name or f"{profile.name} Copy",
description=profile.description,
default_mode=profile.default_mode,
filter_rules=profile.filter_rules,
manual_overrides=profile.manual_overrides,
)
def delete_dependency_graph_profile(
self,
repository_id: int,
profile_id: int,
) -> None:
self.store.delete_dependency_graph_profile(repository_id, profile_id)
def approve_analysis_run_changes(
self,
repository_id: int,
@@ -2252,6 +2513,180 @@ class RegistryService:
for fact in facts
}
def _dependency_characteristic_index(
self,
ability_map: RepositoryAbilityMap,
) -> dict[str, dict[str, object]]:
index: dict[str, dict[str, object]] = {
self._dependency_key("scope", ability_map.scope.id): {
"name": ability_map.scope.name,
"description": ability_map.scope.description,
"primaryClass": "scope",
"attributes": ["scope"],
"confidence": ability_map.scope.confidence,
"sourceReferences": [],
}
}
for ability in ability_map.abilities:
index[self._dependency_key("ability", ability.id)] = {
"name": ability.name,
"description": ability.description,
"primaryClass": ability.primary_class,
"attributes": ability.attributes,
"confidence": ability.confidence,
"sourceReferences": [],
}
for capability in ability.capabilities:
index[self._dependency_key("capability", capability.id)] = {
"name": capability.name,
"description": capability.description,
"primaryClass": capability.primary_class,
"attributes": capability.attributes,
"confidence": capability.confidence,
"sourceReferences": [],
}
for feature in capability.features:
index[self._dependency_key("feature", feature.id)] = {
"name": feature.name,
"description": feature.location,
"primaryClass": feature.primary_class or feature.type,
"attributes": feature.attributes,
"confidence": feature.confidence,
"path": feature.location,
"sourceReferences": [
asdict(source_ref) for source_ref in feature.source_refs
],
}
for evidence in capability.evidence:
index[self._dependency_key("evidence", evidence.id)] = {
"name": evidence.reference,
"description": evidence.type,
"primaryClass": evidence.type,
"attributes": [evidence.type, evidence.strength],
"confidence": self._evidence_confidence(evidence.strength),
"sourceReferences": [
asdict(source_ref) for source_ref in evidence.source_refs
],
}
return index
def _dependency_fact_attributes(self, fact: ObservedFact) -> list[str]:
attributes = [fact.kind]
for key in ("source_role", "classification", "language", "framework"):
value = fact.metadata.get(key)
if isinstance(value, str) and value:
attributes.append(value)
return sorted(set(attributes))
def _dependency_layer(self, kind: str) -> str:
if kind in {"fact", "evidence", "feature", "capability", "ability", "scope"}:
return kind
return "dependency"
def _evaluate_dependency_visibility(
self,
elements: list[dict[str, object]],
rules: list[dict[str, Any]],
manual_overrides: dict[str, str],
) -> dict[str, dict[str, str]]:
visibility: dict[str, dict[str, str]] = {}
for element in elements:
data = element["data"]
element_id = str(data["id"])
state = "show"
source = "default"
reason = "default"
for index, rule in enumerate(rules):
action = str(rule.get("action", "show"))
if action not in {"show", "blur", "hide"}:
continue
if self._dependency_rule_matches(data, rule):
state = action
source = "rule"
reason = str(rule.get("name") or f"rule:{index}")
override = manual_overrides.get(element_id)
if override in {"show", "blur", "hide"}:
state = override
source = "manual"
reason = "manual-override"
visibility[element_id] = {
"displayState": state,
"visibilitySource": source,
"visibilityReason": reason,
}
return visibility
def _dependency_rule_matches(
self,
data: dict[str, Any],
rule: dict[str, Any],
) -> bool:
match = rule.get("match", rule)
if not isinstance(match, dict):
return False
for key, expected in match.items():
if key in {"action", "name", "description"}:
continue
if key == "text":
text = " ".join(
str(data.get(part, ""))
for part in ("label", "name", "description", "path", "value")
).lower()
if str(expected).lower() not in text:
return False
continue
if key == "path":
if str(expected).lower() not in str(data.get("path", "")).lower():
return False
continue
actual = data.get(key)
if key == "dependencyType":
actual = data.get("dependencyType")
elif key == "sameLayer":
actual = bool(data.get("sameLayer"))
elif key == "attributes":
actual_values = set(data.get("attributes") or [])
expected_values = expected if isinstance(expected, list) else [expected]
if not set(expected_values).intersection(actual_values):
return False
continue
elif key == "confidence":
if actual is None:
return False
threshold = float(expected)
if float(actual) < threshold:
return False
continue
if isinstance(expected, list):
if actual not in expected:
return False
elif actual != expected:
return False
return True
def _validate_dependency_graph_profile_payload(
self,
default_mode: str,
filter_rules: list[dict[str, Any]],
manual_overrides: dict[str, str],
) -> None:
if default_mode not in {"full", "impact", "path"}:
raise ValueError("default_mode must be one of full, impact, or path")
for rule in filter_rules:
action = rule.get("action")
if action not in {"show", "blur", "hide"}:
raise ValueError("filter rule action must be show, blur, or hide")
invalid = {
key: value
for key, value in manual_overrides.items()
if value not in {"show", "blur", "hide"}
}
if invalid:
raise ValueError("manual override values must be show, blur, or hide")
def _evidence_confidence(self, strength: str) -> float:
return {"strong": 0.9, "medium": 0.6, "weak": 0.3}.get(strength, 0.5)
def _capability_dependency_edges(
self,
capability,
@@ -2290,14 +2725,19 @@ class RegistryService:
)
for evidence in capability.evidence:
evidence_key = self._dependency_key("evidence", evidence.id)
evidence_target_kind = evidence.target_kind or "capability"
evidence_target_id = evidence.target_id or capability.id
edges.append(
self._dependency_edge(
source_kind="evidence",
source_id=evidence.id,
source_key=evidence_key,
target_kind="capability",
target_id=capability.id,
target_key=capability_key,
target_kind=evidence_target_kind,
target_id=evidence_target_id,
target_key=self._dependency_key(
evidence_target_kind,
evidence_target_id,
),
dependency_type="supports",
strength=evidence.strength or "medium",
source="approved_characteristic",