feat(prompts): implement Phase 6 - Incremental Execution (FR-7, FR-8)

Add change detection, structural diff-based impact analysis,
configurable-depth incremental recomputation with circular suppression,
and impact debt tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-09 13:18:27 +01:00
parent 9ce157400e
commit bd1d05ba79
13 changed files with 2446 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
"""
Incremental execution for prompt artifacts.
Implements FR-7: Incremental Recomputation
Implements FR-8: Impact Analysis
- FR-7.1: Change detection with persistent records
- FR-7.2: Configurable-depth incremental recomputation
- FR-8.1: Structural diff-based impact analysis
- FR-8.2: Impact threshold decisions
"""
from markitect.prompts.incremental.models import (
ChangeType,
ArtifactChange,
ImpactDebt,
RecomputeConfig,
RecomputeResult,
)
from markitect.prompts.incremental.detector import ChangeDetector
from markitect.prompts.incremental.metrics import (
structural_diff_ratio,
line_diff_ratio,
calculate_change_magnitude,
)
from markitect.prompts.incremental.impact import ImpactAnalyzer
from markitect.prompts.incremental.engine import IncrementalExecutionEngine
__all__ = [
# Models
"ChangeType",
"ArtifactChange",
"ImpactDebt",
"RecomputeConfig",
"RecomputeResult",
# Detector
"ChangeDetector",
# Metrics
"structural_diff_ratio",
"line_diff_ratio",
"calculate_change_magnitude",
# Impact
"ImpactAnalyzer",
# Engine
"IncrementalExecutionEngine",
]

View File

@@ -0,0 +1,232 @@
"""
Change detector for artifact content changes.
Implements FR-7.1: Change detection with persistent records.
Detects changes by comparing content digests and persists
ArtifactChange records to the artifact_changes table.
"""
import sqlite3
from pathlib import Path
from typing import List, Optional
from markitect.prompts.models import Artifact, calculate_content_digest
from markitect.prompts.incremental.models import ArtifactChange, ChangeType
# SQL schema for artifact_changes table
CHANGES_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS artifact_changes (
id TEXT PRIMARY KEY,
artifact_id TEXT NOT NULL,
old_digest TEXT,
new_digest TEXT NOT NULL,
change_type TEXT NOT NULL,
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_changes_artifact ON artifact_changes(artifact_id);
CREATE INDEX IF NOT EXISTS idx_changes_type ON artifact_changes(change_type);
"""
class ChangeDetector:
"""
Detects and records artifact content changes.
Uses digest comparison to detect changes and persists
ArtifactChange records to SQLite for auditing and
incremental recomputation triggering.
"""
def __init__(self, db_path: str):
"""
Initialize with database path and ensure tables exist.
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
self._initialize_tables()
def _initialize_tables(self) -> None:
"""Initialize artifact_changes table if not exists."""
db_dir = Path(self.db_path).parent
if db_dir and not db_dir.exists():
db_dir.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
try:
conn.executescript(CHANGES_TABLE_SQL)
conn.commit()
finally:
conn.close()
def _get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def detect_change(
self,
artifact: Artifact,
new_content: str,
) -> Optional[ArtifactChange]:
"""
Detect if artifact content has changed.
Compares the artifact's stored digest against the new content digest.
If changed, creates and returns an ArtifactChange record (not yet persisted).
Args:
artifact: Existing artifact to check
new_content: New content to compare against
Returns:
ArtifactChange if content changed, None otherwise
"""
new_digest = calculate_content_digest(new_content)
if not artifact.has_changed(new_digest):
return None
return ArtifactChange.create(
artifact_id=artifact.id,
old_digest=artifact.content_digest,
new_digest=new_digest,
change_type=ChangeType.MODIFIED,
)
def detect_creation(
self,
artifact_id: str,
content: str,
) -> ArtifactChange:
"""
Record creation of a new artifact.
Args:
artifact_id: ID of newly created artifact
content: Content of new artifact
Returns:
ArtifactChange record for the creation
"""
new_digest = calculate_content_digest(content)
return ArtifactChange.create(
artifact_id=artifact_id,
old_digest=None,
new_digest=new_digest,
change_type=ChangeType.CREATED,
)
def detect_deletion(
self,
artifact: Artifact,
) -> ArtifactChange:
"""
Record deletion of an artifact.
Args:
artifact: Artifact being deleted
Returns:
ArtifactChange record for the deletion
"""
return ArtifactChange.create(
artifact_id=artifact.id,
old_digest=artifact.content_digest,
new_digest=artifact.content_digest,
change_type=ChangeType.DELETED,
)
def record_change(self, change: ArtifactChange) -> ArtifactChange:
"""
Persist an ArtifactChange record to the database.
Args:
change: Change record to persist
Returns:
Persisted change record
"""
conn = self._get_connection()
try:
conn.execute(
"""
INSERT INTO artifact_changes (
id, artifact_id, old_digest, new_digest,
change_type, detected_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
change.id,
change.artifact_id,
change.old_digest,
change.new_digest,
change.change_type.value,
change.detected_at.isoformat(),
),
)
conn.commit()
return change
finally:
conn.close()
def get_changes_for_artifact(self, artifact_id: str) -> List[ArtifactChange]:
"""
Get all recorded changes for an artifact.
Args:
artifact_id: Artifact identifier
Returns:
List of change records, ordered by detection time
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"""
SELECT * FROM artifact_changes
WHERE artifact_id = ?
ORDER BY detected_at
""",
(artifact_id,),
)
return [self._row_to_change(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_changes_by_type(self, change_type: ChangeType) -> List[ArtifactChange]:
"""
Get all recorded changes of a specific type.
Args:
change_type: Type of change to filter by
Returns:
List of change records
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM artifact_changes WHERE change_type = ?",
(change_type.value,),
)
return [self._row_to_change(row) for row in cursor.fetchall()]
finally:
conn.close()
def _row_to_change(self, row: sqlite3.Row) -> ArtifactChange:
"""Convert database row to ArtifactChange instance."""
from datetime import datetime
return ArtifactChange(
id=row["id"],
artifact_id=row["artifact_id"],
old_digest=row["old_digest"],
new_digest=row["new_digest"],
change_type=ChangeType(row["change_type"]),
detected_at=datetime.fromisoformat(row["detected_at"]),
)

View File

@@ -0,0 +1,327 @@
"""
Incremental execution engine.
Implements FR-7: Incremental Recomputation
Implements FR-8: Impact Analysis
Orchestrates: find dependents → check cycles → assess impact →
execute or suppress → record debt.
"""
import sqlite3
from collections import deque
from pathlib import Path
from typing import Callable, List, Optional, Set
from markitect.prompts.dependencies.queries import DependencyQueryService
from markitect.prompts.execution.models import PromptRun
from markitect.prompts.incremental.impact import ImpactAnalyzer
from markitect.prompts.incremental.models import (
ArtifactChange,
ImpactDebt,
RecomputeConfig,
RecomputeResult,
)
# SQL schema for impact_debt table
DEBT_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS impact_debt (
id TEXT PRIMARY KEY,
artifact_id TEXT NOT NULL,
dependent_run_id TEXT NOT NULL,
change_magnitude REAL NOT NULL,
suppression_reason TEXT NOT NULL,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_debt_artifact ON impact_debt(artifact_id);
CREATE INDEX IF NOT EXISTS idx_debt_run ON impact_debt(dependent_run_id);
"""
class IncrementalExecutionEngine:
"""
Engine for incremental recomputation of dependent artifacts.
Orchestrates the full recompute flow:
1. Find dependents at configured depth (BFS)
2. Check for circular dependencies
3. Assess impact magnitude
4. Execute or suppress recomputation
5. Record impact debt for suppressed items
"""
def __init__(
self,
db_path: str,
query_service: DependencyQueryService,
impact_analyzer: Optional[ImpactAnalyzer] = None,
):
"""
Initialize engine with dependencies.
Args:
db_path: Path to SQLite database for debt persistence
query_service: Service for dependency graph queries
impact_analyzer: Impact analyzer (created if not provided)
"""
self.db_path = db_path
self.query_service = query_service
self.impact_analyzer = impact_analyzer or ImpactAnalyzer()
self._initialize_tables()
def _initialize_tables(self) -> None:
"""Initialize impact_debt table if not exists."""
db_dir = Path(self.db_path).parent
if db_dir and not db_dir.exists():
db_dir.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
try:
conn.executescript(DEBT_TABLE_SQL)
conn.commit()
finally:
conn.close()
def _get_connection(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def find_dependents_at_depth(
self,
artifact_id: str,
max_depth: int = 1,
) -> Set[str]:
"""
Find dependents up to a configured BFS depth.
Implements FR-7.2: Configurable depth control.
Args:
artifact_id: Artifact to find dependents of
max_depth: Maximum BFS layers to traverse
Returns:
Set of dependent artifact IDs within depth limit
"""
visited: Set[str] = set()
queue: deque[tuple] = deque([(artifact_id, 0)])
while queue:
current, depth = queue.popleft()
if depth >= max_depth:
continue
direct_dependents = self.query_service.find_dependents(current)
for dep in direct_dependents:
if dep not in visited:
visited.add(dep)
queue.append((dep, depth + 1))
return visited
def recompute(
self,
change: ArtifactChange,
config: Optional[RecomputeConfig] = None,
execution_callback: Optional[Callable[[str], Optional[PromptRun]]] = None,
old_content: Optional[str] = None,
new_content: Optional[str] = None,
) -> RecomputeResult:
"""
Perform incremental recomputation for a detected change.
Orchestrates the full recompute flow:
1. Find dependents at configured depth
2. For each dependent, check cycles, assess impact
3. Execute or suppress and record debt
Args:
change: Detected artifact change
config: Recompute configuration
execution_callback: Callable that takes a run_id and re-executes.
If None, records what would be recomputed without executing.
old_content: Old content for magnitude calculation
new_content: New content for magnitude calculation
Returns:
RecomputeResult summarizing the recomputation
"""
config = config or RecomputeConfig()
# Calculate change magnitude
magnitude = self.impact_analyzer.calculate_magnitude(
old_content, new_content
)
# Find dependents within configured depth
dependents = self.find_dependents_at_depth(
change.artifact_id, config.max_depth
)
result = RecomputeResult(
changed_artifact_id=change.artifact_id,
change=change,
total_dependents=len(dependents),
)
recompute_count = 0
for dependent_id in dependents:
# Budget check
if recompute_count >= config.max_recomputes:
debt = ImpactDebt.create(
artifact_id=change.artifact_id,
dependent_run_id=dependent_id,
change_magnitude=magnitude,
suppression_reason="budget_exhausted",
)
self._record_debt(debt)
result.suppressed.append(debt)
result.suppressed_count += 1
continue
# Circular dependency check
if config.suppress_circular:
if self.query_service.would_create_cycle(
dependent_id, change.artifact_id
):
debt = ImpactDebt.create(
artifact_id=change.artifact_id,
dependent_run_id=dependent_id,
change_magnitude=magnitude,
suppression_reason="circular_dependency",
)
self._record_debt(debt)
result.suppressed.append(debt)
result.suppressed_count += 1
continue
# Impact threshold check
if not self.impact_analyzer.should_recompute(magnitude, config):
debt = ImpactDebt.create(
artifact_id=change.artifact_id,
dependent_run_id=dependent_id,
change_magnitude=magnitude,
suppression_reason="below_threshold",
)
self._record_debt(debt)
result.suppressed.append(debt)
result.suppressed_count += 1
continue
# Execute recomputation
if execution_callback is not None:
run = execution_callback(dependent_id)
if run is not None:
result.executed_run_ids.append(run.id)
else:
# Dry-run mode: just record the ID
result.executed_run_ids.append(dependent_id)
result.recomputed_count += 1
recompute_count += 1
return result
def _record_debt(self, debt: ImpactDebt) -> ImpactDebt:
"""
Persist an ImpactDebt record to the database.
Args:
debt: Debt record to persist
Returns:
Persisted debt record
"""
conn = self._get_connection()
try:
conn.execute(
"""
INSERT INTO impact_debt (
id, artifact_id, dependent_run_id,
change_magnitude, suppression_reason, recorded_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
debt.id,
debt.artifact_id,
debt.dependent_run_id,
debt.change_magnitude,
debt.suppression_reason,
debt.recorded_at.isoformat(),
),
)
conn.commit()
return debt
finally:
conn.close()
def get_debt_for_artifact(self, artifact_id: str) -> List[ImpactDebt]:
"""
Get all impact debt records for a given artifact.
Args:
artifact_id: Artifact identifier
Returns:
List of ImpactDebt records
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM impact_debt WHERE artifact_id = ?",
(artifact_id,),
)
return [self._row_to_debt(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_debt_for_run(self, run_id: str) -> List[ImpactDebt]:
"""
Get all impact debt records for a given run.
Args:
run_id: Run identifier
Returns:
List of ImpactDebt records
"""
conn = self._get_connection()
try:
cursor = conn.execute(
"SELECT * FROM impact_debt WHERE dependent_run_id = ?",
(run_id,),
)
return [self._row_to_debt(row) for row in cursor.fetchall()]
finally:
conn.close()
def get_all_debt(self) -> List[ImpactDebt]:
"""
Get all impact debt records.
Returns:
List of all ImpactDebt records
"""
conn = self._get_connection()
try:
cursor = conn.execute("SELECT * FROM impact_debt")
return [self._row_to_debt(row) for row in cursor.fetchall()]
finally:
conn.close()
def _row_to_debt(self, row: sqlite3.Row) -> ImpactDebt:
"""Convert database row to ImpactDebt instance."""
from datetime import datetime
return ImpactDebt(
id=row["id"],
artifact_id=row["artifact_id"],
dependent_run_id=row["dependent_run_id"],
change_magnitude=row["change_magnitude"],
suppression_reason=row["suppression_reason"],
recorded_at=datetime.fromisoformat(row["recorded_at"]),
)

View File

@@ -0,0 +1,57 @@
"""
Impact analyzer for assessing change significance.
Implements FR-8: Impact Analysis
Stateless analyzer that computes change magnitude and decides
whether recomputation is warranted based on configurable thresholds.
"""
from typing import Optional
from markitect.prompts.incremental.metrics import calculate_change_magnitude
from markitect.prompts.incremental.models import RecomputeConfig
class ImpactAnalyzer:
"""
Stateless impact analyzer for artifact changes.
Computes change magnitude using diff metrics and compares against
configurable thresholds to determine whether recomputation is needed.
"""
def calculate_magnitude(
self,
old_content: Optional[str],
new_content: Optional[str],
method: str = "structural",
) -> float:
"""
Calculate the magnitude of a change.
Args:
old_content: Previous content (None for creation)
new_content: Current content (None for deletion)
method: Diff method ("structural" or "line")
Returns:
Change magnitude from 0.0 to 1.0
"""
return calculate_change_magnitude(old_content, new_content, method)
def should_recompute(
self,
magnitude: float,
config: RecomputeConfig,
) -> bool:
"""
Determine whether a dependent should be recomputed based on magnitude.
Args:
magnitude: Change magnitude (0.0-1.0)
config: Recompute configuration with threshold
Returns:
True if magnitude meets or exceeds threshold
"""
return magnitude >= config.impact_threshold

View File

@@ -0,0 +1,95 @@
"""
Pure diff functions for impact analysis.
Implements FR-8.2: Structural diff-based impact measurement.
Uses difflib.SequenceMatcher for computing change magnitude between
old and new content versions.
"""
import difflib
from typing import Optional
def structural_diff_ratio(old_content: str, new_content: str) -> float:
"""
Calculate structural similarity ratio between old and new content.
Uses SequenceMatcher to compute a ratio of matching blocks.
Returns the *change* ratio (1.0 - similarity), so higher values
mean more change.
Args:
old_content: Previous content
new_content: Current content
Returns:
Change ratio from 0.0 (identical) to 1.0 (completely different)
"""
if old_content == new_content:
return 0.0
if not old_content and not new_content:
return 0.0
if not old_content or not new_content:
return 1.0
matcher = difflib.SequenceMatcher(None, old_content, new_content)
similarity = matcher.ratio()
return 1.0 - similarity
def line_diff_ratio(old_content: str, new_content: str) -> float:
"""
Calculate line-level change ratio between old and new content.
Splits content into lines and computes SequenceMatcher ratio
at the line level. Returns the change ratio.
Args:
old_content: Previous content
new_content: Current content
Returns:
Change ratio from 0.0 (identical) to 1.0 (completely different)
"""
if old_content == new_content:
return 0.0
if not old_content and not new_content:
return 0.0
if not old_content or not new_content:
return 1.0
old_lines = old_content.splitlines()
new_lines = new_content.splitlines()
matcher = difflib.SequenceMatcher(None, old_lines, new_lines)
similarity = matcher.ratio()
return 1.0 - similarity
def calculate_change_magnitude(
old_content: Optional[str],
new_content: Optional[str],
method: str = "structural",
) -> float:
"""
Calculate the magnitude of a change between two content versions.
This is the primary entry point for impact measurement (FR-8.2).
Args:
old_content: Previous content (None for created artifacts)
new_content: Current content (None for deleted artifacts)
method: Diff method to use ("structural" or "line")
Returns:
Change magnitude from 0.0 (no change) to 1.0 (complete change)
"""
# Handle None cases (creation/deletion)
if old_content is None and new_content is None:
return 0.0
if old_content is None or new_content is None:
return 1.0
if method == "line":
return line_diff_ratio(old_content, new_content)
return structural_diff_ratio(old_content, new_content)

View File

@@ -0,0 +1,241 @@
"""
Data models for incremental execution.
Implements FR-7: Incremental Recomputation
Implements FR-8: Impact Analysis
Provides ChangeType enum, ArtifactChange, ImpactDebt, RecomputeConfig,
and RecomputeResult dataclasses.
"""
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
class ChangeType(Enum):
"""Type of artifact change detected."""
CREATED = "created"
MODIFIED = "modified"
DELETED = "deleted"
@dataclass
class ArtifactChange:
"""
Record of a detected artifact change.
Attributes:
id: Unique change identifier
artifact_id: ID of changed artifact
old_digest: Previous content digest (None for created)
new_digest: New content digest
change_type: Type of change
detected_at: When change was detected
"""
id: str
artifact_id: str
old_digest: Optional[str]
new_digest: str
change_type: ChangeType
detected_at: datetime = field(default_factory=datetime.utcnow)
@classmethod
def create(
cls,
artifact_id: str,
old_digest: Optional[str],
new_digest: str,
change_type: ChangeType,
) -> "ArtifactChange":
"""
Create a new ArtifactChange record.
Args:
artifact_id: ID of changed artifact
old_digest: Previous digest (None for created)
new_digest: New digest
change_type: Type of change
Returns:
New ArtifactChange instance
"""
return cls(
id=str(uuid.uuid4()),
artifact_id=artifact_id,
old_digest=old_digest,
new_digest=new_digest,
change_type=change_type,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"artifact_id": self.artifact_id,
"old_digest": self.old_digest,
"new_digest": self.new_digest,
"change_type": self.change_type.value,
"detected_at": self.detected_at.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ArtifactChange":
"""Create from dictionary."""
return cls(
id=data["id"],
artifact_id=data["artifact_id"],
old_digest=data.get("old_digest"),
new_digest=data["new_digest"],
change_type=ChangeType(data["change_type"]),
detected_at=datetime.fromisoformat(data["detected_at"]),
)
@dataclass
class ImpactDebt:
"""
Record of a suppressed recomputation.
When a dependent is not recomputed (due to circular dependency,
below-threshold impact, or budget exhaustion), an ImpactDebt
record is created to track the outstanding work.
Attributes:
id: Unique debt identifier
artifact_id: ID of the changed artifact that triggered this
dependent_run_id: Run ID of the dependent that was not recomputed
change_magnitude: Measured impact magnitude (0.0-1.0)
suppression_reason: Why recomputation was suppressed
recorded_at: When debt was recorded
"""
id: str
artifact_id: str
dependent_run_id: str
change_magnitude: float
suppression_reason: str
recorded_at: datetime = field(default_factory=datetime.utcnow)
@classmethod
def create(
cls,
artifact_id: str,
dependent_run_id: str,
change_magnitude: float,
suppression_reason: str,
) -> "ImpactDebt":
"""
Create a new ImpactDebt record.
Args:
artifact_id: Changed artifact ID
dependent_run_id: Dependent run ID
change_magnitude: Impact magnitude (0.0-1.0)
suppression_reason: Reason for suppression
Returns:
New ImpactDebt instance
"""
return cls(
id=str(uuid.uuid4()),
artifact_id=artifact_id,
dependent_run_id=dependent_run_id,
change_magnitude=change_magnitude,
suppression_reason=suppression_reason,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"artifact_id": self.artifact_id,
"dependent_run_id": self.dependent_run_id,
"change_magnitude": self.change_magnitude,
"suppression_reason": self.suppression_reason,
"recorded_at": self.recorded_at.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ImpactDebt":
"""Create from dictionary."""
return cls(
id=data["id"],
artifact_id=data["artifact_id"],
dependent_run_id=data["dependent_run_id"],
change_magnitude=data["change_magnitude"],
suppression_reason=data["suppression_reason"],
recorded_at=datetime.fromisoformat(data["recorded_at"]),
)
@dataclass
class RecomputeConfig:
"""
Configuration for incremental recomputation.
Attributes:
max_depth: Maximum BFS depth for finding dependents (FR-7.2)
impact_threshold: Minimum magnitude to trigger recompute (FR-8.2)
max_recomputes: Budget limit on recomputations per change
suppress_circular: Whether to suppress circular dependencies
"""
max_depth: int = 1
impact_threshold: float = 0.0
max_recomputes: int = 100
suppress_circular: bool = True
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"max_depth": self.max_depth,
"impact_threshold": self.impact_threshold,
"max_recomputes": self.max_recomputes,
"suppress_circular": self.suppress_circular,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RecomputeConfig":
"""Create from dictionary."""
return cls(
max_depth=data.get("max_depth", 1),
impact_threshold=data.get("impact_threshold", 0.0),
max_recomputes=data.get("max_recomputes", 100),
suppress_circular=data.get("suppress_circular", True),
)
@dataclass
class RecomputeResult:
"""
Result of an incremental recomputation triggered by a change.
Attributes:
changed_artifact_id: ID of the artifact that changed
change: The detected change record
executed_run_ids: Run IDs that were recomputed
suppressed: List of suppressed recomputation debt records
total_dependents: Total number of dependents found
recomputed_count: Number actually recomputed
suppressed_count: Number suppressed
"""
changed_artifact_id: str
change: ArtifactChange
executed_run_ids: List[str] = field(default_factory=list)
suppressed: List[ImpactDebt] = field(default_factory=list)
total_dependents: int = 0
recomputed_count: int = 0
suppressed_count: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"changed_artifact_id": self.changed_artifact_id,
"change": self.change.to_dict(),
"executed_run_ids": self.executed_run_ids,
"suppressed": [d.to_dict() for d in self.suppressed],
"total_dependents": self.total_dependents,
"recomputed_count": self.recomputed_count,
"suppressed_count": self.suppressed_count,
}