feat(prompts): implement Phase 4 - Execution Engine (FR-4, FR-5)
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled

Implement three-stage execution lifecycle with idempotent runs and complete
provenance tracking via RunManifest.

Core Features:
- PromptRun model with execution lifecycle stages:
  1. Analysis: Template analysis and macro extraction
  2. Compilation: Macro resolution and context compilation
  3. Processing: LLM execution and output generation
- InputBundleHash for deterministic idempotency (FR-4.3)
- RunManifest for complete execution provenance (FR-5)
- LLMAdapter interface for pluggable model providers
- MockLLMAdapter for testing without API calls
- PromptExecutionEngine orchestrating full lifecycle

Idempotent Execution (FR-4.4):
- Calculate SHA-256 hash of complete input context
- Skip execution if identical hash exists
- Cache successful runs by hash
- Support force re-execution via config flag

RunManifest Tracking (FR-5.2):
- Template metadata (id, name, digest)
- Resolved input artifacts and digests
- Compiled prompt digest
- Model configuration
- Output artifacts
- Dependency edges for graph construction
- Timing metadata for performance analysis

Tests (27 passing):
- 17 execution model tests (config, bundle, runs, stages)
- 10 engine tests (execution, idempotency, errors, caching)

Implements:
- FR-4.1: Three-stage execution lifecycle
- FR-4.2: CompiledPrompt during compilation
- FR-4.3: InputBundleHash calculation
- FR-4.4: Skip execution for identical hashes
- FR-5.1: RunManifest persistence
- FR-5.2: Complete manifest contents
- FR-5.3: Nested run linking (foundation)

Files Created:
- markitect/prompts/execution/models.py
- markitect/prompts/execution/manifest.py
- markitect/prompts/execution/llm_adapter.py
- markitect/prompts/execution/engine.py
- migrations/prompts/003_create_runs_and_manifests.sql
- tests/unit/prompts/test_execution_models.py
- tests/unit/prompts/test_execution_engine.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 23:15:33 +01:00
parent 5f463e5b20
commit c56c92c815
8 changed files with 1739 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
"""
Execution engine for Prompt Dependency Resolution.
This package provides the core execution infrastructure for running
PromptTemplates with idempotent execution and complete provenance tracking.
"""
from markitect.prompts.execution.models import (
PromptRun,
ExecutionStage,
RunConfig,
InputBundle,
LLMResponse,
)
from markitect.prompts.execution.manifest import RunManifest
from markitect.prompts.execution.engine import PromptExecutionEngine
from markitect.prompts.execution.llm_adapter import LLMAdapter, MockLLMAdapter
__all__ = [
"PromptRun",
"ExecutionStage",
"RunConfig",
"InputBundle",
"LLMResponse",
"RunManifest",
"PromptExecutionEngine",
"LLMAdapter",
"MockLLMAdapter",
]

View File

@@ -0,0 +1,268 @@
"""
Prompt execution engine.
Implements FR-4: PromptRun Lifecycle
Three-stage execution with idempotent runs.
"""
import time
from typing import Optional, Dict
from markitect.prompts.templates.models import PromptTemplate
from markitect.prompts.templates.analyzer import TemplateAnalyzer
from markitect.prompts.resolver.resolver import PromptResolver
from markitect.prompts.resolver.compiler import ContextCompiler
from markitect.prompts.resolver.strategy import ResolutionConfig
from markitect.prompts.execution.models import (
PromptRun,
ExecutionStage,
RunConfig,
RunStatus,
InputBundle,
)
from markitect.prompts.execution.manifest import RunManifest
from markitect.prompts.execution.llm_adapter import LLMAdapter
from markitect.prompts.models import calculate_content_digest, ArtifactType
from markitect.prompts.services.artifact_service import ArtifactService
class PromptExecutionEngine:
"""
Engine for executing prompt templates.
Implements FR-4: PromptRun Lifecycle
- Stage 1: Template Analysis
- Stage 2: Context Compilation
- Stage 3: Prompt Processing
Implements FR-4.4: Idempotent execution via InputBundleHash
"""
def __init__(
self,
artifact_service: ArtifactService,
template_analyzer: TemplateAnalyzer,
resolver: PromptResolver,
compiler: ContextCompiler,
llm_adapter: LLMAdapter,
):
"""
Initialize execution engine.
Args:
artifact_service: For artifact operations
template_analyzer: For template analysis
resolver: For macro resolution
compiler: For context compilation
llm_adapter: For LLM execution
"""
self.artifact_service = artifact_service
self.template_analyzer = template_analyzer
self.resolver = resolver
self.compiler = compiler
self.llm_adapter = llm_adapter
self.run_cache: Dict[str, PromptRun] = {} # Cache by input_bundle_hash
def execute(
self,
template: PromptTemplate,
template_content: str,
resolution_config: ResolutionConfig,
run_config: Optional[RunConfig] = None,
) -> PromptRun:
"""
Execute a prompt template.
Implements FR-4.1: Three-stage execution
1. Analysis: Analyze template and extract macros
2. Compilation: Resolve macros and compile prompt
3. Processing: Execute with LLM
Implements FR-4.4: Skip if identical InputBundleHash exists
Args:
template: Template to execute
template_content: Template content
resolution_config: Resolution configuration
run_config: Execution configuration
Returns:
PromptRun with execution results
"""
config = run_config or RunConfig()
# Stage 1: Template Analysis
start_time = time.time()
if not template.analyzed:
self.template_analyzer.analyze(template, template_content)
analysis_time = time.time() - start_time
# Stage 2: Context Compilation
start_time = time.time()
resolution_result = self.resolver.resolve_template(template, resolution_config)
if not resolution_result.success:
# Resolution failed - create failed run
run = self._create_failed_run(
template,
"Resolution failed: " + ", ".join(resolution_result.context.errors),
config,
)
return run
compiled = self.compiler.compile(template, template_content, resolution_result)
compilation_time = time.time() - start_time
# Calculate InputBundleHash (FR-4.3)
input_bundle = InputBundle(
template_digest=template.content_digest,
dependency_digests=compiled.dependency_digests,
resolution_config_hash=calculate_content_digest(
str(resolution_config.to_dict())
),
model_config={
"model_name": config.model_name,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
},
)
input_bundle_hash = input_bundle.calculate_hash()
# Check for existing run (FR-4.4)
if config.skip_if_exists and input_bundle_hash in self.run_cache:
existing_run = self.run_cache[input_bundle_hash]
# Create skipped run referencing existing
skipped_run = PromptRun.create(
template_id=template.id,
input_bundle_hash=input_bundle_hash,
config=config,
)
skipped_run.mark_skipped()
skipped_run.metadata["skipped_due_to"] = existing_run.id
return skipped_run
# Create run
run = PromptRun.create(
template_id=template.id,
input_bundle_hash=input_bundle_hash,
config=config,
)
# Create manifest
manifest = RunManifest.create(
run_id=run.id,
template_id=template.id,
template_name=template.name,
template_digest=template.content_digest,
)
manifest.compiled_prompt_digest = compiled.content_digest
manifest.model_config = config.to_dict()
# Add resolved inputs to manifest
for resolved in resolution_result.context.resolved_macros:
if resolved.artifact:
manifest.add_resolved_input(
name=resolved.artifact.name,
artifact_id=resolved.artifact.id,
space_id=resolved.space_id or "",
digest=resolved.artifact.content_digest,
)
# Add dependency edge
manifest.add_dependency_edge(
source_id=resolved.artifact.id,
target_id=run.id,
edge_type="requires",
)
# Record timing
manifest.set_timing("analysis", analysis_time)
manifest.set_timing("compilation", compilation_time)
# Stage 3: Prompt Processing
run.advance_stage(ExecutionStage.PROCESSING)
try:
start_time = time.time()
llm_response = self.llm_adapter.execute_prompt(
compiled.content,
config,
)
processing_time = time.time() - start_time
manifest.set_timing("processing", processing_time)
# Store output as generated artifact
output_artifact = self.artifact_service.create_artifact(
space_id=template.space_id,
name=f"{template.name}-output-{run.id[:8]}",
content=llm_response.content,
artifact_type=ArtifactType.GENERATED,
)
manifest.add_output_artifact(
artifact_id=output_artifact.id,
name=output_artifact.name,
digest=output_artifact.content_digest,
artifact_type=output_artifact.artifact_type.value,
)
# Add generation edge
manifest.add_dependency_edge(
source_id=run.id,
target_id=output_artifact.id,
edge_type="generates",
)
# Mark complete
run.mark_complete()
run.metadata["manifest"] = manifest.to_dict()
run.metadata["output_artifact_id"] = output_artifact.id
# Cache run
self.run_cache[input_bundle_hash] = run
except Exception as e:
run.mark_failed(str(e))
run.metadata["manifest"] = manifest.to_dict()
return run
def _create_failed_run(
self,
template: PromptTemplate,
error: str,
config: RunConfig,
) -> PromptRun:
"""
Create a failed run.
Args:
template: Template
error: Error message
config: Config
Returns:
Failed PromptRun
"""
run = PromptRun.create(
template_id=template.id,
input_bundle_hash="failed",
config=config,
)
run.mark_failed(error)
return run
def get_run_by_hash(self, input_bundle_hash: str) -> Optional[PromptRun]:
"""
Retrieve cached run by input bundle hash.
Args:
input_bundle_hash: Hash to lookup
Returns:
PromptRun if found, None otherwise
"""
return self.run_cache.get(input_bundle_hash)
def clear_cache(self) -> None:
"""Clear the run cache."""
self.run_cache.clear()

View File

@@ -0,0 +1,169 @@
"""
LLM adapter interface for pluggable model providers.
Implements abstraction layer for LLM integration, supporting
multiple providers (OpenAI, Anthropic, local models, etc.).
"""
from abc import ABC, abstractmethod
from typing import Dict, Any
from markitect.prompts.execution.models import RunConfig, LLMResponse
class LLMAdapter(ABC):
"""
Abstract base class for LLM providers.
Enables pluggable LLM backends without prescribing implementation.
Implementations can wrap OpenAI, Anthropic, or other APIs.
"""
@abstractmethod
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Execute a prompt with the LLM.
Args:
prompt: Compiled prompt text
config: Execution configuration
Returns:
LLMResponse with generated content
Raises:
Exception: On LLM API errors
"""
pass
@abstractmethod
def validate_config(self, config: RunConfig) -> bool:
"""
Validate that configuration is supported.
Args:
config: Configuration to validate
Returns:
True if valid, False otherwise
"""
pass
class MockLLMAdapter(LLMAdapter):
"""
Mock LLM adapter for testing.
Returns deterministic responses without calling external APIs.
"""
def __init__(self, mock_response: str = "Mock LLM response"):
"""
Initialize mock adapter.
Args:
mock_response: Response to return
"""
self.mock_response = mock_response
self.call_count = 0
self.last_prompt = None
self.last_config = None
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Return mock response.
Args:
prompt: Prompt (stored for inspection)
config: Config (stored for inspection)
Returns:
Mock LLMResponse
"""
self.call_count += 1
self.last_prompt = prompt
self.last_config = config
return LLMResponse(
content=self.mock_response,
model=config.model_name,
usage={
"prompt_tokens": len(prompt.split()),
"completion_tokens": len(self.mock_response.split()),
"total_tokens": len(prompt.split()) + len(self.mock_response.split()),
},
finish_reason="stop",
metadata={"mock": True},
)
def validate_config(self, config: RunConfig) -> bool:
"""
Mock validation always succeeds.
Args:
config: Configuration
Returns:
Always True
"""
return True
def reset(self) -> None:
"""Reset mock state."""
self.call_count = 0
self.last_prompt = None
self.last_config = None
class ErrorLLMAdapter(LLMAdapter):
"""
Mock adapter that always raises an error.
Useful for testing error handling.
"""
def __init__(self, error_message: str = "Mock LLM error"):
"""
Initialize error adapter.
Args:
error_message: Error message to raise
"""
self.error_message = error_message
def execute_prompt(
self,
prompt: str,
config: RunConfig,
) -> LLMResponse:
"""
Raise error.
Args:
prompt: Prompt
config: Config
Raises:
RuntimeError: Always
"""
raise RuntimeError(self.error_message)
def validate_config(self, config: RunConfig) -> bool:
"""
Validation succeeds.
Args:
config: Configuration
Returns:
True
"""
return True

View File

@@ -0,0 +1,291 @@
"""
RunManifest for execution provenance tracking.
Implements FR-5: RunManifest Persistence
Complete record of execution with all inputs, outputs, and metadata.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List, Optional
@dataclass
class ResolvedInput:
"""
Record of a resolved input artifact.
Attributes:
name: Artifact name
artifact_id: Artifact ID
space_id: Space where artifact was found
digest: Content digest
"""
name: str
artifact_id: str
space_id: str
digest: str
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"name": self.name,
"artifact_id": self.artifact_id,
"space_id": self.space_id,
"digest": self.digest,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ResolvedInput":
"""Create from dictionary."""
return cls(
name=data["name"],
artifact_id=data["artifact_id"],
space_id=data["space_id"],
digest=data["digest"],
)
@dataclass
class DependencyEdge:
"""
Dependency edge in execution graph.
Attributes:
source_id: Source artifact/run ID
target_id: Target artifact/run ID
edge_type: Type of dependency (requires, generates, includes)
"""
source_id: str
target_id: str
edge_type: str
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"source_id": self.source_id,
"target_id": self.target_id,
"edge_type": self.edge_type,
}
@dataclass
class OutputArtifact:
"""
Artifact produced by execution.
Attributes:
artifact_id: Artifact ID
name: Artifact name
digest: Content digest
artifact_type: Type of artifact
"""
artifact_id: str
name: str
digest: str
artifact_type: str
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"artifact_id": self.artifact_id,
"name": self.name,
"digest": self.digest,
"artifact_type": self.artifact_type,
}
@dataclass
class RunManifest:
"""
Complete execution manifest with provenance.
Implements FR-5: RunManifest Persistence
The RunManifest provides complete traceability for a prompt execution,
capturing all inputs, outputs, configuration, and metadata.
Implements FR-5.2: RunManifest contents:
- Template metadata
- Resolved inputs and their digests
- CompiledPrompt digest
- Model configuration
- Output artifacts and digests
- Dependency edges
- Validation results
- Impact debt records (if applicable)
Attributes:
run_id: ID of associated run
template_metadata: Template information
resolved_inputs: List of resolved input artifacts
compiled_prompt_digest: Digest of compiled prompt
model_config: Model configuration used
output_artifacts: List of produced artifacts
dependency_edges: Dependency graph edges
validation_results: Quality validation results
impact_debt: Suppressed recomputation records
timing_metadata: Execution timing information
created_at: Manifest creation time
"""
run_id: str
template_metadata: Dict[str, Any]
resolved_inputs: List[ResolvedInput] = field(default_factory=list)
compiled_prompt_digest: str = ""
model_config: Dict[str, Any] = field(default_factory=dict)
output_artifacts: List[OutputArtifact] = field(default_factory=list)
dependency_edges: List[DependencyEdge] = field(default_factory=list)
validation_results: Dict[str, Any] = field(default_factory=dict)
impact_debt: List[Dict[str, Any]] = field(default_factory=list)
timing_metadata: Dict[str, float] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
@classmethod
def create(
cls,
run_id: str,
template_id: str,
template_name: str,
template_digest: str,
) -> "RunManifest":
"""
Create a new manifest.
Args:
run_id: Run ID
template_id: Template ID
template_name: Template name
template_digest: Template content digest
Returns:
New RunManifest instance
"""
return cls(
run_id=run_id,
template_metadata={
"template_id": template_id,
"template_name": template_name,
"template_digest": template_digest,
},
)
def add_resolved_input(
self,
name: str,
artifact_id: str,
space_id: str,
digest: str,
) -> None:
"""
Add a resolved input artifact.
Args:
name: Artifact name
artifact_id: Artifact ID
space_id: Space ID
digest: Content digest
"""
self.resolved_inputs.append(
ResolvedInput(
name=name,
artifact_id=artifact_id,
space_id=space_id,
digest=digest,
)
)
def add_output_artifact(
self,
artifact_id: str,
name: str,
digest: str,
artifact_type: str,
) -> None:
"""
Add an output artifact.
Args:
artifact_id: Artifact ID
name: Artifact name
digest: Content digest
artifact_type: Artifact type
"""
self.output_artifacts.append(
OutputArtifact(
artifact_id=artifact_id,
name=name,
digest=digest,
artifact_type=artifact_type,
)
)
def add_dependency_edge(
self,
source_id: str,
target_id: str,
edge_type: str,
) -> None:
"""
Add a dependency edge.
Args:
source_id: Source ID
target_id: Target ID
edge_type: Edge type
"""
self.dependency_edges.append(
DependencyEdge(
source_id=source_id,
target_id=target_id,
edge_type=edge_type,
)
)
def set_timing(self, stage: str, duration_seconds: float) -> None:
"""
Record timing for a stage.
Args:
stage: Stage name
duration_seconds: Duration in seconds
"""
self.timing_metadata[stage] = duration_seconds
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"run_id": self.run_id,
"template_metadata": self.template_metadata,
"resolved_inputs": [inp.to_dict() for inp in self.resolved_inputs],
"compiled_prompt_digest": self.compiled_prompt_digest,
"model_config": self.model_config,
"output_artifacts": [out.to_dict() for out in self.output_artifacts],
"dependency_edges": [edge.to_dict() for edge in self.dependency_edges],
"validation_results": self.validation_results,
"impact_debt": self.impact_debt,
"timing_metadata": self.timing_metadata,
"created_at": self.created_at.isoformat(),
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RunManifest":
"""Create from dictionary."""
return cls(
run_id=data["run_id"],
template_metadata=data["template_metadata"],
resolved_inputs=[
ResolvedInput.from_dict(inp) for inp in data.get("resolved_inputs", [])
],
compiled_prompt_digest=data.get("compiled_prompt_digest", ""),
model_config=data.get("model_config", {}),
output_artifacts=[
OutputArtifact(**out) for out in data.get("output_artifacts", [])
],
dependency_edges=[
DependencyEdge(**edge) for edge in data.get("dependency_edges", [])
],
validation_results=data.get("validation_results", {}),
impact_debt=data.get("impact_debt", []),
timing_metadata=data.get("timing_metadata", {}),
created_at=datetime.fromisoformat(data["created_at"]),
)

View File

@@ -0,0 +1,303 @@
"""
Models for prompt execution.
Implements FR-4: PromptRun Lifecycle
Defines execution stages, run configurations, and input bundles.
"""
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Any, List, Optional
from enum import Enum
from markitect.prompts.models import calculate_bundle_digest
class ExecutionStage(Enum):
"""
Execution lifecycle stages.
Implements FR-4.1: PromptRun execution stages
"""
PENDING = "pending" # Not started
ANALYSIS = "analysis" # Template analysis
COMPILATION = "compilation" # Context compilation
PROCESSING = "processing" # LLM execution
COMPLETE = "complete" # Successfully finished
FAILED = "failed" # Execution failed
class RunStatus(Enum):
"""Overall status of a run."""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped" # Skipped due to identical InputBundleHash
@dataclass
class RunConfig:
"""
Configuration for prompt execution.
Attributes:
model_name: LLM model to use
temperature: Model temperature (0.0-1.0)
max_tokens: Maximum tokens to generate
model_params: Additional model parameters
max_depth: Maximum generation depth for nested runs
skip_if_exists: Skip if identical InputBundleHash exists (FR-4.4)
timeout_seconds: Execution timeout
"""
model_name: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 2000
model_params: Dict[str, Any] = field(default_factory=dict)
max_depth: int = 3
skip_if_exists: bool = True
timeout_seconds: int = 300
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"model_name": self.model_name,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"model_params": self.model_params,
"max_depth": self.max_depth,
"skip_if_exists": self.skip_if_exists,
"timeout_seconds": self.timeout_seconds,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RunConfig":
"""Create from dictionary."""
return cls(
model_name=data.get("model_name", "gpt-4"),
temperature=data.get("temperature", 0.7),
max_tokens=data.get("max_tokens", 2000),
model_params=data.get("model_params", {}),
max_depth=data.get("max_depth", 3),
skip_if_exists=data.get("skip_if_exists", True),
timeout_seconds=data.get("timeout_seconds", 300),
)
@dataclass
class InputBundle:
"""
Complete input context for execution.
Implements FR-4.3: InputBundleHash calculation
The InputBundle captures all inputs that affect execution output,
enabling idempotent execution through content-based hashing.
Attributes:
template_digest: SHA-256 digest of template content
dependency_digests: Map of dependency name -> digest
resolution_config_hash: Hash of resolution configuration
model_config: Model configuration
compilation_options: Compilation settings
"""
template_digest: str
dependency_digests: Dict[str, str]
resolution_config_hash: str
model_config: Dict[str, Any]
compilation_options: Dict[str, Any] = field(default_factory=dict)
def calculate_hash(self) -> str:
"""
Calculate deterministic hash of input bundle.
Implements FR-4.3: InputBundleHash calculation
Components (sorted for determinism):
1. Template content digest
2. Sorted dependency digests by name
3. Resolution configuration hash
4. Model settings (name, temperature, etc.)
5. Compilation options
Returns:
SHA-256 hash of complete input bundle
"""
components = {
"template": self.template_digest,
"dependencies": ":".join(
f"{k}={v}" for k, v in sorted(self.dependency_digests.items())
),
"resolution_config": self.resolution_config_hash,
"model": ":".join(
f"{k}={v}" for k, v in sorted(self.model_config.items())
),
"compilation": ":".join(
f"{k}={v}" for k, v in sorted(self.compilation_options.items())
),
}
return calculate_bundle_digest(components)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"template_digest": self.template_digest,
"dependency_digests": self.dependency_digests,
"resolution_config_hash": self.resolution_config_hash,
"model_config": self.model_config,
"compilation_options": self.compilation_options,
"input_bundle_hash": self.calculate_hash(),
}
@dataclass
class LLMResponse:
"""
Response from LLM execution.
Attributes:
content: Generated content
model: Model used
usage: Token usage statistics
finish_reason: Why generation stopped
metadata: Additional response metadata
"""
content: str
model: str
usage: Dict[str, int] = field(default_factory=dict)
finish_reason: str = "stop"
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"content": self.content,
"model": self.model,
"usage": self.usage,
"finish_reason": self.finish_reason,
"metadata": self.metadata,
}
@dataclass
class PromptRun:
"""
Record of a prompt template execution.
Implements FR-4: PromptRun Lifecycle
Tracks complete execution state through all stages:
Analysis → Compilation → Processing → Complete/Failed
Attributes:
id: Unique run identifier
template_id: ID of template being executed
input_bundle_hash: Hash of input bundle for idempotency
status: Overall run status
stage: Current execution stage
parent_run_id: Parent run ID (for nested generators)
depth: Nesting depth (0 for top-level)
config: Execution configuration
started_at: Execution start time
completed_at: Execution completion time
error_message: Error message if failed
metadata: Additional run metadata
"""
id: str
template_id: str
input_bundle_hash: str
status: RunStatus = RunStatus.PENDING
stage: ExecutionStage = ExecutionStage.PENDING
parent_run_id: Optional[str] = None
depth: int = 0
config: RunConfig = field(default_factory=RunConfig)
started_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
template_id: str,
input_bundle_hash: str,
config: Optional[RunConfig] = None,
parent_run_id: Optional[str] = None,
depth: int = 0,
) -> "PromptRun":
"""
Create a new run.
Args:
template_id: Template being executed
input_bundle_hash: Hash of input bundle
config: Execution configuration
parent_run_id: Parent run ID for nested execution
depth: Nesting depth
Returns:
New PromptRun instance
"""
return cls(
id=str(uuid.uuid4()),
template_id=template_id,
input_bundle_hash=input_bundle_hash,
config=config or RunConfig(),
parent_run_id=parent_run_id,
depth=depth,
)
def advance_stage(self, stage: ExecutionStage) -> None:
"""
Advance to next execution stage.
Args:
stage: New stage
"""
self.stage = stage
if stage == ExecutionStage.PROCESSING:
self.status = RunStatus.RUNNING
def mark_complete(self) -> None:
"""Mark run as successfully completed."""
self.stage = ExecutionStage.COMPLETE
self.status = RunStatus.SUCCESS
self.completed_at = datetime.utcnow()
def mark_failed(self, error: str) -> None:
"""
Mark run as failed.
Args:
error: Error message
"""
self.stage = ExecutionStage.FAILED
self.status = RunStatus.FAILED
self.error_message = error
self.completed_at = datetime.utcnow()
def mark_skipped(self) -> None:
"""Mark run as skipped (identical hash exists)."""
self.status = RunStatus.SKIPPED
self.completed_at = datetime.utcnow()
def is_complete(self) -> bool:
"""Check if run is complete."""
return self.status in (RunStatus.SUCCESS, RunStatus.FAILED, RunStatus.SKIPPED)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"template_id": self.template_id,
"input_bundle_hash": self.input_bundle_hash,
"status": self.status.value,
"stage": self.stage.value,
"parent_run_id": self.parent_run_id,
"depth": self.depth,
"config": self.config.to_dict(),
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"error_message": self.error_message,
}

View File

@@ -0,0 +1,71 @@
-- Migration 003: Create prompt runs and manifests tables
-- Implements FR-4: PromptRun Lifecycle and FR-5: RunManifest Persistence
-- Date: 2026-02-08
-- Prompt runs table
CREATE TABLE IF NOT EXISTS prompt_runs (
id TEXT PRIMARY KEY,
template_id TEXT NOT NULL REFERENCES prompt_artifacts(id),
input_bundle_hash TEXT NOT NULL,
status TEXT NOT NULL,
stage TEXT NOT NULL,
parent_run_id TEXT REFERENCES prompt_runs(id),
depth INTEGER DEFAULT 0,
config JSON,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
metadata JSON
);
-- Run manifests table
CREATE TABLE IF NOT EXISTS run_manifests (
run_id TEXT PRIMARY KEY REFERENCES prompt_runs(id),
template_metadata JSON NOT NULL,
resolved_inputs JSON NOT NULL,
compiled_prompt_digest TEXT NOT NULL,
model_config JSON NOT NULL,
output_artifacts JSON,
dependency_edges JSON,
validation_results JSON,
impact_debt JSON,
timing_metadata JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_runs_template ON prompt_runs(template_id);
CREATE INDEX IF NOT EXISTS idx_runs_bundle_hash ON prompt_runs(input_bundle_hash);
CREATE INDEX IF NOT EXISTS idx_runs_parent ON prompt_runs(parent_run_id);
CREATE INDEX IF NOT EXISTS idx_runs_status ON prompt_runs(status);
-- Unique constraint for idempotency (FR-4.4)
-- Note: Allows multiple failed runs, but only one successful run per hash
CREATE UNIQUE INDEX IF NOT EXISTS idx_runs_successful_hash
ON prompt_runs(input_bundle_hash)
WHERE status = 'success';
-- Comments (for documentation)
-- prompt_runs.id: Unique UUID for this run
-- prompt_runs.template_id: Template being executed
-- prompt_runs.input_bundle_hash: SHA-256 hash for idempotency
-- prompt_runs.status: pending, running, success, failed, skipped
-- prompt_runs.stage: pending, analysis, compilation, processing, complete, failed
-- prompt_runs.parent_run_id: Parent run for nested generators
-- prompt_runs.depth: Nesting depth (0 for top-level)
-- prompt_runs.config: RunConfig JSON
-- prompt_runs.started_at: Execution start time
-- prompt_runs.completed_at: Execution completion time
-- prompt_runs.error_message: Error message if failed
-- prompt_runs.metadata: Additional metadata
-- run_manifests.run_id: Associated run ID
-- run_manifests.template_metadata: Template info (id, name, digest)
-- run_manifests.resolved_inputs: Array of resolved input artifacts
-- run_manifests.compiled_prompt_digest: Digest of compiled prompt
-- run_manifests.model_config: Model configuration used
-- run_manifests.output_artifacts: Array of produced artifacts
-- run_manifests.dependency_edges: Dependency graph edges
-- run_manifests.validation_results: Quality validation results
-- run_manifests.impact_debt: Suppressed recomputation records
-- run_manifests.timing_metadata: Stage timing information

View File

@@ -0,0 +1,368 @@
"""Unit tests for PromptExecutionEngine."""
import pytest
import tempfile
from pathlib import Path
from markitect.prompts.templates.models import PromptTemplate
from markitect.prompts.templates.analyzer import TemplateAnalyzer
from markitect.prompts.resolver.resolver import PromptResolver
from markitect.prompts.resolver.compiler import ContextCompiler
from markitect.prompts.resolver.strategy import (
MultiSpaceResolutionStrategy,
ResolutionConfig,
)
from markitect.prompts.execution.engine import PromptExecutionEngine
from markitect.prompts.execution.models import RunConfig, RunStatus, ExecutionStage
from markitect.prompts.execution.llm_adapter import MockLLMAdapter, ErrorLLMAdapter
from markitect.prompts.services.artifact_service import ArtifactService
from markitect.prompts.repositories.sqlite import SQLiteArtifactRepository
@pytest.fixture
def temp_db():
"""Create temporary database."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
db_path = f.name
yield db_path
Path(db_path).unlink(missing_ok=True)
@pytest.fixture
def artifact_service(temp_db):
"""Create artifact service."""
repository = SQLiteArtifactRepository(temp_db)
return ArtifactService(repository)
@pytest.fixture
def analyzer():
"""Create template analyzer."""
return TemplateAnalyzer()
@pytest.fixture
def resolver(artifact_service):
"""Create resolver."""
strategy = MultiSpaceResolutionStrategy()
return PromptResolver(artifact_service, strategy)
@pytest.fixture
def compiler():
"""Create compiler."""
return ContextCompiler()
@pytest.fixture
def mock_llm():
"""Create mock LLM adapter."""
return MockLLMAdapter(mock_response="Mock LLM output")
@pytest.fixture
def engine(artifact_service, analyzer, resolver, compiler, mock_llm):
"""Create execution engine."""
return PromptExecutionEngine(
artifact_service,
analyzer,
resolver,
compiler,
mock_llm,
)
class TestPromptExecutionEngine:
"""Tests for PromptExecutionEngine."""
def test_execute_simple_template(
self, engine, artifact_service, mock_llm
):
"""Test executing simple template."""
# Create dependency
artifact_service.create_artifact(
space_id="space-1",
name="intro",
content="Introduction text",
)
# Create template
content = "# Document\n{{require:intro}}\nMore content"
template = PromptTemplate.create(
space_id="space-1",
name="doc",
content=content,
)
# Execute
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Verify run
assert run.status == RunStatus.SUCCESS
assert run.stage == ExecutionStage.COMPLETE
assert run.completed_at is not None
assert "output_artifact_id" in run.metadata
# Verify LLM was called
assert mock_llm.call_count == 1
assert mock_llm.last_prompt is not None
def test_execute_with_failed_resolution(self, engine):
"""Test execution with missing required dependency."""
# Create template with missing dependency
content = "{{require:missing-dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Should fail during resolution
assert run.status == RunStatus.FAILED
assert "Resolution failed" in run.error_message
def test_idempotent_execution_skips_duplicate(
self, engine, artifact_service
):
"""Test idempotent execution (FR-4.4)."""
# Create dependency
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
config = RunConfig(skip_if_exists=True)
# First execution
run1 = engine.execute(template, content, resolution_config, config)
assert run1.status == RunStatus.SUCCESS
# Second execution with same inputs
run2 = engine.execute(template, content, resolution_config, config)
# Should be skipped
assert run2.status == RunStatus.SKIPPED
assert run2.id != run1.id # Different run
assert run2.input_bundle_hash == run1.input_bundle_hash # Same hash
assert "skipped_due_to" in run2.metadata
def test_execution_without_skip_reruns(
self, engine, artifact_service
):
"""Test execution without skip flag reruns."""
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
config = RunConfig(skip_if_exists=False)
# First execution
run1 = engine.execute(template, content, resolution_config, config)
assert run1.status == RunStatus.SUCCESS
# Second execution
run2 = engine.execute(template, content, resolution_config, config)
# Should execute again (not skipped)
assert run2.status == RunStatus.SUCCESS
assert run2.id != run1.id
def test_input_bundle_hash_changes_with_template(
self, engine, artifact_service, mock_llm
):
"""Test hash changes when template changes."""
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
# First template
content1 = "{{require:dep}} version 1"
template1 = PromptTemplate.create(
space_id="space-1",
name="test",
content=content1,
)
resolution_config = ResolutionConfig(space_id="space-1")
run1 = engine.execute(template1, content1, resolution_config)
# Different template content
content2 = "{{require:dep}} version 2"
template2 = PromptTemplate.create(
space_id="space-1",
name="test",
content=content2,
)
run2 = engine.execute(template2, content2, resolution_config)
# Different hashes, both execute
assert run1.input_bundle_hash != run2.input_bundle_hash
assert run1.status == RunStatus.SUCCESS
assert run2.status == RunStatus.SUCCESS
def test_input_bundle_hash_changes_with_dependencies(
self, engine, artifact_service
):
"""Test hash changes when dependency content changes."""
# Create initial dependency
dep = artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Original content",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run1 = engine.execute(template, content, resolution_config)
# Update dependency
artifact_service.update_artifact_content(dep.id, "Modified content")
run2 = engine.execute(template, content, resolution_config)
# Hashes should differ
assert run1.input_bundle_hash != run2.input_bundle_hash
def test_execution_creates_manifest(
self, engine, artifact_service
):
"""Test execution creates RunManifest."""
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Check manifest in metadata
assert "manifest" in run.metadata
manifest_data = run.metadata["manifest"]
assert manifest_data["run_id"] == run.id
assert len(manifest_data["resolved_inputs"]) == 1
assert len(manifest_data["output_artifacts"]) == 1
assert "timing_metadata" in manifest_data
def test_execution_with_llm_error(
self, artifact_service, analyzer, resolver, compiler
):
"""Test execution handles LLM errors."""
# Create engine with error adapter
error_llm = ErrorLLMAdapter("LLM service unavailable")
engine = PromptExecutionEngine(
artifact_service,
analyzer,
resolver,
compiler,
error_llm,
)
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Should fail during processing
assert run.status == RunStatus.FAILED
assert "LLM service unavailable" in run.error_message
def test_get_run_by_hash(self, engine, artifact_service):
"""Test retrieving cached run by hash."""
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Retrieve by hash
cached = engine.get_run_by_hash(run.input_bundle_hash)
assert cached is not None
assert cached.id == run.id
def test_clear_cache(self, engine, artifact_service):
"""Test clearing run cache."""
artifact_service.create_artifact(
space_id="space-1",
name="dep",
content="Dependency",
)
content = "{{require:dep}}"
template = PromptTemplate.create(
space_id="space-1",
name="test",
content=content,
)
resolution_config = ResolutionConfig(space_id="space-1")
run = engine.execute(template, content, resolution_config)
# Cache should have run
assert engine.get_run_by_hash(run.input_bundle_hash) is not None
# Clear cache
engine.clear_cache()
# Should be gone
assert engine.get_run_by_hash(run.input_bundle_hash) is None

View File

@@ -0,0 +1,240 @@
"""Unit tests for execution models."""
import pytest
from markitect.prompts.execution.models import (
RunConfig,
InputBundle,
LLMResponse,
PromptRun,
ExecutionStage,
RunStatus,
)
class TestRunConfig:
"""Tests for RunConfig."""
def test_create_default_config(self):
"""Test creating default config."""
config = RunConfig()
assert config.model_name == "gpt-4"
assert config.temperature == 0.7
assert config.max_tokens == 2000
assert config.max_depth == 3
assert config.skip_if_exists is True
def test_create_custom_config(self):
"""Test creating custom config."""
config = RunConfig(
model_name="gpt-3.5-turbo",
temperature=0.5,
max_tokens=1000,
max_depth=5,
skip_if_exists=False,
)
assert config.model_name == "gpt-3.5-turbo"
assert config.temperature == 0.5
assert config.max_tokens == 1000
assert config.max_depth == 5
assert config.skip_if_exists is False
def test_config_to_dict(self):
"""Test serialization."""
config = RunConfig(model_name="test-model")
data = config.to_dict()
assert data["model_name"] == "test-model"
assert "temperature" in data
def test_config_from_dict(self):
"""Test deserialization."""
data = {
"model_name": "custom-model",
"temperature": 0.9,
"max_tokens": 500,
}
config = RunConfig.from_dict(data)
assert config.model_name == "custom-model"
assert config.temperature == 0.9
assert config.max_tokens == 500
class TestInputBundle:
"""Tests for InputBundle."""
def test_create_input_bundle(self):
"""Test creating input bundle."""
bundle = InputBundle(
template_digest="abc123",
dependency_digests={"dep1": "def456", "dep2": "ghi789"},
resolution_config_hash="config123",
model_config={"model": "gpt-4", "temp": 0.7},
)
assert bundle.template_digest == "abc123"
assert len(bundle.dependency_digests) == 2
def test_calculate_hash_deterministic(self):
"""Test hash calculation is deterministic."""
bundle1 = InputBundle(
template_digest="abc",
dependency_digests={"a": "1", "b": "2"},
resolution_config_hash="conf",
model_config={"model": "gpt-4"},
)
bundle2 = InputBundle(
template_digest="abc",
dependency_digests={"b": "2", "a": "1"}, # Different order
resolution_config_hash="conf",
model_config={"model": "gpt-4"},
)
# Should produce same hash regardless of dict order
assert bundle1.calculate_hash() == bundle2.calculate_hash()
def test_calculate_hash_changes_with_content(self):
"""Test hash changes when content changes."""
bundle1 = InputBundle(
template_digest="abc",
dependency_digests={},
resolution_config_hash="conf",
model_config={},
)
bundle2 = InputBundle(
template_digest="xyz", # Different template
dependency_digests={},
resolution_config_hash="conf",
model_config={},
)
assert bundle1.calculate_hash() != bundle2.calculate_hash()
def test_to_dict_includes_hash(self):
"""Test dictionary includes hash."""
bundle = InputBundle(
template_digest="abc",
dependency_digests={},
resolution_config_hash="conf",
model_config={},
)
data = bundle.to_dict()
assert "input_bundle_hash" in data
assert data["input_bundle_hash"] == bundle.calculate_hash()
class TestLLMResponse:
"""Tests for LLMResponse."""
def test_create_response(self):
"""Test creating LLM response."""
response = LLMResponse(
content="Generated text",
model="gpt-4",
usage={"total_tokens": 100},
finish_reason="stop",
)
assert response.content == "Generated text"
assert response.model == "gpt-4"
assert response.usage["total_tokens"] == 100
def test_response_to_dict(self):
"""Test serialization."""
response = LLMResponse(
content="Text",
model="gpt-4",
)
data = response.to_dict()
assert data["content"] == "Text"
assert data["model"] == "gpt-4"
class TestPromptRun:
"""Tests for PromptRun."""
def test_create_run(self):
"""Test creating run."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash123",
)
assert run.id # Has UUID
assert run.template_id == "template-1"
assert run.input_bundle_hash == "hash123"
assert run.status == RunStatus.PENDING
assert run.stage == ExecutionStage.PENDING
assert run.depth == 0
def test_create_nested_run(self):
"""Test creating nested run."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
parent_run_id="parent-123",
depth=2,
)
assert run.parent_run_id == "parent-123"
assert run.depth == 2
def test_advance_stage(self):
"""Test advancing execution stage."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
)
assert run.stage == ExecutionStage.PENDING
run.advance_stage(ExecutionStage.ANALYSIS)
assert run.stage == ExecutionStage.ANALYSIS
run.advance_stage(ExecutionStage.PROCESSING)
assert run.stage == ExecutionStage.PROCESSING
assert run.status == RunStatus.RUNNING
def test_mark_complete(self):
"""Test marking run as complete."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
)
run.mark_complete()
assert run.status == RunStatus.SUCCESS
assert run.stage == ExecutionStage.COMPLETE
assert run.completed_at is not None
assert run.is_complete()
def test_mark_failed(self):
"""Test marking run as failed."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
)
run.mark_failed("Error message")
assert run.status == RunStatus.FAILED
assert run.stage == ExecutionStage.FAILED
assert run.error_message == "Error message"
assert run.completed_at is not None
assert run.is_complete()
def test_mark_skipped(self):
"""Test marking run as skipped."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
)
run.mark_skipped()
assert run.status == RunStatus.SKIPPED
assert run.completed_at is not None
assert run.is_complete()
def test_run_to_dict(self):
"""Test serialization."""
run = PromptRun.create(
template_id="template-1",
input_bundle_hash="hash",
)
data = run.to_dict()
assert data["id"] == run.id
assert data["template_id"] == "template-1"
assert data["input_bundle_hash"] == "hash"
assert data["status"] == "pending"
assert data["stage"] == "pending"