feat(prompts): implement Phase 7 - Quality & Validation (FR-9, FR-10)
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Add quality gate framework with schema validation (JSON Schema via jsonschema library), pattern validation (regex-based), multi-gate QualityValidator with SQLite persistence, HaltingPolicyEngine with budget/iteration/improvement checks, and RefinementLoop for iterative execute-validate-halt cycles. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
53
markitect/prompts/quality/__init__.py
Normal file
53
markitect/prompts/quality/__init__.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""
|
||||
Quality validation and halting policies for prompt artifacts.
|
||||
|
||||
Implements FR-9: QualityGate Validation
|
||||
Implements FR-10: Halting and Refinement Policy
|
||||
|
||||
- FR-9.1: Schema validation against generated artifacts
|
||||
- FR-9.2: Multiple QualityGates per artifact
|
||||
- FR-9.3: Record pass/fail results and diagnostics
|
||||
- FR-9.4: Halting policies based on QualityGate results
|
||||
- FR-10.1: Configurable QualityPolicies
|
||||
- FR-10.2: Halting decisions (quality, improvement, iterations, budget)
|
||||
- FR-10.3: Record halting decisions in RunManifest
|
||||
"""
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
GateType,
|
||||
ValidationStatus,
|
||||
HaltDecision,
|
||||
ValidationDiagnostic,
|
||||
ValidationResult,
|
||||
QualityGate,
|
||||
QualityPolicy,
|
||||
HaltingRecord,
|
||||
RefinementResult,
|
||||
)
|
||||
from markitect.prompts.quality.gates.schema_gate import SchemaValidationGate
|
||||
from markitect.prompts.quality.gates.pattern_gate import PatternValidationGate
|
||||
from markitect.prompts.quality.validator import QualityValidator
|
||||
from markitect.prompts.quality.policy import HaltingPolicyEngine
|
||||
from markitect.prompts.quality.refinement import RefinementLoop
|
||||
|
||||
__all__ = [
|
||||
# Models
|
||||
"GateType",
|
||||
"ValidationStatus",
|
||||
"HaltDecision",
|
||||
"ValidationDiagnostic",
|
||||
"ValidationResult",
|
||||
"QualityGate",
|
||||
"QualityPolicy",
|
||||
"HaltingRecord",
|
||||
"RefinementResult",
|
||||
# Gates
|
||||
"SchemaValidationGate",
|
||||
"PatternValidationGate",
|
||||
# Validator
|
||||
"QualityValidator",
|
||||
# Policy
|
||||
"HaltingPolicyEngine",
|
||||
# Refinement
|
||||
"RefinementLoop",
|
||||
]
|
||||
13
markitect/prompts/quality/gates/__init__.py
Normal file
13
markitect/prompts/quality/gates/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""
|
||||
Quality gate implementations.
|
||||
|
||||
Provides SchemaValidationGate and PatternValidationGate.
|
||||
"""
|
||||
|
||||
from markitect.prompts.quality.gates.schema_gate import SchemaValidationGate
|
||||
from markitect.prompts.quality.gates.pattern_gate import PatternValidationGate
|
||||
|
||||
__all__ = [
|
||||
"SchemaValidationGate",
|
||||
"PatternValidationGate",
|
||||
]
|
||||
109
markitect/prompts/quality/gates/pattern_gate.py
Normal file
109
markitect/prompts/quality/gates/pattern_gate.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""
|
||||
Pattern validation quality gate.
|
||||
|
||||
Validates content against required and forbidden regex patterns.
|
||||
"""
|
||||
|
||||
import re
|
||||
import uuid
|
||||
from typing import List, Optional
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
GateType,
|
||||
QualityGate,
|
||||
ValidationDiagnostic,
|
||||
ValidationResult,
|
||||
ValidationStatus,
|
||||
)
|
||||
|
||||
|
||||
class PatternValidationGate(QualityGate):
|
||||
"""
|
||||
Validates artifact content against regex patterns.
|
||||
|
||||
Checks that all required patterns are present and no forbidden
|
||||
patterns are found.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
required_patterns: Optional[List[str]] = None,
|
||||
forbidden_patterns: Optional[List[str]] = None,
|
||||
gate_id: Optional[str] = None,
|
||||
name: str = "pattern",
|
||||
):
|
||||
"""
|
||||
Initialize with pattern lists.
|
||||
|
||||
Args:
|
||||
required_patterns: Regex patterns that must be found in content
|
||||
forbidden_patterns: Regex patterns that must NOT be found in content
|
||||
gate_id: Optional gate identifier
|
||||
name: Human-readable gate name
|
||||
"""
|
||||
super().__init__(
|
||||
gate_id=gate_id or str(uuid.uuid4()),
|
||||
name=name,
|
||||
gate_type=GateType.PATTERN,
|
||||
)
|
||||
self.required_patterns = required_patterns or []
|
||||
self.forbidden_patterns = forbidden_patterns or []
|
||||
|
||||
def validate(self, content: str, artifact_id: str) -> ValidationResult:
|
||||
"""
|
||||
Validate content against required and forbidden patterns.
|
||||
|
||||
Args:
|
||||
content: Content string to validate
|
||||
artifact_id: ID of the artifact being validated
|
||||
|
||||
Returns:
|
||||
ValidationResult with status and diagnostics
|
||||
"""
|
||||
diagnostics = []
|
||||
total_checks = len(self.required_patterns) + len(self.forbidden_patterns)
|
||||
failures = 0
|
||||
|
||||
# Check required patterns
|
||||
for pattern in self.required_patterns:
|
||||
if not re.search(pattern, content):
|
||||
diagnostics.append(
|
||||
ValidationDiagnostic(
|
||||
code="MISSING_PATTERN",
|
||||
message=f"Required pattern not found: {pattern}",
|
||||
severity="error",
|
||||
)
|
||||
)
|
||||
failures += 1
|
||||
|
||||
# Check forbidden patterns
|
||||
for pattern in self.forbidden_patterns:
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
diagnostics.append(
|
||||
ValidationDiagnostic(
|
||||
code="FORBIDDEN_PATTERN",
|
||||
message=f"Forbidden pattern found: {pattern} (matched: '{match.group()}')",
|
||||
severity="error",
|
||||
)
|
||||
)
|
||||
failures += 1
|
||||
|
||||
if total_checks == 0:
|
||||
status = ValidationStatus.PASS
|
||||
score = 1.0
|
||||
elif failures == 0:
|
||||
status = ValidationStatus.PASS
|
||||
score = 1.0
|
||||
else:
|
||||
status = ValidationStatus.FAIL
|
||||
score = max(0.0, 1.0 - failures / total_checks)
|
||||
|
||||
return ValidationResult.create(
|
||||
gate_id=self.id,
|
||||
gate_type=self.gate_type,
|
||||
artifact_id=artifact_id,
|
||||
status=status,
|
||||
score=score,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
123
markitect/prompts/quality/gates/schema_gate.py
Normal file
123
markitect/prompts/quality/gates/schema_gate.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""
|
||||
Schema validation quality gate.
|
||||
|
||||
Implements FR-9.1: Validate generated artifacts against JSON schemas.
|
||||
Uses the jsonschema library for validation.
|
||||
"""
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import jsonschema
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
GateType,
|
||||
QualityGate,
|
||||
ValidationDiagnostic,
|
||||
ValidationResult,
|
||||
ValidationStatus,
|
||||
)
|
||||
|
||||
|
||||
class SchemaValidationGate(QualityGate):
|
||||
"""
|
||||
Validates artifact content against a JSON schema.
|
||||
|
||||
Parses content as JSON and validates against the provided schema
|
||||
using the jsonschema library.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
schema: Dict[str, Any],
|
||||
gate_id: Optional[str] = None,
|
||||
name: str = "schema",
|
||||
):
|
||||
"""
|
||||
Initialize with a JSON schema.
|
||||
|
||||
Args:
|
||||
schema: JSON Schema dictionary
|
||||
gate_id: Optional gate identifier (auto-generated if not provided)
|
||||
name: Human-readable gate name
|
||||
"""
|
||||
super().__init__(
|
||||
gate_id=gate_id or str(uuid.uuid4()),
|
||||
name=name,
|
||||
gate_type=GateType.SCHEMA,
|
||||
)
|
||||
self.schema = schema
|
||||
|
||||
def validate(self, content: str, artifact_id: str) -> ValidationResult:
|
||||
"""
|
||||
Validate content against the JSON schema.
|
||||
|
||||
Parses the content as JSON, then validates against the schema.
|
||||
Returns FAIL if content is not valid JSON or fails schema validation.
|
||||
|
||||
Args:
|
||||
content: JSON content string to validate
|
||||
artifact_id: ID of the artifact being validated
|
||||
|
||||
Returns:
|
||||
ValidationResult with status and diagnostics
|
||||
"""
|
||||
diagnostics = []
|
||||
|
||||
# Parse JSON
|
||||
try:
|
||||
data = json.loads(content)
|
||||
except (json.JSONDecodeError, TypeError) as e:
|
||||
diagnostics.append(
|
||||
ValidationDiagnostic(
|
||||
code="INVALID_JSON",
|
||||
message=f"Content is not valid JSON: {e}",
|
||||
severity="error",
|
||||
)
|
||||
)
|
||||
return ValidationResult.create(
|
||||
gate_id=self.id,
|
||||
gate_type=self.gate_type,
|
||||
artifact_id=artifact_id,
|
||||
status=ValidationStatus.FAIL,
|
||||
score=0.0,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
|
||||
# Validate against schema
|
||||
validator = jsonschema.Draft7Validator(self.schema)
|
||||
errors = list(validator.iter_errors(data))
|
||||
|
||||
if not errors:
|
||||
return ValidationResult.create(
|
||||
gate_id=self.id,
|
||||
gate_type=self.gate_type,
|
||||
artifact_id=artifact_id,
|
||||
status=ValidationStatus.PASS,
|
||||
score=1.0,
|
||||
diagnostics=[],
|
||||
)
|
||||
|
||||
for error in errors:
|
||||
path = ".".join(str(p) for p in error.absolute_path) or "(root)"
|
||||
diagnostics.append(
|
||||
ValidationDiagnostic(
|
||||
code="SCHEMA_VIOLATION",
|
||||
message=f"At '{path}': {error.message}",
|
||||
severity="error",
|
||||
)
|
||||
)
|
||||
|
||||
# Score based on proportion of passing validations
|
||||
total_checks = len(errors) + 1 # approximate
|
||||
score = max(0.0, 1.0 - len(errors) / total_checks)
|
||||
|
||||
return ValidationResult.create(
|
||||
gate_id=self.id,
|
||||
gate_type=self.gate_type,
|
||||
artifact_id=artifact_id,
|
||||
status=ValidationStatus.FAIL,
|
||||
score=score,
|
||||
diagnostics=diagnostics,
|
||||
)
|
||||
283
markitect/prompts/quality/models.py
Normal file
283
markitect/prompts/quality/models.py
Normal file
@@ -0,0 +1,283 @@
|
||||
"""
|
||||
Data models for quality validation and halting policies.
|
||||
|
||||
Implements FR-9: QualityGate Validation
|
||||
Implements FR-10: Halting and Refinement Policy
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
class GateType(Enum):
|
||||
"""Type classification for quality gates."""
|
||||
SCHEMA = "schema"
|
||||
PATTERN = "pattern"
|
||||
CUSTOM = "custom"
|
||||
|
||||
|
||||
class ValidationStatus(Enum):
|
||||
"""Outcome status of a quality gate check."""
|
||||
PASS = "pass"
|
||||
FAIL = "fail"
|
||||
WARNING = "warning"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
class HaltDecision(Enum):
|
||||
"""Decision outcome from halting policy evaluation."""
|
||||
CONTINUE = "continue"
|
||||
HALT_QUALITY_MET = "halted_quality_met"
|
||||
HALT_ITERATION_LIMIT = "halted_iteration_limit"
|
||||
HALT_BUDGET_EXHAUSTED = "halted_budget_exhausted"
|
||||
HALT_NO_IMPROVEMENT = "halted_no_improvement"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationDiagnostic:
|
||||
"""
|
||||
Single diagnostic message from a quality gate.
|
||||
|
||||
Attributes:
|
||||
code: Machine-readable diagnostic code
|
||||
message: Human-readable description
|
||||
severity: Severity level (error, warning, info)
|
||||
"""
|
||||
code: str
|
||||
message: str
|
||||
severity: str = "error"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"code": self.code,
|
||||
"message": self.message,
|
||||
"severity": self.severity,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "ValidationDiagnostic":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
code=data["code"],
|
||||
message=data["message"],
|
||||
severity=data.get("severity", "error"),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""
|
||||
Result of applying a quality gate to an artifact.
|
||||
|
||||
Implements FR-9.3: Record pass/fail results and diagnostics.
|
||||
|
||||
Attributes:
|
||||
id: Unique result identifier
|
||||
gate_id: ID of the quality gate that produced this result
|
||||
gate_type: Type of the quality gate
|
||||
artifact_id: ID of the validated artifact
|
||||
status: Pass/fail outcome
|
||||
score: Optional quality score (0.0-1.0)
|
||||
diagnostics: List of diagnostic messages
|
||||
validated_at: When validation occurred
|
||||
"""
|
||||
id: str
|
||||
gate_id: str
|
||||
gate_type: GateType
|
||||
artifact_id: str
|
||||
status: ValidationStatus
|
||||
score: Optional[float] = None
|
||||
diagnostics: List[ValidationDiagnostic] = field(default_factory=list)
|
||||
validated_at: datetime = field(default_factory=datetime.utcnow)
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
gate_id: str,
|
||||
gate_type: GateType,
|
||||
artifact_id: str,
|
||||
status: ValidationStatus,
|
||||
score: Optional[float] = None,
|
||||
diagnostics: Optional[List[ValidationDiagnostic]] = None,
|
||||
) -> "ValidationResult":
|
||||
"""Create a new ValidationResult."""
|
||||
return cls(
|
||||
id=str(uuid.uuid4()),
|
||||
gate_id=gate_id,
|
||||
gate_type=gate_type,
|
||||
artifact_id=artifact_id,
|
||||
status=status,
|
||||
score=score,
|
||||
diagnostics=diagnostics or [],
|
||||
)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"id": self.id,
|
||||
"gate_id": self.gate_id,
|
||||
"gate_type": self.gate_type.value,
|
||||
"artifact_id": self.artifact_id,
|
||||
"status": self.status.value,
|
||||
"score": self.score,
|
||||
"diagnostics": [d.to_dict() for d in self.diagnostics],
|
||||
"validated_at": self.validated_at.isoformat(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "ValidationResult":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
id=data["id"],
|
||||
gate_id=data["gate_id"],
|
||||
gate_type=GateType(data["gate_type"]),
|
||||
artifact_id=data["artifact_id"],
|
||||
status=ValidationStatus(data["status"]),
|
||||
score=data.get("score"),
|
||||
diagnostics=[
|
||||
ValidationDiagnostic.from_dict(d)
|
||||
for d in data.get("diagnostics", [])
|
||||
],
|
||||
validated_at=datetime.fromisoformat(data["validated_at"]),
|
||||
)
|
||||
|
||||
|
||||
class QualityGate(ABC):
|
||||
"""
|
||||
Abstract base class for quality gates.
|
||||
|
||||
Implements FR-9.1/FR-9.2: Pluggable validation framework
|
||||
supporting multiple gates per artifact.
|
||||
"""
|
||||
|
||||
def __init__(self, gate_id: str, name: str, gate_type: GateType):
|
||||
self.id = gate_id
|
||||
self.name = name
|
||||
self.gate_type = gate_type
|
||||
|
||||
@abstractmethod
|
||||
def validate(self, content: str, artifact_id: str) -> ValidationResult:
|
||||
"""
|
||||
Validate content against this quality gate.
|
||||
|
||||
Args:
|
||||
content: Content to validate
|
||||
artifact_id: ID of the artifact being validated
|
||||
|
||||
Returns:
|
||||
ValidationResult with status and diagnostics
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class QualityPolicy:
|
||||
"""
|
||||
Configuration for halting and refinement policy.
|
||||
|
||||
Implements FR-10.1: Configurable QualityPolicies.
|
||||
|
||||
Attributes:
|
||||
max_iterations: Maximum refinement iterations
|
||||
min_improvement: Minimum score improvement to continue
|
||||
fail_on_gate_failure: Whether any gate failure halts execution
|
||||
resource_budget: Maximum total runs allowed
|
||||
required_gate_ids: Gate IDs that must pass for quality to be met
|
||||
"""
|
||||
max_iterations: int = 3
|
||||
min_improvement: float = 0.05
|
||||
fail_on_gate_failure: bool = True
|
||||
resource_budget: int = 10
|
||||
required_gate_ids: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"max_iterations": self.max_iterations,
|
||||
"min_improvement": self.min_improvement,
|
||||
"fail_on_gate_failure": self.fail_on_gate_failure,
|
||||
"resource_budget": self.resource_budget,
|
||||
"required_gate_ids": self.required_gate_ids,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "QualityPolicy":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
max_iterations=data.get("max_iterations", 3),
|
||||
min_improvement=data.get("min_improvement", 0.05),
|
||||
fail_on_gate_failure=data.get("fail_on_gate_failure", True),
|
||||
resource_budget=data.get("resource_budget", 10),
|
||||
required_gate_ids=data.get("required_gate_ids", []),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HaltingRecord:
|
||||
"""
|
||||
Record of a halting decision.
|
||||
|
||||
Implements FR-10.3: Record halting decisions in the RunManifest.
|
||||
|
||||
Attributes:
|
||||
decision: The halting decision
|
||||
iteration: Current iteration number
|
||||
max_iterations: Maximum allowed iterations
|
||||
scores: Score history across iterations
|
||||
reason: Human-readable reason for decision
|
||||
recorded_at: When the decision was made
|
||||
"""
|
||||
decision: HaltDecision
|
||||
iteration: int
|
||||
max_iterations: int
|
||||
scores: List[float] = field(default_factory=list)
|
||||
reason: str = ""
|
||||
recorded_at: datetime = field(default_factory=datetime.utcnow)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary for serialization."""
|
||||
return {
|
||||
"decision": self.decision.value,
|
||||
"iteration": self.iteration,
|
||||
"max_iterations": self.max_iterations,
|
||||
"scores": self.scores,
|
||||
"reason": self.reason,
|
||||
"recorded_at": self.recorded_at.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class RefinementResult:
|
||||
"""
|
||||
Result of a refinement loop execution.
|
||||
|
||||
Attributes:
|
||||
iterations_run: Number of iterations executed
|
||||
final_results: Validation results from the last iteration
|
||||
halting_record: Record of the halting decision
|
||||
all_results: Validation results from all iterations
|
||||
run_ids: List of run IDs produced during refinement
|
||||
"""
|
||||
iterations_run: int
|
||||
final_results: List[ValidationResult] = field(default_factory=list)
|
||||
halting_record: Optional[HaltingRecord] = None
|
||||
all_results: List[List[ValidationResult]] = field(default_factory=list)
|
||||
run_ids: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"iterations_run": self.iterations_run,
|
||||
"final_results": [r.to_dict() for r in self.final_results],
|
||||
"halting_record": self.halting_record.to_dict() if self.halting_record else None,
|
||||
"all_results": [
|
||||
[r.to_dict() for r in iteration]
|
||||
for iteration in self.all_results
|
||||
],
|
||||
"run_ids": self.run_ids,
|
||||
}
|
||||
153
markitect/prompts/quality/policy.py
Normal file
153
markitect/prompts/quality/policy.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""
|
||||
Halting policy engine for refinement control.
|
||||
|
||||
Implements FR-10: Halting and Refinement Policy
|
||||
Evaluates halting conditions based on quality gate results,
|
||||
marginal improvement, iteration limits, and resource budgets.
|
||||
"""
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
HaltDecision,
|
||||
HaltingRecord,
|
||||
QualityPolicy,
|
||||
ValidationResult,
|
||||
ValidationStatus,
|
||||
)
|
||||
|
||||
|
||||
class HaltingPolicyEngine:
|
||||
"""
|
||||
Evaluates halting decisions based on configurable policies.
|
||||
|
||||
Implements FR-10.2: Evaluate halting based on quality gate results,
|
||||
marginal improvement metrics, iteration limits, and resource budgets.
|
||||
|
||||
Implements FR-10.3: Record halting decisions.
|
||||
"""
|
||||
|
||||
def __init__(self, policy: QualityPolicy):
|
||||
"""
|
||||
Initialize with a quality policy.
|
||||
|
||||
Args:
|
||||
policy: Quality policy configuration
|
||||
"""
|
||||
self.policy = policy
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
results: List[ValidationResult],
|
||||
iteration: int,
|
||||
score_history: Optional[List[float]] = None,
|
||||
total_runs: int = 0,
|
||||
) -> HaltingRecord:
|
||||
"""
|
||||
Evaluate whether to halt or continue refinement.
|
||||
|
||||
Checks in order:
|
||||
1. Resource budget exhaustion
|
||||
2. Iteration limit
|
||||
3. Quality met (all required gates pass)
|
||||
4. Marginal improvement below threshold
|
||||
|
||||
Args:
|
||||
results: Validation results from current iteration
|
||||
iteration: Current iteration number (1-based)
|
||||
score_history: Aggregate scores from previous iterations
|
||||
total_runs: Total number of runs consumed
|
||||
|
||||
Returns:
|
||||
HaltingRecord with the decision
|
||||
"""
|
||||
score_history = score_history or []
|
||||
current_score = self._aggregate_score(results)
|
||||
all_scores = score_history + [current_score]
|
||||
|
||||
# Check resource budget
|
||||
if total_runs >= self.policy.resource_budget:
|
||||
return HaltingRecord(
|
||||
decision=HaltDecision.HALT_BUDGET_EXHAUSTED,
|
||||
iteration=iteration,
|
||||
max_iterations=self.policy.max_iterations,
|
||||
scores=all_scores,
|
||||
reason=f"Resource budget exhausted: {total_runs}/{self.policy.resource_budget} runs used",
|
||||
)
|
||||
|
||||
# Check iteration limit
|
||||
if iteration >= self.policy.max_iterations:
|
||||
return HaltingRecord(
|
||||
decision=HaltDecision.HALT_ITERATION_LIMIT,
|
||||
iteration=iteration,
|
||||
max_iterations=self.policy.max_iterations,
|
||||
scores=all_scores,
|
||||
reason=f"Iteration limit reached: {iteration}/{self.policy.max_iterations}",
|
||||
)
|
||||
|
||||
# Check if quality is met
|
||||
if self._quality_met(results):
|
||||
return HaltingRecord(
|
||||
decision=HaltDecision.HALT_QUALITY_MET,
|
||||
iteration=iteration,
|
||||
max_iterations=self.policy.max_iterations,
|
||||
scores=all_scores,
|
||||
reason="All quality gates passed",
|
||||
)
|
||||
|
||||
# Check marginal improvement
|
||||
if len(all_scores) >= 2:
|
||||
improvement = all_scores[-1] - all_scores[-2]
|
||||
if improvement < self.policy.min_improvement:
|
||||
return HaltingRecord(
|
||||
decision=HaltDecision.HALT_NO_IMPROVEMENT,
|
||||
iteration=iteration,
|
||||
max_iterations=self.policy.max_iterations,
|
||||
scores=all_scores,
|
||||
reason=(
|
||||
f"Marginal improvement {improvement:.4f} below "
|
||||
f"threshold {self.policy.min_improvement}"
|
||||
),
|
||||
)
|
||||
|
||||
# Continue refinement
|
||||
return HaltingRecord(
|
||||
decision=HaltDecision.CONTINUE,
|
||||
iteration=iteration,
|
||||
max_iterations=self.policy.max_iterations,
|
||||
scores=all_scores,
|
||||
reason="Continuing refinement",
|
||||
)
|
||||
|
||||
def _quality_met(self, results: List[ValidationResult]) -> bool:
|
||||
"""
|
||||
Check if quality requirements are met.
|
||||
|
||||
If required_gate_ids is set, only those gates must pass.
|
||||
Otherwise, all gates must pass.
|
||||
|
||||
Args:
|
||||
results: Validation results to check
|
||||
|
||||
Returns:
|
||||
True if quality requirements are met
|
||||
"""
|
||||
if self.policy.required_gate_ids:
|
||||
for gate_id in self.policy.required_gate_ids:
|
||||
gate_results = [r for r in results if r.gate_id == gate_id]
|
||||
if not gate_results:
|
||||
return False
|
||||
if any(r.status == ValidationStatus.FAIL for r in gate_results):
|
||||
return False
|
||||
return True
|
||||
|
||||
return all(r.status == ValidationStatus.PASS for r in results)
|
||||
|
||||
def _aggregate_score(self, results: List[ValidationResult]) -> float:
|
||||
"""Calculate aggregate score from results."""
|
||||
if not results:
|
||||
return 0.0
|
||||
scores = [r.score for r in results if r.score is not None]
|
||||
if not scores:
|
||||
return 0.0
|
||||
return sum(scores) / len(scores)
|
||||
108
markitect/prompts/quality/refinement.py
Normal file
108
markitect/prompts/quality/refinement.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""
|
||||
Refinement loop for iterative quality improvement.
|
||||
|
||||
Implements FR-10: Halting and Refinement Policy
|
||||
Execute → Validate → Halt or Refine cycle.
|
||||
"""
|
||||
|
||||
from typing import Callable, List, Optional, Tuple
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
HaltDecision,
|
||||
QualityPolicy,
|
||||
RefinementResult,
|
||||
ValidationResult,
|
||||
)
|
||||
from markitect.prompts.quality.policy import HaltingPolicyEngine
|
||||
from markitect.prompts.quality.validator import QualityValidator
|
||||
|
||||
|
||||
class RefinementLoop:
|
||||
"""
|
||||
Iterative refinement loop with quality gate checks.
|
||||
|
||||
Executes a cycle of: execute → validate → check halting → refine
|
||||
until a halting condition is met.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
validator: QualityValidator,
|
||||
policy: QualityPolicy,
|
||||
):
|
||||
"""
|
||||
Initialize with validator and policy.
|
||||
|
||||
Args:
|
||||
validator: Quality validator with configured gates
|
||||
policy: Halting policy configuration
|
||||
"""
|
||||
self.validator = validator
|
||||
self.policy = policy
|
||||
self.policy_engine = HaltingPolicyEngine(policy)
|
||||
|
||||
def run(
|
||||
self,
|
||||
execution_callback: Callable[[int, List[ValidationResult]], Tuple[str, str, str]],
|
||||
artifact_id: str,
|
||||
) -> RefinementResult:
|
||||
"""
|
||||
Execute the refinement loop.
|
||||
|
||||
The execution_callback is called each iteration with:
|
||||
- iteration number (1-based)
|
||||
- previous validation results (empty list on first iteration)
|
||||
|
||||
It should return a tuple of (run_id, content, artifact_id).
|
||||
|
||||
Args:
|
||||
execution_callback: Callable that executes/refines and returns
|
||||
(run_id, content, artifact_id)
|
||||
artifact_id: ID of the artifact being refined
|
||||
|
||||
Returns:
|
||||
RefinementResult with complete iteration history
|
||||
"""
|
||||
result = RefinementResult(iterations_run=0)
|
||||
score_history: List[float] = []
|
||||
prev_results: List[ValidationResult] = []
|
||||
|
||||
for iteration in range(1, self.policy.max_iterations + 1):
|
||||
# Execute / refine
|
||||
run_id, content, art_id = execution_callback(iteration, prev_results)
|
||||
result.run_ids.append(run_id)
|
||||
|
||||
# Validate
|
||||
current_results = self.validator.validate_artifact(
|
||||
content, art_id, run_id=run_id if self.validator.db_path else None,
|
||||
)
|
||||
result.all_results.append(current_results)
|
||||
result.iterations_run = iteration
|
||||
|
||||
# Evaluate halting
|
||||
halting_record = self.policy_engine.evaluate(
|
||||
results=current_results,
|
||||
iteration=iteration,
|
||||
score_history=score_history,
|
||||
total_runs=len(result.run_ids),
|
||||
)
|
||||
|
||||
current_score = self.policy_engine._aggregate_score(current_results)
|
||||
score_history.append(current_score)
|
||||
|
||||
if halting_record.decision != HaltDecision.CONTINUE:
|
||||
result.final_results = current_results
|
||||
result.halting_record = halting_record
|
||||
return result
|
||||
|
||||
prev_results = current_results
|
||||
|
||||
# Reached max iterations without explicit halt
|
||||
result.final_results = prev_results
|
||||
result.halting_record = self.policy_engine.evaluate(
|
||||
results=prev_results,
|
||||
iteration=self.policy.max_iterations,
|
||||
score_history=score_history,
|
||||
total_runs=len(result.run_ids),
|
||||
)
|
||||
return result
|
||||
294
markitect/prompts/quality/validator.py
Normal file
294
markitect/prompts/quality/validator.py
Normal file
@@ -0,0 +1,294 @@
|
||||
"""
|
||||
Quality validator for applying multiple gates to artifacts.
|
||||
|
||||
Implements FR-9.2: Multiple QualityGates per artifact.
|
||||
Implements FR-9.3: Record pass/fail results and diagnostics in RunManifest.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
from markitect.prompts.quality.models import (
|
||||
GateType,
|
||||
QualityGate,
|
||||
ValidationResult,
|
||||
ValidationStatus,
|
||||
)
|
||||
|
||||
|
||||
# SQL schema for quality tables
|
||||
QUALITY_TABLES_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS quality_gates (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
gate_type TEXT NOT NULL,
|
||||
config JSON NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS validation_results (
|
||||
id TEXT PRIMARY KEY,
|
||||
run_id TEXT NOT NULL,
|
||||
gate_id TEXT NOT NULL,
|
||||
artifact_id TEXT,
|
||||
status TEXT NOT NULL,
|
||||
score REAL,
|
||||
diagnostics JSON,
|
||||
validated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_validations_run ON validation_results(run_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_validations_artifact ON validation_results(artifact_id);
|
||||
"""
|
||||
|
||||
|
||||
class QualityValidator:
|
||||
"""
|
||||
Applies multiple quality gates to artifacts and records results.
|
||||
|
||||
Implements FR-9.2 and FR-9.3.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
gates: Optional[List[QualityGate]] = None,
|
||||
db_path: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Initialize validator with quality gates.
|
||||
|
||||
Args:
|
||||
gates: List of quality gates to apply
|
||||
db_path: Optional database path for persisting results
|
||||
"""
|
||||
self.gates: List[QualityGate] = gates or []
|
||||
self.db_path = db_path
|
||||
if db_path:
|
||||
self._initialize_tables()
|
||||
|
||||
def _initialize_tables(self) -> None:
|
||||
"""Initialize quality tables if DB path is set."""
|
||||
db_dir = Path(self.db_path).parent
|
||||
if db_dir and not db_dir.exists():
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
try:
|
||||
conn.executescript(QUALITY_TABLES_SQL)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _get_connection(self) -> sqlite3.Connection:
|
||||
"""Get a database connection."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def add_gate(self, gate: QualityGate) -> None:
|
||||
"""
|
||||
Add a quality gate to the validator.
|
||||
|
||||
Args:
|
||||
gate: Quality gate to add
|
||||
"""
|
||||
self.gates.append(gate)
|
||||
|
||||
def validate_artifact(
|
||||
self,
|
||||
content: str,
|
||||
artifact_id: str,
|
||||
run_id: Optional[str] = None,
|
||||
) -> List[ValidationResult]:
|
||||
"""
|
||||
Apply all quality gates to an artifact.
|
||||
|
||||
Args:
|
||||
content: Artifact content to validate
|
||||
artifact_id: ID of the artifact
|
||||
run_id: Optional run ID for persistence
|
||||
|
||||
Returns:
|
||||
List of ValidationResult from all gates
|
||||
"""
|
||||
results = []
|
||||
for gate in self.gates:
|
||||
result = gate.validate(content, artifact_id)
|
||||
results.append(result)
|
||||
|
||||
if run_id and self.db_path:
|
||||
self._persist_result(result, run_id)
|
||||
|
||||
return results
|
||||
|
||||
def all_passed(self, results: List[ValidationResult]) -> bool:
|
||||
"""
|
||||
Check if all validation results passed.
|
||||
|
||||
Args:
|
||||
results: List of validation results
|
||||
|
||||
Returns:
|
||||
True if all results have PASS status
|
||||
"""
|
||||
return all(r.status == ValidationStatus.PASS for r in results)
|
||||
|
||||
def aggregate_score(self, results: List[ValidationResult]) -> float:
|
||||
"""
|
||||
Calculate aggregate score across all results.
|
||||
|
||||
Args:
|
||||
results: List of validation results
|
||||
|
||||
Returns:
|
||||
Average score (0.0-1.0), or 1.0 if no results
|
||||
"""
|
||||
if not results:
|
||||
return 1.0
|
||||
scores = [r.score for r in results if r.score is not None]
|
||||
if not scores:
|
||||
return 1.0
|
||||
return sum(scores) / len(scores)
|
||||
|
||||
def get_failed_gates(
|
||||
self,
|
||||
results: List[ValidationResult],
|
||||
) -> List[ValidationResult]:
|
||||
"""
|
||||
Get only failed validation results.
|
||||
|
||||
Args:
|
||||
results: List of validation results
|
||||
|
||||
Returns:
|
||||
List of results with FAIL status
|
||||
"""
|
||||
return [r for r in results if r.status == ValidationStatus.FAIL]
|
||||
|
||||
def results_to_manifest_dict(
|
||||
self,
|
||||
results: List[ValidationResult],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert validation results to RunManifest-compatible dict.
|
||||
|
||||
Implements FR-9.3: Results for RunManifest.
|
||||
|
||||
Args:
|
||||
results: List of validation results
|
||||
|
||||
Returns:
|
||||
Dictionary suitable for RunManifest.validation_results
|
||||
"""
|
||||
return {
|
||||
"quality_gates": [r.to_dict() for r in results],
|
||||
"all_passed": self.all_passed(results),
|
||||
"aggregate_score": self.aggregate_score(results),
|
||||
}
|
||||
|
||||
def _persist_result(
|
||||
self,
|
||||
result: ValidationResult,
|
||||
run_id: str,
|
||||
) -> None:
|
||||
"""Persist a validation result to the database."""
|
||||
import json
|
||||
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO validation_results (
|
||||
id, run_id, gate_id, artifact_id,
|
||||
status, score, diagnostics, validated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
result.id,
|
||||
run_id,
|
||||
result.gate_id,
|
||||
result.artifact_id,
|
||||
result.status.value,
|
||||
result.score,
|
||||
json.dumps([d.to_dict() for d in result.diagnostics]),
|
||||
result.validated_at.isoformat(),
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_results_for_run(self, run_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Retrieve persisted validation results for a run.
|
||||
|
||||
Args:
|
||||
run_id: Run identifier
|
||||
|
||||
Returns:
|
||||
List of result dictionaries
|
||||
"""
|
||||
if not self.db_path:
|
||||
return []
|
||||
|
||||
import json
|
||||
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM validation_results WHERE run_id = ?",
|
||||
(run_id,),
|
||||
)
|
||||
results = []
|
||||
for row in cursor.fetchall():
|
||||
results.append({
|
||||
"id": row["id"],
|
||||
"run_id": run_id,
|
||||
"gate_id": row["gate_id"],
|
||||
"artifact_id": row["artifact_id"],
|
||||
"status": row["status"],
|
||||
"score": row["score"],
|
||||
"diagnostics": json.loads(row["diagnostics"]) if row["diagnostics"] else [],
|
||||
"validated_at": row["validated_at"],
|
||||
})
|
||||
return results
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_results_for_artifact(self, artifact_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Retrieve persisted validation results for an artifact.
|
||||
|
||||
Args:
|
||||
artifact_id: Artifact identifier
|
||||
|
||||
Returns:
|
||||
List of result dictionaries
|
||||
"""
|
||||
if not self.db_path:
|
||||
return []
|
||||
|
||||
import json
|
||||
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
cursor = conn.execute(
|
||||
"SELECT * FROM validation_results WHERE artifact_id = ?",
|
||||
(artifact_id,),
|
||||
)
|
||||
results = []
|
||||
for row in cursor.fetchall():
|
||||
results.append({
|
||||
"id": row["id"],
|
||||
"run_id": row["run_id"],
|
||||
"gate_id": row["gate_id"],
|
||||
"artifact_id": artifact_id,
|
||||
"status": row["status"],
|
||||
"score": row["score"],
|
||||
"diagnostics": json.loads(row["diagnostics"]) if row["diagnostics"] else [],
|
||||
"validated_at": row["validated_at"],
|
||||
})
|
||||
return results
|
||||
finally:
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user