diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index c443999..5c8ab7f 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -65,6 +65,7 @@ and SQLite repositories are adapters behind those ports. - Idempotency-key reuse with a different payload raises a validation error. - Transformation run and derived lineage persistence for traceable derived artifact creation. +- Workflow template and workflow run persistence for durable job execution. - In-memory repository for deterministic tests. - SQLite repository for local-first durable asset registry state. - SQLite foreign-key enforcement for representation and metadata asset @@ -89,12 +90,14 @@ and SQLite repositories are adapters behind those ports. - `ingestion_jobs` - `transformation_runs` - `derived_lineage` +- `workflow_templates` +- `workflow_runs` Payloads are stored as compact JSON envelopes while indexed columns carry stable lookup fields such as asset ID, lifecycle, representation kind, digest, sequence, relationship source/target, actor ID, target, correlation ID, -idempotency key, transformation status, operation ID, and derived output asset -ID. +idempotency key, transformation status, operation ID, derived output asset ID, +workflow template ID, workflow version, and workflow run status. ## Not Yet Implemented diff --git a/docs/workflow-jobs-implementation.md b/docs/workflow-jobs-implementation.md new file mode 100644 index 0000000..7412ffd --- /dev/null +++ b/docs/workflow-jobs-implementation.md @@ -0,0 +1,104 @@ +# Workflow Jobs Implementation Note + +Date: 2026-05-06 + +Status: active implementation note for `KONT-WP-0008`. + +## Purpose + +This note records the first durable workflow and job-runner foundation. It +extends traceable transformations into reusable workflow templates with +dependency-ordered execution and recoverable run state. + +## Implemented Package Shape + +```text +src/kontextual_engine/ + core/ + workflow_jobs.py + services/ + workflow_service.py + ports/ + repositories.py + adapters/ + memory/asset_registry.py + sqlite/asset_registry.py +``` + +The workflow service uses the existing repository and policy ports. Executable +steps delegate to `TransformationService`, so workflow runs do not bypass asset +governance, transformation run records, derived lineage, or audit. + +## Implemented Capabilities + +- `WorkflowTemplate` for reusable template identity, version, metadata, + inputs, steps, policy checks, and failure behavior. +- `WorkflowInputDefinition` supports asset, collection, query, source event, + and payload inputs. +- `WorkflowStepDefinition` captures step kind, operation ID, dependencies, + input/output bindings, preconditions, failure behavior, and metadata. +- Template registration validates duplicate step IDs, missing dependencies, + dependency cycles, and unsupported failure behavior. +- `WorkflowRun` and `WorkflowStepRun` persist queued, running, waiting, + completed, partially completed, failed, retried, and canceled states. +- `WorkflowService` can register templates, queue runs, invoke runs, resume + runs, retry runs, and cancel runs programmatically. +- The MVP runner executes transformation-backed steps in dependency order. +- Step inputs can resolve source asset IDs from invocation inputs or completed + upstream step outputs. +- Retry and repeated invocation avoid silent overwrite by choosing a fresh + output asset ID when a fixed template output ID already exists. +- Workflow template and run persistence are implemented for memory and SQLite. +- Audit events are emitted for template registration, run queue/start/final + states, step start/final states, retry, and cancellation. + +## Current Boundaries + +The runner executes only `kind="transformation"` steps. Non-transformation +steps are represented and fail with structured diagnostics until a later +adapter or job worker handles them. + +Workflow inputs preserve collection, query, source event, and payload bindings, +but the MVP runner only interprets asset bindings for transformation execution. +Query expansion, source-event ingestion, human tasks, and exception queues stay +in later WP-0008 tasks. + +Markdown-specific transformations remain adapter-backed through markitect-tool. +Workflow orchestration may invoke those operations once the adapter boundary is +wired, but the engine does not reimplement Markdown semantics. + +## Current SQLite Tables + +WP-0008 adds shared registry persistence for: + +- `workflow_templates` +- `workflow_runs` + +The tables store compact JSON payloads with indexed lookup columns for template +ID, template version, template name, run ID, run status, actor ID, correlation +ID, queued timestamp, and updated timestamp. + +## Not Yet Implemented + +- Human review gates and approval tasks. +- Exception queues for blocked, failed, low-confidence, or policy-conflicted + items. +- Queue-worker adapters beyond embedded execution. +- Rich retry policies by operation type. +- Query/input expansion into dynamic asset sets. +- Full workflow reconstruction views across all audit and lineage records. + +These remain in open tasks `KONT-WP-0008-T006` and `KONT-WP-0008-T007`. + +## Test Coverage + +`tests/test_workflow_service.py` covers: + +- template registration, input-kind persistence, missing dependency rejection, + and dependency-cycle rejection, +- dependency-ordered execution of two transformation steps, +- queue, cancel, resume denial, and retry without direct storage edits, +- partial completion when a continue-on-failure step fails and another step + succeeds, +- SQLite reload of workflow templates, workflow runs, step state, and derived + output representation state. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 9fdafa8..295fac7 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -58,6 +58,14 @@ from .core import ( TransformationRun, TransformationRunStatus, VersionChangeType, + WorkflowInputDefinition, + WorkflowInputKind, + WorkflowRun, + WorkflowRunStatus, + WorkflowStepDefinition, + WorkflowStepRun, + WorkflowStepRunStatus, + WorkflowTemplate, ) from .errors import ( AdapterUnavailableError, @@ -109,6 +117,9 @@ from .services import ( TransformationRequest, TransformationRunResult, TransformationService, + WorkflowInvocation, + WorkflowRunResult, + WorkflowService, default_transformation_registry, ) from .storage import InMemoryKnowledgeRepository @@ -232,7 +243,18 @@ __all__ = [ "TransformationService", "ValidationError", "VersionChangeType", + "WorkflowInputDefinition", + "WorkflowInputKind", + "WorkflowInvocation", + "WorkflowRun", + "WorkflowRunResult", + "WorkflowRunStatus", + "WorkflowService", "WorkflowStep", + "WorkflowStepDefinition", + "WorkflowStepRun", + "WorkflowStepRunStatus", + "WorkflowTemplate", "bundle_digest", "content_digest", "default_transformation_registry", diff --git a/src/kontextual_engine/adapters/memory/asset_registry.py b/src/kontextual_engine/adapters/memory/asset_registry.py index 4d7c9d1..678227e 100644 --- a/src/kontextual_engine/adapters/memory/asset_registry.py +++ b/src/kontextual_engine/adapters/memory/asset_registry.py @@ -26,6 +26,9 @@ from kontextual_engine.core import ( Sensitivity, TransformationRun, TransformationRunStatus, + WorkflowRun, + WorkflowRunStatus, + WorkflowTemplate, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -47,6 +50,8 @@ class InMemoryAssetRegistryRepository: ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict) transformation_runs: dict[str, TransformationRun] = field(default_factory=dict) derived_lineage: dict[str, DerivedArtifactLineage] = field(default_factory=dict) + workflow_templates: dict[tuple[str, str], WorkflowTemplate] = field(default_factory=dict) + workflow_runs: dict[str, WorkflowRun] = field(default_factory=dict) def save_actor(self, actor: Actor) -> Actor: self.actors[actor.id] = actor @@ -357,6 +362,64 @@ class InMemoryAssetRegistryRepository: records = [record for record in records if record.transformation_run_id == transformation_run_id] return sorted(records, key=lambda record: (record.transformation_run_id, record.lineage_id)) + def save_workflow_template(self, template: WorkflowTemplate) -> WorkflowTemplate: + self.workflow_templates[(template.template_id, template.version)] = template + return template + + def get_workflow_template( + self, + template_id: str, + *, + version: str | None = None, + ) -> WorkflowTemplate: + if version is not None: + try: + return self.workflow_templates[(template_id, version)] + except KeyError as exc: + raise NotFoundError( + "Workflow template not found", + details={"template_id": template_id, "version": version}, + ) from exc + candidates = [item for key, item in self.workflow_templates.items() if key[0] == template_id] + if not candidates: + raise NotFoundError("Workflow template not found", details={"template_id": template_id}) + return sorted(candidates, key=lambda item: (item.updated_at, item.version))[-1] + + def list_workflow_templates( + self, + *, + template_id: str | None = None, + ) -> list[WorkflowTemplate]: + templates: Iterable[WorkflowTemplate] = self.workflow_templates.values() + if template_id is not None: + templates = [item for item in templates if item.template_id == template_id] + return sorted(templates, key=lambda item: (item.name, item.version, item.template_id)) + + def save_workflow_run(self, run: WorkflowRun) -> WorkflowRun: + self.get_actor(run.actor_id) + self.get_workflow_template(run.template_id, version=run.template_version) + self.workflow_runs[run.run_id] = run + return run + + def get_workflow_run(self, run_id: str) -> WorkflowRun: + try: + return self.workflow_runs[run_id] + except KeyError as exc: + raise NotFoundError("Workflow run not found", details={"run_id": run_id}) from exc + + def list_workflow_runs( + self, + *, + status: WorkflowRunStatus | None = None, + template_id: str | None = None, + ) -> list[WorkflowRun]: + runs: Iterable[WorkflowRun] = self.workflow_runs.values() + if status is not None: + runs = [run for run in runs if run.status == status] + if template_id is not None: + runs = [run for run in runs if run.template_id == template_id] + return sorted(runs, key=lambda run: (run.queued_at, run.run_id)) + def _metadata_matches( records: list[MetadataRecord], diff --git a/src/kontextual_engine/adapters/sqlite/asset_registry.py b/src/kontextual_engine/adapters/sqlite/asset_registry.py index 5d14d66..ea0b0f5 100644 --- a/src/kontextual_engine/adapters/sqlite/asset_registry.py +++ b/src/kontextual_engine/adapters/sqlite/asset_registry.py @@ -29,6 +29,9 @@ from kontextual_engine.core import ( Sensitivity, TransformationRun, TransformationRunStatus, + WorkflowRun, + WorkflowRunStatus, + WorkflowTemplate, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -723,6 +726,139 @@ class SQLiteAssetRegistryRepository: records = [record for record in records if source_asset_id in record.source_asset_ids] return records + def save_workflow_template(self, template: WorkflowTemplate) -> WorkflowTemplate: + with self._connect() as conn: + conn.execute( + """ + insert into workflow_templates + (id, version, name, updated_at, payload) + values (?, ?, ?, ?, ?) + on conflict(id, version) do update set + name=excluded.name, + updated_at=excluded.updated_at, + payload=excluded.payload + """, + ( + template.template_id, + template.version, + template.name, + template.updated_at, + _json(template.to_dict()), + ), + ) + return template + + def get_workflow_template( + self, + template_id: str, + *, + version: str | None = None, + ) -> WorkflowTemplate: + if version is None: + row = self._one( + """ + select payload from workflow_templates + where id = ? + order by updated_at desc, version desc + limit 1 + """, + (template_id,), + ) + else: + row = self._one( + "select payload from workflow_templates where id = ? and version = ?", + (template_id, version), + ) + if row is None: + details = {"template_id": template_id} + if version is not None: + details["version"] = version + raise NotFoundError("Workflow template not found", details=details) + return WorkflowTemplate.from_dict(_loads(row["payload"])) + + def list_workflow_templates( + self, + *, + template_id: str | None = None, + ) -> list[WorkflowTemplate]: + if template_id is None: + rows = self._all("select payload from workflow_templates order by name, version, id", ()) + else: + rows = self._all( + "select payload from workflow_templates where id = ? order by name, version, id", + (template_id,), + ) + return [WorkflowTemplate.from_dict(_loads(row["payload"])) for row in rows] + + def save_workflow_run(self, run: WorkflowRun) -> WorkflowRun: + try: + with self._connect() as conn: + conn.execute( + """ + insert into workflow_runs + (id, template_id, template_version, status, actor_id, correlation_id, + queued_at, updated_at, payload) + values (?, ?, ?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + template_id=excluded.template_id, + template_version=excluded.template_version, + status=excluded.status, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + queued_at=excluded.queued_at, + updated_at=excluded.updated_at, + payload=excluded.payload + """, + ( + run.run_id, + run.template_id, + run.template_version, + run.status.value, + run.actor_id, + run.correlation_id, + run.queued_at, + run.updated_at, + _json(run.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Workflow run references an unknown actor or template", + details={ + "actor_id": run.actor_id, + "template_id": run.template_id, + "template_version": run.template_version, + "run_id": run.run_id, + }, + ) from exc + raise + return run + + def get_workflow_run(self, run_id: str) -> WorkflowRun: + row = self._one("select payload from workflow_runs where id = ?", (run_id,)) + if row is None: + raise NotFoundError("Workflow run not found", details={"run_id": run_id}) + return WorkflowRun.from_dict(_loads(row["payload"])) + + def list_workflow_runs( + self, + *, + status: WorkflowRunStatus | None = None, + template_id: str | None = None, + ) -> list[WorkflowRun]: + clauses = [] + params: list[Any] = [] + if status is not None: + clauses.append("status = ?") + params.append(status.value) + if template_id is not None: + clauses.append("template_id = ?") + params.append(template_id) + where = f" where {' and '.join(clauses)}" if clauses else "" + rows = self._all(f"select payload from workflow_runs{where} order by queued_at, id", tuple(params)) + return [WorkflowRun.from_dict(_loads(row["payload"])) for row in rows] + def _initialize(self) -> None: with self._connect() as conn: conn.executescript( @@ -839,6 +975,28 @@ class SQLiteAssetRegistryRepository: transformation_run_id text not null references transformation_runs(id) on delete cascade, payload text not null ); + create table if not exists workflow_templates ( + id text not null, + version text not null, + name text not null, + updated_at text not null, + payload text not null, + primary key(id, version) + ); + create table if not exists workflow_runs ( + id text primary key, + template_id text not null, + template_version text not null, + status text not null, + actor_id text not null, + correlation_id text not null, + queued_at text not null, + updated_at text not null, + payload text not null, + foreign key(actor_id) references actors(id), + foreign key(template_id, template_version) + references workflow_templates(id, version) + ); create index if not exists idx_assets_lifecycle on assets(lifecycle); create index if not exists idx_representations_asset on representations(asset_id); create index if not exists idx_metadata_asset on metadata_records(asset_id); @@ -858,6 +1016,10 @@ class SQLiteAssetRegistryRepository: create index if not exists idx_transformation_runs_correlation on transformation_runs(correlation_id); create index if not exists idx_derived_lineage_output on derived_lineage(output_asset_id); create index if not exists idx_derived_lineage_run on derived_lineage(transformation_run_id); + create index if not exists idx_workflow_templates_name on workflow_templates(name); + create index if not exists idx_workflow_runs_status on workflow_runs(status); + create index if not exists idx_workflow_runs_template on workflow_runs(template_id); + create index if not exists idx_workflow_runs_correlation on workflow_runs(correlation_id); """ ) diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index 722919f..c4c4708 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -42,6 +42,16 @@ from .relationships import ( ) from .retrieval_feedback import RetrievalFeedbackLabel, RetrievalFeedbackRecord from .transformations import TransformationOperation, TransformationRun, TransformationRunStatus +from .workflow_jobs import ( + WorkflowInputDefinition, + WorkflowInputKind, + WorkflowRun, + WorkflowRunStatus, + WorkflowStepDefinition, + WorkflowStepRun, + WorkflowStepRunStatus, + WorkflowTemplate, +) __all__ = [ "Actor", @@ -87,6 +97,14 @@ __all__ = [ "TransformationRun", "TransformationRunStatus", "VersionChangeType", + "WorkflowInputDefinition", + "WorkflowInputKind", + "WorkflowRun", + "WorkflowRunStatus", + "WorkflowStepDefinition", + "WorkflowStepRun", + "WorkflowStepRunStatus", + "WorkflowTemplate", "content_digest", "mapping_digest", "new_id", diff --git a/src/kontextual_engine/core/workflow_jobs.py b/src/kontextual_engine/core/workflow_jobs.py new file mode 100644 index 0000000..a2e475a --- /dev/null +++ b/src/kontextual_engine/core/workflow_jobs.py @@ -0,0 +1,462 @@ +"""Workflow template and durable run primitives.""" + +from __future__ import annotations + +from dataclasses import dataclass, field, replace +from enum import Enum +from typing import Any + +from .primitives import compact_dict, mapping_digest, new_id, utc_now + + +class WorkflowInputKind(str, Enum): + ASSET = "asset" + COLLECTION = "collection" + QUERY = "query" + SOURCE_EVENT = "source_event" + PAYLOAD = "payload" + + +class WorkflowRunStatus(str, Enum): + QUEUED = "queued" + RUNNING = "running" + WAITING = "waiting" + COMPLETED = "completed" + PARTIALLY_COMPLETED = "partially_completed" + FAILED = "failed" + RETRIED = "retried" + CANCELED = "canceled" + + +class WorkflowStepRunStatus(str, Enum): + QUEUED = "queued" + RUNNING = "running" + WAITING = "waiting" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + CANCELED = "canceled" + + +@dataclass(frozen=True) +class WorkflowInputDefinition: + name: str + kind: WorkflowInputKind | str + required: bool = True + description: str = "" + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "kind", WorkflowInputKind(self.kind)) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "name": self.name, + "kind": self.kind.value, + "required": self.required, + "description": self.description, + "metadata": dict(self.metadata), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowInputDefinition": + return cls( + name=data["name"], + kind=WorkflowInputKind(data["kind"]), + required=bool(data.get("required", True)), + description=data.get("description", ""), + metadata=dict(data.get("metadata", {})), + ) + + +@dataclass(frozen=True) +class WorkflowStepDefinition: + step_id: str + kind: str = "transformation" + operation_id: str | None = None + depends_on: tuple[str, ...] = () + inputs: dict[str, Any] = field(default_factory=dict) + parameters: dict[str, Any] = field(default_factory=dict) + outputs: dict[str, Any] = field(default_factory=dict) + preconditions: tuple[dict[str, Any], ...] = () + failure_behavior: str = "fail_workflow" + required_permissions: tuple[str, ...] = () + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "depends_on", tuple(self.depends_on)) + object.__setattr__(self, "preconditions", tuple(dict(item) for item in self.preconditions)) + object.__setattr__(self, "required_permissions", tuple(self.required_permissions)) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "step_id": self.step_id, + "kind": self.kind, + "operation_id": self.operation_id, + "depends_on": list(self.depends_on), + "inputs": dict(self.inputs), + "parameters": dict(self.parameters), + "outputs": dict(self.outputs), + "preconditions": list(self.preconditions), + "failure_behavior": self.failure_behavior, + "required_permissions": list(self.required_permissions), + "metadata": dict(self.metadata), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowStepDefinition": + return cls( + step_id=data["step_id"], + kind=data.get("kind", "transformation"), + operation_id=data.get("operation_id"), + depends_on=tuple(data.get("depends_on", ())), + inputs=dict(data.get("inputs", {})), + parameters=dict(data.get("parameters", {})), + outputs=dict(data.get("outputs", {})), + preconditions=tuple(dict(item) for item in data.get("preconditions", ())), + failure_behavior=data.get("failure_behavior", "fail_workflow"), + required_permissions=tuple(data.get("required_permissions", ())), + metadata=dict(data.get("metadata", {})), + ) + + +@dataclass(frozen=True) +class WorkflowTemplate: + name: str + inputs: tuple[WorkflowInputDefinition, ...] = () + steps: tuple[WorkflowStepDefinition, ...] = () + version: str = "1" + description: str = "" + policy_checks: tuple[dict[str, Any], ...] = () + failure_behavior: str = "fail_workflow" + metadata: dict[str, Any] = field(default_factory=dict) + created_by: str | None = None + template_id: str = field(default_factory=lambda: new_id("wftpl")) + created_at: str = field(default_factory=lambda: utc_now().isoformat()) + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def __post_init__(self) -> None: + object.__setattr__(self, "inputs", tuple(self.inputs)) + object.__setattr__(self, "steps", tuple(self.steps)) + object.__setattr__(self, "policy_checks", tuple(dict(item) for item in self.policy_checks)) + + @property + def template_hash(self) -> str: + return mapping_digest(self.to_dict(include_hash=False)) + + def with_actor(self, actor_id: str) -> "WorkflowTemplate": + return replace(self, created_by=self.created_by or actor_id, updated_at=utc_now().isoformat()) + + def to_dict(self, *, include_hash: bool = True) -> dict[str, Any]: + data = compact_dict( + { + "template_id": self.template_id, + "name": self.name, + "version": self.version, + "description": self.description, + "inputs": [item.to_dict() for item in self.inputs], + "steps": [item.to_dict() for item in self.steps], + "policy_checks": list(self.policy_checks), + "failure_behavior": self.failure_behavior, + "metadata": dict(self.metadata), + "created_by": self.created_by, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + ) + if include_hash: + data["template_hash"] = self.template_hash + return data + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowTemplate": + return cls( + template_id=data["template_id"], + name=data["name"], + version=data.get("version", "1"), + description=data.get("description", ""), + inputs=tuple(WorkflowInputDefinition.from_dict(item) for item in data.get("inputs", ())), + steps=tuple(WorkflowStepDefinition.from_dict(item) for item in data.get("steps", ())), + policy_checks=tuple(dict(item) for item in data.get("policy_checks", ())), + failure_behavior=data.get("failure_behavior", "fail_workflow"), + metadata=dict(data.get("metadata", {})), + created_by=data.get("created_by"), + created_at=data["created_at"], + updated_at=data["updated_at"], + ) + + +@dataclass(frozen=True) +class WorkflowStepRun: + step_id: str + operation_id: str | None = None + status: WorkflowStepRunStatus = WorkflowStepRunStatus.QUEUED + transformation_run_id: str | None = None + output_asset_ids: tuple[str, ...] = () + diagnostics: tuple[dict[str, Any], ...] = () + started_at: str | None = None + completed_at: str | None = None + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def __post_init__(self) -> None: + object.__setattr__(self, "status", WorkflowStepRunStatus(self.status)) + object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def running(self) -> "WorkflowStepRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowStepRunStatus.RUNNING, + started_at=self.started_at or now, + updated_at=now, + ) + + def waiting(self, diagnostics: tuple[dict[str, Any], ...] = ()) -> "WorkflowStepRun": + return replace( + self, + status=WorkflowStepRunStatus.WAITING, + diagnostics=self.diagnostics + tuple(diagnostics), + updated_at=utc_now().isoformat(), + ) + + def completed( + self, + *, + transformation_run_id: str | None = None, + output_asset_ids: tuple[str, ...] = (), + ) -> "WorkflowStepRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowStepRunStatus.COMPLETED, + transformation_run_id=transformation_run_id or self.transformation_run_id, + output_asset_ids=tuple(output_asset_ids), + completed_at=now, + updated_at=now, + ) + + def failed(self, diagnostics: tuple[dict[str, Any], ...]) -> "WorkflowStepRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowStepRunStatus.FAILED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def skipped(self, diagnostics: tuple[dict[str, Any], ...]) -> "WorkflowStepRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowStepRunStatus.SKIPPED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def canceled(self, diagnostics: tuple[dict[str, Any], ...] = ()) -> "WorkflowStepRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowStepRunStatus.CANCELED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "step_id": self.step_id, + "operation_id": self.operation_id, + "status": self.status.value, + "transformation_run_id": self.transformation_run_id, + "output_asset_ids": list(self.output_asset_ids), + "diagnostics": list(self.diagnostics), + "started_at": self.started_at, + "completed_at": self.completed_at, + "updated_at": self.updated_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowStepRun": + return cls( + step_id=data["step_id"], + operation_id=data.get("operation_id"), + status=WorkflowStepRunStatus(data.get("status", WorkflowStepRunStatus.QUEUED.value)), + transformation_run_id=data.get("transformation_run_id"), + output_asset_ids=tuple(data.get("output_asset_ids", ())), + diagnostics=tuple(data.get("diagnostics", ())), + started_at=data.get("started_at"), + completed_at=data.get("completed_at"), + updated_at=data["updated_at"], + ) + + +@dataclass(frozen=True) +class WorkflowRun: + template_id: str + template_version: str + input_bindings: dict[str, Any] + actor_id: str + correlation_id: str + policy_context: dict[str, Any] = field(default_factory=dict) + status: WorkflowRunStatus = WorkflowRunStatus.QUEUED + step_runs: tuple[WorkflowStepRun, ...] = () + output_asset_ids: tuple[str, ...] = () + diagnostics: tuple[dict[str, Any], ...] = () + retry_of_run_id: str | None = None + attempt: int = 1 + run_id: str = field(default_factory=lambda: new_id("wfrun")) + queued_at: str = field(default_factory=lambda: utc_now().isoformat()) + started_at: str | None = None + completed_at: str | None = None + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def __post_init__(self) -> None: + object.__setattr__(self, "status", WorkflowRunStatus(self.status)) + object.__setattr__(self, "step_runs", tuple(self.step_runs)) + object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def running(self) -> "WorkflowRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowRunStatus.RUNNING, + started_at=self.started_at or now, + updated_at=now, + ) + + def waiting(self, diagnostics: tuple[dict[str, Any], ...] = ()) -> "WorkflowRun": + return replace( + self, + status=WorkflowRunStatus.WAITING, + diagnostics=self.diagnostics + tuple(diagnostics), + updated_at=utc_now().isoformat(), + ) + + def completed(self, *, output_asset_ids: tuple[str, ...] = ()) -> "WorkflowRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowRunStatus.COMPLETED, + output_asset_ids=tuple(output_asset_ids), + completed_at=now, + updated_at=now, + ) + + def partially_completed( + self, + *, + output_asset_ids: tuple[str, ...] = (), + diagnostics: tuple[dict[str, Any], ...] = (), + ) -> "WorkflowRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowRunStatus.PARTIALLY_COMPLETED, + output_asset_ids=tuple(output_asset_ids), + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def failed(self, diagnostics: tuple[dict[str, Any], ...]) -> "WorkflowRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowRunStatus.FAILED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def canceled(self, diagnostics: tuple[dict[str, Any], ...] = ()) -> "WorkflowRun": + now = utc_now().isoformat() + return replace( + self, + status=WorkflowRunStatus.CANCELED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def retried(self) -> "WorkflowRun": + return replace( + self, + status=WorkflowRunStatus.RETRIED, + updated_at=utc_now().isoformat(), + ) + + def retry(self, *, actor_id: str, correlation_id: str) -> "WorkflowRun": + return WorkflowRun( + template_id=self.template_id, + template_version=self.template_version, + input_bindings=dict(self.input_bindings), + actor_id=actor_id, + correlation_id=correlation_id, + policy_context=dict(self.policy_context), + retry_of_run_id=self.run_id, + attempt=self.attempt + 1, + ) + + def with_step_run(self, step_run: WorkflowStepRun) -> "WorkflowRun": + existing = {item.step_id: item for item in self.step_runs} + existing[step_run.step_id] = step_run + ordered = tuple(existing[item.step_id] for item in self.step_runs if item.step_id in existing) + if step_run.step_id not in {item.step_id for item in self.step_runs}: + ordered = ordered + (step_run,) + return replace(self, step_runs=ordered, updated_at=utc_now().isoformat()) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "run_id": self.run_id, + "template_id": self.template_id, + "template_version": self.template_version, + "input_bindings": dict(self.input_bindings), + "actor_id": self.actor_id, + "correlation_id": self.correlation_id, + "policy_context": dict(self.policy_context), + "status": self.status.value, + "step_runs": [item.to_dict() for item in self.step_runs], + "output_asset_ids": list(self.output_asset_ids), + "diagnostics": list(self.diagnostics), + "retry_of_run_id": self.retry_of_run_id, + "attempt": self.attempt, + "queued_at": self.queued_at, + "started_at": self.started_at, + "completed_at": self.completed_at, + "updated_at": self.updated_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "WorkflowRun": + return cls( + run_id=data["run_id"], + template_id=data["template_id"], + template_version=data.get("template_version", "1"), + input_bindings=dict(data.get("input_bindings", {})), + actor_id=data["actor_id"], + correlation_id=data["correlation_id"], + policy_context=dict(data.get("policy_context", {})), + status=WorkflowRunStatus(data.get("status", WorkflowRunStatus.QUEUED.value)), + step_runs=tuple(WorkflowStepRun.from_dict(item) for item in data.get("step_runs", ())), + output_asset_ids=tuple(data.get("output_asset_ids", ())), + diagnostics=tuple(data.get("diagnostics", ())), + retry_of_run_id=data.get("retry_of_run_id"), + attempt=int(data.get("attempt", 1)), + queued_at=data["queued_at"], + started_at=data.get("started_at"), + completed_at=data.get("completed_at"), + updated_at=data["updated_at"], + ) diff --git a/src/kontextual_engine/ports/repositories.py b/src/kontextual_engine/ports/repositories.py index e987fcc..034eb41 100644 --- a/src/kontextual_engine/ports/repositories.py +++ b/src/kontextual_engine/ports/repositories.py @@ -25,6 +25,9 @@ from kontextual_engine.core import ( Sensitivity, TransformationRun, TransformationRunStatus, + WorkflowRun, + WorkflowRunStatus, + WorkflowTemplate, ) @@ -131,3 +134,25 @@ class AssetRegistryRepository(Protocol): source_asset_id: str | None = None, transformation_run_id: str | None = None, ) -> list[DerivedArtifactLineage]: ... + + def save_workflow_template(self, template: WorkflowTemplate) -> WorkflowTemplate: ... + def get_workflow_template( + self, + template_id: str, + *, + version: str | None = None, + ) -> WorkflowTemplate: ... + def list_workflow_templates( + self, + *, + template_id: str | None = None, + ) -> list[WorkflowTemplate]: ... + + def save_workflow_run(self, run: WorkflowRun) -> WorkflowRun: ... + def get_workflow_run(self, run_id: str) -> WorkflowRun: ... + def list_workflow_runs( + self, + *, + status: WorkflowRunStatus | None = None, + template_id: str | None = None, + ) -> list[WorkflowRun]: ... diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 018d772..b72d3ea 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -32,6 +32,7 @@ from .transformation_service import ( TransformationService, default_transformation_registry, ) +from .workflow_service import WorkflowInvocation, WorkflowRunResult, WorkflowService __all__ = [ "AssetChangeResult", @@ -60,5 +61,8 @@ __all__ = [ "TransformationRequest", "TransformationRunResult", "TransformationService", + "WorkflowInvocation", + "WorkflowRunResult", + "WorkflowService", "default_transformation_registry", ] diff --git a/src/kontextual_engine/services/workflow_service.py b/src/kontextual_engine/services/workflow_service.py new file mode 100644 index 0000000..2205ee7 --- /dev/null +++ b/src/kontextual_engine/services/workflow_service.py @@ -0,0 +1,853 @@ +"""Durable workflow templates and MVP job runner.""" + +from __future__ import annotations + +from dataclasses import dataclass, field, replace +from typing import Any + +from kontextual_engine.core import ( + AuditEvent, + AuditOutcome, + OperationContext, + PolicyDecision, + WorkflowRun, + WorkflowRunStatus, + WorkflowStepDefinition, + WorkflowStepRun, + WorkflowStepRunStatus, + WorkflowTemplate, +) +from kontextual_engine.errors import AuthorizationError, Diagnostic, NotFoundError, ValidationError +from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway + +from .transformation_service import TransformationRequest, TransformationRunResult, TransformationService + + +@dataclass(frozen=True) +class WorkflowInvocation: + template_id: str + inputs: dict[str, Any] = field(default_factory=dict) + template_version: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "template_id": self.template_id, + "template_version": self.template_version, + "inputs": dict(self.inputs), + "metadata": dict(self.metadata), + } + + +@dataclass(frozen=True) +class WorkflowRunResult: + run: WorkflowRun + success: bool + diagnostics: tuple[Diagnostic, ...] = () + audit_event: AuditEvent | None = None + policy_decision: PolicyDecision | None = None + step_results: tuple[TransformationRunResult, ...] = () + + def __post_init__(self) -> None: + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + object.__setattr__(self, "step_results", tuple(self.step_results)) + + def to_dict(self) -> dict[str, Any]: + return { + "success": self.success, + "run": self.run.to_dict(), + "diagnostics": [item.to_dict() for item in self.diagnostics], + "audit_event": self.audit_event.to_dict() if self.audit_event else None, + "policy_decision": self.policy_decision.to_dict() if self.policy_decision else None, + "step_results": [item.to_dict() for item in self.step_results], + } + + +class WorkflowService: + def __init__( + self, + repository: AssetRegistryRepository, + *, + transformation_service: TransformationService | None = None, + policy_gateway: PolicyGateway | None = None, + ) -> None: + self.repository = repository + self.policy_gateway = policy_gateway or AllowAllPolicyGateway() + self.transformation_service = transformation_service or TransformationService( + repository, + policy_gateway=self.policy_gateway, + ) + + def register_template(self, template: WorkflowTemplate, context: OperationContext) -> WorkflowTemplate: + self.repository.save_actor(context.actor) + diagnostics = _template_diagnostics(template) + if diagnostics: + raise ValidationError( + "Workflow template is invalid", + details={"diagnostics": [item.to_dict() for item in diagnostics]}, + ) + template = template.with_actor(context.actor.id) + decision = self._authorize( + context, + "workflow.template.register", + f"workflow_template:{template.template_id}", + resource_metadata={"template": template.to_dict()}, + ) + if not decision.allowed: + self._audit( + "workflow.template.register", + f"workflow_template:{template.template_id}", + AuditOutcome.DENIED, + context, + decision, + ) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": "workflow.template.register", + "resource": f"workflow_template:{template.template_id}", + "correlation_id": context.correlation_id, + "policy_decision": decision.to_dict(), + }, + ) + saved = self.repository.save_workflow_template(template) + self._audit( + "workflow.template.register", + f"workflow_template:{template.template_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"template_version": template.version, "template_hash": template.template_hash}, + ) + return saved + + def get_template(self, template_id: str, *, version: str | None = None) -> WorkflowTemplate: + return self.repository.get_workflow_template(template_id, version=version) + + def list_templates(self, *, template_id: str | None = None) -> tuple[WorkflowTemplate, ...]: + return tuple(self.repository.list_workflow_templates(template_id=template_id)) + + def queue_template(self, invocation: WorkflowInvocation, context: OperationContext) -> WorkflowRunResult: + template = self.repository.get_workflow_template( + invocation.template_id, + version=invocation.template_version, + ) + self.repository.save_actor(context.actor) + decision = self._authorize( + context, + "workflow.run.execute", + f"workflow_template:{template.template_id}", + resource_metadata={"template": template.to_dict(), "invocation": invocation.to_dict()}, + ) + run = WorkflowRun( + template_id=template.template_id, + template_version=template.version, + input_bindings=dict(invocation.inputs), + actor_id=context.actor.id, + correlation_id=context.correlation_id, + policy_context={"run_execute": decision.to_dict()}, + step_runs=_initial_step_runs(template), + ) + self.repository.save_workflow_run(run) + self._audit( + "workflow.run.queued", + f"workflow_run:{run.run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"template_id": template.template_id, "template_version": template.version}, + ) + if not decision.allowed: + diagnostic = _permission_diagnostic(decision) + failed = run.failed((_diagnostic_dict(diagnostic),)) + self.repository.save_workflow_run(failed) + event = self._audit( + "workflow.run.execute", + f"workflow_run:{run.run_id}", + AuditOutcome.DENIED, + context, + decision, + details={"template_id": template.template_id}, + ) + return WorkflowRunResult( + run=failed, + success=False, + diagnostics=(diagnostic,), + audit_event=event, + policy_decision=decision, + ) + + input_diagnostics = _input_diagnostics(template, invocation) + if input_diagnostics: + failed = run.failed(tuple(_diagnostic_dict(item) for item in input_diagnostics)) + self.repository.save_workflow_run(failed) + event = self._audit( + "workflow.run.failed", + f"workflow_run:{run.run_id}", + AuditOutcome.FAILED, + context, + decision, + details={"diagnostics": [item.to_dict() for item in input_diagnostics]}, + ) + return WorkflowRunResult( + run=failed, + success=False, + diagnostics=tuple(input_diagnostics), + audit_event=event, + policy_decision=decision, + ) + + return WorkflowRunResult(run=run, success=True, policy_decision=decision) + + def invoke_template(self, invocation: WorkflowInvocation, context: OperationContext) -> WorkflowRunResult: + queued = self.queue_template(invocation, context) + if not queued.success: + return queued + return self.resume_run(queued.run.run_id, context) + + def resume_run(self, run_id: str, context: OperationContext) -> WorkflowRunResult: + run = self.repository.get_workflow_run(run_id) + if run.status in ( + WorkflowRunStatus.COMPLETED, + WorkflowRunStatus.CANCELED, + WorkflowRunStatus.RETRIED, + ): + diagnostic = Diagnostic( + severity="error", + code="workflow.run_not_resumable", + message="Workflow run cannot be resumed from its current status", + details={"run_id": run_id, "status": run.status.value}, + ) + return WorkflowRunResult(run=run, success=False, diagnostics=(diagnostic,)) + template = self.repository.get_workflow_template(run.template_id, version=run.template_version) + return self._execute_run(template, run, context) + + def retry_run(self, run_id: str, context: OperationContext) -> WorkflowRunResult: + previous = self.repository.get_workflow_run(run_id) + marked = previous.retried() + self.repository.save_workflow_run(marked) + retry = previous.retry(actor_id=context.actor.id, correlation_id=context.correlation_id) + retry = replace(retry, step_runs=tuple(_reset_step_run(item) for item in previous.step_runs)) + self.repository.save_actor(context.actor) + self.repository.save_workflow_run(retry) + decision = PolicyDecision.allow( + context.actor.id, + "workflow.run.retry", + f"workflow_run:{run_id}", + context={"previous_run_id": run_id, "retry_run_id": retry.run_id}, + ) + self._audit( + "workflow.run.retried", + f"workflow_run:{run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"retry_run_id": retry.run_id}, + ) + return self.resume_run(retry.run_id, context) + + def cancel_run(self, run_id: str, context: OperationContext, *, reason: str | None = None) -> WorkflowRun: + run = self.repository.get_workflow_run(run_id) + diagnostic = Diagnostic( + severity="warning", + code="workflow.run_canceled", + message="Workflow run was canceled", + details={"reason": reason} if reason else {}, + ) + canceled_steps = tuple( + step.canceled((_diagnostic_dict(diagnostic),)) + if step.status in (WorkflowStepRunStatus.QUEUED, WorkflowStepRunStatus.WAITING) + else step + for step in run.step_runs + ) + canceled = replace(run, step_runs=canceled_steps).canceled((_diagnostic_dict(diagnostic),)) + self.repository.save_actor(context.actor) + decision = self._authorize( + context, + "workflow.run.cancel", + f"workflow_run:{run_id}", + resource_metadata={"reason": reason, "status": run.status.value}, + ) + if not decision.allowed: + self._audit( + "workflow.run.canceled", + f"workflow_run:{run_id}", + AuditOutcome.DENIED, + context, + decision, + details={"reason": reason} if reason else {}, + ) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": "workflow.run.cancel", + "resource": f"workflow_run:{run_id}", + "correlation_id": context.correlation_id, + "policy_decision": decision.to_dict(), + }, + ) + self.repository.save_workflow_run(canceled) + self._audit( + "workflow.run.canceled", + f"workflow_run:{run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"reason": reason} if reason else {}, + ) + return canceled + + def _execute_run( + self, + template: WorkflowTemplate, + run: WorkflowRun, + context: OperationContext, + ) -> WorkflowRunResult: + decision = PolicyDecision.from_dict(run.policy_context["run_execute"]) + run = run.running() + self.repository.save_workflow_run(run) + self._audit( + "workflow.run.started", + f"workflow_run:{run.run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"template_id": template.template_id, "template_version": template.version}, + ) + + step_results: list[TransformationRunResult] = [] + attempted_step_ids: set[str] = set() + progress = True + while progress: + progress = False + step_runs_by_id = {item.step_id: item for item in run.step_runs} + for step in template.steps: + current = step_runs_by_id[step.step_id] + if current.status in ( + WorkflowStepRunStatus.COMPLETED, + WorkflowStepRunStatus.SKIPPED, + WorkflowStepRunStatus.CANCELED, + ): + continue + if current.status == WorkflowStepRunStatus.FAILED and current.step_id in attempted_step_ids: + continue + dependency_diagnostic = _dependency_diagnostic(step, step_runs_by_id) + if dependency_diagnostic is not None: + if _dependency_terminal(step, step_runs_by_id): + current = _step_failure_by_behavior(step, current, dependency_diagnostic) + run = run.with_step_run(current) + self.repository.save_workflow_run(run) + progress = True + continue + + current = current.running() + run = run.with_step_run(current) + self.repository.save_workflow_run(run) + self._audit( + "workflow.step.started", + f"workflow_run:{run.run_id}:step:{step.step_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"operation_id": step.operation_id, "kind": step.kind}, + ) + attempted_step_ids.add(step.step_id) + executed, step_result = self._execute_step(step, run, context) + if step_result is not None: + step_results.append(step_result) + if executed.status == WorkflowStepRunStatus.FAILED and step.failure_behavior == "continue": + executed = replace(executed, status=WorkflowStepRunStatus.SKIPPED) + run = run.with_step_run(executed) + self.repository.save_workflow_run(run) + self._audit( + "workflow.step.completed" if executed.status == WorkflowStepRunStatus.COMPLETED else "workflow.step.failed", + f"workflow_run:{run.run_id}:step:{step.step_id}", + AuditOutcome.SUCCESS if executed.status == WorkflowStepRunStatus.COMPLETED else AuditOutcome.FAILED, + context, + decision, + details={ + "operation_id": step.operation_id, + "status": executed.status.value, + "diagnostics": list(executed.diagnostics), + }, + ) + progress = True + if executed.status == WorkflowStepRunStatus.FAILED and step.failure_behavior != "continue": + progress = False + break + + final = _finalize_run(run) + self.repository.save_workflow_run(final) + event = self._audit( + _workflow_completion_operation(final), + f"workflow_run:{final.run_id}", + _workflow_audit_outcome(final), + context, + decision, + details={ + "template_id": template.template_id, + "status": final.status.value, + "output_asset_ids": list(final.output_asset_ids), + "diagnostics": list(final.diagnostics), + }, + ) + return WorkflowRunResult( + run=final, + success=final.status == WorkflowRunStatus.COMPLETED, + diagnostics=tuple(Diagnostic(**item) for item in final.diagnostics), + audit_event=event, + policy_decision=decision, + step_results=tuple(step_results), + ) + + def _execute_step( + self, + step: WorkflowStepDefinition, + run: WorkflowRun, + context: OperationContext, + ) -> tuple[WorkflowStepRun, TransformationRunResult | None]: + current = {item.step_id: item for item in run.step_runs}[step.step_id] + precondition_diagnostics = _precondition_diagnostics(step, run) + if precondition_diagnostics: + return current.failed(tuple(_diagnostic_dict(item) for item in precondition_diagnostics)), None + if step.kind != "transformation": + diagnostic = Diagnostic( + severity="error", + code="workflow.step_kind_unsupported", + message="Workflow step kind is not executable by the MVP runner", + details={"step_id": step.step_id, "kind": step.kind}, + ) + return current.failed((_diagnostic_dict(diagnostic),)), None + if not step.operation_id: + diagnostic = Diagnostic( + severity="error", + code="workflow.operation_missing", + message="Transformation workflow step requires an operation ID", + details={"step_id": step.step_id}, + ) + return current.failed((_diagnostic_dict(diagnostic),)), None + + source_asset_ids, resolve_diagnostics = _resolve_source_asset_ids(step, run) + if resolve_diagnostics: + return current.failed(tuple(_diagnostic_dict(item) for item in resolve_diagnostics)), None + + result = self.transformation_service.execute_transformation( + TransformationRequest( + operation_id=step.operation_id, + source_asset_ids=source_asset_ids, + parameters=dict(step.parameters), + output_asset_id=self._available_output_asset_id( + _resolve_value(step.outputs.get("asset_id"), run), + run, + ), + output_title=_resolve_value(step.outputs.get("title"), run), + output_asset_type=str(step.outputs.get("asset_type", "derived_artifact")), + output_media_type=step.outputs.get("media_type"), + metadata=dict(step.metadata), + ), + context, + ) + if result.success and result.run is not None: + return ( + current.completed( + transformation_run_id=result.run.run_id, + output_asset_ids=result.run.output_asset_ids, + ), + result, + ) + diagnostics = result.diagnostics or ( + Diagnostic( + severity="error", + code="workflow.transformation_failed", + message="Transformation step failed without diagnostics", + details={"step_id": step.step_id, "operation_id": step.operation_id}, + ), + ) + transformation_run_id = result.run.run_id if result.run else None + return ( + replace(current, transformation_run_id=transformation_run_id).failed( + tuple(_diagnostic_dict(item) for item in diagnostics) + ), + result, + ) + + def _available_output_asset_id(self, output_asset_id: Any, run: WorkflowRun) -> str | None: + if output_asset_id is None: + return None + output_asset_id = str(output_asset_id) + try: + self.repository.get_asset(output_asset_id) + except NotFoundError: + return output_asset_id + return f"{output_asset_id}-{run.run_id}" + + def _authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, Any] | None = None, + ) -> PolicyDecision: + self.repository.save_actor(context.actor) + try: + return self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata=resource_metadata, + ) + except Exception as exc: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Workflow policy gateway failed", + context={"resource_metadata": resource_metadata or {}, "gateway_error": type(exc).__name__}, + ) + + def _audit( + self, + operation: str, + target: str, + outcome: AuditOutcome, + context: OperationContext, + policy_decision: PolicyDecision, + *, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + event = AuditEvent.from_context( + operation, + target, + outcome, + context, + policy_decision=policy_decision, + details=details, + ) + return self.repository.save_audit_event(event) + + +def _initial_step_runs(template: WorkflowTemplate) -> tuple[WorkflowStepRun, ...]: + return tuple( + WorkflowStepRun(step_id=step.step_id, operation_id=step.operation_id) + for step in template.steps + ) + + +def _reset_step_run(step_run: WorkflowStepRun) -> WorkflowStepRun: + return WorkflowStepRun(step_id=step_run.step_id, operation_id=step_run.operation_id) + + +def _template_diagnostics(template: WorkflowTemplate) -> tuple[Diagnostic, ...]: + diagnostics: list[Diagnostic] = [] + step_ids = [step.step_id for step in template.steps] + duplicate_ids = sorted({step_id for step_id in step_ids if step_ids.count(step_id) > 1}) + for step_id in duplicate_ids: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.step_id_duplicate", + message="Workflow template contains duplicate step IDs", + details={"template_id": template.template_id, "step_id": step_id}, + ) + ) + known = set(step_ids) + for step in template.steps: + if step.failure_behavior not in ("fail_workflow", "continue"): + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.failure_behavior_unsupported", + message="Workflow step declares unsupported failure behavior", + details={"step_id": step.step_id, "failure_behavior": step.failure_behavior}, + ) + ) + for dependency in step.depends_on: + if dependency not in known: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.dependency_missing", + message="Workflow step depends on an unknown step", + details={"step_id": step.step_id, "dependency": dependency}, + ) + ) + cycle = _dependency_cycle(template.steps) + if cycle: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.dependency_cycle", + message="Workflow template contains a dependency cycle", + details={"cycle": cycle}, + ) + ) + return tuple(diagnostics) + + +def _dependency_cycle(steps: tuple[WorkflowStepDefinition, ...]) -> list[str]: + graph = {step.step_id: tuple(step.depends_on) for step in steps} + visiting: set[str] = set() + visited: set[str] = set() + stack: list[str] = [] + + def visit(step_id: str) -> list[str] | None: + if step_id in visited: + return None + if step_id in visiting: + if step_id in stack: + return stack[stack.index(step_id):] + [step_id] + return [step_id] + visiting.add(step_id) + stack.append(step_id) + for dependency in graph.get(step_id, ()): + result = visit(dependency) + if result: + return result + visiting.remove(step_id) + visited.add(step_id) + stack.pop() + return None + + for step_id in graph: + result = visit(step_id) + if result: + return result + return [] + + +def _input_diagnostics(template: WorkflowTemplate, invocation: WorkflowInvocation) -> list[Diagnostic]: + diagnostics: list[Diagnostic] = [] + specs = {item.name: item for item in template.inputs} + for spec in template.inputs: + if spec.required and spec.name not in invocation.inputs: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.input_missing", + message="Workflow invocation is missing a required input", + details={"template_id": template.template_id, "input": spec.name, "kind": spec.kind.value}, + ) + ) + for name, binding in invocation.inputs.items(): + spec = specs.get(name) + declared_kind = binding.get("kind") if isinstance(binding, dict) else None + if spec is not None and declared_kind is not None and declared_kind != spec.kind.value: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.input_kind_mismatch", + message="Workflow invocation input kind does not match the template declaration", + details={ + "template_id": template.template_id, + "input": name, + "expected": spec.kind.value, + "actual": declared_kind, + }, + ) + ) + return diagnostics + + +def _dependency_diagnostic( + step: WorkflowStepDefinition, + step_runs_by_id: dict[str, WorkflowStepRun], +) -> Diagnostic | None: + for dependency in step.depends_on: + dependency_run = step_runs_by_id[dependency] + if dependency_run.status != WorkflowStepRunStatus.COMPLETED: + return Diagnostic( + severity="error", + code="workflow.dependency_not_completed", + message="Workflow step dependency is not completed", + details={ + "step_id": step.step_id, + "dependency": dependency, + "dependency_status": dependency_run.status.value, + }, + ) + return None + + +def _dependency_terminal( + step: WorkflowStepDefinition, + step_runs_by_id: dict[str, WorkflowStepRun], +) -> bool: + return any( + step_runs_by_id[dependency].status + in ( + WorkflowStepRunStatus.FAILED, + WorkflowStepRunStatus.SKIPPED, + WorkflowStepRunStatus.CANCELED, + ) + for dependency in step.depends_on + ) + + +def _step_failure_by_behavior( + step: WorkflowStepDefinition, + step_run: WorkflowStepRun, + diagnostic: Diagnostic, +) -> WorkflowStepRun: + if step.failure_behavior == "continue": + return step_run.skipped((_diagnostic_dict(diagnostic),)) + return step_run.failed((_diagnostic_dict(diagnostic),)) + + +def _precondition_diagnostics(step: WorkflowStepDefinition, run: WorkflowRun) -> list[Diagnostic]: + diagnostics: list[Diagnostic] = [] + for precondition in step.preconditions: + precondition_type = precondition.get("type") + if precondition_type == "input_present": + name = precondition.get("name") + if name not in run.input_bindings: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.precondition_failed", + message="Workflow step precondition failed", + details={"step_id": step.step_id, "type": precondition_type, "name": name}, + ) + ) + elif precondition_type: + diagnostics.append( + Diagnostic( + severity="error", + code="workflow.precondition_unsupported", + message="Workflow step precondition type is unsupported", + details={"step_id": step.step_id, "type": precondition_type}, + ) + ) + return diagnostics + + +def _resolve_source_asset_ids( + step: WorkflowStepDefinition, + run: WorkflowRun, +) -> tuple[tuple[str, ...], list[Diagnostic]]: + source_asset_ids = _resolve_value(step.inputs.get("source_asset_ids", ()), run) + try: + return _normalise_asset_ids(source_asset_ids), [] + except ValueError as exc: + return (), [ + Diagnostic( + severity="error", + code="workflow.source_asset_resolution_failed", + message="Workflow step source assets could not be resolved", + details={"step_id": step.step_id, "error": str(exc), "binding": source_asset_ids}, + ) + ] + + +def _resolve_value(value: Any, run: WorkflowRun) -> Any: + if isinstance(value, str): + if value.startswith("$inputs."): + return run.input_bindings.get(value.removeprefix("$inputs.")) + if value.startswith("$steps."): + parts = value.split(".") + if len(parts) == 3: + step_id = parts[1] + attr = parts[2] + step_run = {item.step_id: item for item in run.step_runs}.get(step_id) + if step_run is None: + return None + if attr == "output_asset_ids": + return step_run.output_asset_ids + if attr == "output_asset_id": + return step_run.output_asset_ids[0] if step_run.output_asset_ids else None + if attr == "transformation_run_id": + return step_run.transformation_run_id + return None + return value + if isinstance(value, list): + return [_resolve_value(item, run) for item in value] + if isinstance(value, tuple): + return tuple(_resolve_value(item, run) for item in value) + if isinstance(value, dict): + return {key: _resolve_value(item, run) for key, item in value.items()} + return value + + +def _normalise_asset_ids(value: Any) -> tuple[str, ...]: + if value is None: + return () + if isinstance(value, str): + return (value,) + if isinstance(value, dict): + if "asset_id" in value: + return _normalise_asset_ids(value["asset_id"]) + if "asset_ids" in value: + return _normalise_asset_ids(value["asset_ids"]) + raise ValueError("asset binding dictionary must include asset_id or asset_ids") + if isinstance(value, (list, tuple)): + asset_ids: list[str] = [] + for item in value: + asset_ids.extend(_normalise_asset_ids(item)) + return tuple(asset_ids) + raise ValueError(f"unsupported asset binding type: {type(value).__name__}") + + +def _finalize_run(run: WorkflowRun) -> WorkflowRun: + output_asset_ids = tuple( + output_asset_id + for step_run in run.step_runs + if step_run.status == WorkflowStepRunStatus.COMPLETED + for output_asset_id in step_run.output_asset_ids + ) + diagnostics = tuple( + diagnostic + for step_run in run.step_runs + if step_run.status + in ( + WorkflowStepRunStatus.FAILED, + WorkflowStepRunStatus.SKIPPED, + WorkflowStepRunStatus.CANCELED, + WorkflowStepRunStatus.WAITING, + ) + for diagnostic in step_run.diagnostics + ) + statuses = {step_run.status for step_run in run.step_runs} + if not run.step_runs or statuses == {WorkflowStepRunStatus.COMPLETED}: + return run.completed(output_asset_ids=output_asset_ids) + if WorkflowStepRunStatus.FAILED in statuses or WorkflowStepRunStatus.CANCELED in statuses: + if output_asset_ids: + return run.partially_completed(output_asset_ids=output_asset_ids, diagnostics=diagnostics) + return run.failed(diagnostics) + if WorkflowStepRunStatus.SKIPPED in statuses: + if output_asset_ids: + return run.partially_completed(output_asset_ids=output_asset_ids, diagnostics=diagnostics) + return run.failed(diagnostics) + return run.waiting(diagnostics) + + +def _workflow_completion_operation(run: WorkflowRun) -> str: + if run.status == WorkflowRunStatus.COMPLETED: + return "workflow.run.completed" + if run.status == WorkflowRunStatus.PARTIALLY_COMPLETED: + return "workflow.run.partially_completed" + if run.status == WorkflowRunStatus.WAITING: + return "workflow.run.waiting" + return "workflow.run.failed" + + +def _workflow_audit_outcome(run: WorkflowRun) -> AuditOutcome: + if run.status == WorkflowRunStatus.COMPLETED: + return AuditOutcome.SUCCESS + if run.status == WorkflowRunStatus.PARTIALLY_COMPLETED: + return AuditOutcome.PARTIAL + if run.status == WorkflowRunStatus.WAITING: + return AuditOutcome.REVIEW_REQUIRED + return AuditOutcome.FAILED + + +def _permission_diagnostic(decision: PolicyDecision) -> Diagnostic: + return Diagnostic( + severity="error", + code="workflow.permission_denied", + message="Workflow operation denied by policy", + details={"policy_decision": decision.to_dict()}, + ) + + +def _diagnostic_dict(diagnostic: Diagnostic) -> dict[str, Any]: + return diagnostic.to_dict() diff --git a/tests/test_workflow_service.py b/tests/test_workflow_service.py new file mode 100644 index 0000000..7a7a823 --- /dev/null +++ b/tests/test_workflow_service.py @@ -0,0 +1,345 @@ +from pathlib import Path + +import pytest + +from kontextual_engine import ( + Actor, + ActorType, + AssetRegistryService, + AssetRepresentation, + Classification, + InMemoryAssetRegistryRepository, + OperationContext, + RepresentationKind, + Sensitivity, + SQLiteAssetRegistryRepository, + TransformationRunStatus, + TransformationService, + ValidationError, + WorkflowInputDefinition, + WorkflowInputKind, + WorkflowInvocation, + WorkflowRunStatus, + WorkflowService, + WorkflowStepDefinition, + WorkflowStepRunStatus, + WorkflowTemplate, +) + + +def test_workflow_template_registration_persists_input_kinds_and_rejects_bad_dependencies() -> None: + repo = InMemoryAssetRegistryRepository() + service = WorkflowService(repo) + context = operation_context() + + template = service.register_template(rich_input_template(), context) + + assert template.created_by == "user-test" + assert [item.kind for item in repo.get_workflow_template("workflow-rich-inputs").inputs] == [ + WorkflowInputKind.ASSET, + WorkflowInputKind.COLLECTION, + WorkflowInputKind.QUERY, + WorkflowInputKind.SOURCE_EVENT, + WorkflowInputKind.PAYLOAD, + ] + assert repo.list_audit_events(target="workflow_template:workflow-rich-inputs")[0].operation == ( + "workflow.template.register" + ) + + with pytest.raises(ValidationError) as missing_dependency: + service.register_template( + WorkflowTemplate( + template_id="workflow-bad", + name="Bad Workflow", + steps=( + WorkflowStepDefinition( + step_id="late", + operation_id="structured_view", + depends_on=("missing",), + ), + ), + ), + context, + ) + assert missing_dependency.value.details["diagnostics"][0]["code"] == "workflow.dependency_missing" + + with pytest.raises(ValidationError) as cycle: + service.register_template( + WorkflowTemplate( + template_id="workflow-cycle", + name="Cycle", + steps=( + WorkflowStepDefinition( + step_id="a", + operation_id="structured_view", + depends_on=("b",), + ), + WorkflowStepDefinition( + step_id="b", + operation_id="structured_view", + depends_on=("a",), + ), + ), + ), + context, + ) + assert cycle.value.details["diagnostics"][0]["code"] == "workflow.dependency_cycle" + + +def test_workflow_invocation_executes_dependent_transformations_in_order() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(dependent_transformation_template(), context) + + result = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-dependent", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + assert result.success is True + assert result.run.status == WorkflowRunStatus.COMPLETED + step_runs = {step.step_id: step for step in result.run.step_runs} + assert step_runs["view"].status == WorkflowStepRunStatus.COMPLETED + assert step_runs["view_again"].status == WorkflowStepRunStatus.COMPLETED + assert result.run.output_asset_ids == ("asset-view-1", "asset-view-2") + first_transformation = repo.get_transformation_run(step_runs["view"].transformation_run_id) + second_transformation = repo.get_transformation_run(step_runs["view_again"].transformation_run_id) + assert first_transformation.source_asset_ids == ("asset-source",) + assert second_transformation.source_asset_ids == ("asset-view-1",) + assert second_transformation.status == TransformationRunStatus.COMPLETED + assert [ + event.operation + for event in repo.list_audit_events(target=f"workflow_run:{result.run.run_id}") + ] == [ + "workflow.run.queued", + "workflow.run.started", + "workflow.run.completed", + ] + + +def test_workflow_queue_cancel_resume_and_retry_do_not_require_storage_edits() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(single_step_template(), context) + + queued = workflow.queue_template( + WorkflowInvocation( + template_id="workflow-single", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + canceled = workflow.cancel_run(queued.run.run_id, context, reason="operator pause") + resumed = workflow.resume_run(canceled.run_id, context) + completed = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-single", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + retry = workflow.retry_run(completed.run.run_id, context) + + assert queued.run.status == WorkflowRunStatus.QUEUED + assert canceled.status == WorkflowRunStatus.CANCELED + assert {step.status for step in canceled.step_runs} == {WorkflowStepRunStatus.CANCELED} + assert resumed.success is False + assert resumed.diagnostics[0].code == "workflow.run_not_resumable" + assert repo.get_workflow_run(completed.run.run_id).status == WorkflowRunStatus.RETRIED + assert retry.run.retry_of_run_id == completed.run.run_id + assert retry.run.attempt == 2 + assert retry.run.status == WorkflowRunStatus.COMPLETED + + +def test_workflow_continue_failure_finishes_partially_completed() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(partial_template(), context) + + result = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-partial", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + step_runs = {step.step_id: step for step in result.run.step_runs} + assert result.success is False + assert result.run.status == WorkflowRunStatus.PARTIALLY_COMPLETED + assert result.run.output_asset_ids == ("asset-view-success",) + assert step_runs["markdown"].status == WorkflowStepRunStatus.SKIPPED + assert step_runs["view"].status == WorkflowStepRunStatus.COMPLETED + assert result.diagnostics[0].code == "transformation.operation_not_executable" + assert repo.list_assets(asset_type="derived_artifact")[0].id == "asset-view-success" + + +def test_sqlite_workflow_templates_and_runs_survive_reinstantiation(tmp_path: Path) -> None: + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + registry = AssetRegistryService(repo) + context = operation_context() + create_source_asset(registry, context, asset_id="asset-source") + workflow = WorkflowService( + repo, + transformation_service=TransformationService(repo, asset_service=registry), + ) + workflow.register_template(single_step_template(), context) + + result = workflow.invoke_template( + WorkflowInvocation( + template_id="workflow-single", + inputs={"source": {"kind": "asset", "asset_id": "asset-source"}}, + ), + context, + ) + + reloaded = SQLiteAssetRegistryRepository(db_path) + run = reloaded.get_workflow_run(result.run.run_id) + + assert reloaded.get_workflow_template("workflow-single").template_id == "workflow-single" + assert run.status == WorkflowRunStatus.COMPLETED + assert run.step_runs[0].status == WorkflowStepRunStatus.COMPLETED + assert reloaded.list_workflow_runs(template_id="workflow-single")[0].run_id == result.run.run_id + assert reloaded.list_representations(asset_id="asset-single-output")[0].kind == RepresentationKind.DERIVED + + +def rich_input_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-rich-inputs", + name="Rich Inputs", + inputs=( + WorkflowInputDefinition("source", WorkflowInputKind.ASSET), + WorkflowInputDefinition("bundle", WorkflowInputKind.COLLECTION, required=False), + WorkflowInputDefinition("query", WorkflowInputKind.QUERY, required=False), + WorkflowInputDefinition("event", WorkflowInputKind.SOURCE_EVENT, required=False), + WorkflowInputDefinition("payload", WorkflowInputKind.PAYLOAD, required=False), + ), + steps=( + WorkflowStepDefinition( + step_id="view", + operation_id="structured_view", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-rich-view"}, + ), + ), + ) + + +def dependent_transformation_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-dependent", + name="Dependent Transformations", + inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),), + steps=( + WorkflowStepDefinition( + step_id="view", + operation_id="structured_view", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-view-1", "title": "First View"}, + ), + WorkflowStepDefinition( + step_id="view_again", + operation_id="structured_view", + depends_on=("view",), + inputs={"source_asset_ids": "$steps.view.output_asset_ids"}, + outputs={"asset_id": "asset-view-2", "title": "Second View"}, + ), + ), + ) + + +def single_step_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-single", + name="Single Step", + inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),), + steps=( + WorkflowStepDefinition( + step_id="view", + operation_id="structured_view", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-single-output", "title": "Single Output"}, + ), + ), + ) + + +def partial_template() -> WorkflowTemplate: + return WorkflowTemplate( + template_id="workflow-partial", + name="Partial Workflow", + inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),), + steps=( + WorkflowStepDefinition( + step_id="markdown", + operation_id="markdown_transform", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-markdown-output"}, + failure_behavior="continue", + ), + WorkflowStepDefinition( + step_id="view", + operation_id="structured_view", + inputs={"source_asset_ids": "$inputs.source"}, + outputs={"asset_id": "asset-view-success"}, + ), + ), + ) + + +def create_source_asset( + registry: AssetRegistryService, + context: OperationContext, + *, + asset_id: str, +) -> None: + registry.create_asset( + "Source", + Classification( + asset_type="document", + sensitivity=Sensitivity.INTERNAL, + owner="Platform Knowledge", + ), + context, + asset_id=asset_id, + representations=[ + AssetRepresentation.from_content( + asset_id, + RepresentationKind.SOURCE, + "text/markdown", + "# Source\n", + ) + ], + ) + + +def operation_context() -> OperationContext: + actor = Actor.create( + ActorType.HUMAN, + actor_id="user-test", + display_name="Test User", + groups=["engineering"], + ) + return OperationContext.create(actor, correlation_id="corr-test") diff --git a/workplans/KONT-WP-0008-transformations-workflow-jobs.md b/workplans/KONT-WP-0008-transformations-workflow-jobs.md index 00e18aa..3b8605f 100644 --- a/workplans/KONT-WP-0008-transformations-workflow-jobs.md +++ b/workplans/KONT-WP-0008-transformations-workflow-jobs.md @@ -47,12 +47,13 @@ audit events. ## Implementation Status -The first foundation slice is implemented for transformation operations, -transformation run persistence, and derived artifact lineage. See -`docs/transformation-implementation.md`. +The first foundation slices are implemented for transformation operations, +transformation run persistence, derived artifact lineage, workflow templates, +and the MVP durable job runner. See `docs/transformation-implementation.md` +and `docs/workflow-jobs-implementation.md`. -Workflow templates, job runner orchestration, review gates, exception queues, -and richer workflow audit reconstruction remain open in this workplan. +Review gates, exception queues, and richer workflow audit reconstruction remain +open in this workplan. ## O8.1 - Implement transformation operation registry @@ -144,7 +145,7 @@ Implemented: ```task id: KONT-WP-0008-T004 -status: todo +status: done priority: high state_hub_task_id: "2c55c5dd-f07b-466b-85a5-f229e41fd124" ``` @@ -159,11 +160,22 @@ Acceptance: - Workflow inputs can be assets, collections, queries, source events, or submitted payloads. +Implemented: + +- `WorkflowTemplate`, `WorkflowInputDefinition`, and + `WorkflowStepDefinition` capture reusable templates, dependencies, inputs, + outputs, preconditions, policy checks, and failure behavior. +- Template registration validates missing dependencies, duplicate step IDs, + unsupported failure behavior, and dependency cycles. +- Workflow invocations preserve asset, collection, query, source event, and + payload input bindings; the MVP runner resolves asset bindings for + transformation-backed steps. + ## O8.5 - Implement job runner status retry resume and cancel behavior ```task id: KONT-WP-0008-T005 -status: todo +status: done priority: high state_hub_task_id: "5f4d6c88-904d-4369-90d5-eaa4d27e3010" ``` @@ -177,6 +189,15 @@ Acceptance: - Safe retry, resume, and cancellation behavior is defined per operation. - Recovery actions do not require direct storage edits. +Implemented: + +- `WorkflowService` can queue, invoke, resume, retry, and cancel workflow + runs programmatically. +- The MVP runner executes transformation-backed steps in dependency order and + persists workflow run/step state in memory and SQLite repositories. +- Retry creates fresh governed outputs when fixed template output IDs would + collide with existing assets. + ## O8.6 - Implement review gates human tasks and exception queues ```task