feat(prompts): implement Phase 5 - Dependency Tracking (FR-6)

Add directed dependency graph with cycle detection, topological sort,
and query service for finding dependents/dependencies transitively.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-09 13:18:18 +01:00
parent c56c92c815
commit 9ce157400e
13 changed files with 3021 additions and 0 deletions

View File

@@ -0,0 +1,40 @@
"""
Dependency tracking for prompt artifacts.
Implements FR-6: Dependency Tracking
- FR-6.1: Directed dependency edges between artifacts
- FR-6.2: Cross-space dependency graph
- FR-6.3: Circular dependency detection
"""
from markitect.prompts.dependencies.models import (
EdgeType,
DependencyEdge,
DependencyGraph,
CircularDependencyError,
)
from markitect.prompts.dependencies.repository import (
IDependencyRepository,
SQLiteDependencyRepository,
DependencyRepositoryError,
DuplicateDependencyError,
)
from markitect.prompts.dependencies.graph import GraphBuilder
from markitect.prompts.dependencies.queries import DependencyQueryService
__all__ = [
# Models
"EdgeType",
"DependencyEdge",
"DependencyGraph",
"CircularDependencyError",
# Repository
"IDependencyRepository",
"SQLiteDependencyRepository",
"DependencyRepositoryError",
"DuplicateDependencyError",
# Graph
"GraphBuilder",
# Queries
"DependencyQueryService",
]

View File

@@ -0,0 +1,145 @@
"""
Graph builder for bridging ephemeral manifest edges to persistent storage.
Bridges RunManifest ephemeral DependencyEdge instances (3-field) to
persistent DependencyEdge storage (6-field), and builds DependencyGraph
from repository data.
"""
from typing import List, Optional, Set
from markitect.prompts.dependencies.models import (
DependencyEdge,
DependencyGraph,
EdgeType,
)
from markitect.prompts.dependencies.repository import IDependencyRepository
from markitect.prompts.execution.manifest import (
DependencyEdge as ManifestDependencyEdge,
RunManifest,
)
class GraphBuilder:
"""
Bridges ephemeral manifest edges to persistent dependency storage.
Extracts dependency edges from RunManifest instances, persists them
via IDependencyRepository, and builds in-memory DependencyGraph
instances for analysis.
"""
def __init__(self, repository: IDependencyRepository):
"""
Initialize with dependency repository.
Args:
repository: Repository for persisting dependency edges
"""
self._repository = repository
def extract_edges(self, manifest: RunManifest) -> List[DependencyEdge]:
"""
Extract persistent DependencyEdge instances from a RunManifest.
Converts the ephemeral 3-field DependencyEdge from the manifest
into persistent 6-field DependencyEdge instances.
Args:
manifest: RunManifest containing ephemeral edges
Returns:
List of persistent DependencyEdge instances
"""
edges = []
for manifest_edge in manifest.dependency_edges:
edge_type = EdgeType(manifest_edge.edge_type)
edge = DependencyEdge.create(
source_artifact_id=manifest_edge.source_id,
target_artifact_id=manifest_edge.target_id,
run_id=manifest.run_id,
edge_type=edge_type,
)
edges.append(edge)
return edges
def persist_edges(self, manifest: RunManifest) -> List[DependencyEdge]:
"""
Extract edges from manifest and persist them to the repository.
Args:
manifest: RunManifest containing ephemeral edges
Returns:
List of persisted DependencyEdge instances
"""
edges = self.extract_edges(manifest)
persisted = []
for edge in edges:
created = self._repository.create(edge)
persisted.append(created)
return persisted
def build_graph(
self,
artifact_ids: Optional[Set[str]] = None,
) -> DependencyGraph:
"""
Build an in-memory DependencyGraph from repository data.
Args:
artifact_ids: Optional set of artifact IDs to scope the graph.
If None, builds graph from all edges.
Returns:
DependencyGraph instance
"""
graph = DependencyGraph()
if artifact_ids is not None:
# Build scoped subgraph
for artifact_id in artifact_ids:
edges = self._repository.get_by_source(artifact_id)
for edge in edges:
if edge.target_artifact_id in artifact_ids:
graph.add_edge(
edge.source_artifact_id,
edge.target_artifact_id,
edge.edge_type,
)
# Ensure node exists even if it has no edges
if artifact_id not in graph._forward:
graph._forward[artifact_id] = set()
if artifact_id not in graph._reverse:
graph._reverse[artifact_id] = set()
else:
# Build full graph
all_edges = self._repository.get_all()
for edge in all_edges:
graph.add_edge(
edge.source_artifact_id,
edge.target_artifact_id,
edge.edge_type,
)
return graph
def build_graph_for_run(self, run_id: str) -> DependencyGraph:
"""
Build a DependencyGraph from edges of a specific run.
Args:
run_id: Run identifier
Returns:
DependencyGraph for the run's edges
"""
graph = DependencyGraph()
edges = self._repository.get_by_run(run_id)
for edge in edges:
graph.add_edge(
edge.source_artifact_id,
edge.target_artifact_id,
edge.edge_type,
)
return graph

View File

@@ -0,0 +1,308 @@
"""
Dependency tracking models for prompt artifact graphs.
Implements FR-6: Dependency Tracking
- FR-6.1: Directed dependency edges between artifacts
- FR-6.2: Cross-space dependency graph
- FR-6.3: Circular dependency detection
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, Any, List, Optional, Set, Tuple
import uuid
class EdgeType(Enum):
"""Type of dependency relationship between artifacts."""
REQUIRES = "requires"
GENERATES = "generates"
INCLUDES = "includes"
class CircularDependencyError(Exception):
"""
Raised when a circular dependency is detected in the graph.
Attributes:
cycle: List of node IDs forming the cycle
"""
def __init__(self, message: str, cycle: Optional[List[str]] = None):
super().__init__(message)
self.cycle = cycle or []
@dataclass
class DependencyEdge:
"""
Persistent dependency edge between artifacts.
Extends the ephemeral DependencyEdge from execution/manifest.py
with persistence fields (id, run_id, created_at).
Attributes:
id: Unique edge identifier
source_artifact_id: Source artifact ID
target_artifact_id: Target artifact ID
run_id: ID of the run that created this edge
edge_type: Type of dependency relationship
created_at: When this edge was created
"""
id: str
source_artifact_id: str
target_artifact_id: str
run_id: str
edge_type: EdgeType
created_at: datetime = field(default_factory=datetime.utcnow)
@classmethod
def create(
cls,
source_artifact_id: str,
target_artifact_id: str,
run_id: str,
edge_type: EdgeType,
) -> "DependencyEdge":
"""
Create a new dependency edge.
Args:
source_artifact_id: Source artifact ID
target_artifact_id: Target artifact ID
run_id: Run that established this dependency
edge_type: Type of dependency
Returns:
New DependencyEdge instance
"""
return cls(
id=str(uuid.uuid4()),
source_artifact_id=source_artifact_id,
target_artifact_id=target_artifact_id,
run_id=run_id,
edge_type=edge_type,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"source_artifact_id": self.source_artifact_id,
"target_artifact_id": self.target_artifact_id,
"run_id": self.run_id,
"edge_type": self.edge_type.value,
"created_at": self.created_at.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DependencyEdge":
"""Create from dictionary."""
return cls(
id=data["id"],
source_artifact_id=data["source_artifact_id"],
target_artifact_id=data["target_artifact_id"],
run_id=data["run_id"],
edge_type=EdgeType(data["edge_type"]),
created_at=datetime.fromisoformat(data["created_at"]),
)
# DFS node coloring for cycle detection
_WHITE = 0 # Unvisited
_GRAY = 1 # In current DFS path
_BLACK = 2 # Fully processed
class DependencyGraph:
"""
In-memory directed graph for dependency analysis.
Maintains forward and reverse adjacency lists for efficient
traversal in both directions. Supports cycle detection via
3-color DFS and topological sorting.
Implements FR-6.2: Cross-space dependency graph
Implements FR-6.3: Circular dependency detection
"""
def __init__(self) -> None:
self._forward: Dict[str, Set[str]] = {}
self._reverse: Dict[str, Set[str]] = {}
self._edge_types: Dict[Tuple[str, str], EdgeType] = {}
@property
def nodes(self) -> Set[str]:
"""All nodes in the graph."""
return set(self._forward.keys()) | set(self._reverse.keys())
@property
def edge_count(self) -> int:
"""Total number of edges."""
return sum(len(targets) for targets in self._forward.values())
def add_edge(
self,
source: str,
target: str,
edge_type: EdgeType = EdgeType.REQUIRES,
) -> None:
"""
Add a directed edge from source to target.
Args:
source: Source node ID
target: Target node ID
edge_type: Type of dependency
"""
if source not in self._forward:
self._forward[source] = set()
if target not in self._forward:
self._forward[target] = set()
if source not in self._reverse:
self._reverse[source] = set()
if target not in self._reverse:
self._reverse[target] = set()
self._forward[source].add(target)
self._reverse[target].add(source)
self._edge_types[(source, target)] = edge_type
def get_successors(self, node: str) -> Set[str]:
"""Get direct successors (dependencies) of a node."""
return set(self._forward.get(node, set()))
def get_predecessors(self, node: str) -> Set[str]:
"""Get direct predecessors (dependents) of a node."""
return set(self._reverse.get(node, set()))
def get_edge_type(self, source: str, target: str) -> Optional[EdgeType]:
"""Get the edge type between two nodes."""
return self._edge_types.get((source, target))
def has_edge(self, source: str, target: str) -> bool:
"""Check if an edge exists between source and target."""
return target in self._forward.get(source, set())
def detect_cycles(self) -> List[List[str]]:
"""
Detect all cycles in the graph using 3-color DFS.
Returns:
List of cycles, where each cycle is a list of node IDs
"""
color: Dict[str, int] = {node: _WHITE for node in self.nodes}
parent: Dict[str, Optional[str]] = {node: None for node in self.nodes}
cycles: List[List[str]] = []
def _dfs(node: str) -> None:
color[node] = _GRAY
for neighbor in self._forward.get(node, set()):
if color.get(neighbor, _WHITE) == _GRAY:
# Back edge found - extract cycle
cycle = [neighbor]
current = node
while current != neighbor:
cycle.append(current)
current = parent[current]
cycle.append(neighbor)
cycle.reverse()
cycles.append(cycle)
elif color.get(neighbor, _WHITE) == _WHITE:
parent[neighbor] = node
_dfs(neighbor)
color[node] = _BLACK
for node in self.nodes:
if color.get(node, _WHITE) == _WHITE:
_dfs(node)
return cycles
def has_cycle(self) -> bool:
"""Check if the graph contains any cycles."""
color: Dict[str, int] = {node: _WHITE for node in self.nodes}
def _dfs(node: str) -> bool:
color[node] = _GRAY
for neighbor in self._forward.get(node, set()):
if color.get(neighbor, _WHITE) == _GRAY:
return True
if color.get(neighbor, _WHITE) == _WHITE and _dfs(neighbor):
return True
color[node] = _BLACK
return False
for node in self.nodes:
if color.get(node, _WHITE) == _WHITE:
if _dfs(node):
return True
return False
def topological_sort(self) -> List[str]:
"""
Topologically sort the graph nodes.
Returns:
Nodes in topological order (dependencies before dependents)
Raises:
CircularDependencyError: If graph contains cycles
"""
color: Dict[str, int] = {node: _WHITE for node in self.nodes}
result: List[str] = []
cycle_path: List[str] = []
def _dfs(node: str) -> bool:
color[node] = _GRAY
cycle_path.append(node)
for neighbor in sorted(self._forward.get(node, set())):
if color.get(neighbor, _WHITE) == _GRAY:
# Extract cycle for error reporting
idx = cycle_path.index(neighbor)
cycle = cycle_path[idx:] + [neighbor]
raise CircularDependencyError(
f"Circular dependency detected: {' -> '.join(cycle)}",
cycle=cycle,
)
if color.get(neighbor, _WHITE) == _WHITE:
_dfs(neighbor)
cycle_path.pop()
color[node] = _BLACK
result.append(node)
for node in sorted(self.nodes):
if color.get(node, _WHITE) == _WHITE:
_dfs(node)
result.reverse()
return result
def get_subgraph(self, node_ids: Set[str]) -> "DependencyGraph":
"""
Extract a subgraph containing only the specified nodes.
Args:
node_ids: Set of node IDs to include
Returns:
New DependencyGraph with only the specified nodes and their edges
"""
subgraph = DependencyGraph()
for source in node_ids:
for target in self._forward.get(source, set()):
if target in node_ids:
edge_type = self._edge_types.get(
(source, target), EdgeType.REQUIRES
)
subgraph.add_edge(source, target, edge_type)
# Ensure isolated nodes are present
for node_id in node_ids:
if node_id not in subgraph._forward:
subgraph._forward[node_id] = set()
if node_id not in subgraph._reverse:
subgraph._reverse[node_id] = set()
return subgraph

View File

@@ -0,0 +1,221 @@
"""
Dependency query service for analyzing artifact dependency graphs.
Provides operations for finding dependents, dependencies, transitive
closures, dependency chains, cycle detection, and build ordering.
"""
from collections import deque
from typing import List, Optional, Set
from markitect.prompts.dependencies.models import (
CircularDependencyError,
DependencyGraph,
EdgeType,
)
from markitect.prompts.dependencies.repository import IDependencyRepository
from markitect.prompts.dependencies.graph import GraphBuilder
class DependencyQueryService:
"""
Service for querying dependency relationships between artifacts.
Provides direct and transitive dependency lookups, dependency chain
computation via BFS, cycle detection, pre-validation, and build
order (topological sort).
"""
def __init__(self, repository: IDependencyRepository):
"""
Initialize with dependency repository.
Args:
repository: Repository for reading dependency edges
"""
self._repository = repository
self._graph_builder = GraphBuilder(repository)
def find_dependents(self, artifact_id: str) -> Set[str]:
"""
Find direct dependents of an artifact (who depends on it).
Args:
artifact_id: Artifact to find dependents of
Returns:
Set of artifact IDs that directly depend on this artifact
"""
edges = self._repository.get_by_target(artifact_id)
return {edge.source_artifact_id for edge in edges}
def find_dependencies(self, artifact_id: str) -> Set[str]:
"""
Find direct dependencies of an artifact (what it depends on).
Args:
artifact_id: Artifact to find dependencies of
Returns:
Set of artifact IDs that this artifact directly depends on
"""
edges = self._repository.get_by_source(artifact_id)
return {edge.target_artifact_id for edge in edges}
def find_transitive_dependents(self, artifact_id: str) -> Set[str]:
"""
Find all transitive dependents of an artifact (full upstream impact).
Uses BFS to traverse the reverse dependency graph.
Args:
artifact_id: Artifact to find transitive dependents of
Returns:
Set of all artifact IDs that transitively depend on this artifact
"""
graph = self._graph_builder.build_graph()
visited: Set[str] = set()
queue = deque([artifact_id])
while queue:
current = queue.popleft()
for predecessor in graph.get_predecessors(current):
if predecessor not in visited:
visited.add(predecessor)
queue.append(predecessor)
return visited
def find_transitive_dependencies(self, artifact_id: str) -> Set[str]:
"""
Find all transitive dependencies of an artifact (full dependency tree).
Uses BFS to traverse the forward dependency graph.
Args:
artifact_id: Artifact to find transitive dependencies of
Returns:
Set of all artifact IDs this artifact transitively depends on
"""
graph = self._graph_builder.build_graph()
visited: Set[str] = set()
queue = deque([artifact_id])
while queue:
current = queue.popleft()
for successor in graph.get_successors(current):
if successor not in visited:
visited.add(successor)
queue.append(successor)
return visited
def get_dependency_chain(
self,
source_id: str,
target_id: str,
) -> Optional[List[str]]:
"""
Find a dependency chain between two artifacts using BFS.
Args:
source_id: Starting artifact ID
target_id: Target artifact ID
Returns:
List of artifact IDs forming the chain from source to target,
or None if no path exists
"""
graph = self._graph_builder.build_graph()
if source_id == target_id:
return [source_id]
visited: Set[str] = {source_id}
queue: deque[List[str]] = deque([[source_id]])
while queue:
path = queue.popleft()
current = path[-1]
for successor in graph.get_successors(current):
if successor == target_id:
return path + [successor]
if successor not in visited:
visited.add(successor)
queue.append(path + [successor])
return None
def detect_circular_dependencies(self) -> List[List[str]]:
"""
Detect all circular dependencies in the graph.
Returns:
List of cycles, where each cycle is a list of artifact IDs
"""
graph = self._graph_builder.build_graph()
return graph.detect_cycles()
def would_create_cycle(
self,
source_id: str,
target_id: str,
) -> bool:
"""
Check if adding an edge would create a cycle.
Pre-validation check before persisting a new dependency edge.
Args:
source_id: Proposed source artifact ID
target_id: Proposed target artifact ID
Returns:
True if adding this edge would create a cycle
"""
graph = self._graph_builder.build_graph()
# Adding source -> target creates a cycle if target can reach source
# (i.e., there's already a path from target to source)
if source_id == target_id:
return True
visited: Set[str] = set()
queue = deque([target_id])
while queue:
current = queue.popleft()
if current == source_id:
return True
for successor in graph.get_successors(current):
if successor not in visited:
visited.add(successor)
queue.append(successor)
return False
def get_build_order(
self,
artifact_ids: Optional[Set[str]] = None,
) -> List[str]:
"""
Get artifacts in build order (topological sort).
Dependencies appear before the artifacts that depend on them.
Args:
artifact_ids: Optional set of artifact IDs to include.
If None, returns build order for all artifacts.
Returns:
List of artifact IDs in build order
Raises:
CircularDependencyError: If graph contains cycles
"""
graph = self._graph_builder.build_graph(artifact_ids)
order = graph.topological_sort()
order.reverse() # Dependencies before dependents
return order

View File

@@ -0,0 +1,429 @@
"""
Repository interfaces and SQLite implementation for dependency persistence.
Implements FR-6.1: Directed dependency edges between artifacts.
Mirrors the SQLiteArtifactRepository pattern for consistency.
"""
import sqlite3
import json
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional
from datetime import datetime
from markitect.prompts.dependencies.models import DependencyEdge, EdgeType
class DependencyRepositoryError(Exception):
"""Base exception for dependency repository errors."""
pass
class DuplicateDependencyError(DependencyRepositoryError):
"""Raised when attempting to create a duplicate dependency edge."""
pass
# SQL Schema for dependency tables
DEPENDENCY_TABLES_SQL = """
CREATE TABLE IF NOT EXISTS prompt_dependencies (
id TEXT PRIMARY KEY,
source_artifact_id TEXT NOT NULL,
target_artifact_id TEXT NOT NULL,
run_id TEXT NOT NULL,
edge_type TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(source_artifact_id, target_artifact_id, run_id)
);
CREATE INDEX IF NOT EXISTS idx_deps_source ON prompt_dependencies(source_artifact_id);
CREATE INDEX IF NOT EXISTS idx_deps_target ON prompt_dependencies(target_artifact_id);
CREATE INDEX IF NOT EXISTS idx_deps_run ON prompt_dependencies(run_id);
CREATE INDEX IF NOT EXISTS idx_deps_type ON prompt_dependencies(edge_type);
CREATE INDEX IF NOT EXISTS idx_deps_source_target ON prompt_dependencies(source_artifact_id, target_artifact_id);
"""
def initialize_dependency_tables(db_path: str) -> None:
"""
Initialize the dependency-related database tables.
Args:
db_path: Path to the SQLite database file
"""
db_dir = Path(db_path).parent
if db_dir and not db_dir.exists():
db_dir.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path)
try:
conn.executescript(DEPENDENCY_TABLES_SQL)
conn.commit()
finally:
conn.close()
class IDependencyRepository(ABC):
"""
Abstract interface for dependency edge persistence.
Implements FR-6.1: Directed dependency edges between artifacts.
"""
@abstractmethod
def create(self, edge: DependencyEdge) -> DependencyEdge:
"""
Persist a new dependency edge.
Args:
edge: Dependency edge to create
Returns:
Created edge
Raises:
DuplicateDependencyError: If edge with same source+target+run exists
DependencyRepositoryError: On other persistence errors
"""
pass
@abstractmethod
def get_by_id(self, edge_id: str) -> Optional[DependencyEdge]:
"""
Retrieve edge by ID.
Args:
edge_id: Edge identifier
Returns:
Edge if found, None otherwise
"""
pass
@abstractmethod
def get_by_source(self, source_artifact_id: str) -> List[DependencyEdge]:
"""
Get all edges originating from a source artifact.
Args:
source_artifact_id: Source artifact ID
Returns:
List of edges from this source
"""
pass
@abstractmethod
def get_by_target(self, target_artifact_id: str) -> List[DependencyEdge]:
"""
Get all edges pointing to a target artifact.
Args:
target_artifact_id: Target artifact ID
Returns:
List of edges to this target
"""
pass
@abstractmethod
def get_by_run(self, run_id: str) -> List[DependencyEdge]:
"""
Get all edges created by a specific run.
Args:
run_id: Run identifier
Returns:
List of edges from this run
"""
pass
@abstractmethod
def get_by_type(self, edge_type: EdgeType) -> List[DependencyEdge]:
"""
Get all edges of a specific type.
Args:
edge_type: Edge type to filter by
Returns:
List of edges of this type
"""
pass
@abstractmethod
def get_all(self) -> List[DependencyEdge]:
"""
Get all dependency edges.
Returns:
List of all edges
"""
pass
@abstractmethod
def delete(self, edge_id: str) -> bool:
"""
Delete a dependency edge.
Args:
edge_id: Edge identifier
Returns:
True if deleted, False if not found
"""
pass
@abstractmethod
def delete_by_run(self, run_id: str) -> int:
"""
Delete all edges created by a specific run.
Args:
run_id: Run identifier
Returns:
Number of edges deleted
"""
pass
class SQLiteDependencyRepository(IDependencyRepository):
"""
SQLite implementation of dependency repository.
Provides persistent storage for dependency edges with query support
by source, target, run, and edge type.
"""
def __init__(self, db_path: str):
"""
Initialize repository with database path.
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
initialize_dependency_tables(db_path)
def _get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _row_to_edge(self, row: sqlite3.Row) -> DependencyEdge:
"""Convert database row to DependencyEdge instance."""
return DependencyEdge(
id=row["id"],
source_artifact_id=row["source_artifact_id"],
target_artifact_id=row["target_artifact_id"],
run_id=row["run_id"],
edge_type=EdgeType(row["edge_type"]),
created_at=datetime.fromisoformat(row["created_at"]),
)
def create(self, edge: DependencyEdge) -> DependencyEdge:
"""
Persist a new dependency edge.
Args:
edge: Dependency edge to create
Returns:
Created edge
Raises:
DuplicateDependencyError: If edge with same source+target+run exists
DependencyRepositoryError: On other persistence errors
"""
conn = self._get_connection()
try:
conn.execute(
"""
INSERT INTO prompt_dependencies (
id, source_artifact_id, target_artifact_id,
run_id, edge_type, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
edge.id,
edge.source_artifact_id,
edge.target_artifact_id,
edge.run_id,
edge.edge_type.value,
edge.created_at.isoformat(),
),
)
conn.commit()
return edge
except sqlite3.IntegrityError as e:
if "UNIQUE constraint" in str(e):
raise DuplicateDependencyError(
f"Dependency edge from '{edge.source_artifact_id}' to "
f"'{edge.target_artifact_id}' already exists for run '{edge.run_id}'"
)
raise DependencyRepositoryError(f"Database integrity error: {e}")
except Exception as e:
raise DependencyRepositoryError(f"Failed to create dependency edge: {e}")
finally:
conn.close()
def get_by_id(self, edge_id: str) -> Optional[DependencyEdge]:
"""
Retrieve edge by ID.
Args:
edge_id: Edge identifier
Returns:
Edge if found, None otherwise
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM prompt_dependencies WHERE id = ?",
(edge_id,),
)
row = cursor.fetchone()
return self._row_to_edge(row) if row else None
finally:
conn.close()
def get_by_source(self, source_artifact_id: str) -> List[DependencyEdge]:
"""
Get all edges originating from a source artifact.
Args:
source_artifact_id: Source artifact ID
Returns:
List of edges from this source
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM prompt_dependencies WHERE source_artifact_id = ?",
(source_artifact_id,),
)
return [self._row_to_edge(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_by_target(self, target_artifact_id: str) -> List[DependencyEdge]:
"""
Get all edges pointing to a target artifact.
Args:
target_artifact_id: Target artifact ID
Returns:
List of edges to this target
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM prompt_dependencies WHERE target_artifact_id = ?",
(target_artifact_id,),
)
return [self._row_to_edge(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_by_run(self, run_id: str) -> List[DependencyEdge]:
"""
Get all edges created by a specific run.
Args:
run_id: Run identifier
Returns:
List of edges from this run
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM prompt_dependencies WHERE run_id = ?",
(run_id,),
)
return [self._row_to_edge(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_by_type(self, edge_type: EdgeType) -> List[DependencyEdge]:
"""
Get all edges of a specific type.
Args:
edge_type: Edge type to filter by
Returns:
List of edges of this type
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM prompt_dependencies WHERE edge_type = ?",
(edge_type.value,),
)
return [self._row_to_edge(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_all(self) -> List[DependencyEdge]:
"""
Get all dependency edges.
Returns:
List of all edges
"""
conn = self._get_connection()
try:
cursor = conn.execute("SELECT * FROM prompt_dependencies")
return [self._row_to_edge(row) for row in cursor.fetchall()]
finally:
conn.close()
def delete(self, edge_id: str) -> bool:
"""
Delete a dependency edge.
Args:
edge_id: Edge identifier
Returns:
True if deleted, False if not found
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"DELETE FROM prompt_dependencies WHERE id = ?",
(edge_id,),
)
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()
def delete_by_run(self, run_id: str) -> int:
"""
Delete all edges created by a specific run.
Args:
run_id: Run identifier
Returns:
Number of edges deleted
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"DELETE FROM prompt_dependencies WHERE run_id = ?",
(run_id,),
)
conn.commit()
return cursor.rowcount
finally:
conn.close()