feat(prompts): implement Phase 8 - Observability & Traceability (FR-11)

Complete implementation of Phase 8, the final phase of prompt dependency
resolution infrastructure, adding full observability and traceability.

## Features (FR-11)

### FR-11.1: Complete Artifact Provenance Tracing
- TraceabilityService: composition layer for full artifact lineage
- Trace any artifact to producing PromptTemplate, input artifacts,
  generator runs, and quality validation results
- ProvenanceTrace model with complete dependency chain reconstruction
- RunSummary and ArtifactLineage models for structured trace output

### FR-11.2: Recomputation Query Infrastructure
- PromptQueryService: cross-service complex queries
- Run history queries with template and status filters
- Stale artifact detection via impact debt analysis
- Dependency graph statistics (nodes, edges, cycles, roots, leaves)
- Content-based artifact lookups by digest

### Visualization Support
- GraphExporter: DOT (Graphviz) and Mermaid format export
- Supports all edge types (requires, generates, includes)
- Handles isolated nodes, linear chains, diamonds, and complex graphs

### CLI Commands (prompt group)
- `prompt trace <artifact_id>` - Full provenance trace as JSON
- `prompt graph <artifact_id>` - Dependency graph (DOT/Mermaid)
- `prompt runs` - List execution runs with filters
- `prompt debt` - Show impact debt and stale artifacts
- `prompt stats` - Dependency graph statistics

## Implementation

Source files (8):
- markitect/prompts/traceability/models.py - Trace data models
- markitect/prompts/traceability/service.py - TraceabilityService
- markitect/prompts/visualization/graph.py - Graph export
- markitect/prompts/queries/operations.py - PromptQueryService
- markitect/prompts/cli.py - Click CLI commands
- Package __init__.py files (3)

Tests (64 total, all passing):
- tests/unit/prompts/test_traceability_service.py (21 tests)
- tests/unit/prompts/test_visualization.py (14 tests)
- tests/unit/prompts/test_query_operations.py (12 tests)
- tests/integration/prompts/test_traceability_workflow.py (7 tests)
- tests/integration/prompts/test_prompt_cli.py (10 tests)

## Architecture

TraceabilityService is a composition layer that delegates to:
- DependencyQueryService (transitive dependency lookups)
- QualityValidator (validation history)
- IncrementalExecutionEngine (impact debt queries)
- Direct repository access (artifacts, edges)

No duplicate data storage - all data comes from existing Phase 1-7
infrastructure (artifact repo, dependency repo, validation DB, debt DB).

## Verification

All 2250 tests pass with 0 regressions.
Phase 8 completes the full 8-phase implementation roadmap.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-09 20:32:18 +01:00
parent 704272644c
commit 7b4bd461c9
14 changed files with 2012 additions and 0 deletions

View File

@@ -7088,6 +7088,13 @@ try:
except ImportError:
pass # Plugin not available
# Register prompt dependency resolution commands
try:
from markitect.prompts.cli import prompt_commands
cli.add_command(prompt_commands)
except ImportError:
pass # Prompts module not available
# Make cli function available as main entry point
main = cli

147
markitect/prompts/cli.py Normal file
View File

@@ -0,0 +1,147 @@
"""
CLI commands for prompt dependency resolution.
Provides Click commands for tracing provenance, visualizing graphs,
querying run history, and inspecting impact debt.
"""
import json
import os
import click
from markitect.prompts.dependencies.graph import GraphBuilder
from markitect.prompts.dependencies.repository import SQLiteDependencyRepository
from markitect.prompts.queries.operations import PromptQueryService
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
from markitect.prompts.traceability.service import TraceabilityService
from markitect.prompts.visualization.graph import GraphExporter
def _default_db_path():
"""Return the default database path."""
return os.path.expanduser("~/.markitect/markitect.db")
def _get_repos(database):
"""Create artifact and dependency repos from database path."""
db = database or _default_db_path()
artifact_repo = SQLiteArtifactRepository(db)
dep_repo = SQLiteDependencyRepository(db)
return artifact_repo, dep_repo, db
@click.group(name="prompt")
def prompt_commands():
"""Prompt dependency resolution commands."""
pass
@prompt_commands.command("trace")
@click.argument("artifact_id")
@click.option("--database", type=click.Path(), help="Database file path")
def trace_artifact(artifact_id, database):
"""Trace provenance of an artifact."""
artifact_repo, dep_repo, db = _get_repos(database)
service = TraceabilityService(artifact_repo, dep_repo, db_path=db)
artifact = artifact_repo.get_by_id(artifact_id)
if not artifact:
click.echo(f"Artifact '{artifact_id}' not found.", err=True)
raise SystemExit(1)
trace = service.trace_artifact(artifact_id)
click.echo(json.dumps(trace.to_dict(), indent=2, default=str))
@prompt_commands.command("graph")
@click.argument("artifact_id")
@click.option(
"--format",
"fmt",
type=click.Choice(["dot", "mermaid"]),
default="mermaid",
help="Output format",
)
@click.option("--database", type=click.Path(), help="Database file path")
def show_graph(artifact_id, fmt, database):
"""Visualize dependency graph for an artifact."""
artifact_repo, dep_repo, db = _get_repos(database)
builder = GraphBuilder(dep_repo)
# Build graph from all edges involving this artifact
from markitect.prompts.dependencies.queries import DependencyQueryService
query_svc = DependencyQueryService(dep_repo)
deps = query_svc.find_transitive_dependencies(artifact_id)
dependents = query_svc.find_transitive_dependents(artifact_id)
all_ids = deps | dependents | {artifact_id}
graph = builder.build_graph(all_ids)
if fmt == "dot":
click.echo(GraphExporter.to_dot(graph, title=f"Dependencies: {artifact_id}"))
else:
click.echo(
GraphExporter.to_mermaid(graph, title=f"Dependencies: {artifact_id}")
)
@prompt_commands.command("runs")
@click.option("--template", help="Filter by template ID")
@click.option(
"--status",
type=click.Choice(["pending", "running", "success", "failed", "skipped"]),
help="Filter by status",
)
@click.option("--limit", default=20, type=int, help="Maximum results")
@click.option("--database", type=click.Path(), help="Database file path")
def list_runs(template, status, limit, database):
"""List prompt execution runs."""
artifact_repo, dep_repo, db = _get_repos(database)
query_svc = PromptQueryService(artifact_repo, dep_repo, db_path=db)
runs = query_svc.get_run_history(
template_id=template, status=status, limit=limit
)
if not runs:
click.echo("No runs found.")
return
click.echo(json.dumps(runs, indent=2, default=str))
@prompt_commands.command("debt")
@click.option("--artifact", help="Filter by artifact ID")
@click.option("--database", type=click.Path(), help="Database file path")
def show_debt(artifact, database):
"""Show impact debt (suppressed recomputations)."""
artifact_repo, dep_repo, db = _get_repos(database)
query_svc = PromptQueryService(artifact_repo, dep_repo, db_path=db)
if artifact:
# Get debt for specific artifact via traceability service
trace_svc = TraceabilityService(artifact_repo, dep_repo, db_path=db)
debts = trace_svc.get_impact_debt(artifact)
if not debts:
click.echo(f"No impact debt for artifact '{artifact}'.")
return
click.echo(json.dumps(debts, indent=2, default=str))
else:
stale = query_svc.get_stale_artifacts()
if not stale:
click.echo("No stale artifacts found.")
return
click.echo(json.dumps(stale, indent=2, default=str))
@prompt_commands.command("stats")
@click.option("--database", type=click.Path(), help="Database file path")
def show_stats(database):
"""Show dependency graph statistics."""
artifact_repo, dep_repo, db = _get_repos(database)
query_svc = PromptQueryService(artifact_repo, dep_repo, db_path=db)
stats = query_svc.get_dependency_stats()
click.echo(json.dumps(stats, indent=2))

View File

@@ -0,0 +1,11 @@
"""
Query operations package for cross-service prompt queries.
Provides higher-level query operations over the prompt infrastructure.
"""
from markitect.prompts.queries.operations import PromptQueryService
__all__ = [
"PromptQueryService",
]

View File

@@ -0,0 +1,165 @@
"""
Complex cross-service query operations for prompt infrastructure.
Combines artifact repository, dependency repository, and run data
into higher-level queries like run history, impact analysis, and stats.
"""
from typing import Any, Dict, List, Optional
from markitect.prompts.dependencies.graph import GraphBuilder
from markitect.prompts.dependencies.queries import DependencyQueryService
from markitect.prompts.dependencies.repository import IDependencyRepository
from markitect.prompts.execution.models import PromptRun
from markitect.prompts.incremental.engine import IncrementalExecutionEngine
from markitect.prompts.repositories.interfaces import IArtifactRepository
class PromptQueryService:
"""
Complex cross-service queries over prompt infrastructure.
Provides higher-level queries that span multiple data sources:
run history, stale artifact detection, dependency statistics,
and content-based artifact lookups.
"""
def __init__(
self,
artifact_repo: IArtifactRepository,
dependency_repo: IDependencyRepository,
db_path: Optional[str] = None,
):
"""
Initialize with data sources.
Args:
artifact_repo: Artifact repository
dependency_repo: Dependency edge repository
db_path: Optional database path for debt queries
"""
self._artifact_repo = artifact_repo
self._dependency_repo = dependency_repo
self._db_path = db_path
self._query_service = DependencyQueryService(dependency_repo)
self._graph_builder = GraphBuilder(dependency_repo)
self._engine = (
IncrementalExecutionEngine(db_path, self._query_service)
if db_path
else None
)
# Run registry: external code registers runs for querying
self._runs: Dict[str, PromptRun] = {}
def register_run(self, run: PromptRun) -> None:
"""Register a run for query lookups."""
self._runs[run.id] = run
def get_run_history(
self,
template_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""
Query run history with optional filters.
Args:
template_id: Optional template ID filter
status: Optional status filter
limit: Maximum results to return
Returns:
List of run dictionaries sorted by start time (newest first)
"""
runs = list(self._runs.values())
if template_id:
runs = [r for r in runs if r.template_id == template_id]
if status:
runs = [r for r in runs if r.status.value == status]
runs.sort(key=lambda r: r.started_at, reverse=True)
runs = runs[:limit]
return [r.to_dict() for r in runs]
def get_stale_artifacts(self) -> List[Dict[str, Any]]:
"""
Find artifacts with outstanding impact debt.
Returns:
List of dicts with artifact info and debt details
"""
if not self._engine:
return []
all_debt = self._engine.get_all_debt()
if not all_debt:
return []
# Group debt by artifact
debt_by_artifact: Dict[str, list] = {}
for d in all_debt:
debt_by_artifact.setdefault(d.artifact_id, []).append(d)
results = []
for artifact_id, debts in debt_by_artifact.items():
artifact = self._artifact_repo.get_by_id(artifact_id)
results.append({
"artifact_id": artifact_id,
"artifact_name": artifact.name if artifact else "unknown",
"debt_count": len(debts),
"total_magnitude": sum(d.change_magnitude for d in debts),
"reasons": list({d.suppression_reason for d in debts}),
})
return results
def get_dependency_stats(self) -> Dict[str, Any]:
"""
Get summary statistics about the dependency graph.
Returns:
Dictionary with graph statistics
"""
graph = self._graph_builder.build_graph()
nodes = graph.nodes
edge_count = graph.edge_count
# Find root nodes (no predecessors) and leaf nodes (no successors)
roots = [n for n in nodes if not graph.get_predecessors(n)]
leaves = [n for n in nodes if not graph.get_successors(n)]
# Check for cycles
has_cycles = graph.has_cycle()
return {
"total_nodes": len(nodes),
"total_edges": edge_count,
"root_count": len(roots),
"leaf_count": len(leaves),
"has_cycles": has_cycles,
}
def find_artifacts_by_digest(self, digest: str) -> List[Dict[str, Any]]:
"""
Find all artifacts with a given content digest.
Args:
digest: Content digest to search for
Returns:
List of artifact dictionaries
"""
artifacts = self._artifact_repo.get_by_digest(digest)
return [
{
"artifact_id": a.id,
"name": a.name,
"space_id": a.space_id,
"artifact_type": a.artifact_type.value,
}
for a in artifacts
]

View File

@@ -0,0 +1,19 @@
"""
Traceability package for artifact provenance tracking.
Implements FR-11: Observability & Traceability.
"""
from markitect.prompts.traceability.models import (
ArtifactLineage,
ProvenanceTrace,
RunSummary,
)
from markitect.prompts.traceability.service import TraceabilityService
__all__ = [
"ArtifactLineage",
"ProvenanceTrace",
"RunSummary",
"TraceabilityService",
]

View File

@@ -0,0 +1,116 @@
"""
Traceability models for provenance tracking.
Implements FR-11.1: Trace artifacts to producing runs, inputs, and validation.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
@dataclass
class RunSummary:
"""Summary of a PromptRun for traceability output."""
run_id: str
template_id: str
status: str
stage: str
parent_run_id: Optional[str]
depth: int
input_bundle_hash: str
started_at: datetime
completed_at: Optional[datetime]
@classmethod
def create(
cls,
run_id: str,
template_id: str,
status: str,
stage: str,
input_bundle_hash: str,
started_at: datetime,
parent_run_id: Optional[str] = None,
depth: int = 0,
completed_at: Optional[datetime] = None,
) -> "RunSummary":
"""Create a RunSummary."""
return cls(
run_id=run_id,
template_id=template_id,
status=status,
stage=stage,
parent_run_id=parent_run_id,
depth=depth,
input_bundle_hash=input_bundle_hash,
started_at=started_at,
completed_at=completed_at,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"run_id": self.run_id,
"template_id": self.template_id,
"status": self.status,
"stage": self.stage,
"parent_run_id": self.parent_run_id,
"depth": self.depth,
"input_bundle_hash": self.input_bundle_hash,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}
@dataclass
class ArtifactLineage:
"""Lineage record for a single artifact in a trace."""
artifact_id: str
name: str
space_id: str
artifact_type: str
content_digest: str
role: str # "input", "output", "template"
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"artifact_id": self.artifact_id,
"name": self.name,
"space_id": self.space_id,
"artifact_type": self.artifact_type,
"content_digest": self.content_digest,
"role": self.role,
}
@dataclass
class ProvenanceTrace:
"""Complete provenance trace for an artifact."""
artifact_id: str
producing_run: Optional[RunSummary] = None
template: Optional[ArtifactLineage] = None
input_artifacts: List[ArtifactLineage] = field(default_factory=list)
output_artifacts: List[ArtifactLineage] = field(default_factory=list)
generator_runs: List[RunSummary] = field(default_factory=list)
validation_results: List[Dict[str, Any]] = field(default_factory=list)
impact_debt: List[Dict[str, Any]] = field(default_factory=list)
dependency_chain: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"artifact_id": self.artifact_id,
"producing_run": self.producing_run.to_dict() if self.producing_run else None,
"template": self.template.to_dict() if self.template else None,
"input_artifacts": [a.to_dict() for a in self.input_artifacts],
"output_artifacts": [a.to_dict() for a in self.output_artifacts],
"generator_runs": [r.to_dict() for r in self.generator_runs],
"validation_results": self.validation_results,
"impact_debt": self.impact_debt,
"dependency_chain": self.dependency_chain,
}

View File

@@ -0,0 +1,315 @@
"""
Traceability service for artifact provenance tracking.
Implements FR-11.1: Trace any artifact to its producing PromptTemplate,
input artifacts, generator runs, and quality validation results.
Implements FR-11.2: Enable recomputation based on dependency changes.
Composition layer over existing services — does NOT duplicate data storage.
"""
from typing import Any, Dict, List, Optional
from markitect.prompts.dependencies.graph import GraphBuilder
from markitect.prompts.dependencies.queries import DependencyQueryService
from markitect.prompts.dependencies.repository import IDependencyRepository
from markitect.prompts.execution.models import PromptRun
from markitect.prompts.incremental.engine import IncrementalExecutionEngine
from markitect.prompts.quality.validator import QualityValidator
from markitect.prompts.repositories.interfaces import IArtifactRepository
from markitect.prompts.traceability.models import (
ArtifactLineage,
ProvenanceTrace,
RunSummary,
)
def _run_to_summary(run: PromptRun) -> RunSummary:
"""Convert a PromptRun to a RunSummary."""
return RunSummary.create(
run_id=run.id,
template_id=run.template_id,
status=run.status.value,
stage=run.stage.value,
input_bundle_hash=run.input_bundle_hash,
started_at=run.started_at,
parent_run_id=run.parent_run_id,
depth=run.depth,
completed_at=run.completed_at,
)
class TraceabilityService:
"""
Composition layer for full artifact provenance tracing.
Delegates to DependencyQueryService, QualityValidator,
IncrementalExecutionEngine, and direct repository access.
"""
def __init__(
self,
artifact_repo: IArtifactRepository,
dependency_repo: IDependencyRepository,
db_path: Optional[str] = None,
):
"""
Compose over existing repos and services.
Args:
artifact_repo: Repository for artifact lookups
dependency_repo: Repository for dependency edge lookups
db_path: Optional database path for quality/debt services
"""
self._artifact_repo = artifact_repo
self._dependency_repo = dependency_repo
self._db_path = db_path
self._query_service = DependencyQueryService(dependency_repo)
self._graph_builder = GraphBuilder(dependency_repo)
self._validator = QualityValidator(db_path=db_path) if db_path else None
self._engine = (
IncrementalExecutionEngine(db_path, self._query_service)
if db_path
else None
)
# Run registry: external code can register runs for tracing
self._runs: Dict[str, PromptRun] = {}
def register_run(self, run: PromptRun) -> None:
"""
Register a run for traceability lookups.
Args:
run: PromptRun to register
"""
self._runs[run.id] = run
def trace_artifact(self, artifact_id: str) -> ProvenanceTrace:
"""
Full provenance trace for an artifact (FR-11.1).
Args:
artifact_id: Artifact to trace
Returns:
ProvenanceTrace with all provenance data
"""
trace = ProvenanceTrace(artifact_id=artifact_id)
# Find producing run
producing_run = self.get_producing_run(artifact_id)
trace.producing_run = producing_run
if producing_run:
# Get template artifact
template_artifact = self._artifact_repo.get_by_id(
producing_run.template_id
)
if template_artifact:
trace.template = ArtifactLineage(
artifact_id=template_artifact.id,
name=template_artifact.name,
space_id=template_artifact.space_id,
artifact_type=template_artifact.artifact_type.value,
content_digest=template_artifact.content_digest,
role="template",
)
# Get input and output artifacts
trace.input_artifacts = self.get_input_artifacts(producing_run.run_id)
trace.output_artifacts = self.get_output_artifacts(producing_run.run_id)
# Get generator sub-runs
trace.generator_runs = self.get_generator_runs(producing_run.run_id)
# Get validation history
trace.validation_results = self.get_validation_history(artifact_id)
# Get impact debt
trace.impact_debt = self.get_impact_debt(artifact_id)
# Build dependency chain
deps = self._query_service.find_transitive_dependencies(artifact_id)
trace.dependency_chain = sorted(deps)
return trace
def get_producing_run(self, artifact_id: str) -> Optional[RunSummary]:
"""
Find the run that produced an artifact.
Searches registered runs for one whose manifest lists
this artifact as an output.
Args:
artifact_id: Artifact to find producer of
Returns:
RunSummary if found, None otherwise
"""
# Check dependency edges: find edges where this artifact is a target
# with edge_type "generates"
edges = self._dependency_repo.get_by_target(artifact_id)
for edge in edges:
# The source is the run or template that generated this artifact
run = self._runs.get(edge.source_artifact_id)
if run:
return _run_to_summary(run)
# Fallback: search registered runs by manifest metadata
for run in self._runs.values():
manifest = run.metadata.get("manifest", {})
outputs = manifest.get("output_artifacts", [])
for output in outputs:
if output.get("artifact_id") == artifact_id:
return _run_to_summary(run)
return None
def get_input_artifacts(self, run_id: str) -> List[ArtifactLineage]:
"""
Get all input artifacts for a run.
Uses dependency edges to find artifacts that the run depends on.
Args:
run_id: Run identifier
Returns:
List of ArtifactLineage for inputs
"""
result = []
# Find edges where this run is the target (artifacts -> run)
edges = self._dependency_repo.get_by_run(run_id)
for edge in edges:
if edge.edge_type.value == "requires":
artifact = self._artifact_repo.get_by_id(edge.source_artifact_id)
if artifact:
result.append(
ArtifactLineage(
artifact_id=artifact.id,
name=artifact.name,
space_id=artifact.space_id,
artifact_type=artifact.artifact_type.value,
content_digest=artifact.content_digest,
role="input",
)
)
# Also check manifest resolved_inputs
run = self._runs.get(run_id)
if run:
manifest = run.metadata.get("manifest", {})
seen_ids = {a.artifact_id for a in result}
for inp in manifest.get("resolved_inputs", []):
aid = inp.get("artifact_id", "")
if aid and aid not in seen_ids:
artifact = self._artifact_repo.get_by_id(aid)
if artifact:
result.append(
ArtifactLineage(
artifact_id=artifact.id,
name=artifact.name,
space_id=artifact.space_id,
artifact_type=artifact.artifact_type.value,
content_digest=artifact.content_digest,
role="input",
)
)
seen_ids.add(aid)
return result
def get_output_artifacts(self, run_id: str) -> List[ArtifactLineage]:
"""
Get all output artifacts produced by a run.
Args:
run_id: Run identifier
Returns:
List of ArtifactLineage for outputs
"""
result = []
# Find edges where this run is the source with "generates"
edges = self._dependency_repo.get_by_run(run_id)
for edge in edges:
if edge.edge_type.value == "generates":
artifact = self._artifact_repo.get_by_id(edge.target_artifact_id)
if artifact:
result.append(
ArtifactLineage(
artifact_id=artifact.id,
name=artifact.name,
space_id=artifact.space_id,
artifact_type=artifact.artifact_type.value,
content_digest=artifact.content_digest,
role="output",
)
)
# Also check manifest output_artifacts
run = self._runs.get(run_id)
if run:
manifest = run.metadata.get("manifest", {})
seen_ids = {a.artifact_id for a in result}
for out in manifest.get("output_artifacts", []):
aid = out.get("artifact_id", "")
if aid and aid not in seen_ids:
artifact = self._artifact_repo.get_by_id(aid)
if artifact:
result.append(
ArtifactLineage(
artifact_id=artifact.id,
name=artifact.name,
space_id=artifact.space_id,
artifact_type=artifact.artifact_type.value,
content_digest=artifact.content_digest,
role="output",
)
)
seen_ids.add(aid)
return result
def get_generator_runs(self, run_id: str) -> List[RunSummary]:
"""
Get nested generator runs spawned by a run.
Args:
run_id: Parent run identifier
Returns:
List of RunSummary for child runs
"""
return [
_run_to_summary(run)
for run in self._runs.values()
if run.parent_run_id == run_id
]
def get_validation_history(self, artifact_id: str) -> List[Dict[str, Any]]:
"""
Get validation results for an artifact across all runs.
Args:
artifact_id: Artifact identifier
Returns:
List of validation result dictionaries
"""
if self._validator:
return self._validator.get_results_for_artifact(artifact_id)
return []
def get_impact_debt(self, artifact_id: str) -> List[Dict[str, Any]]:
"""
Get suppressed recomputation records for an artifact.
Args:
artifact_id: Artifact identifier
Returns:
List of impact debt dictionaries
"""
if self._engine:
debts = self._engine.get_debt_for_artifact(artifact_id)
return [d.to_dict() for d in debts]
return []

View File

@@ -0,0 +1,11 @@
"""
Visualization package for dependency graph export.
Provides DOT and Mermaid format export for DependencyGraph instances.
"""
from markitect.prompts.visualization.graph import GraphExporter
__all__ = [
"GraphExporter",
]

View File

@@ -0,0 +1,106 @@
"""
Graph visualization export for dependency graphs.
Exports DependencyGraph to DOT (Graphviz) and Mermaid diagram formats.
"""
from markitect.prompts.dependencies.models import DependencyGraph, EdgeType
# Edge type to style mappings
_DOT_EDGE_STYLES = {
EdgeType.REQUIRES: 'style="solid"',
EdgeType.GENERATES: 'style="dashed"',
EdgeType.INCLUDES: 'style="dotted"',
}
_MERMAID_EDGE_STYLES = {
EdgeType.REQUIRES: "-->",
EdgeType.GENERATES: "-.->",
EdgeType.INCLUDES: "==>",
}
class GraphExporter:
"""Export DependencyGraph to DOT and Mermaid formats."""
@staticmethod
def to_dot(graph: DependencyGraph, title: str = "Dependencies") -> str:
"""
Export dependency graph to Graphviz DOT format.
Args:
graph: DependencyGraph to export
title: Graph title
Returns:
DOT format string
"""
lines = [
f'digraph "{title}" {{',
" rankdir=LR;",
f' label="{title}";',
" node [shape=box];",
]
# Add nodes
for node in sorted(graph.nodes):
safe_id = node.replace("-", "_")
lines.append(f' {safe_id} [label="{node}"];')
# Add edges
for source in sorted(graph.nodes):
for target in sorted(graph.get_successors(source)):
edge_type = graph.get_edge_type(source, target)
style = _DOT_EDGE_STYLES.get(edge_type, 'style="solid"')
safe_source = source.replace("-", "_")
safe_target = target.replace("-", "_")
label = edge_type.value if edge_type else "requires"
lines.append(
f' {safe_source} -> {safe_target} [{style} label="{label}"];'
)
lines.append("}")
return "\n".join(lines)
@staticmethod
def to_mermaid(graph: DependencyGraph, title: str = "Dependencies") -> str:
"""
Export dependency graph to Mermaid diagram format.
Args:
graph: DependencyGraph to export
title: Graph title (used as comment)
Returns:
Mermaid format string
"""
lines = [
f"%%{{ title: {title} }}%%",
"graph LR",
]
# Add edges
edges_added = False
for source in sorted(graph.nodes):
for target in sorted(graph.get_successors(source)):
edge_type = graph.get_edge_type(source, target)
arrow = _MERMAID_EDGE_STYLES.get(edge_type, "-->")
label = edge_type.value if edge_type else "requires"
lines.append(f" {source}{arrow}|{label}|{target}")
edges_added = True
# Add isolated nodes (no edges)
if not edges_added:
for node in sorted(graph.nodes):
lines.append(f" {node}")
else:
# Add any isolated nodes that have no edges
connected = set()
for source in graph.nodes:
if graph.get_successors(source) or graph.get_predecessors(source):
connected.add(source)
for node in sorted(graph.nodes - connected):
lines.append(f" {node}")
return "\n".join(lines)

View File

@@ -0,0 +1,179 @@
"""
Integration tests for prompt CLI commands.
Tests Click CLI commands with CliRunner and a real SQLite database.
"""
import json
import pytest
import tempfile
from pathlib import Path
from click.testing import CliRunner
from markitect.prompts.cli import prompt_commands
from markitect.prompts.dependencies.models import DependencyEdge, EdgeType
from markitect.prompts.dependencies.repository import SQLiteDependencyRepository
from markitect.prompts.models import Artifact, ArtifactType
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
@pytest.fixture
def temp_db():
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def runner():
"""Click CLI test runner."""
return CliRunner()
@pytest.fixture
def seeded_db(temp_db):
"""Seed database with test data and return path."""
art_repo = SQLiteArtifactRepository(temp_db)
dep_repo = SQLiteDependencyRepository(temp_db)
a = Artifact.create(space_id="s", name="a", content="content-a",
artifact_type=ArtifactType.CONTENT)
b = Artifact.create(space_id="s", name="b", content="content-b",
artifact_type=ArtifactType.CONTENT)
a = art_repo.create(a)
b = art_repo.create(b)
edge = DependencyEdge.create(
source_artifact_id=a.id,
target_artifact_id=b.id,
run_id="run-1",
edge_type=EdgeType.REQUIRES,
)
dep_repo.create(edge)
return temp_db, a.id, b.id
class TestTraceCommand:
"""Tests for the 'trace' command."""
def test_trace_existing_artifact(self, runner, seeded_db):
"""Test tracing an existing artifact."""
db_path, art_a, art_b = seeded_db
result = runner.invoke(
prompt_commands, ["trace", art_a, "--database", db_path]
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["artifact_id"] == art_a
def test_trace_nonexistent_artifact(self, runner, temp_db):
"""Test tracing a non-existent artifact."""
# Initialize tables
SQLiteArtifactRepository(temp_db)
SQLiteDependencyRepository(temp_db)
result = runner.invoke(
prompt_commands, ["trace", "nonexistent", "--database", temp_db]
)
assert result.exit_code != 0
class TestGraphCommand:
"""Tests for the 'graph' command."""
def test_graph_mermaid(self, runner, seeded_db):
"""Test graph export in mermaid format."""
db_path, art_a, _ = seeded_db
result = runner.invoke(
prompt_commands,
["graph", art_a, "--format", "mermaid", "--database", db_path],
)
assert result.exit_code == 0
assert "graph LR" in result.output
def test_graph_dot(self, runner, seeded_db):
"""Test graph export in DOT format."""
db_path, art_a, _ = seeded_db
result = runner.invoke(
prompt_commands,
["graph", art_a, "--format", "dot", "--database", db_path],
)
assert result.exit_code == 0
assert "digraph" in result.output
class TestRunsCommand:
"""Tests for the 'runs' command."""
def test_runs_empty(self, runner, seeded_db):
"""Test listing runs when none are registered."""
db_path, _, _ = seeded_db
result = runner.invoke(
prompt_commands, ["runs", "--database", db_path]
)
assert result.exit_code == 0
assert "No runs found" in result.output
def test_runs_with_limit(self, runner, seeded_db):
"""Test listing runs with limit option."""
db_path, _, _ = seeded_db
result = runner.invoke(
prompt_commands, ["runs", "--limit", "5", "--database", db_path]
)
assert result.exit_code == 0
class TestDebtCommand:
"""Tests for the 'debt' command."""
def test_debt_no_stale(self, runner, seeded_db):
"""Test debt when no stale artifacts exist."""
db_path, _, _ = seeded_db
result = runner.invoke(
prompt_commands, ["debt", "--database", db_path]
)
assert result.exit_code == 0
assert "No stale artifacts" in result.output
def test_debt_for_artifact(self, runner, seeded_db):
"""Test debt for a specific artifact."""
db_path, art_a, _ = seeded_db
result = runner.invoke(
prompt_commands,
["debt", "--artifact", art_a, "--database", db_path],
)
assert result.exit_code == 0
assert "No impact debt" in result.output
class TestStatsCommand:
"""Tests for the 'stats' command."""
def test_stats_with_data(self, runner, seeded_db):
"""Test stats with real graph data."""
db_path, _, _ = seeded_db
result = runner.invoke(
prompt_commands, ["stats", "--database", db_path]
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["total_nodes"] == 2
assert data["total_edges"] == 1
assert data["has_cycles"] is False
def test_stats_empty_db(self, runner, temp_db):
"""Test stats on empty database."""
SQLiteArtifactRepository(temp_db)
SQLiteDependencyRepository(temp_db)
result = runner.invoke(
prompt_commands, ["stats", "--database", temp_db]
)
assert result.exit_code == 0
data = json.loads(result.output)
assert data["total_nodes"] == 0
assert data["total_edges"] == 0

View File

@@ -0,0 +1,255 @@
"""
Integration test for full traceability workflow.
Tests the complete flow: create artifacts + dependencies → trace provenance
→ verify lineage with a real SQLite database.
"""
import pytest
import tempfile
from pathlib import Path
from markitect.prompts.dependencies.models import DependencyEdge, EdgeType
from markitect.prompts.dependencies.repository import SQLiteDependencyRepository
from markitect.prompts.execution.models import PromptRun, RunConfig
from markitect.prompts.models import Artifact, ArtifactType
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
from markitect.prompts.traceability.service import TraceabilityService
from markitect.prompts.visualization.graph import GraphExporter
from markitect.prompts.queries.operations import PromptQueryService
@pytest.fixture
def temp_db():
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def artifact_repo(temp_db):
return SQLiteArtifactRepository(temp_db)
@pytest.fixture
def dep_repo(temp_db):
return SQLiteDependencyRepository(temp_db)
@pytest.fixture
def trace_service(artifact_repo, dep_repo, temp_db):
return TraceabilityService(artifact_repo, dep_repo, db_path=temp_db)
@pytest.fixture
def query_service(artifact_repo, dep_repo, temp_db):
return PromptQueryService(artifact_repo, dep_repo, db_path=temp_db)
def _create_artifact(repo, space_id, name, content="content", atype=ArtifactType.CONTENT):
artifact = Artifact.create(
space_id=space_id, name=name, content=content, artifact_type=atype,
)
return repo.create(artifact)
def _create_edge(repo, src, tgt, run_id, edge_type=EdgeType.REQUIRES):
edge = DependencyEdge.create(
source_artifact_id=src,
target_artifact_id=tgt,
run_id=run_id,
edge_type=edge_type,
)
return repo.create(edge)
class TestFullTraceabilityWorkflow:
"""End-to-end traceability workflow test."""
def test_create_trace_verify(
self, artifact_repo, dep_repo, trace_service
):
"""
Full workflow:
1. Create template, input, output artifacts
2. Create dependency edges
3. Register a run
4. Trace the output artifact
5. Verify lineage data
"""
# Step 1: Create artifacts
template = _create_artifact(
artifact_repo, "space-1", "my-template",
content="Generate {{input}}", atype=ArtifactType.TEMPLATE,
)
input_art = _create_artifact(
artifact_repo, "space-1", "input-data",
content="raw input data",
)
output_art = _create_artifact(
artifact_repo, "space-1", "output-doc",
content="generated output", atype=ArtifactType.GENERATED,
)
# Step 2: Create a run
run = PromptRun.create(
template_id=template.id,
input_bundle_hash="test-hash-123",
)
run.mark_complete()
run.metadata["manifest"] = {
"resolved_inputs": [{"artifact_id": input_art.id}],
"output_artifacts": [{"artifact_id": output_art.id}],
}
# Step 3: Create dependency edges
_create_edge(dep_repo, input_art.id, template.id, run.id, EdgeType.REQUIRES)
_create_edge(dep_repo, run.id, output_art.id, run.id, EdgeType.GENERATES)
# Step 4: Register and trace
trace_service.register_run(run)
trace = trace_service.trace_artifact(output_art.id)
# Step 5: Verify
assert trace.artifact_id == output_art.id
assert trace.producing_run is not None
assert trace.producing_run.run_id == run.id
assert trace.producing_run.template_id == template.id
# Verify serialization
d = trace.to_dict()
assert d["artifact_id"] == output_art.id
assert d["producing_run"]["run_id"] == run.id
def test_multi_level_dependency_chain(
self, artifact_repo, dep_repo, trace_service
):
"""Test tracing across a multi-level dependency chain."""
# A -> B -> C (A depends on B, B depends on C)
a = _create_artifact(artifact_repo, "s", "a", "content-a")
b = _create_artifact(artifact_repo, "s", "b", "content-b")
c = _create_artifact(artifact_repo, "s", "c", "content-c")
_create_edge(dep_repo, a.id, b.id, "r1")
_create_edge(dep_repo, b.id, c.id, "r1")
trace = trace_service.trace_artifact(a.id)
assert b.id in trace.dependency_chain
assert c.id in trace.dependency_chain
def test_generator_run_tracing(self, trace_service):
"""Test tracing nested generator runs."""
parent = PromptRun.create(
template_id="tmpl-1", input_bundle_hash="hash-1"
)
parent.mark_complete()
child = PromptRun.create(
template_id="tmpl-2",
input_bundle_hash="hash-2",
parent_run_id=parent.id,
depth=1,
)
child.mark_complete()
trace_service.register_run(parent)
trace_service.register_run(child)
# Trace should find generator runs from manifest context
trace = trace_service.trace_artifact("dummy-artifact-id")
# Generator runs are found via parent_run_id match
generators = trace_service.get_generator_runs(parent.id)
assert len(generators) == 1
assert generators[0].run_id == child.id
assert generators[0].depth == 1
class TestVisualizationIntegration:
"""Test visualization with real dependency data."""
def test_graph_export_from_real_data(self, artifact_repo, dep_repo):
"""Test DOT and Mermaid export from real DB data."""
from markitect.prompts.dependencies.graph import GraphBuilder
a = _create_artifact(artifact_repo, "s", "a")
b = _create_artifact(artifact_repo, "s", "b")
c = _create_artifact(artifact_repo, "s", "c")
_create_edge(dep_repo, a.id, b.id, "r1", EdgeType.REQUIRES)
_create_edge(dep_repo, b.id, c.id, "r1", EdgeType.GENERATES)
builder = GraphBuilder(dep_repo)
graph = builder.build_graph()
dot = GraphExporter.to_dot(graph, "Test Graph")
assert "digraph" in dot
assert "requires" in dot
assert "generates" in dot
mermaid = GraphExporter.to_mermaid(graph, "Test Graph")
assert "graph LR" in mermaid
assert "requires" in mermaid
assert "generates" in mermaid
class TestQueryServiceIntegration:
"""Test PromptQueryService with real data."""
def test_dependency_stats_with_data(
self, artifact_repo, dep_repo, query_service
):
"""Test stats with actual artifacts and edges."""
a = _create_artifact(artifact_repo, "s", "root")
b = _create_artifact(artifact_repo, "s", "mid")
c = _create_artifact(artifact_repo, "s", "leaf")
_create_edge(dep_repo, a.id, b.id, "r1")
_create_edge(dep_repo, b.id, c.id, "r1")
stats = query_service.get_dependency_stats()
assert stats["total_nodes"] == 3
assert stats["total_edges"] == 2
assert stats["root_count"] == 1
assert stats["leaf_count"] == 1
assert stats["has_cycles"] is False
def test_find_artifacts_by_digest_integration(
self, artifact_repo, query_service
):
"""Test finding artifacts by digest across spaces."""
a1 = _create_artifact(artifact_repo, "space-1", "doc-a", "shared content")
a2 = _create_artifact(artifact_repo, "space-2", "doc-b", "shared content")
results = query_service.find_artifacts_by_digest(a1.content_digest)
assert len(results) == 2
ids = {r["artifact_id"] for r in results}
assert a1.id in ids
assert a2.id in ids
def test_run_history_with_filters(self, query_service):
"""Test run history with template and status filters."""
from markitect.prompts.execution.models import RunStatus
run_ok = PromptRun.create(template_id="t1", input_bundle_hash="h1")
run_ok.mark_complete()
run_fail = PromptRun.create(template_id="t2", input_bundle_hash="h2")
run_fail.mark_failed("error")
query_service.register_run(run_ok)
query_service.register_run(run_fail)
# All runs
all_runs = query_service.get_run_history()
assert len(all_runs) == 2
# Filter by template
t1_runs = query_service.get_run_history(template_id="t1")
assert len(t1_runs) == 1
assert t1_runs[0]["template_id"] == "t1"
# Filter by status
failed = query_service.get_run_history(status="failed")
assert len(failed) == 1
assert failed[0]["status"] == "failed"

View File

@@ -0,0 +1,184 @@
"""
Unit tests for PromptQueryService.
Tests run history, impact analysis queries, dependency stats,
and artifact digest lookups.
"""
import pytest
import tempfile
from pathlib import Path
from markitect.prompts.dependencies.models import DependencyEdge, EdgeType
from markitect.prompts.dependencies.repository import SQLiteDependencyRepository
from markitect.prompts.execution.models import PromptRun, RunStatus
from markitect.prompts.models import Artifact, ArtifactType
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
from markitect.prompts.queries.operations import PromptQueryService
@pytest.fixture
def temp_db():
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def artifact_repo(temp_db):
return SQLiteArtifactRepository(temp_db)
@pytest.fixture
def dep_repo(temp_db):
return SQLiteDependencyRepository(temp_db)
@pytest.fixture
def query_service(artifact_repo, dep_repo, temp_db):
return PromptQueryService(artifact_repo, dep_repo, db_path=temp_db)
def _make_artifact(repo, space_id, name, content="content"):
artifact = Artifact.create(
space_id=space_id, name=name, content=content,
artifact_type=ArtifactType.CONTENT,
)
return repo.create(artifact)
def _make_run(template_id, status=RunStatus.SUCCESS):
run = PromptRun.create(
template_id=template_id,
input_bundle_hash="hash-abc",
)
if status == RunStatus.SUCCESS:
run.mark_complete()
elif status == RunStatus.FAILED:
run.mark_failed("error")
return run
def _make_edge(repo, source, target, run_id):
edge = DependencyEdge.create(
source_artifact_id=source,
target_artifact_id=target,
run_id=run_id,
edge_type=EdgeType.REQUIRES,
)
return repo.create(edge)
class TestRunHistory:
"""Tests for get_run_history."""
def test_returns_all_runs(self, query_service):
"""Test returning all registered runs."""
run1 = _make_run("tmpl-1")
run2 = _make_run("tmpl-2")
query_service.register_run(run1)
query_service.register_run(run2)
history = query_service.get_run_history()
assert len(history) == 2
def test_filter_by_template(self, query_service):
"""Test filtering by template_id."""
run1 = _make_run("tmpl-1")
run2 = _make_run("tmpl-2")
query_service.register_run(run1)
query_service.register_run(run2)
history = query_service.get_run_history(template_id="tmpl-1")
assert len(history) == 1
assert history[0]["template_id"] == "tmpl-1"
def test_filter_by_status(self, query_service):
"""Test filtering by status."""
run_ok = _make_run("tmpl-1", RunStatus.SUCCESS)
run_fail = _make_run("tmpl-1", RunStatus.FAILED)
query_service.register_run(run_ok)
query_service.register_run(run_fail)
history = query_service.get_run_history(status="success")
assert len(history) == 1
assert history[0]["status"] == "success"
def test_limit(self, query_service):
"""Test limit parameter."""
for i in range(10):
run = _make_run(f"tmpl-{i}")
query_service.register_run(run)
history = query_service.get_run_history(limit=3)
assert len(history) == 3
def test_empty_history(self, query_service):
"""Test empty run history."""
assert query_service.get_run_history() == []
class TestStaleArtifacts:
"""Tests for get_stale_artifacts."""
def test_no_debt_returns_empty(self, query_service):
"""Test returns empty when no debt exists."""
assert query_service.get_stale_artifacts() == []
def test_no_engine_returns_empty(self, artifact_repo, dep_repo):
"""Test returns empty without db_path."""
svc = PromptQueryService(artifact_repo, dep_repo, db_path=None)
assert svc.get_stale_artifacts() == []
class TestDependencyStats:
"""Tests for get_dependency_stats."""
def test_empty_graph(self, query_service):
"""Test stats for empty graph."""
stats = query_service.get_dependency_stats()
assert stats["total_nodes"] == 0
assert stats["total_edges"] == 0
assert stats["has_cycles"] is False
def test_simple_graph(self, query_service, artifact_repo, dep_repo):
"""Test stats for a simple graph."""
a = _make_artifact(artifact_repo, "s", "a")
b = _make_artifact(artifact_repo, "s", "b")
c = _make_artifact(artifact_repo, "s", "c")
_make_edge(dep_repo, a.id, b.id, "r1")
_make_edge(dep_repo, b.id, c.id, "r1")
stats = query_service.get_dependency_stats()
assert stats["total_nodes"] == 3
assert stats["total_edges"] == 2
assert stats["root_count"] == 1 # 'a' is root
assert stats["leaf_count"] == 1 # 'c' is leaf
assert stats["has_cycles"] is False
class TestFindArtifactsByDigest:
"""Tests for find_artifacts_by_digest."""
def test_finds_matching(self, query_service, artifact_repo):
"""Test finding artifacts by content digest."""
art = _make_artifact(artifact_repo, "s", "test-art", "hello")
results = query_service.find_artifacts_by_digest(art.content_digest)
assert len(results) == 1
assert results[0]["artifact_id"] == art.id
def test_no_match(self, query_service):
"""Test returns empty for non-matching digest."""
assert query_service.find_artifacts_by_digest("nonexistent") == []
def test_multiple_matches(self, query_service, artifact_repo):
"""Test finding multiple artifacts with same digest."""
_make_artifact(artifact_repo, "s1", "a", "same-content")
_make_artifact(artifact_repo, "s2", "a", "same-content")
art = _make_artifact(artifact_repo, "s3", "a", "same-content")
results = query_service.find_artifacts_by_digest(art.content_digest)
assert len(results) == 3

View File

@@ -0,0 +1,356 @@
"""
Unit tests for TraceabilityService.
Tests trace_artifact, get_producing_run, get_input_artifacts,
get_generator_runs, and get_validation_history.
"""
import pytest
import tempfile
from datetime import datetime
from pathlib import Path
from unittest.mock import MagicMock
from markitect.prompts.dependencies.models import DependencyEdge, EdgeType
from markitect.prompts.dependencies.repository import SQLiteDependencyRepository
from markitect.prompts.execution.models import (
ExecutionStage,
PromptRun,
RunConfig,
RunStatus,
)
from markitect.prompts.models import Artifact, ArtifactType
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
from markitect.prompts.traceability.models import (
ArtifactLineage,
ProvenanceTrace,
RunSummary,
)
from markitect.prompts.traceability.service import TraceabilityService
@pytest.fixture
def temp_db():
"""Create temporary database for testing."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def artifact_repo(temp_db):
"""Create artifact repository."""
return SQLiteArtifactRepository(temp_db)
@pytest.fixture
def dep_repo(temp_db):
"""Create dependency repository."""
return SQLiteDependencyRepository(temp_db)
@pytest.fixture
def service(artifact_repo, dep_repo, temp_db):
"""Create TraceabilityService."""
return TraceabilityService(artifact_repo, dep_repo, db_path=temp_db)
def _make_artifact(repo, space_id, name, content="content", atype=ArtifactType.CONTENT):
"""Helper to create and persist an artifact."""
artifact = Artifact.create(
space_id=space_id, name=name, content=content, artifact_type=atype
)
return repo.create(artifact)
def _make_run(template_id, status=RunStatus.SUCCESS, parent_run_id=None, depth=0):
"""Helper to create a PromptRun."""
run = PromptRun.create(
template_id=template_id,
input_bundle_hash="hash-abc",
parent_run_id=parent_run_id,
depth=depth,
)
if status == RunStatus.SUCCESS:
run.mark_complete()
return run
def _make_edge(repo, source, target, run_id, edge_type=EdgeType.REQUIRES):
"""Helper to create a dependency edge."""
edge = DependencyEdge.create(
source_artifact_id=source,
target_artifact_id=target,
run_id=run_id,
edge_type=edge_type,
)
return repo.create(edge)
class TestTraceArtifact:
"""Tests for trace_artifact."""
def test_trace_returns_provenance(self, service, artifact_repo, dep_repo):
"""Test trace_artifact returns a ProvenanceTrace."""
art = _make_artifact(artifact_repo, "space-1", "output-doc")
trace = service.trace_artifact(art.id)
assert isinstance(trace, ProvenanceTrace)
assert trace.artifact_id == art.id
def test_trace_with_producing_run(self, service, artifact_repo, dep_repo):
"""Test trace finds producing run via manifest metadata."""
template = _make_artifact(
artifact_repo, "space-1", "tmpl", atype=ArtifactType.TEMPLATE
)
output = _make_artifact(artifact_repo, "space-1", "output")
run = _make_run(template.id)
run.metadata["manifest"] = {
"output_artifacts": [{"artifact_id": output.id}],
"resolved_inputs": [],
}
service.register_run(run)
trace = service.trace_artifact(output.id)
assert trace.producing_run is not None
assert trace.producing_run.run_id == run.id
def test_trace_no_producing_run(self, service, artifact_repo):
"""Test trace with no producing run returns None."""
art = _make_artifact(artifact_repo, "space-1", "standalone")
trace = service.trace_artifact(art.id)
assert trace.producing_run is None
def test_trace_includes_dependency_chain(self, service, artifact_repo, dep_repo):
"""Test trace includes transitive dependency chain."""
a = _make_artifact(artifact_repo, "s", "a")
b = _make_artifact(artifact_repo, "s", "b")
c = _make_artifact(artifact_repo, "s", "c")
_make_edge(dep_repo, a.id, b.id, "r1")
_make_edge(dep_repo, b.id, c.id, "r1")
trace = service.trace_artifact(a.id)
assert b.id in trace.dependency_chain
assert c.id in trace.dependency_chain
def test_trace_to_dict(self, service, artifact_repo):
"""Test ProvenanceTrace serialization."""
art = _make_artifact(artifact_repo, "s", "x")
trace = service.trace_artifact(art.id)
d = trace.to_dict()
assert d["artifact_id"] == art.id
assert "producing_run" in d
assert "input_artifacts" in d
assert "dependency_chain" in d
class TestGetProducingRun:
"""Tests for get_producing_run."""
def test_finds_run_via_generates_edge(self, service, artifact_repo, dep_repo):
"""Test finding run via generates dependency edge."""
output = _make_artifact(artifact_repo, "s", "output")
run = _make_run("tmpl-1")
service.register_run(run)
# Create a "generates" edge from run.id -> output.id
_make_edge(dep_repo, run.id, output.id, run.id, EdgeType.GENERATES)
result = service.get_producing_run(output.id)
assert result is not None
assert result.run_id == run.id
def test_finds_run_via_manifest(self, service, artifact_repo):
"""Test finding run via manifest output_artifacts."""
output = _make_artifact(artifact_repo, "s", "output")
run = _make_run("tmpl-1")
run.metadata["manifest"] = {
"output_artifacts": [{"artifact_id": output.id}],
}
service.register_run(run)
result = service.get_producing_run(output.id)
assert result is not None
assert result.run_id == run.id
def test_returns_none_when_not_found(self, service):
"""Test returns None when no producing run exists."""
result = service.get_producing_run("nonexistent")
assert result is None
class TestGetInputArtifacts:
"""Tests for get_input_artifacts."""
def test_finds_inputs_via_edges(self, service, artifact_repo, dep_repo):
"""Test finding input artifacts via dependency edges."""
inp = _make_artifact(artifact_repo, "s", "input-data")
run = _make_run("tmpl-1")
service.register_run(run)
_make_edge(dep_repo, inp.id, run.id, run.id, EdgeType.REQUIRES)
inputs = service.get_input_artifacts(run.id)
assert len(inputs) == 1
assert inputs[0].artifact_id == inp.id
assert inputs[0].role == "input"
def test_finds_inputs_via_manifest(self, service, artifact_repo, dep_repo):
"""Test finding input artifacts via manifest resolved_inputs."""
inp = _make_artifact(artifact_repo, "s", "input-data")
run = _make_run("tmpl-1")
run.metadata["manifest"] = {
"resolved_inputs": [{"artifact_id": inp.id}],
"output_artifacts": [],
}
service.register_run(run)
inputs = service.get_input_artifacts(run.id)
assert len(inputs) == 1
assert inputs[0].artifact_id == inp.id
def test_no_duplicates(self, service, artifact_repo, dep_repo):
"""Test inputs are deduplicated across edges and manifest."""
inp = _make_artifact(artifact_repo, "s", "input-data")
run = _make_run("tmpl-1")
run.metadata["manifest"] = {
"resolved_inputs": [{"artifact_id": inp.id}],
"output_artifacts": [],
}
service.register_run(run)
_make_edge(dep_repo, inp.id, run.id, run.id, EdgeType.REQUIRES)
inputs = service.get_input_artifacts(run.id)
assert len(inputs) == 1
class TestGetOutputArtifacts:
"""Tests for get_output_artifacts."""
def test_finds_outputs_via_edges(self, service, artifact_repo, dep_repo):
"""Test finding output artifacts via generates edges."""
output = _make_artifact(artifact_repo, "s", "output")
run = _make_run("tmpl-1")
service.register_run(run)
_make_edge(dep_repo, run.id, output.id, run.id, EdgeType.GENERATES)
outputs = service.get_output_artifacts(run.id)
assert len(outputs) == 1
assert outputs[0].artifact_id == output.id
assert outputs[0].role == "output"
class TestGetGeneratorRuns:
"""Tests for get_generator_runs."""
def test_finds_child_runs(self, service):
"""Test finding nested generator runs."""
parent = _make_run("tmpl-1")
child1 = _make_run("tmpl-2", parent_run_id=parent.id, depth=1)
child2 = _make_run("tmpl-3", parent_run_id=parent.id, depth=1)
unrelated = _make_run("tmpl-4")
service.register_run(parent)
service.register_run(child1)
service.register_run(child2)
service.register_run(unrelated)
children = service.get_generator_runs(parent.id)
child_ids = {c.run_id for c in children}
assert child1.id in child_ids
assert child2.id in child_ids
assert unrelated.id not in child_ids
def test_no_children(self, service):
"""Test returns empty when no child runs exist."""
parent = _make_run("tmpl-1")
service.register_run(parent)
assert service.get_generator_runs(parent.id) == []
class TestGetValidationHistory:
"""Tests for get_validation_history."""
def test_returns_empty_without_db(self, artifact_repo, dep_repo):
"""Test returns empty list when no db_path."""
svc = TraceabilityService(artifact_repo, dep_repo, db_path=None)
assert svc.get_validation_history("art-1") == []
def test_returns_results_with_db(self, service):
"""Test returns results from validator when db_path is set."""
# Without actually writing validation data, should return empty
results = service.get_validation_history("art-1")
assert results == []
class TestGetImpactDebt:
"""Tests for get_impact_debt."""
def test_returns_empty_without_db(self, artifact_repo, dep_repo):
"""Test returns empty list when no db_path."""
svc = TraceabilityService(artifact_repo, dep_repo, db_path=None)
assert svc.get_impact_debt("art-1") == []
def test_returns_empty_no_debt(self, service):
"""Test returns empty when no debt exists."""
assert service.get_impact_debt("nonexistent") == []
class TestRunSummary:
"""Tests for RunSummary model."""
def test_create_and_to_dict(self):
"""Test RunSummary creation and serialization."""
now = datetime.utcnow()
summary = RunSummary.create(
run_id="r1",
template_id="t1",
status="success",
stage="complete",
input_bundle_hash="hash",
started_at=now,
completed_at=now,
)
d = summary.to_dict()
assert d["run_id"] == "r1"
assert d["status"] == "success"
assert d["completed_at"] is not None
def test_optional_fields(self):
"""Test RunSummary with optional fields as None."""
now = datetime.utcnow()
summary = RunSummary.create(
run_id="r1",
template_id="t1",
status="pending",
stage="pending",
input_bundle_hash="hash",
started_at=now,
)
d = summary.to_dict()
assert d["parent_run_id"] is None
assert d["completed_at"] is None
assert d["depth"] == 0
class TestArtifactLineage:
"""Tests for ArtifactLineage model."""
def test_to_dict(self):
"""Test ArtifactLineage serialization."""
lineage = ArtifactLineage(
artifact_id="a1",
name="test",
space_id="s1",
artifact_type="content",
content_digest="abc123",
role="input",
)
d = lineage.to_dict()
assert d["artifact_id"] == "a1"
assert d["role"] == "input"

View File

@@ -0,0 +1,141 @@
"""
Unit tests for GraphExporter.
Tests DOT and Mermaid export with various graph shapes.
"""
import pytest
from markitect.prompts.dependencies.models import DependencyGraph, EdgeType
from markitect.prompts.visualization.graph import GraphExporter
@pytest.fixture
def empty_graph():
"""Empty graph with no nodes."""
return DependencyGraph()
@pytest.fixture
def single_node_graph():
"""Graph with a single isolated node."""
g = DependencyGraph()
g._forward["node-a"] = set()
g._reverse["node-a"] = set()
return g
@pytest.fixture
def linear_graph():
"""Linear chain: A -> B -> C."""
g = DependencyGraph()
g.add_edge("A", "B", EdgeType.REQUIRES)
g.add_edge("B", "C", EdgeType.GENERATES)
return g
@pytest.fixture
def diamond_graph():
"""Diamond: A -> B, A -> C, B -> D, C -> D."""
g = DependencyGraph()
g.add_edge("A", "B", EdgeType.REQUIRES)
g.add_edge("A", "C", EdgeType.REQUIRES)
g.add_edge("B", "D", EdgeType.GENERATES)
g.add_edge("C", "D", EdgeType.INCLUDES)
return g
class TestToDot:
"""Tests for to_dot export."""
def test_empty_graph(self, empty_graph):
"""Test DOT output for empty graph."""
dot = GraphExporter.to_dot(empty_graph)
assert 'digraph "Dependencies"' in dot
assert "rankdir=LR" in dot
def test_single_node(self, single_node_graph):
"""Test DOT output with single node."""
dot = GraphExporter.to_dot(single_node_graph)
assert "node_a" in dot
assert 'label="node-a"' in dot
def test_linear_graph(self, linear_graph):
"""Test DOT output for linear chain."""
dot = GraphExporter.to_dot(linear_graph)
assert "A -> B" in dot
assert "B -> C" in dot
assert 'label="requires"' in dot
assert 'label="generates"' in dot
def test_diamond_graph(self, diamond_graph):
"""Test DOT output for diamond graph."""
dot = GraphExporter.to_dot(diamond_graph)
assert "A -> B" in dot
assert "A -> C" in dot
assert "B -> D" in dot
assert "C -> D" in dot
def test_custom_title(self, linear_graph):
"""Test DOT output with custom title."""
dot = GraphExporter.to_dot(linear_graph, title="My Graph")
assert 'digraph "My Graph"' in dot
assert 'label="My Graph"' in dot
def test_edge_styles(self, linear_graph):
"""Test DOT edge styles for different edge types."""
dot = GraphExporter.to_dot(linear_graph)
assert 'style="solid"' in dot # REQUIRES
assert 'style="dashed"' in dot # GENERATES
def test_dot_is_valid_structure(self, diamond_graph):
"""Test DOT output has valid opening/closing braces."""
dot = GraphExporter.to_dot(diamond_graph)
assert dot.startswith('digraph')
assert dot.endswith("}")
class TestToMermaid:
"""Tests for to_mermaid export."""
def test_empty_graph(self, empty_graph):
"""Test Mermaid output for empty graph."""
mermaid = GraphExporter.to_mermaid(empty_graph)
assert "graph LR" in mermaid
def test_single_node(self, single_node_graph):
"""Test Mermaid output with single node."""
mermaid = GraphExporter.to_mermaid(single_node_graph)
assert "node-a" in mermaid
def test_linear_graph(self, linear_graph):
"""Test Mermaid output for linear chain."""
mermaid = GraphExporter.to_mermaid(linear_graph)
assert "A-->|requires|B" in mermaid
assert "B-.->|generates|C" in mermaid
def test_diamond_graph(self, diamond_graph):
"""Test Mermaid output for diamond graph."""
mermaid = GraphExporter.to_mermaid(diamond_graph)
assert "A-->|requires|B" in mermaid
assert "A-->|requires|C" in mermaid
assert "B-.->|generates|D" in mermaid
assert "C==>|includes|D" in mermaid
def test_custom_title(self, linear_graph):
"""Test Mermaid output with custom title."""
mermaid = GraphExporter.to_mermaid(linear_graph, title="Build Graph")
assert "Build Graph" in mermaid
def test_edge_arrows(self, diamond_graph):
"""Test Mermaid edge arrows for different types."""
mermaid = GraphExporter.to_mermaid(diamond_graph)
assert "-->" in mermaid # REQUIRES
assert "-.->" in mermaid # GENERATES
assert "==>" in mermaid # INCLUDES
def test_mermaid_starts_with_graph(self, linear_graph):
"""Test Mermaid output starts with graph directive."""
mermaid = GraphExporter.to_mermaid(linear_graph)
lines = mermaid.strip().split("\n")
assert "graph LR" in lines[1]