diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index 5c8ab7f..8bea070 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -66,6 +66,8 @@ and SQLite repositories are adapters behind those ports. - Transformation run and derived lineage persistence for traceable derived artifact creation. - Workflow template and workflow run persistence for durable job execution. +- Workflow review tasks and exception queue items are embedded in workflow run + payloads for reconstructable local-first state. - In-memory repository for deterministic tests. - SQLite repository for local-first durable asset registry state. - SQLite foreign-key enforcement for representation and metadata asset diff --git a/docs/transformation-implementation.md b/docs/transformation-implementation.md index 6912d17..7a23c17 100644 --- a/docs/transformation-implementation.md +++ b/docs/transformation-implementation.md @@ -56,6 +56,9 @@ rather than by bypassing existing asset governance. available. - `DerivedArtifactLineage` persistence and lookup by output asset, source asset, or transformation run. +- `derived_artifact.lineage.linked` audit events connect output assets to + transformation runs, source assets, source versions, and output + representations. - Audit events for queued, started, completed, failed, denied, and canceled transformation runs. diff --git a/docs/workflow-jobs-implementation.md b/docs/workflow-jobs-implementation.md index 7412ffd..0b9a790 100644 --- a/docs/workflow-jobs-implementation.md +++ b/docs/workflow-jobs-implementation.md @@ -2,7 +2,7 @@ Date: 2026-05-06 -Status: active implementation note for `KONT-WP-0008`. +Status: completed foundation implementation note for `KONT-WP-0008`. ## Purpose @@ -36,7 +36,8 @@ governance, transformation run records, derived lineage, or audit. - `WorkflowInputDefinition` supports asset, collection, query, source event, and payload inputs. - `WorkflowStepDefinition` captures step kind, operation ID, dependencies, - input/output bindings, preconditions, failure behavior, and metadata. + input/output bindings, preconditions, review gates, failure behavior, and + metadata. - Template registration validates duplicate step IDs, missing dependencies, dependency cycles, and unsupported failure behavior. - `WorkflowRun` and `WorkflowStepRun` persist queued, running, waiting, @@ -49,8 +50,17 @@ governance, transformation run records, derived lineage, or audit. - Retry and repeated invocation avoid silent overwrite by choosing a fresh output asset ID when a fixed template output ID already exists. - Workflow template and run persistence are implemented for memory and SQLite. +- Review-required outputs pause the step and workflow run with embedded + `WorkflowReviewTask` and `WorkflowExceptionRecord` state. +- Review decisions can continue, reject, correct, retry, or escalate workflow + runs. +- Exception queue listing exposes review-required, failed, blocked, + low-confidence, and policy-conflicted items. - Audit events are emitted for template registration, run queue/start/final - states, step start/final states, retry, and cancellation. + states, step start/final states, retry, cancellation, review requests, + review decisions, and exception opening. +- `WorkflowService.reconstruct_run` returns run state, template, audit events, + transformation runs, derived lineage, review tasks, and exceptions. ## Current Boundaries @@ -60,8 +70,8 @@ adapter or job worker handles them. Workflow inputs preserve collection, query, source event, and payload bindings, but the MVP runner only interprets asset bindings for transformation execution. -Query expansion, source-event ingestion, human tasks, and exception queues stay -in later WP-0008 tasks. +Query expansion, source-event ingestion, and external queue-worker adapters stay +in later implementation work. Markdown-specific transformations remain adapter-backed through markitect-tool. Workflow orchestration may invoke those operations once the adapter boundary is @@ -80,15 +90,10 @@ ID, queued timestamp, and updated timestamp. ## Not Yet Implemented -- Human review gates and approval tasks. -- Exception queues for blocked, failed, low-confidence, or policy-conflicted - items. - Queue-worker adapters beyond embedded execution. - Rich retry policies by operation type. - Query/input expansion into dynamic asset sets. -- Full workflow reconstruction views across all audit and lineage records. - -These remain in open tasks `KONT-WP-0008-T006` and `KONT-WP-0008-T007`. +- Product/API views for review queues and reconstruction records. ## Test Coverage @@ -100,5 +105,9 @@ These remain in open tasks `KONT-WP-0008-T006` and `KONT-WP-0008-T007`. - queue, cancel, resume denial, and retry without direct storage edits, - partial completion when a continue-on-failure step fails and another step succeeds, -- SQLite reload of workflow templates, workflow runs, step state, and derived - output representation state. +- review gate pause, continue, reject, correct, retry, and escalation decisions, +- failed step exception queue items, +- reconstruction across workflow audit, transformation runs, and derived + lineage, +- SQLite reload of workflow templates, workflow runs, step state, review state, + exception state, and derived output representation state. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 295fac7..04c440f 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -58,8 +58,14 @@ from .core import ( TransformationRun, TransformationRunStatus, VersionChangeType, + WorkflowExceptionKind, + WorkflowExceptionRecord, + WorkflowExceptionStatus, WorkflowInputDefinition, WorkflowInputKind, + WorkflowReviewDecisionType, + WorkflowReviewStatus, + WorkflowReviewTask, WorkflowRun, WorkflowRunStatus, WorkflowStepDefinition, @@ -118,6 +124,7 @@ from .services import ( TransformationRunResult, TransformationService, WorkflowInvocation, + WorkflowRunReconstruction, WorkflowRunResult, WorkflowService, default_transformation_registry, @@ -243,10 +250,17 @@ __all__ = [ "TransformationService", "ValidationError", "VersionChangeType", + "WorkflowExceptionKind", + "WorkflowExceptionRecord", + "WorkflowExceptionStatus", "WorkflowInputDefinition", "WorkflowInputKind", "WorkflowInvocation", + "WorkflowReviewDecisionType", + "WorkflowReviewStatus", + "WorkflowReviewTask", "WorkflowRun", + "WorkflowRunReconstruction", "WorkflowRunResult", "WorkflowRunStatus", "WorkflowService", diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index c4c4708..52425f0 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -43,8 +43,14 @@ from .relationships import ( from .retrieval_feedback import RetrievalFeedbackLabel, RetrievalFeedbackRecord from .transformations import TransformationOperation, TransformationRun, TransformationRunStatus from .workflow_jobs import ( + WorkflowExceptionKind, + WorkflowExceptionRecord, + WorkflowExceptionStatus, WorkflowInputDefinition, WorkflowInputKind, + WorkflowReviewDecisionType, + WorkflowReviewStatus, + WorkflowReviewTask, WorkflowRun, WorkflowRunStatus, WorkflowStepDefinition, @@ -97,8 +103,14 @@ __all__ = [ "TransformationRun", "TransformationRunStatus", "VersionChangeType", + "WorkflowExceptionKind", + "WorkflowExceptionRecord", + "WorkflowExceptionStatus", "WorkflowInputDefinition", "WorkflowInputKind", + "WorkflowReviewDecisionType", + "WorkflowReviewStatus", + "WorkflowReviewTask", "WorkflowRun", "WorkflowRunStatus", "WorkflowStepDefinition", diff --git a/src/kontextual_engine/core/workflow_jobs.py b/src/kontextual_engine/core/workflow_jobs.py index a2e475a..dc371f5 100644 --- a/src/kontextual_engine/core/workflow_jobs.py +++ b/src/kontextual_engine/core/workflow_jobs.py @@ -38,6 +38,37 @@ class WorkflowStepRunStatus(str, Enum): CANCELED = "canceled" +class WorkflowReviewDecisionType(str, Enum): + CONTINUE = "continue" + REJECT = "reject" + CORRECT = "correct" + RETRY = "retry" + ESCALATE = "escalate" + + +class WorkflowReviewStatus(str, Enum): + OPEN = "open" + CONTINUED = "continued" + REJECTED = "rejected" + CORRECTED = "corrected" + RETRY_REQUESTED = "retry_requested" + ESCALATED = "escalated" + + +class WorkflowExceptionKind(str, Enum): + FAILED = "failed" + BLOCKED = "blocked" + LOW_CONFIDENCE = "low_confidence" + POLICY_CONFLICT = "policy_conflict" + REVIEW_REQUIRED = "review_required" + + +class WorkflowExceptionStatus(str, Enum): + OPEN = "open" + RESOLVED = "resolved" + ESCALATED = "escalated" + + @dataclass(frozen=True) class WorkflowInputDefinition: name: str @@ -81,6 +112,7 @@ class WorkflowStepDefinition: parameters: dict[str, Any] = field(default_factory=dict) outputs: dict[str, Any] = field(default_factory=dict) preconditions: tuple[dict[str, Any], ...] = () + review_gate: dict[str, Any] = field(default_factory=dict) failure_behavior: str = "fail_workflow" required_permissions: tuple[str, ...] = () metadata: dict[str, Any] = field(default_factory=dict) @@ -101,6 +133,7 @@ class WorkflowStepDefinition: "parameters": dict(self.parameters), "outputs": dict(self.outputs), "preconditions": list(self.preconditions), + "review_gate": dict(self.review_gate), "failure_behavior": self.failure_behavior, "required_permissions": list(self.required_permissions), "metadata": dict(self.metadata), @@ -118,6 +151,7 @@ class WorkflowStepDefinition: parameters=dict(data.get("parameters", {})), outputs=dict(data.get("outputs", {})), preconditions=tuple(dict(item) for item in data.get("preconditions", ())), + review_gate=dict(data.get("review_gate", {})), failure_behavior=data.get("failure_behavior", "fail_workflow"), required_permissions=tuple(data.get("required_permissions", ())), metadata=dict(data.get("metadata", {})), @@ -300,6 +334,198 @@ class WorkflowStepRun: ) +@dataclass(frozen=True) +class WorkflowReviewTask: + workflow_run_id: str + step_id: str + reason: str + requested_by: str + output_asset_ids: tuple[str, ...] = () + status: WorkflowReviewStatus = WorkflowReviewStatus.OPEN + queue: str = "default" + assigned_to: str | None = None + diagnostics: tuple[dict[str, Any], ...] = () + exception_id: str | None = None + decision: WorkflowReviewDecisionType | None = None + decision_note: str = "" + correction: dict[str, Any] = field(default_factory=dict) + decided_by: str | None = None + decided_at: str | None = None + review_id: str = field(default_factory=lambda: new_id("review")) + requested_at: str = field(default_factory=lambda: utc_now().isoformat()) + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def __post_init__(self) -> None: + object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + object.__setattr__(self, "status", WorkflowReviewStatus(self.status)) + if self.decision is not None: + object.__setattr__(self, "decision", WorkflowReviewDecisionType(self.decision)) + + def decide( + self, + decision: WorkflowReviewDecisionType | str, + *, + actor_id: str, + note: str = "", + correction: dict[str, Any] | None = None, + ) -> "WorkflowReviewTask": + decision = WorkflowReviewDecisionType(decision) + statuses = { + WorkflowReviewDecisionType.CONTINUE: WorkflowReviewStatus.CONTINUED, + WorkflowReviewDecisionType.REJECT: WorkflowReviewStatus.REJECTED, + WorkflowReviewDecisionType.CORRECT: WorkflowReviewStatus.CORRECTED, + WorkflowReviewDecisionType.RETRY: WorkflowReviewStatus.RETRY_REQUESTED, + WorkflowReviewDecisionType.ESCALATE: WorkflowReviewStatus.ESCALATED, + } + now = utc_now().isoformat() + return replace( + self, + status=statuses[decision], + decision=decision, + decision_note=note, + correction=dict(correction or {}), + decided_by=actor_id, + decided_at=now, + updated_at=now, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "review_id": self.review_id, + "workflow_run_id": self.workflow_run_id, + "step_id": self.step_id, + "reason": self.reason, + "requested_by": self.requested_by, + "output_asset_ids": list(self.output_asset_ids), + "status": self.status.value, + "queue": self.queue, + "assigned_to": self.assigned_to, + "diagnostics": list(self.diagnostics), + "exception_id": self.exception_id, + "decision": self.decision.value if self.decision else None, + "decision_note": self.decision_note, + "correction": dict(self.correction), + "decided_by": self.decided_by, + "decided_at": self.decided_at, + "requested_at": self.requested_at, + "updated_at": self.updated_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowReviewTask": + return cls( + review_id=data["review_id"], + workflow_run_id=data["workflow_run_id"], + step_id=data["step_id"], + reason=data["reason"], + requested_by=data["requested_by"], + output_asset_ids=tuple(data.get("output_asset_ids", ())), + status=WorkflowReviewStatus(data.get("status", WorkflowReviewStatus.OPEN.value)), + queue=data.get("queue", "default"), + assigned_to=data.get("assigned_to"), + diagnostics=tuple(data.get("diagnostics", ())), + exception_id=data.get("exception_id"), + decision=WorkflowReviewDecisionType(data["decision"]) if data.get("decision") else None, + decision_note=data.get("decision_note", ""), + correction=dict(data.get("correction", {})), + decided_by=data.get("decided_by"), + decided_at=data.get("decided_at"), + requested_at=data["requested_at"], + updated_at=data["updated_at"], + ) + + +@dataclass(frozen=True) +class WorkflowExceptionRecord: + workflow_run_id: str + kind: WorkflowExceptionKind | str + message: str + step_id: str | None = None + status: WorkflowExceptionStatus = WorkflowExceptionStatus.OPEN + diagnostics: tuple[dict[str, Any], ...] = () + output_asset_ids: tuple[str, ...] = () + review_id: str | None = None + assigned_to: str | None = None + resolved_by: str | None = None + resolution: str = "" + exception_id: str = field(default_factory=lambda: new_id("wfx")) + created_at: str = field(default_factory=lambda: utc_now().isoformat()) + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + resolved_at: str | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "kind", WorkflowExceptionKind(self.kind)) + object.__setattr__(self, "status", WorkflowExceptionStatus(self.status)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) + + def resolve(self, *, actor_id: str, resolution: str) -> "WorkflowExceptionRecord": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowExceptionStatus.RESOLVED, + resolved_by=actor_id, + resolution=resolution, + resolved_at=now, + updated_at=now, + ) + + def escalate(self, *, actor_id: str, resolution: str) -> "WorkflowExceptionRecord": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowExceptionStatus.ESCALATED, + resolved_by=actor_id, + resolution=resolution, + resolved_at=now, + updated_at=now, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "exception_id": self.exception_id, + "workflow_run_id": self.workflow_run_id, + "kind": self.kind.value, + "message": self.message, + "step_id": self.step_id, + "status": self.status.value, + "diagnostics": list(self.diagnostics), + "output_asset_ids": list(self.output_asset_ids), + "review_id": self.review_id, + "assigned_to": self.assigned_to, + "resolved_by": self.resolved_by, + "resolution": self.resolution, + "created_at": self.created_at, + "updated_at": self.updated_at, + "resolved_at": self.resolved_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowExceptionRecord": + return cls( + exception_id=data["exception_id"], + workflow_run_id=data["workflow_run_id"], + kind=WorkflowExceptionKind(data["kind"]), + message=data["message"], + step_id=data.get("step_id"), + status=WorkflowExceptionStatus(data.get("status", WorkflowExceptionStatus.OPEN.value)), + diagnostics=tuple(data.get("diagnostics", ())), + output_asset_ids=tuple(data.get("output_asset_ids", ())), + review_id=data.get("review_id"), + assigned_to=data.get("assigned_to"), + resolved_by=data.get("resolved_by"), + resolution=data.get("resolution", ""), + created_at=data["created_at"], + updated_at=data["updated_at"], + resolved_at=data.get("resolved_at"), + ) + + @dataclass(frozen=True) class WorkflowRun: template_id: str @@ -310,6 +536,8 @@ class WorkflowRun: policy_context: dict[str, Any] = field(default_factory=dict) status: WorkflowRunStatus = WorkflowRunStatus.QUEUED step_runs: tuple[WorkflowStepRun, ...] = () + review_tasks: tuple[WorkflowReviewTask, ...] = () + exceptions: tuple[WorkflowExceptionRecord, ...] = () output_asset_ids: tuple[str, ...] = () diagnostics: tuple[dict[str, Any], ...] = () retry_of_run_id: str | None = None @@ -323,6 +551,8 @@ class WorkflowRun: def __post_init__(self) -> None: object.__setattr__(self, "status", WorkflowRunStatus(self.status)) object.__setattr__(self, "step_runs", tuple(self.step_runs)) + object.__setattr__(self, "review_tasks", tuple(self.review_tasks)) + object.__setattr__(self, "exceptions", tuple(self.exceptions)) object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) @@ -416,6 +646,30 @@ class WorkflowRun: ordered = ordered + (step_run,) return replace(self, step_runs=ordered, updated_at=utc_now().isoformat()) + def with_review_task(self, review_task: WorkflowReviewTask) -> "WorkflowRun": + existing = {item.review_id: item for item in self.review_tasks} + existing[review_task.review_id] = review_task + ordered = tuple(existing[item.review_id] for item in self.review_tasks if item.review_id in existing) + if review_task.review_id not in {item.review_id for item in self.review_tasks}: + ordered = ordered + (review_task,) + return replace(self, review_tasks=ordered, updated_at=utc_now().isoformat()) + + def with_exception(self, exception: WorkflowExceptionRecord) -> "WorkflowRun": + existing = {item.exception_id: item for item in self.exceptions} + existing[exception.exception_id] = exception + ordered = tuple(existing[item.exception_id] for item in self.exceptions if item.exception_id in existing) + if exception.exception_id not in {item.exception_id for item in self.exceptions}: + ordered = ordered + (exception,) + return replace(self, exceptions=ordered, updated_at=utc_now().isoformat()) + + @property + def open_review_tasks(self) -> tuple[WorkflowReviewTask, ...]: + return tuple(item for item in self.review_tasks if item.status == WorkflowReviewStatus.OPEN) + + @property + def open_exceptions(self) -> tuple[WorkflowExceptionRecord, ...]: + return tuple(item for item in self.exceptions if item.status == WorkflowExceptionStatus.OPEN) + def to_dict(self) -> dict[str, Any]: return compact_dict( { @@ -428,6 +682,8 @@ class WorkflowRun: "policy_context": dict(self.policy_context), "status": self.status.value, "step_runs": [item.to_dict() for item in self.step_runs], + "review_tasks": [item.to_dict() for item in self.review_tasks], + "exceptions": [item.to_dict() for item in self.exceptions], "output_asset_ids": list(self.output_asset_ids), "diagnostics": list(self.diagnostics), "retry_of_run_id": self.retry_of_run_id, @@ -451,6 +707,8 @@ class WorkflowRun: policy_context=dict(data.get("policy_context", {})), status=WorkflowRunStatus(data.get("status", WorkflowRunStatus.QUEUED.value)), step_runs=tuple(WorkflowStepRun.from_dict(item) for item in data.get("step_runs", ())), + review_tasks=tuple(WorkflowReviewTask.from_dict(item) for item in data.get("review_tasks", ())), + exceptions=tuple(WorkflowExceptionRecord.from_dict(item) for item in data.get("exceptions", ())), output_asset_ids=tuple(data.get("output_asset_ids", ())), diagnostics=tuple(data.get("diagnostics", ())), retry_of_run_id=data.get("retry_of_run_id"), diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index b72d3ea..66a8839 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -32,7 +32,12 @@ from .transformation_service import ( TransformationService, default_transformation_registry, ) -from .workflow_service import WorkflowInvocation, WorkflowRunResult, WorkflowService +from .workflow_service import ( + WorkflowInvocation, + WorkflowRunReconstruction, + WorkflowRunResult, + WorkflowService, +) __all__ = [ "AssetChangeResult", @@ -62,6 +67,7 @@ __all__ = [ "TransformationRunResult", "TransformationService", "WorkflowInvocation", + "WorkflowRunReconstruction", "WorkflowRunResult", "WorkflowService", "default_transformation_registry", diff --git a/src/kontextual_engine/services/transformation_service.py b/src/kontextual_engine/services/transformation_service.py index 60c5e9d..7f957d8 100644 --- a/src/kontextual_engine/services/transformation_service.py +++ b/src/kontextual_engine/services/transformation_service.py @@ -515,6 +515,20 @@ class TransformationService: metadata_delta={"lineage_id": lineage.lineage_id, "operation_id": run.operation_id}, ) self.repository.save_derived_lineage(lineage) + self._audit( + "derived_artifact.lineage.linked", + f"asset:{output_asset_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "lineage_id": lineage.lineage_id, + "transformation_run_id": run.run_id, + "source_asset_ids": list(lineage.source_asset_ids), + "source_version_ids": list(lineage.source_version_ids), + "output_representation_id": lineage.output_representation_id, + }, + ) return asset_change, output_representation, lineage def _authorize( diff --git a/src/kontextual_engine/services/workflow_service.py b/src/kontextual_engine/services/workflow_service.py index 2205ee7..052657a 100644 --- a/src/kontextual_engine/services/workflow_service.py +++ b/src/kontextual_engine/services/workflow_service.py @@ -8,8 +8,16 @@ from typing import Any from kontextual_engine.core import ( AuditEvent, AuditOutcome, + DerivedArtifactLineage, OperationContext, PolicyDecision, + TransformationRun, + WorkflowExceptionKind, + WorkflowExceptionRecord, + WorkflowExceptionStatus, + WorkflowReviewDecisionType, + WorkflowReviewStatus, + WorkflowReviewTask, WorkflowRun, WorkflowRunStatus, WorkflowStepDefinition, @@ -63,6 +71,35 @@ class WorkflowRunResult: } +@dataclass(frozen=True) +class WorkflowRunReconstruction: + run: WorkflowRun + template: WorkflowTemplate + audit_events: tuple[AuditEvent, ...] = () + transformation_runs: tuple[TransformationRun, ...] = () + derived_lineage: tuple[DerivedArtifactLineage, ...] = () + review_tasks: tuple[WorkflowReviewTask, ...] = () + exceptions: tuple[WorkflowExceptionRecord, ...] = () + + def __post_init__(self) -> None: + object.__setattr__(self, "audit_events", tuple(self.audit_events)) + object.__setattr__(self, "transformation_runs", tuple(self.transformation_runs)) + object.__setattr__(self, "derived_lineage", tuple(self.derived_lineage)) + object.__setattr__(self, "review_tasks", tuple(self.review_tasks)) + object.__setattr__(self, "exceptions", tuple(self.exceptions)) + + def to_dict(self) -> dict[str, Any]: + return { + "run": self.run.to_dict(), + "template": self.template.to_dict(), + "audit_events": [item.to_dict() for item in self.audit_events], + "transformation_runs": [item.to_dict() for item in self.transformation_runs], + "derived_lineage": [item.to_dict() for item in self.derived_lineage], + "review_tasks": [item.to_dict() for item in self.review_tasks], + "exceptions": [item.to_dict() for item in self.exceptions], + } + + class WorkflowService: def __init__( self, @@ -127,6 +164,41 @@ class WorkflowService: def list_templates(self, *, template_id: str | None = None) -> tuple[WorkflowTemplate, ...]: return tuple(self.repository.list_workflow_templates(template_id=template_id)) + def list_review_tasks( + self, + *, + status: WorkflowReviewStatus | str | None = WorkflowReviewStatus.OPEN, + workflow_run_id: str | None = None, + ) -> tuple[WorkflowReviewTask, ...]: + status = WorkflowReviewStatus(status) if status is not None else None + reviews = [ + review + for run in self.repository.list_workflow_runs(template_id=None) + if workflow_run_id is None or run.run_id == workflow_run_id + for review in run.review_tasks + if status is None or review.status == status + ] + return tuple(sorted(reviews, key=lambda item: (item.requested_at, item.review_id))) + + def list_exception_queue( + self, + *, + status: WorkflowExceptionStatus | str | None = WorkflowExceptionStatus.OPEN, + kind: WorkflowExceptionKind | str | None = None, + workflow_run_id: str | None = None, + ) -> tuple[WorkflowExceptionRecord, ...]: + status = WorkflowExceptionStatus(status) if status is not None else None + kind = WorkflowExceptionKind(kind) if kind is not None else None + exceptions = [ + exception + for run in self.repository.list_workflow_runs(template_id=None) + if workflow_run_id is None or run.run_id == workflow_run_id + for exception in run.exceptions + if (status is None or exception.status == status) + and (kind is None or exception.kind == kind) + ] + return tuple(sorted(exceptions, key=lambda item: (item.created_at, item.exception_id))) + def queue_template(self, invocation: WorkflowInvocation, context: OperationContext) -> WorkflowRunResult: template = self.repository.get_workflow_template( invocation.template_id, @@ -219,6 +291,17 @@ class WorkflowService: details={"run_id": run_id, "status": run.status.value}, ) return WorkflowRunResult(run=run, success=False, diagnostics=(diagnostic,)) + if run.open_review_tasks: + diagnostic = Diagnostic( + severity="warning", + code="workflow.review_open", + message="Workflow run is waiting for open review tasks", + details={ + "run_id": run_id, + "review_ids": [item.review_id for item in run.open_review_tasks], + }, + ) + return WorkflowRunResult(run=run, success=False, diagnostics=(diagnostic,)) template = self.repository.get_workflow_template(run.template_id, version=run.template_version) return self._execute_run(template, run, context) @@ -297,6 +380,189 @@ class WorkflowService: ) return canceled + def record_review_decision( + self, + run_id: str, + review_id: str, + decision: WorkflowReviewDecisionType | str, + context: OperationContext, + *, + note: str = "", + correction: dict[str, Any] | None = None, + ) -> WorkflowRunResult: + run = self.repository.get_workflow_run(run_id) + review = _find_review(run, review_id) + if review.status != WorkflowReviewStatus.OPEN: + diagnostic = Diagnostic( + severity="error", + code="workflow.review_not_open", + message="Workflow review task is not open", + details={"run_id": run_id, "review_id": review_id, "status": review.status.value}, + ) + return WorkflowRunResult(run=run, success=False, diagnostics=(diagnostic,)) + decision = WorkflowReviewDecisionType(decision) + policy_decision = self._authorize( + context, + "workflow.review.decide", + f"workflow_review:{review_id}", + resource_metadata={ + "workflow_run_id": run_id, + "step_id": review.step_id, + "decision": decision.value, + "correction": correction or {}, + }, + ) + if not policy_decision.allowed: + self._audit( + "workflow.review.decide", + f"workflow_review:{review_id}", + AuditOutcome.DENIED, + context, + policy_decision, + details={"workflow_run_id": run_id, "decision": decision.value}, + ) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": "workflow.review.decide", + "resource": f"workflow_review:{review_id}", + "correlation_id": context.correlation_id, + "policy_decision": policy_decision.to_dict(), + }, + ) + + updated_review = review.decide( + decision, + actor_id=context.actor.id, + note=note, + correction=correction, + ) + run = run.with_review_task(updated_review) + exception = _exception_for_review(run, review_id) + if exception is not None: + if decision == WorkflowReviewDecisionType.ESCALATE: + exception = exception.escalate(actor_id=context.actor.id, resolution=note or decision.value) + else: + exception = exception.resolve(actor_id=context.actor.id, resolution=note or decision.value) + run = run.with_exception(exception) + + self._audit( + f"workflow.review.{decision.value}", + f"workflow_review:{review_id}", + AuditOutcome.REVIEW_REQUIRED + if decision == WorkflowReviewDecisionType.ESCALATE + else AuditOutcome.SUCCESS, + context, + policy_decision, + details={ + "workflow_run_id": run_id, + "step_id": review.step_id, + "decision": decision.value, + "exception_id": exception.exception_id if exception else None, + "correction": correction or {}, + }, + ) + + step = _find_step_run(run, review.step_id) + if decision in (WorkflowReviewDecisionType.CONTINUE, WorkflowReviewDecisionType.CORRECT): + completed_step = step.completed( + transformation_run_id=step.transformation_run_id, + output_asset_ids=step.output_asset_ids, + ) + run = run.with_step_run(completed_step) + self.repository.save_workflow_run(run) + return self.resume_run(run.run_id, context) + if decision == WorkflowReviewDecisionType.RETRY: + self.repository.save_workflow_run(run) + return self.retry_run(run.run_id, context) + if decision == WorkflowReviewDecisionType.ESCALATE: + waiting = run.waiting( + ( + _diagnostic_dict( + Diagnostic( + severity="warning", + code="workflow.review_escalated", + message="Workflow review was escalated", + details={"review_id": review_id, "note": note}, + ) + ), + ) + ) + self.repository.save_workflow_run(waiting) + return WorkflowRunResult(run=waiting, success=False, policy_decision=policy_decision) + + diagnostic = Diagnostic( + severity="error", + code="workflow.review_rejected", + message="Workflow output was rejected during review", + details={"review_id": review_id, "note": note}, + ) + failed_step = step.failed((_diagnostic_dict(diagnostic),)) + run = run.with_step_run(failed_step) + final = _finalize_run(run) + self.repository.save_workflow_run(final) + event = self._audit( + _workflow_completion_operation(final), + f"workflow_run:{final.run_id}", + _workflow_audit_outcome(final), + context, + policy_decision, + details={ + "status": final.status.value, + "review_id": review_id, + "diagnostics": list(final.diagnostics), + }, + ) + return WorkflowRunResult( + run=final, + success=False, + diagnostics=tuple(Diagnostic(**item) for item in final.diagnostics), + audit_event=event, + policy_decision=policy_decision, + ) + + def reconstruct_run(self, run_id: str) -> WorkflowRunReconstruction: + run = self.repository.get_workflow_run(run_id) + template = self.repository.get_workflow_template(run.template_id, version=run.template_version) + transformation_runs: list[TransformationRun] = [] + for step in run.step_runs: + if step.transformation_run_id: + transformation_runs.append(self.repository.get_transformation_run(step.transformation_run_id)) + derived_lineage: list[DerivedArtifactLineage] = [] + for output_asset_id in run.output_asset_ids: + derived_lineage.extend(self.repository.list_derived_lineage(output_asset_id=output_asset_id)) + for step in run.step_runs: + for output_asset_id in step.output_asset_ids: + derived_lineage.extend(self.repository.list_derived_lineage(output_asset_id=output_asset_id)) + lineage_by_id = {item.lineage_id: item for item in derived_lineage} + transformation_targets = { + f"transformation_run:{item.run_id}" + for item in transformation_runs + } + workflow_targets = {f"workflow_run:{run.run_id}"} + workflow_targets.update(f"workflow_run:{run.run_id}:step:{item.step_id}" for item in run.step_runs) + workflow_targets.update(f"workflow_review:{item.review_id}" for item in run.review_tasks) + asset_targets = { + f"asset:{output_asset_id}" + for lineage in lineage_by_id.values() + for output_asset_id in (lineage.output_asset_id,) + } + relevant_targets = workflow_targets | transformation_targets | asset_targets + audit_events = tuple( + event + for event in self.repository.list_audit_events(correlation_id=run.correlation_id) + if event.target in relevant_targets + ) + return WorkflowRunReconstruction( + run=run, + template=template, + audit_events=audit_events, + transformation_runs=tuple(transformation_runs), + derived_lineage=tuple(lineage_by_id.values()), + review_tasks=run.review_tasks, + exceptions=run.exceptions, + ) + def _execute_run( self, template: WorkflowTemplate, @@ -329,6 +595,11 @@ class WorkflowService: WorkflowStepRunStatus.CANCELED, ): continue + if current.status == WorkflowStepRunStatus.WAITING and _step_has_open_review( + run, + step.step_id, + ): + continue if current.status == WorkflowStepRunStatus.FAILED and current.step_id in attempted_step_ids: continue dependency_diagnostic = _dependency_diagnostic(step, step_runs_by_id) @@ -355,14 +626,73 @@ class WorkflowService: executed, step_result = self._execute_step(step, run, context) if step_result is not None: step_results.append(step_result) + if executed.status == WorkflowStepRunStatus.COMPLETED: + review_task, exception = _review_gate_records(step, run, executed, context) + if review_task is not None and exception is not None: + review_task = replace(review_task, exception_id=exception.exception_id) + exception = replace(exception, review_id=review_task.review_id) + diagnostic = Diagnostic( + severity="warning", + code="workflow.review_required", + message="Workflow step output requires review before the run can continue", + details={ + "step_id": step.step_id, + "review_id": review_task.review_id, + "exception_id": exception.exception_id, + "queue": review_task.queue, + }, + ) + executed = replace( + executed, + status=WorkflowStepRunStatus.WAITING, + diagnostics=executed.diagnostics + (_diagnostic_dict(diagnostic),), + completed_at=None, + ) + run = run.with_review_task(review_task).with_exception(exception) + self.repository.save_workflow_run(run) + self._audit( + "workflow.review.requested", + f"workflow_review:{review_task.review_id}", + AuditOutcome.REVIEW_REQUIRED, + context, + decision, + details={ + "workflow_run_id": run.run_id, + "step_id": step.step_id, + "exception_id": exception.exception_id, + "output_asset_ids": list(review_task.output_asset_ids), + }, + ) + self._audit( + "workflow.exception.opened", + f"workflow_run:{run.run_id}", + AuditOutcome.REVIEW_REQUIRED, + context, + decision, + details=exception.to_dict(), + ) if executed.status == WorkflowStepRunStatus.FAILED and step.failure_behavior == "continue": executed = replace(executed, status=WorkflowStepRunStatus.SKIPPED) + if executed.status in (WorkflowStepRunStatus.FAILED, WorkflowStepRunStatus.SKIPPED): + exception = _step_exception_record(step, run, executed) + run = run.with_exception(exception) + self.repository.save_workflow_run(run) + self._audit( + "workflow.exception.opened", + f"workflow_run:{run.run_id}", + AuditOutcome.FAILED, + context, + decision, + details=exception.to_dict(), + ) run = run.with_step_run(executed) self.repository.save_workflow_run(run) + step_audit_operation = _step_audit_operation(executed) + step_audit_outcome = _step_audit_outcome(executed) self._audit( - "workflow.step.completed" if executed.status == WorkflowStepRunStatus.COMPLETED else "workflow.step.failed", + step_audit_operation, f"workflow_run:{run.run_id}:step:{step.step_id}", - AuditOutcome.SUCCESS if executed.status == WorkflowStepRunStatus.COMPLETED else AuditOutcome.FAILED, + step_audit_outcome, context, decision, details={ @@ -372,6 +702,8 @@ class WorkflowService: }, ) progress = True + if executed.status == WorkflowStepRunStatus.WAITING: + continue if executed.status == WorkflowStepRunStatus.FAILED and step.failure_behavior != "continue": progress = False break @@ -538,6 +870,122 @@ def _reset_step_run(step_run: WorkflowStepRun) -> WorkflowStepRun: return WorkflowStepRun(step_id=step_run.step_id, operation_id=step_run.operation_id) +def _find_review(run: WorkflowRun, review_id: str) -> WorkflowReviewTask: + for review in run.review_tasks: + if review.review_id == review_id: + return review + raise NotFoundError("Workflow review task not found", details={"run_id": run.run_id, "review_id": review_id}) + + +def _find_step_run(run: WorkflowRun, step_id: str) -> WorkflowStepRun: + for step_run in run.step_runs: + if step_run.step_id == step_id: + return step_run + raise NotFoundError("Workflow step run not found", details={"run_id": run.run_id, "step_id": step_id}) + + +def _exception_for_review(run: WorkflowRun, review_id: str) -> WorkflowExceptionRecord | None: + for exception in run.exceptions: + if exception.review_id == review_id: + return exception + return None + + +def _step_has_open_review(run: WorkflowRun, step_id: str) -> bool: + return any( + review.step_id == step_id and review.status == WorkflowReviewStatus.OPEN + for review in run.review_tasks + ) + + +def _review_gate_records( + step: WorkflowStepDefinition, + run: WorkflowRun, + step_run: WorkflowStepRun, + context: OperationContext, +) -> tuple[WorkflowReviewTask | None, WorkflowExceptionRecord | None]: + gate = dict(step.review_gate) + if not gate.get("required") and not step.metadata.get("review_required"): + return None, None + reason = str(gate.get("reason") or step.metadata.get("review_reason") or "Workflow step output requires review") + queue = str(gate.get("queue") or step.metadata.get("review_queue") or "human-review") + assigned_to = gate.get("assigned_to") or step.metadata.get("review_assignee") + review = WorkflowReviewTask( + workflow_run_id=run.run_id, + step_id=step.step_id, + reason=reason, + requested_by=context.actor.id, + output_asset_ids=step_run.output_asset_ids, + queue=queue, + assigned_to=assigned_to, + diagnostics=step_run.diagnostics, + ) + exception = WorkflowExceptionRecord( + workflow_run_id=run.run_id, + kind=WorkflowExceptionKind.REVIEW_REQUIRED, + message=reason, + step_id=step.step_id, + diagnostics=step_run.diagnostics, + output_asset_ids=step_run.output_asset_ids, + assigned_to=assigned_to, + ) + return review, exception + + +def _step_exception_record( + step: WorkflowStepDefinition, + run: WorkflowRun, + step_run: WorkflowStepRun, +) -> WorkflowExceptionRecord: + kind = _exception_kind_for_diagnostics(step_run.diagnostics) + message = "Workflow step failed" + if step_run.status == WorkflowStepRunStatus.SKIPPED: + message = "Workflow step was skipped after a recoverable failure" + if step_run.diagnostics: + message = str(step_run.diagnostics[-1].get("message") or message) + return WorkflowExceptionRecord( + workflow_run_id=run.run_id, + kind=kind, + message=message, + step_id=step.step_id, + diagnostics=step_run.diagnostics, + output_asset_ids=step_run.output_asset_ids, + ) + + +def _exception_kind_for_diagnostics(diagnostics: tuple[dict[str, Any], ...]) -> WorkflowExceptionKind: + codes = {str(item.get("code", "")) for item in diagnostics} + if any("permission_denied" in code or "policy" in code for code in codes): + return WorkflowExceptionKind.POLICY_CONFLICT + if any("low_confidence" in code for code in codes): + return WorkflowExceptionKind.LOW_CONFIDENCE + if any("dependency" in code or "blocked" in code or "precondition" in code for code in codes): + return WorkflowExceptionKind.BLOCKED + return WorkflowExceptionKind.FAILED + + +def _step_audit_operation(step_run: WorkflowStepRun) -> str: + if step_run.status == WorkflowStepRunStatus.COMPLETED: + return "workflow.step.completed" + if step_run.status == WorkflowStepRunStatus.WAITING: + return "workflow.step.waiting" + if step_run.status == WorkflowStepRunStatus.SKIPPED: + return "workflow.step.skipped" + if step_run.status == WorkflowStepRunStatus.CANCELED: + return "workflow.step.canceled" + return "workflow.step.failed" + + +def _step_audit_outcome(step_run: WorkflowStepRun) -> AuditOutcome: + if step_run.status == WorkflowStepRunStatus.COMPLETED: + return AuditOutcome.SUCCESS + if step_run.status == WorkflowStepRunStatus.WAITING: + return AuditOutcome.REVIEW_REQUIRED + if step_run.status == WorkflowStepRunStatus.SKIPPED: + return AuditOutcome.PARTIAL + return AuditOutcome.FAILED + + def _template_diagnostics(template: WorkflowTemplate) -> tuple[Diagnostic, ...]: diagnostics: list[Diagnostic] = [] step_ids = [step.step_id for step in template.steps] diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py index 7a7a823..60113c0 100644 --- a/tests/test_workflow_service.py +++ b/tests/test_workflow_service.py @@ -16,9 +16,13 @@ from kontextual_engine import ( TransformationRunStatus, TransformationService, ValidationError, + WorkflowExceptionKind, + WorkflowExceptionStatus, WorkflowInputDefinition, WorkflowInputKind, WorkflowInvocation, + WorkflowReviewDecisionType, + WorkflowReviewStatus, WorkflowRunStatus, WorkflowService, WorkflowStepDefinition, @@ -225,6 +229,180 @@ def test_sqlite_workflow_templates_and_runs_survive_reinstantiation(tmp_path: Pa assert reloaded.list_representations(asset_id="asset-single-output")[0].kind == RepresentationKind.DERIVED +def test_review_gate_pauses_output_then_continue_completes_and_reconstructs_lineage() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(review_gate_template(), context) + + waiting = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-review", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + assert waiting.success is False + assert waiting.run.status == WorkflowRunStatus.WAITING + assert waiting.run.step_runs[0].status == WorkflowStepRunStatus.WAITING + assert waiting.run.review_tasks[0].status == WorkflowReviewStatus.OPEN + assert waiting.run.exceptions[0].kind == WorkflowExceptionKind.REVIEW_REQUIRED + assert waiting.run.exceptions[0].status == WorkflowExceptionStatus.OPEN + assert workflow.list_review_tasks(workflow_run_id=waiting.run.run_id) == waiting.run.review_tasks + assert workflow.list_exception_queue(kind=WorkflowExceptionKind.REVIEW_REQUIRED) == waiting.run.exceptions + + continued = workflow.record_review_decision( + waiting.run.run_id, + waiting.run.review_tasks[0].review_id, + WorkflowReviewDecisionType.CONTINUE, + context, + note="approved for release", + ) + reconstruction = workflow.reconstruct_run(continued.run.run_id) + + assert continued.success is True + assert continued.run.status == WorkflowRunStatus.COMPLETED + assert continued.run.review_tasks[0].status == WorkflowReviewStatus.CONTINUED + assert continued.run.exceptions[0].status == WorkflowExceptionStatus.RESOLVED + assert continued.run.output_asset_ids == ("asset-reviewed-output",) + assert len(reconstruction.transformation_runs) == 1 + assert reconstruction.derived_lineage[0].output_asset_id == "asset-reviewed-output" + assert { + event.operation + for event in reconstruction.audit_events + } >= { + "workflow.review.requested", + "workflow.review.continue", + "derived_artifact.lineage.linked", + } + + +def test_review_decisions_can_reject_correct_retry_and_escalate_runs() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(review_gate_template(), context) + + rejected = _invoke_review_workflow(workflow, context) + rejected_result = workflow.record_review_decision( + rejected.run.run_id, + rejected.run.review_tasks[0].review_id, + WorkflowReviewDecisionType.REJECT, + context, + note="incorrect output", + ) + assert rejected_result.run.status == WorkflowRunStatus.FAILED + assert rejected_result.run.review_tasks[0].status == WorkflowReviewStatus.REJECTED + assert rejected_result.run.exceptions[0].status == WorkflowExceptionStatus.RESOLVED + + corrected = _invoke_review_workflow(workflow, context) + corrected_result = workflow.record_review_decision( + corrected.run.run_id, + corrected.run.review_tasks[0].review_id, + WorkflowReviewDecisionType.CORRECT, + context, + note="minor metadata correction", + correction={"label": "approved-after-edit"}, + ) + assert corrected_result.run.status == WorkflowRunStatus.COMPLETED + assert corrected_result.run.review_tasks[0].status == WorkflowReviewStatus.CORRECTED + assert corrected_result.run.review_tasks[0].correction == {"label": "approved-after-edit"} + + retry_requested = _invoke_review_workflow(workflow, context) + retry_result = workflow.record_review_decision( + retry_requested.run.run_id, + retry_requested.run.review_tasks[0].review_id, + WorkflowReviewDecisionType.RETRY, + context, + note="rerun with same inputs", + ) + assert repo.get_workflow_run(retry_requested.run.run_id).status == WorkflowRunStatus.RETRIED + assert retry_result.run.retry_of_run_id == retry_requested.run.run_id + assert retry_result.run.status == WorkflowRunStatus.WAITING + assert retry_result.run.review_tasks[0].status == WorkflowReviewStatus.OPEN + + escalated = _invoke_review_workflow(workflow, context) + escalated_result = workflow.record_review_decision( + escalated.run.run_id, + escalated.run.review_tasks[0].review_id, + WorkflowReviewDecisionType.ESCALATE, + context, + note="needs owner decision", + ) + assert escalated_result.run.status == WorkflowRunStatus.WAITING + assert escalated_result.run.review_tasks[0].status == WorkflowReviewStatus.ESCALATED + assert escalated_result.run.exceptions[0].status == WorkflowExceptionStatus.ESCALATED + assert workflow.list_exception_queue(status=WorkflowExceptionStatus.ESCALATED)[0].exception_id == ( + escalated_result.run.exceptions[0].exception_id + ) + + +def test_failed_workflow_step_creates_exception_queue_item() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(markdown_failure_template(), context) + + result = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-failure", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + exceptions = workflow.list_exception_queue(kind=WorkflowExceptionKind.FAILED) + + assert result.run.status == WorkflowRunStatus.FAILED + assert exceptions[0].workflow_run_id == result.run.run_id + assert exceptions[0].step_id == "markdown" + assert exceptions[0].diagnostics[0]["code"] == "transformation.operation_not_executable" + + +def test_sqlite_review_state_survives_reinstantiation(tmp_path: Path) -> None: + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(review_gate_template(), context) + + waiting = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-review", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + reloaded = WorkflowService(SQLiteAssetRegistryRepository(db_path)) + run = reloaded.reconstruct_run(waiting.run.run_id).run + + assert run.status == WorkflowRunStatus.WAITING + assert run.review_tasks[0].status == WorkflowReviewStatus.OPEN + assert run.exceptions[0].kind == WorkflowExceptionKind.REVIEW_REQUIRED + + def rich_input_template() -> WorkflowTemplate: return WorkflowTemplate( template_id="workflow-rich-inputs", @@ -309,6 +487,56 @@ def partial_template() -> WorkflowTemplate: ) +def review_gate_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-review", + name="Review Workflow", + inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),), + steps=( + WorkflowStepDefinition( + step_id="view", + operation_id="structured_view", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-reviewed-output", "title": "Reviewed Output"}, + review_gate={ + "required": True, + "reason": "sensitive output requires approval", + "queue": "content-review", + }, + ), + ), + ) + + +def markdown_failure_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-failure", + name="Failure Workflow", + inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),), + steps=( + WorkflowStepDefinition( + step_id="markdown", + operation_id="markdown_transform", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-markdown-failure"}, + ), + ), + ) + + +def _invoke_review_workflow( + workflow: WorkflowService, + context: OperationContext, +): + return workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-review", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + def create_source_asset( registry: AssetRegistryService, context: OperationContext, diff --git a/workplans/KONT-WP-0008-transformations-workflow-jobs.md b/workplans/KONT-WP-0008-transformations-workflow-jobs.md index 3b8605f..90c0d1c 100644 --- a/workplans/KONT-WP-0008-transformations-workflow-jobs.md +++ b/workplans/KONT-WP-0008-transformations-workflow-jobs.md @@ -4,7 +4,7 @@ type: workplan title: "Traceable Transformations And Workflow Jobs" domain: markitect repo: kontextual-engine -status: active +status: completed owner: codex topic_slug: markitect planning_priority: high @@ -47,13 +47,11 @@ audit events. ## Implementation Status -The first foundation slices are implemented for transformation operations, +The foundation slices are implemented for transformation operations, transformation run persistence, derived artifact lineage, workflow templates, -and the MVP durable job runner. See `docs/transformation-implementation.md` -and `docs/workflow-jobs-implementation.md`. - -Review gates, exception queues, and richer workflow audit reconstruction remain -open in this workplan. +the MVP durable job runner, review gates, exception queues, and audit +reconstruction. See `docs/transformation-implementation.md` and +`docs/workflow-jobs-implementation.md`. ## O8.1 - Implement transformation operation registry @@ -202,7 +200,7 @@ Implemented: ```task id: KONT-WP-0008-T006 -status: todo +status: done priority: medium state_hub_task_id: "5fae9005-4d64-4fca-8c51-a19405512377" ``` @@ -217,11 +215,20 @@ Acceptance: or review-required items. - Review decisions continue, reject, correct, retry, or escalate runs. +Implemented: + +- Workflow step definitions support `review_gate` settings. +- Review-required outputs pause step and run execution with embedded + `WorkflowReviewTask` and `WorkflowExceptionRecord` state. +- Review decisions can continue, reject, correct, retry, or escalate a run. +- Exception queue listing exposes review-required, failed, blocked, + low-confidence, and policy-conflicted workflow exceptions. + ## O8.7 - Audit workflow and transformation operations ```task id: KONT-WP-0008-T007 -status: todo +status: done priority: medium state_hub_task_id: "9e06aa46-3988-4389-99ec-0a934c68af1b" ``` @@ -236,6 +243,17 @@ Acceptance: policy context. - Derived artifact audit events connect to source lineage. +Implemented: + +- Workflow template registration, run queue/start/final states, step + executions, retries, cancellations, review requests, review decisions, + failures, and exceptions emit audit events. +- `WorkflowService.reconstruct_run` returns run state, template, audit events, + transformation runs, derived lineage, review tasks, and exceptions. +- Derived artifact lineage emits `derived_artifact.lineage.linked` audit events + connected to output asset, transformation run, source assets, source + versions, and output representation. + ## Definition Of Done - Transformations and workflows produce inspectable run records and audit