From 43c06d6024df12999507f2b3ed0c74cbc2f447bc Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 18:05:44 +0200 Subject: [PATCH] transformation registry, transformation runs, and derived artifact lineage --- docs/asset-registry-implementation.md | 10 +- docs/transformation-implementation.md | 108 +++ src/kontextual_engine/__init__.py | 20 + .../adapters/memory/asset_registry.py | 57 ++ .../adapters/sqlite/asset_registry.py | 147 ++++ src/kontextual_engine/core/__init__.py | 4 + src/kontextual_engine/core/provenance.py | 15 + src/kontextual_engine/core/transformations.py | 228 ++++++ src/kontextual_engine/ports/repositories.py | 22 + src/kontextual_engine/services/__init__.py | 16 + .../services/asset_service.py | 13 +- .../services/transformation_service.py | 728 ++++++++++++++++++ tests/test_transformation_service.py | 289 +++++++ ...T-WP-0008-transformations-workflow-jobs.md | 46 +- 14 files changed, 1695 insertions(+), 8 deletions(-) create mode 100644 docs/transformation-implementation.md create mode 100644 src/kontextual_engine/core/transformations.py create mode 100644 src/kontextual_engine/services/transformation_service.py create mode 100644 tests/test_transformation_service.py diff --git a/docs/asset-registry-implementation.md b/docs/asset-registry-implementation.md index cb21fc3..c443999 100644 --- a/docs/asset-registry-implementation.md +++ b/docs/asset-registry-implementation.md @@ -63,6 +63,8 @@ and SQLite repositories are adapters behind those ports. - Relationship changes create source-asset version records and audit events. - Idempotency records for safe asset creation retries. - Idempotency-key reuse with a different payload raises a validation error. +- Transformation run and derived lineage persistence for traceable derived + artifact creation. - In-memory repository for deterministic tests. - SQLite repository for local-first durable asset registry state. - SQLite foreign-key enforcement for representation and metadata asset @@ -84,11 +86,15 @@ and SQLite repositories are adapters behind those ports. - `audit_events` - `retrieval_feedback` - `idempotency_records` +- `ingestion_jobs` +- `transformation_runs` +- `derived_lineage` Payloads are stored as compact JSON envelopes while indexed columns carry stable lookup fields such as asset ID, lifecycle, representation kind, digest, -sequence, relationship source/target, actor ID, target, correlation ID, and -idempotency key. +sequence, relationship source/target, actor ID, target, correlation ID, +idempotency key, transformation status, operation ID, and derived output asset +ID. ## Not Yet Implemented diff --git a/docs/transformation-implementation.md b/docs/transformation-implementation.md new file mode 100644 index 0000000..6912d17 --- /dev/null +++ b/docs/transformation-implementation.md @@ -0,0 +1,108 @@ +# Transformation Implementation Note + +Date: 2026-05-06 + +Status: active implementation note for `KONT-WP-0008`. + +## Purpose + +This note records the first traceable transformation foundation. The goal is to +turn governed source assets into governed derived assets without losing actor, +policy, run, version, and lineage context. + +## Implemented Package Shape + +```text +src/kontextual_engine/ + core/ + transformations.py + provenance.py + services/ + transformation_service.py + ports/ + repositories.py + adapters/ + memory/asset_registry.py + sqlite/asset_registry.py +``` + +The transformation service depends on the asset registry repository and policy +gateway ports. Derived outputs are persisted through `AssetRegistryService` +rather than by bypassing existing asset governance. + +## Implemented Capabilities + +- `TransformationOperation` descriptors with input specs, output specs, + parameter schema, required permissions, supported asset types, adapter + references, and metadata. +- `TransformationOperationRegistry` with a default operation catalog. +- Executable engine-owned `structured_view` operation that produces a JSON + derived representation from source asset metadata and representation refs. +- Adapter-backed operation descriptors for `summarize`, `classify`, + `generate_report`, and Markdown operations. +- Markdown compose/include/transform/validate operations are registered with + `adapter_ref="markitect-tool"` and no engine-local handler. +- Structured capability diagnostics for unsupported operations, unsupported + asset types, missing parameters, missing executable adapters, policy denial, + and execution failure. +- `TransformationRun` records with queued, running, waiting, completed, + partially completed, failed, retried, and canceled states. +- Run persistence in memory and SQLite repositories. +- Source-read and run-execute policy decisions captured in run policy context + before transformation handler execution. +- Derived artifact creation through the governed asset registry service. +- Derived output versions use `VersionChangeType.DERIVED_OUTPUT`, store the + transformation run ID, and link to the first source parent version when + available. +- `DerivedArtifactLineage` persistence and lookup by output asset, source + asset, or transformation run. +- Audit events for queued, started, completed, failed, denied, and canceled + transformation runs. + +## markitect-tool Boundary + +Kontextual Engine owns run state, governance, derived artifact identity, +lineage, policy context, and audit. Markitect owns Markdown-specific semantics. + +The engine therefore registers Markdown operations as adapter-backed +capabilities but does not implement Markdown syntax processing itself. Once the +adapter integration is wired, these operations should call Markitect functions +through a stable boundary and keep the resulting run/output/lineage state in +Kontextual Engine. + +## Current SQLite Tables + +WP-0008 adds shared registry persistence for: + +- `transformation_runs` +- `derived_lineage` + +The tables store compact JSON payloads with indexed lookup columns for run ID, +operation ID, status, actor ID, correlation ID, queued/updated timestamps, +output asset ID, and transformation run lineage. + +## Not Yet Implemented + +- Workflow template persistence and invocation. +- Step dependencies, preconditions, and workflow input binding. +- Durable job runner execution model with resume semantics. +- Rich retry policy by operation type. +- Review gates, human tasks, and exception queues. +- Full workflow reconstruction across template, step, run, derived output, and + audit records. +- Concrete adapter execution for Markitect and provider-backed operations. + +These remain in open tasks `KONT-WP-0008-T004` through +`KONT-WP-0008-T007`. + +## Test Coverage + +`tests/test_transformation_service.py` covers: + +- default operation registration and the Markitect adapter boundary, +- structured view execution with derived asset, version, lineage, and audit + persistence, +- SQLite reload of transformation runs and derived lineage, +- adapter-backed operation failure without local Markdown reimplementation, +- unsupported operation diagnostics without creating a run, +- source-read policy denial before handler execution or output creation. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 9548541..9fdafa8 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -54,6 +54,9 @@ from .core import ( Sensitivity, SourceReference, SourcePayload, + TransformationOperation, + TransformationRun, + TransformationRunStatus, VersionChangeType, ) from .errors import ( @@ -100,6 +103,13 @@ from .services import ( RetrievalFeedbackResult, RetrievalQualityMetrics, RetrievalSnippet, + TransformationExecutionContext, + TransformationOperationRegistry, + TransformationOutput, + TransformationRequest, + TransformationRunResult, + TransformationService, + default_transformation_registry, ) from .storage import InMemoryKnowledgeRepository from .workflows import ( @@ -211,11 +221,21 @@ __all__ = [ "SourceConnector", "SourcePayload", "SQLiteAssetRegistryRepository", + "TransformationExecutionContext", + "TransformationOperation", + "TransformationOperationRegistry", + "TransformationOutput", + "TransformationRequest", + "TransformationRun", + "TransformationRunResult", + "TransformationRunStatus", + "TransformationService", "ValidationError", "VersionChangeType", "WorkflowStep", "bundle_digest", "content_digest", + "default_transformation_registry", ] __version__ = "0.1.0" diff --git a/src/kontextual_engine/adapters/memory/asset_registry.py b/src/kontextual_engine/adapters/memory/asset_registry.py index bad31dd..4d7c9d1 100644 --- a/src/kontextual_engine/adapters/memory/asset_registry.py +++ b/src/kontextual_engine/adapters/memory/asset_registry.py @@ -12,6 +12,7 @@ from kontextual_engine.core import ( AuditEvent, ContextEntity, CoreRelationship, + DerivedArtifactLineage, IdempotencyRecord, IngestionJob, IngestionJobStatus, @@ -23,6 +24,8 @@ from kontextual_engine.core import ( RepresentationKind, RetrievalFeedbackRecord, Sensitivity, + TransformationRun, + TransformationRunStatus, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -42,6 +45,8 @@ class InMemoryAssetRegistryRepository: retrieval_feedback: dict[str, RetrievalFeedbackRecord] = field(default_factory=dict) idempotency_records: dict[str, IdempotencyRecord] = field(default_factory=dict) ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict) + transformation_runs: dict[str, TransformationRun] = field(default_factory=dict) + derived_lineage: dict[str, DerivedArtifactLineage] = field(default_factory=dict) def save_actor(self, actor: Actor) -> Actor: self.actors[actor.id] = actor @@ -300,6 +305,58 @@ class InMemoryAssetRegistryRepository: jobs = [job for job in jobs if job.status == status] return sorted(jobs, key=lambda job: (job.created_at, job.job_id)) + def save_transformation_run(self, run: TransformationRun) -> TransformationRun: + self.get_actor(run.actor_id) + self.transformation_runs[run.run_id] = run + return run + + def get_transformation_run(self, run_id: str) -> TransformationRun: + try: + return self.transformation_runs[run_id] + except KeyError as exc: + raise NotFoundError("Transformation run not found", details={"run_id": run_id}) from exc + + def list_transformation_runs( + self, + *, + status: TransformationRunStatus | None = None, + operation_id: str | None = None, + ) -> list[TransformationRun]: + runs: Iterable[TransformationRun] = self.transformation_runs.values() + if status is not None: + runs = [run for run in runs if run.status == status] + if operation_id is not None: + runs = [run for run in runs if run.operation_id == operation_id] + return sorted(runs, key=lambda run: (run.queued_at, run.run_id)) + + def save_derived_lineage(self, lineage: DerivedArtifactLineage) -> DerivedArtifactLineage: + self.get_asset(lineage.output_asset_id) + self.get_transformation_run(lineage.transformation_run_id) + self.derived_lineage[lineage.lineage_id] = lineage + return lineage + + def get_derived_lineage(self, lineage_id: str) -> DerivedArtifactLineage: + try: + return self.derived_lineage[lineage_id] + except KeyError as exc: + raise NotFoundError("Derived lineage not found", details={"lineage_id": lineage_id}) from exc + + def list_derived_lineage( + self, + *, + output_asset_id: str | None = None, + source_asset_id: str | None = None, + transformation_run_id: str | None = None, + ) -> list[DerivedArtifactLineage]: + records: Iterable[DerivedArtifactLineage] = self.derived_lineage.values() + if output_asset_id is not None: + records = [record for record in records if record.output_asset_id == output_asset_id] + if source_asset_id is not None: + records = [record for record in records if source_asset_id in record.source_asset_ids] + if transformation_run_id is not None: + records = [record for record in records if record.transformation_run_id == transformation_run_id] + return sorted(records, key=lambda record: (record.transformation_run_id, record.lineage_id)) + def _metadata_matches( records: list[MetadataRecord], diff --git a/src/kontextual_engine/adapters/sqlite/asset_registry.py b/src/kontextual_engine/adapters/sqlite/asset_registry.py index e951d31..5d14d66 100644 --- a/src/kontextual_engine/adapters/sqlite/asset_registry.py +++ b/src/kontextual_engine/adapters/sqlite/asset_registry.py @@ -14,6 +14,7 @@ from kontextual_engine.core import ( AuditEvent, ContextEntity, CoreRelationship, + DerivedArtifactLineage, IdempotencyRecord, IngestionJob, IngestionJobStatus, @@ -26,6 +27,8 @@ from kontextual_engine.core import ( RelationshipTargetKind, RetrievalFeedbackRecord, Sensitivity, + TransformationRun, + TransformationRunStatus, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -598,6 +601,128 @@ class SQLiteAssetRegistryRepository: ) return [IngestionJob.from_dict(_loads(row["payload"])) for row in rows] + def save_transformation_run(self, run: TransformationRun) -> TransformationRun: + try: + with self._connect() as conn: + conn.execute( + """ + insert into transformation_runs + (id, operation_id, status, actor_id, correlation_id, queued_at, updated_at, payload) + values (?, ?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + operation_id=excluded.operation_id, + status=excluded.status, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + queued_at=excluded.queued_at, + updated_at=excluded.updated_at, + payload=excluded.payload + """, + ( + run.run_id, + run.operation_id, + run.status.value, + run.actor_id, + run.correlation_id, + run.queued_at, + run.updated_at, + _json(run.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Transformation run references an unknown actor", + details={"actor_id": run.actor_id, "run_id": run.run_id}, + ) from exc + raise + return run + + def get_transformation_run(self, run_id: str) -> TransformationRun: + row = self._one("select payload from transformation_runs where id = ?", (run_id,)) + if row is None: + raise NotFoundError("Transformation run not found", details={"run_id": run_id}) + return TransformationRun.from_dict(_loads(row["payload"])) + + def list_transformation_runs( + self, + *, + status: TransformationRunStatus | None = None, + operation_id: str | None = None, + ) -> list[TransformationRun]: + clauses = [] + params: list[Any] = [] + if status is not None: + clauses.append("status = ?") + params.append(status.value) + if operation_id is not None: + clauses.append("operation_id = ?") + params.append(operation_id) + where = f" where {' and '.join(clauses)}" if clauses else "" + rows = self._all(f"select payload from transformation_runs{where} order by queued_at, id", tuple(params)) + return [TransformationRun.from_dict(_loads(row["payload"])) for row in rows] + + def save_derived_lineage(self, lineage: DerivedArtifactLineage) -> DerivedArtifactLineage: + try: + with self._connect() as conn: + conn.execute( + """ + insert into derived_lineage + (id, output_asset_id, transformation_run_id, payload) + values (?, ?, ?, ?) + on conflict(id) do update set + output_asset_id=excluded.output_asset_id, + transformation_run_id=excluded.transformation_run_id, + payload=excluded.payload + """, + ( + lineage.lineage_id, + lineage.output_asset_id, + lineage.transformation_run_id, + _json(lineage.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Derived lineage references an unknown output asset or transformation run", + details={ + "output_asset_id": lineage.output_asset_id, + "transformation_run_id": lineage.transformation_run_id, + "lineage_id": lineage.lineage_id, + }, + ) from exc + raise + return lineage + + def get_derived_lineage(self, lineage_id: str) -> DerivedArtifactLineage: + row = self._one("select payload from derived_lineage where id = ?", (lineage_id,)) + if row is None: + raise NotFoundError("Derived lineage not found", details={"lineage_id": lineage_id}) + return DerivedArtifactLineage.from_dict(_loads(row["payload"])) + + def list_derived_lineage( + self, + *, + output_asset_id: str | None = None, + source_asset_id: str | None = None, + transformation_run_id: str | None = None, + ) -> list[DerivedArtifactLineage]: + clauses = [] + params: list[Any] = [] + if output_asset_id is not None: + clauses.append("output_asset_id = ?") + params.append(output_asset_id) + if transformation_run_id is not None: + clauses.append("transformation_run_id = ?") + params.append(transformation_run_id) + where = f" where {' and '.join(clauses)}" if clauses else "" + rows = self._all(f"select payload from derived_lineage{where} order by transformation_run_id, id", tuple(params)) + records = [DerivedArtifactLineage.from_dict(_loads(row["payload"])) for row in rows] + if source_asset_id is not None: + records = [record for record in records if source_asset_id in record.source_asset_ids] + return records + def _initialize(self) -> None: with self._connect() as conn: conn.executescript( @@ -697,6 +822,23 @@ class SQLiteAssetRegistryRepository: payload text not null, foreign key(actor_id) references actors(id) ); + create table if not exists transformation_runs ( + id text primary key, + operation_id text not null, + status text not null, + actor_id text not null, + correlation_id text not null, + queued_at text not null, + updated_at text not null, + payload text not null, + foreign key(actor_id) references actors(id) + ); + create table if not exists derived_lineage ( + id text primary key, + output_asset_id text not null references assets(id) on delete cascade, + transformation_run_id text not null references transformation_runs(id) on delete cascade, + payload text not null + ); create index if not exists idx_assets_lifecycle on assets(lifecycle); create index if not exists idx_representations_asset on representations(asset_id); create index if not exists idx_metadata_asset on metadata_records(asset_id); @@ -711,6 +853,11 @@ class SQLiteAssetRegistryRepository: create index if not exists idx_retrieval_feedback_correlation on retrieval_feedback(correlation_id); create index if not exists idx_ingestion_jobs_status on ingestion_jobs(status); create index if not exists idx_ingestion_jobs_correlation on ingestion_jobs(correlation_id); + create index if not exists idx_transformation_runs_status on transformation_runs(status); + create index if not exists idx_transformation_runs_operation on transformation_runs(operation_id); + create index if not exists idx_transformation_runs_correlation on transformation_runs(correlation_id); + create index if not exists idx_derived_lineage_output on derived_lineage(output_asset_id); + create index if not exists idx_derived_lineage_run on derived_lineage(transformation_run_id); """ ) diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index b8f3631..722919f 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -41,6 +41,7 @@ from .relationships import ( RelationshipTargetKind, ) from .retrieval_feedback import RetrievalFeedbackLabel, RetrievalFeedbackRecord +from .transformations import TransformationOperation, TransformationRun, TransformationRunStatus __all__ = [ "Actor", @@ -82,6 +83,9 @@ __all__ = [ "Sensitivity", "SourceReference", "SourcePayload", + "TransformationOperation", + "TransformationRun", + "TransformationRunStatus", "VersionChangeType", "content_digest", "mapping_digest", diff --git a/src/kontextual_engine/core/provenance.py b/src/kontextual_engine/core/provenance.py index 1fb215a..8f7714f 100644 --- a/src/kontextual_engine/core/provenance.py +++ b/src/kontextual_engine/core/provenance.py @@ -159,3 +159,18 @@ class DerivedArtifactLineage: if include_hash: data["lineage_hash"] = self.lineage_hash return data + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "DerivedArtifactLineage": + return cls( + lineage_id=data["lineage_id"], + source_asset_ids=tuple(data.get("source_asset_ids", ())), + source_version_ids=tuple(data.get("source_version_ids", ())), + transformation_run_id=data["transformation_run_id"], + output_asset_id=data["output_asset_id"], + output_representation_id=data["output_representation_id"], + actor_id=data["actor_id"], + parameters=dict(data.get("parameters", {})), + policy_context=dict(data.get("policy_context", {})), + adapter_provenance=dict(data.get("adapter_provenance", {})), + ) diff --git a/src/kontextual_engine/core/transformations.py b/src/kontextual_engine/core/transformations.py new file mode 100644 index 0000000..b9f38f8 --- /dev/null +++ b/src/kontextual_engine/core/transformations.py @@ -0,0 +1,228 @@ +"""Transformation operation and run primitives.""" + +from __future__ import annotations + +from dataclasses import dataclass, field, replace +from enum import Enum +from typing import Any + +from .primitives import compact_dict, new_id, utc_now + + +class TransformationRunStatus(str, Enum): + QUEUED = "queued" + RUNNING = "running" + WAITING = "waiting" + COMPLETED = "completed" + PARTIALLY_COMPLETED = "partially_completed" + FAILED = "failed" + RETRIED = "retried" + CANCELED = "canceled" + + +@dataclass(frozen=True) +class TransformationOperation: + operation_id: str + name: str + description: str = "" + input_spec: tuple[str, ...] = () + output_spec: tuple[str, ...] = () + parameter_schema: dict[str, Any] = field(default_factory=dict) + required_permissions: tuple[str, ...] = () + supported_asset_types: tuple[str, ...] = () + adapter_ref: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "input_spec", tuple(self.input_spec)) + object.__setattr__(self, "output_spec", tuple(self.output_spec)) + object.__setattr__(self, "required_permissions", tuple(self.required_permissions)) + object.__setattr__(self, "supported_asset_types", tuple(self.supported_asset_types)) + + def supports_asset_type(self, asset_type: str) -> bool: + return not self.supported_asset_types or asset_type in self.supported_asset_types + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "operation_id": self.operation_id, + "name": self.name, + "description": self.description, + "input_spec": list(self.input_spec), + "output_spec": list(self.output_spec), + "parameter_schema": dict(self.parameter_schema), + "required_permissions": list(self.required_permissions), + "supported_asset_types": list(self.supported_asset_types), + "adapter_ref": self.adapter_ref, + "metadata": dict(self.metadata), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "TransformationOperation": + return cls( + operation_id=data["operation_id"], + name=data["name"], + description=data.get("description", ""), + input_spec=tuple(data.get("input_spec", ())), + output_spec=tuple(data.get("output_spec", ())), + parameter_schema=dict(data.get("parameter_schema", {})), + required_permissions=tuple(data.get("required_permissions", ())), + supported_asset_types=tuple(data.get("supported_asset_types", ())), + adapter_ref=data.get("adapter_ref"), + metadata=dict(data.get("metadata", {})), + ) + + +@dataclass(frozen=True) +class TransformationRun: + operation_id: str + source_asset_ids: tuple[str, ...] + source_version_ids: tuple[str, ...] + parameters: dict[str, Any] + actor_id: str + correlation_id: str + policy_context: dict[str, Any] = field(default_factory=dict) + status: TransformationRunStatus = TransformationRunStatus.QUEUED + output_asset_ids: tuple[str, ...] = () + diagnostics: tuple[dict[str, Any], ...] = () + retry_of_run_id: str | None = None + attempt: int = 1 + run_id: str = field(default_factory=lambda: new_id("run")) + queued_at: str = field(default_factory=lambda: utc_now().isoformat()) + started_at: str | None = None + completed_at: str | None = None + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def __post_init__(self) -> None: + object.__setattr__(self, "source_asset_ids", tuple(self.source_asset_ids)) + object.__setattr__(self, "source_version_ids", tuple(self.source_version_ids)) + object.__setattr__(self, "output_asset_ids", tuple(self.output_asset_ids)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def running(self) -> "TransformationRun": + now = utc_now().isoformat() + return replace( + self, + status=TransformationRunStatus.RUNNING, + started_at=self.started_at or now, + updated_at=now, + ) + + def waiting(self) -> "TransformationRun": + return replace( + self, + status=TransformationRunStatus.WAITING, + updated_at=utc_now().isoformat(), + ) + + def completed(self, *, output_asset_ids: tuple[str, ...] = ()) -> "TransformationRun": + now = utc_now().isoformat() + return replace( + self, + status=TransformationRunStatus.COMPLETED, + output_asset_ids=tuple(output_asset_ids), + completed_at=now, + updated_at=now, + ) + + def partially_completed( + self, + *, + output_asset_ids: tuple[str, ...] = (), + diagnostics: tuple[dict[str, Any], ...] = (), + ) -> "TransformationRun": + now = utc_now().isoformat() + return replace( + self, + status=TransformationRunStatus.PARTIALLY_COMPLETED, + output_asset_ids=tuple(output_asset_ids), + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def failed(self, diagnostics: tuple[dict[str, Any], ...]) -> "TransformationRun": + now = utc_now().isoformat() + return replace( + self, + status=TransformationRunStatus.FAILED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def canceled(self, diagnostics: tuple[dict[str, Any], ...] = ()) -> "TransformationRun": + now = utc_now().isoformat() + return replace( + self, + status=TransformationRunStatus.CANCELED, + diagnostics=self.diagnostics + tuple(diagnostics), + completed_at=now, + updated_at=now, + ) + + def retried(self) -> "TransformationRun": + return replace( + self, + status=TransformationRunStatus.RETRIED, + updated_at=utc_now().isoformat(), + ) + + def retry(self, *, actor_id: str, correlation_id: str) -> "TransformationRun": + return TransformationRun( + operation_id=self.operation_id, + source_asset_ids=self.source_asset_ids, + source_version_ids=self.source_version_ids, + parameters=dict(self.parameters), + actor_id=actor_id, + correlation_id=correlation_id, + policy_context=dict(self.policy_context), + retry_of_run_id=self.run_id, + attempt=self.attempt + 1, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "run_id": self.run_id, + "operation_id": self.operation_id, + "source_asset_ids": list(self.source_asset_ids), + "source_version_ids": list(self.source_version_ids), + "parameters": dict(self.parameters), + "actor_id": self.actor_id, + "correlation_id": self.correlation_id, + "policy_context": dict(self.policy_context), + "status": self.status.value, + "output_asset_ids": list(self.output_asset_ids), + "diagnostics": list(self.diagnostics), + "retry_of_run_id": self.retry_of_run_id, + "attempt": self.attempt, + "queued_at": self.queued_at, + "started_at": self.started_at, + "completed_at": self.completed_at, + "updated_at": self.updated_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "TransformationRun": + return cls( + run_id=data["run_id"], + operation_id=data["operation_id"], + source_asset_ids=tuple(data.get("source_asset_ids", ())), + source_version_ids=tuple(data.get("source_version_ids", ())), + parameters=dict(data.get("parameters", {})), + actor_id=data["actor_id"], + correlation_id=data["correlation_id"], + policy_context=dict(data.get("policy_context", {})), + status=TransformationRunStatus(data.get("status", TransformationRunStatus.QUEUED.value)), + output_asset_ids=tuple(data.get("output_asset_ids", ())), + diagnostics=tuple(data.get("diagnostics", ())), + retry_of_run_id=data.get("retry_of_run_id"), + attempt=int(data.get("attempt", 1)), + queued_at=data["queued_at"], + started_at=data.get("started_at"), + completed_at=data.get("completed_at"), + updated_at=data["updated_at"], + ) diff --git a/src/kontextual_engine/ports/repositories.py b/src/kontextual_engine/ports/repositories.py index cde7558..e987fcc 100644 --- a/src/kontextual_engine/ports/repositories.py +++ b/src/kontextual_engine/ports/repositories.py @@ -11,6 +11,7 @@ from kontextual_engine.core import ( AuditEvent, ContextEntity, CoreRelationship, + DerivedArtifactLineage, IdempotencyRecord, IngestionJob, IngestionJobStatus, @@ -22,6 +23,8 @@ from kontextual_engine.core import ( RepresentationKind, RetrievalFeedbackRecord, Sensitivity, + TransformationRun, + TransformationRunStatus, ) @@ -109,3 +112,22 @@ class AssetRegistryRepository(Protocol): *, status: IngestionJobStatus | None = None, ) -> list[IngestionJob]: ... + + def save_transformation_run(self, run: TransformationRun) -> TransformationRun: ... + def get_transformation_run(self, run_id: str) -> TransformationRun: ... + def list_transformation_runs( + self, + *, + status: TransformationRunStatus | None = None, + operation_id: str | None = None, + ) -> list[TransformationRun]: ... + + def save_derived_lineage(self, lineage: DerivedArtifactLineage) -> DerivedArtifactLineage: ... + def get_derived_lineage(self, lineage_id: str) -> DerivedArtifactLineage: ... + def list_derived_lineage( + self, + *, + output_asset_id: str | None = None, + source_asset_id: str | None = None, + transformation_run_id: str | None = None, + ) -> list[DerivedArtifactLineage]: ... diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index a7eafae..018d772 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -23,6 +23,15 @@ from .retrieval_service import ( RetrievalQualityMetrics, RetrievalSnippet, ) +from .transformation_service import ( + TransformationExecutionContext, + TransformationOperationRegistry, + TransformationOutput, + TransformationRequest, + TransformationRunResult, + TransformationService, + default_transformation_registry, +) __all__ = [ "AssetChangeResult", @@ -45,4 +54,11 @@ __all__ = [ "RetrievalFeedbackResult", "RetrievalQualityMetrics", "RetrievalSnippet", + "TransformationExecutionContext", + "TransformationOperationRegistry", + "TransformationOutput", + "TransformationRequest", + "TransformationRunResult", + "TransformationService", + "default_transformation_registry", ] diff --git a/src/kontextual_engine/services/asset_service.py b/src/kontextual_engine/services/asset_service.py index d05cd53..886e47f 100644 --- a/src/kontextual_engine/services/asset_service.py +++ b/src/kontextual_engine/services/asset_service.py @@ -76,6 +76,10 @@ class AssetRegistryService: metadata_records: list[MetadataRecord] | None = None, asset_id: str | None = None, idempotency_key: str | None = None, + version_change_type: VersionChangeType = VersionChangeType.CREATED, + operation_id: str | None = None, + parent_version_id: str | None = None, + metadata_delta: dict[str, Any] | None = None, ) -> AssetChangeResult: request_hash = mapping_digest( { @@ -85,6 +89,10 @@ class AssetRegistryService: "representations": [representation.to_dict() for representation in representations or []], "metadata_records": [record.to_dict() for record in metadata_records or []], "asset_id": asset_id, + "version_change_type": version_change_type.value, + "operation_id": operation_id, + "parent_version_id": parent_version_id, + "metadata_delta": dict(metadata_delta or {}), } ) if idempotency_key: @@ -111,9 +119,12 @@ class AssetRegistryService: version = AssetVersion( asset_id=asset.id, sequence=1, - change_type=VersionChangeType.CREATED, + change_type=version_change_type, representation_ids=tuple(item.representation_id for item in representations or []), actor_id=context.actor.id, + operation_id=operation_id, + parent_version_id=parent_version_id, + metadata_delta=dict(metadata_delta or {}), lifecycle=classification.lifecycle.value, ) asset = asset.with_current_version(version.version_id) diff --git a/src/kontextual_engine/services/transformation_service.py b/src/kontextual_engine/services/transformation_service.py new file mode 100644 index 0000000..60c5e9d --- /dev/null +++ b/src/kontextual_engine/services/transformation_service.py @@ -0,0 +1,728 @@ +"""Traceable transformation operations over governed assets.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any, Callable + +from kontextual_engine.core import ( + AssetRepresentation, + AuditEvent, + AuditOutcome, + Classification, + DerivedArtifactLineage, + KnowledgeAsset, + MetadataRecord, + OperationContext, + PolicyDecision, + RepresentationKind, + Sensitivity, + TransformationOperation, + TransformationRun, + TransformationRunStatus, + VersionChangeType, + new_id, +) +from kontextual_engine.errors import Diagnostic +from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway + +from .asset_service import AssetChangeResult, AssetRegistryService + + +OperationHandler = Callable[["TransformationExecutionContext"], "TransformationOutput"] + + +@dataclass(frozen=True) +class TransformationRequest: + operation_id: str + source_asset_ids: tuple[str, ...] = () + parameters: dict[str, Any] = field(default_factory=dict) + output_title: str | None = None + output_asset_id: str | None = None + output_asset_type: str = "derived_artifact" + output_media_type: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "source_asset_ids", tuple(self.source_asset_ids)) + + def to_dict(self) -> dict[str, Any]: + return { + "operation_id": self.operation_id, + "source_asset_ids": list(self.source_asset_ids), + "parameters": dict(self.parameters), + "output_title": self.output_title, + "output_asset_id": self.output_asset_id, + "output_asset_type": self.output_asset_type, + "output_media_type": self.output_media_type, + "metadata": dict(self.metadata), + } + + +@dataclass(frozen=True) +class TransformationOutput: + content: str | bytes + media_type: str + title: str + metadata: dict[str, Any] = field(default_factory=dict) + adapter_provenance: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TransformationExecutionContext: + operation: TransformationOperation + request: TransformationRequest + run: TransformationRun + source_assets: tuple[KnowledgeAsset, ...] + source_representations: dict[str, tuple[AssetRepresentation, ...]] + + +@dataclass(frozen=True) +class TransformationRunResult: + run: TransformationRun | None + success: bool + diagnostics: tuple[Diagnostic, ...] = () + output_asset: KnowledgeAsset | None = None + output_representation: AssetRepresentation | None = None + lineage: DerivedArtifactLineage | None = None + audit_event: AuditEvent | None = None + policy_decision: PolicyDecision | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def to_dict(self) -> dict[str, Any]: + return { + "success": self.success, + "run": self.run.to_dict() if self.run else None, + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + "output_asset": self.output_asset.to_dict() if self.output_asset else None, + "output_representation": self.output_representation.to_dict() if self.output_representation else None, + "lineage": self.lineage.to_dict() if self.lineage else None, + "audit_event": self.audit_event.to_dict() if self.audit_event else None, + "policy_decision": self.policy_decision.to_dict() if self.policy_decision else None, + } + + +class TransformationOperationRegistry: + def __init__(self) -> None: + self._operations: dict[str, TransformationOperation] = {} + self._handlers: dict[str, OperationHandler] = {} + + def register( + self, + operation: TransformationOperation, + *, + handler: OperationHandler | None = None, + ) -> TransformationOperation: + self._operations[operation.operation_id] = operation + if handler is not None: + self._handlers[operation.operation_id] = handler + return operation + + def get(self, operation_id: str) -> TransformationOperation | None: + return self._operations.get(operation_id) + + def handler_for(self, operation_id: str) -> OperationHandler | None: + return self._handlers.get(operation_id) + + def list_operations(self) -> tuple[TransformationOperation, ...]: + return tuple(sorted(self._operations.values(), key=lambda item: item.operation_id)) + + def supported_operation_ids(self) -> tuple[str, ...]: + return tuple(operation.operation_id for operation in self.list_operations()) + + +class TransformationService: + def __init__( + self, + repository: AssetRegistryRepository, + *, + registry: TransformationOperationRegistry | None = None, + policy_gateway: PolicyGateway | None = None, + asset_service: AssetRegistryService | None = None, + ) -> None: + self.repository = repository + self.registry = registry or default_transformation_registry() + self.policy_gateway = policy_gateway or AllowAllPolicyGateway() + self.asset_service = asset_service or AssetRegistryService( + repository, + policy_gateway=self.policy_gateway, + ) + + def list_operations(self) -> tuple[TransformationOperation, ...]: + return self.registry.list_operations() + + def execute_transformation( + self, + request: TransformationRequest, + context: OperationContext, + ) -> TransformationRunResult: + operation = self.registry.get(request.operation_id) + if operation is None: + return TransformationRunResult( + run=None, + success=False, + diagnostics=( + Diagnostic( + severity="error", + code="transformation.operation_unsupported", + message="Transformation operation is not registered", + details={ + "operation_id": request.operation_id, + "supported": list(self.registry.supported_operation_ids()), + }, + ), + ), + ) + + self.repository.save_actor(context.actor) + source_assets = tuple(self.repository.get_asset(asset_id) for asset_id in request.source_asset_ids) + source_version_ids = tuple(asset.current_version_id for asset in source_assets if asset.current_version_id) + decision = self._authorize( + context, + "transformation.run.execute", + f"transformation:{operation.operation_id}", + resource_metadata={ + "operation": operation.to_dict(), + "request": request.to_dict(), + "source_version_ids": list(source_version_ids), + }, + ) + source_decisions: tuple[PolicyDecision, ...] = () + policy_context = {"run_execute": decision.to_dict()} + if decision.allowed: + source_decisions = tuple( + self._authorize( + context, + "asset.retrieve", + f"asset:{asset.id}", + resource_metadata={ + "asset_id": asset.id, + "title": asset.title, + "classification": asset.classification.to_dict(), + "current_version_id": asset.current_version_id, + }, + ) + for asset in source_assets + ) + policy_context["source_reads"] = [item.to_dict() for item in source_decisions] + capability_diagnostics = _capability_diagnostics(operation, source_assets, request) + run = TransformationRun( + operation_id=operation.operation_id, + source_asset_ids=request.source_asset_ids, + source_version_ids=source_version_ids, + parameters=dict(request.parameters), + actor_id=context.actor.id, + correlation_id=context.correlation_id, + policy_context=policy_context, + ) + self.repository.save_transformation_run(run) + self._audit( + "transformation.run.queued", + f"transformation_run:{run.run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"operation_id": operation.operation_id}, + ) + + if not decision.allowed: + failed = run.failed((_diagnostic_dict(_permission_diagnostic(decision)),)) + self.repository.save_transformation_run(failed) + event = self._audit( + "transformation.run.execute", + f"transformation_run:{run.run_id}", + AuditOutcome.DENIED, + context, + decision, + details={"operation_id": operation.operation_id}, + ) + return TransformationRunResult( + run=failed, + success=False, + diagnostics=(_permission_diagnostic(decision),), + audit_event=event, + policy_decision=decision, + ) + + denied_source_decisions = tuple(item for item in source_decisions if not item.allowed) + if denied_source_decisions: + diagnostics = tuple(_permission_diagnostic(item) for item in denied_source_decisions) + failed = run.failed(tuple(_diagnostic_dict(item) for item in diagnostics)) + self.repository.save_transformation_run(failed) + event = self._audit( + "transformation.run.execute", + f"transformation_run:{run.run_id}", + AuditOutcome.DENIED, + context, + denied_source_decisions[0], + details={ + "operation_id": operation.operation_id, + "denied_source_reads": [item.to_dict() for item in denied_source_decisions], + }, + ) + return TransformationRunResult( + run=failed, + success=False, + diagnostics=diagnostics, + audit_event=event, + policy_decision=denied_source_decisions[0], + ) + + if capability_diagnostics: + failed = run.failed(tuple(_diagnostic_dict(item) for item in capability_diagnostics)) + self.repository.save_transformation_run(failed) + event = self._audit( + "transformation.run.failed", + f"transformation_run:{run.run_id}", + AuditOutcome.FAILED, + context, + decision, + details={"diagnostics": [item.to_dict() for item in capability_diagnostics]}, + ) + return TransformationRunResult( + run=failed, + success=False, + diagnostics=tuple(capability_diagnostics), + audit_event=event, + policy_decision=decision, + ) + + handler = self.registry.handler_for(operation.operation_id) + if handler is None: + diagnostic = Diagnostic( + severity="error", + code="transformation.operation_not_executable", + message="Transformation operation is registered but has no executable adapter", + details={ + "operation_id": operation.operation_id, + "adapter_ref": operation.adapter_ref, + }, + ) + failed = run.failed((_diagnostic_dict(diagnostic),)) + self.repository.save_transformation_run(failed) + event = self._audit( + "transformation.run.failed", + f"transformation_run:{run.run_id}", + AuditOutcome.FAILED, + context, + decision, + details={"diagnostics": [diagnostic.to_dict()]}, + ) + return TransformationRunResult( + run=failed, + success=False, + diagnostics=(diagnostic,), + audit_event=event, + policy_decision=decision, + ) + + running = run.running() + self.repository.save_transformation_run(running) + self._audit( + "transformation.run.started", + f"transformation_run:{run.run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"operation_id": operation.operation_id}, + ) + source_representations = { + asset.id: tuple(self.repository.list_representations(asset_id=asset.id)) + for asset in source_assets + } + try: + output = handler( + TransformationExecutionContext( + operation=operation, + request=request, + run=running, + source_assets=source_assets, + source_representations=source_representations, + ) + ) + asset_change, output_representation, lineage = self._persist_output( + request, + output, + running, + source_assets, + context, + decision, + ) + completed = running.completed(output_asset_ids=(asset_change.asset.id,)) + self.repository.save_transformation_run(completed) + event = self._audit( + "transformation.run.completed", + f"transformation_run:{completed.run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "operation_id": operation.operation_id, + "output_asset_id": asset_change.asset.id, + "lineage_id": lineage.lineage_id, + "version_id": asset_change.version.version_id, + }, + ) + return TransformationRunResult( + run=completed, + success=True, + output_asset=asset_change.asset, + output_representation=output_representation, + lineage=lineage, + audit_event=event, + policy_decision=decision, + ) + except Exception as exc: + diagnostic = Diagnostic( + severity="error", + code="transformation.execution_failed", + message="Transformation execution failed", + details={"error_type": type(exc).__name__, "error": str(exc)}, + ) + failed = running.failed((_diagnostic_dict(diagnostic),)) + self.repository.save_transformation_run(failed) + event = self._audit( + "transformation.run.failed", + f"transformation_run:{failed.run_id}", + AuditOutcome.FAILED, + context, + decision, + details={"diagnostics": [diagnostic.to_dict()]}, + ) + return TransformationRunResult( + run=failed, + success=False, + diagnostics=(diagnostic,), + audit_event=event, + policy_decision=decision, + ) + + def get_run(self, run_id: str) -> TransformationRun: + return self.repository.get_transformation_run(run_id) + + def list_runs( + self, + *, + status: TransformationRunStatus | None = None, + operation_id: str | None = None, + ) -> tuple[TransformationRun, ...]: + return tuple(self.repository.list_transformation_runs(status=status, operation_id=operation_id)) + + def retry_run(self, run_id: str, context: OperationContext) -> TransformationRunResult: + previous = self.repository.get_transformation_run(run_id) + marked = previous.retried() + self.repository.save_transformation_run(marked) + retry_request = TransformationRequest( + operation_id=previous.operation_id, + source_asset_ids=previous.source_asset_ids, + parameters=dict(previous.parameters), + ) + return self.execute_transformation(retry_request, context) + + def cancel_run(self, run_id: str, context: OperationContext, *, reason: str | None = None) -> TransformationRun: + run = self.repository.get_transformation_run(run_id) + diagnostic = Diagnostic( + severity="warning", + code="transformation.run_canceled", + message="Transformation run was canceled", + details={"reason": reason} if reason else {}, + ) + canceled = run.canceled((_diagnostic_dict(diagnostic),)) + self.repository.save_actor(context.actor) + decision = PolicyDecision.allow(context.actor.id, "transformation.run.cancel", f"transformation_run:{run_id}") + self.repository.save_transformation_run(canceled) + self._audit( + "transformation.run.canceled", + f"transformation_run:{run_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={"reason": reason} if reason else {}, + ) + return canceled + + def _persist_output( + self, + request: TransformationRequest, + output: TransformationOutput, + run: TransformationRun, + source_assets: tuple[KnowledgeAsset, ...], + context: OperationContext, + decision: PolicyDecision, + ) -> tuple[AssetChangeResult, AssetRepresentation, DerivedArtifactLineage]: + output_asset_id = request.output_asset_id or new_id("asset") + output_representation = AssetRepresentation.from_content( + output_asset_id, + RepresentationKind.DERIVED, + request.output_media_type or output.media_type, + output.content, + producer=request.operation_id, + metadata={ + "transformation_run_id": run.run_id, + "operation_id": run.operation_id, + "adapter_provenance": dict(output.adapter_provenance), + **output.metadata, + }, + ) + lineage = DerivedArtifactLineage( + source_asset_ids=run.source_asset_ids, + source_version_ids=run.source_version_ids, + transformation_run_id=run.run_id, + output_asset_id=output_asset_id, + output_representation_id=output_representation.representation_id, + actor_id=context.actor.id, + parameters=dict(run.parameters), + policy_context=dict(run.policy_context), + adapter_provenance=dict(output.adapter_provenance), + ) + classification = Classification( + asset_type=request.output_asset_type, + sensitivity=_highest_sensitivity(source_assets), + owner=context.actor.id, + topics=("derived", run.operation_id), + metadata={ + "transformation_run_id": run.run_id, + "operation_id": run.operation_id, + **request.metadata, + }, + ) + asset_change = self.asset_service.create_asset( + request.output_title or output.title, + classification, + context, + asset_id=output_asset_id, + representations=[output_representation], + metadata_records=[ + MetadataRecord( + "lineage", + lineage.to_dict(), + provenance={"producer": "kontextual-engine"}, + confirmed=True, + ), + MetadataRecord( + "transformation_run_id", + run.run_id, + provenance={"producer": "kontextual-engine"}, + confirmed=True, + ), + ], + version_change_type=VersionChangeType.DERIVED_OUTPUT, + operation_id=run.run_id, + parent_version_id=run.source_version_ids[0] if run.source_version_ids else None, + metadata_delta={"lineage_id": lineage.lineage_id, "operation_id": run.operation_id}, + ) + self.repository.save_derived_lineage(lineage) + return asset_change, output_representation, lineage + + def _authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, Any] | None = None, + ) -> PolicyDecision: + self.repository.save_actor(context.actor) + try: + return self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata=resource_metadata, + ) + except Exception as exc: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Transformation policy gateway failed", + context={"resource_metadata": resource_metadata or {}, "gateway_error": type(exc).__name__}, + ) + + def _audit( + self, + operation: str, + target: str, + outcome: AuditOutcome, + context: OperationContext, + policy_decision: PolicyDecision, + *, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + event = AuditEvent.from_context( + operation, + target, + outcome, + context, + policy_decision=policy_decision, + details=details, + ) + return self.repository.save_audit_event(event) + + +def default_transformation_registry() -> TransformationOperationRegistry: + registry = TransformationOperationRegistry() + registry.register( + TransformationOperation( + operation_id="structured_view", + name="Produce Structured View", + description="Create a JSON structured view of source asset metadata and representation references.", + input_spec=("knowledge_asset",), + output_spec=("derived_asset:application/json",), + parameter_schema={"required": []}, + required_permissions=("asset.retrieve", "asset.create"), + ), + handler=_structured_view_handler, + ) + registry.register( + TransformationOperation( + operation_id="summarize", + name="Summarize", + description="Summarize source assets through a provider adapter.", + input_spec=("knowledge_asset",), + output_spec=("derived_asset:text/markdown",), + parameter_schema={"required": ["style"]}, + required_permissions=("asset.retrieve", "asset.create"), + adapter_ref="llm-connect", + ) + ) + registry.register( + TransformationOperation( + operation_id="classify", + name="Classify", + description="Classify assets through a deterministic or provider-backed classifier adapter.", + input_spec=("knowledge_asset",), + output_spec=("metadata_record",), + required_permissions=("asset.retrieve", "asset.metadata.add"), + adapter_ref="classifier-adapter", + ) + ) + for operation_id, name in ( + ("markdown_compose", "Markdown Compose"), + ("markdown_include", "Markdown Include Resolution"), + ("markdown_transform", "Markdown Transform"), + ("markdown_validate", "Markdown Validate"), + ): + registry.register( + TransformationOperation( + operation_id=operation_id, + name=name, + description="Markdown-specific operation delegated to markitect-tool.", + input_spec=("markdown_asset",), + output_spec=("derived_asset:text/markdown",), + required_permissions=("asset.retrieve", "asset.create"), + supported_asset_types=("document", "markdown", "markdown_proxy"), + adapter_ref="markitect-tool", + metadata={"boundary": "adapter-backed; do not reimplement markdown syntax in engine"}, + ) + ) + registry.register( + TransformationOperation( + operation_id="generate_report", + name="Generate Report", + description="Generate a report from source assets through a report adapter.", + input_spec=("knowledge_asset",), + output_spec=("derived_asset",), + required_permissions=("asset.retrieve", "asset.create"), + adapter_ref="report-adapter", + ) + ) + return registry + + +def _structured_view_handler(context: TransformationExecutionContext) -> TransformationOutput: + source_payload = [] + for asset in context.source_assets: + source_payload.append( + { + "asset_id": asset.id, + "title": asset.title, + "classification": asset.classification.to_dict(), + "lifecycle": asset.lifecycle.value, + "current_version_id": asset.current_version_id, + "representations": [ + representation.to_dict() + for representation in context.source_representations.get(asset.id, ()) + ], + "source_refs": [source_ref.to_dict() for source_ref in asset.source_refs], + } + ) + content = json.dumps( + { + "operation_id": context.operation.operation_id, + "parameters": dict(context.request.parameters), + "source_assets": source_payload, + }, + sort_keys=True, + ensure_ascii=False, + ) + return TransformationOutput( + content=content, + media_type="application/json", + title=context.request.output_title or "Structured View", + metadata={"source_count": len(context.source_assets)}, + adapter_provenance={"operation": "structured_view", "adapter": "kontextual-engine"}, + ) + + +def _capability_diagnostics( + operation: TransformationOperation, + source_assets: tuple[KnowledgeAsset, ...], + request: TransformationRequest, +) -> list[Diagnostic]: + diagnostics: list[Diagnostic] = [] + for asset in source_assets: + if not operation.supports_asset_type(asset.classification.asset_type): + diagnostics.append( + Diagnostic( + severity="error", + code="transformation.asset_type_unsupported", + message="Transformation operation does not support source asset type", + details={ + "operation_id": operation.operation_id, + "asset_id": asset.id, + "asset_type": asset.classification.asset_type, + "supported_asset_types": list(operation.supported_asset_types), + }, + ) + ) + required_parameters = operation.parameter_schema.get("required", ()) + for key in required_parameters: + if key not in request.parameters: + diagnostics.append( + Diagnostic( + severity="error", + code="transformation.parameter_missing", + message="Transformation operation requires a missing parameter", + details={"operation_id": operation.operation_id, "parameter": key}, + ) + ) + return diagnostics + + +def _highest_sensitivity(source_assets: tuple[KnowledgeAsset, ...]) -> Sensitivity: + if not source_assets: + return Sensitivity.INTERNAL + order = { + Sensitivity.PUBLIC: 0, + Sensitivity.INTERNAL: 1, + Sensitivity.CONFIDENTIAL: 2, + Sensitivity.RESTRICTED: 3, + } + return max((asset.classification.sensitivity for asset in source_assets), key=lambda item: order[item]) + + +def _permission_diagnostic(decision: PolicyDecision) -> Diagnostic: + return Diagnostic( + severity="error", + code="transformation.permission_denied", + message="Transformation operation denied by policy", + details={"policy_decision": decision.to_dict()}, + ) + + +def _diagnostic_dict(diagnostic: Diagnostic) -> dict[str, Any]: + return diagnostic.to_dict() diff --git a/tests/test_transformation_service.py b/tests/test_transformation_service.py new file mode 100644 index 0000000..2d763bf --- /dev/null +++ b/tests/test_transformation_service.py @@ -0,0 +1,289 @@ +from pathlib import Path +from typing import Any + +from kontextual_engine import ( + Actor, + ActorType, + AssetRegistryService, + AssetRepresentation, + AuditOutcome, + Classification, + InMemoryAssetRegistryRepository, + OperationContext, + PolicyDecision, + RepresentationKind, + Sensitivity, + SQLiteAssetRegistryRepository, + TransformationRequest, + TransformationRunStatus, + TransformationService, + VersionChangeType, + default_transformation_registry, +) + + +def test_default_transformation_registry_declares_engine_and_adapter_boundaries() -> None: + registry = default_transformation_registry() + + operations = {operation.operation_id: operation for operation in registry.list_operations()} + + assert "structured_view" in operations + assert operations["structured_view"].adapter_ref is None + assert registry.handler_for("structured_view") is not None + assert operations["structured_view"].required_permissions == ("asset.retrieve", "asset.create") + for operation_id in ( + "markdown_compose", + "markdown_include", + "markdown_transform", + "markdown_validate", + ): + operation = operations[operation_id] + + assert operation.adapter_ref == "markitect-tool" + assert registry.handler_for(operation_id) is None + assert operation.metadata["boundary"] == "adapter-backed; do not reimplement markdown syntax in engine" + + +def test_structured_view_transformation_persists_run_output_lineage_and_audit() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + source = registry.create_asset( + "Intent", + Classification( + asset_type="document", + sensitivity=Sensitivity.CONFIDENTIAL, + owner="Platform Knowledge", + ), + context, + asset_id="asset-intent", + representations=[ + AssetRepresentation.from_content( + "asset-intent", + RepresentationKind.SOURCE, + "text/markdown", + "# Intent\n\nBuild a context engine.\n", + storage_ref="object://intent-source", + ) + ], + ) + service = TransformationService(repo, asset_service=registry) + + result = service.execute_transformation( + TransformationRequest( + operation_id="structured_view", + source_asset_ids=("asset-intent",), + parameters={"shape": "asset-summary"}, + output_asset_id="asset-intent-structured", + output_title="Intent Structured View", + ), + context, + ) + + assert result.success is True + assert result.run is not None + assert result.run.status == TransformationRunStatus.COMPLETED + assert result.run.output_asset_ids == ("asset-intent-structured",) + assert repo.get_transformation_run(result.run.run_id).status == TransformationRunStatus.COMPLETED + assert result.output_asset is not None + assert result.output_asset.classification.sensitivity == Sensitivity.CONFIDENTIAL + assert result.output_representation is not None + assert result.output_representation.kind == RepresentationKind.DERIVED + assert result.output_representation.media_type == "application/json" + assert result.output_representation.metadata["transformation_run_id"] == result.run.run_id + assert result.lineage is not None + assert result.lineage.source_asset_ids == ("asset-intent",) + assert result.lineage.source_version_ids == (source.version.version_id,) + assert result.lineage.output_asset_id == "asset-intent-structured" + assert result.lineage.policy_context["run_execute"]["effect"] == "allow" + assert result.lineage.policy_context["source_reads"][0]["action"] == "asset.retrieve" + assert repo.get_derived_lineage(result.lineage.lineage_id) == result.lineage + assert repo.list_derived_lineage(source_asset_id="asset-intent") == [result.lineage] + version = repo.list_versions("asset-intent-structured")[0] + assert version.change_type == VersionChangeType.DERIVED_OUTPUT + assert version.operation_id == result.run.run_id + assert version.parent_version_id == source.version.version_id + assert [record.key for record in repo.list_metadata_records("asset-intent-structured")] == [ + "lineage", + "transformation_run_id", + ] + assert [ + event.operation + for event in repo.list_audit_events(target=f"transformation_run:{result.run.run_id}") + ] == [ + "transformation.run.queued", + "transformation.run.started", + "transformation.run.completed", + ] + + +def test_sqlite_transformation_runs_and_lineage_survive_reinstantiation(tmp_path: Path) -> None: + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + registry = AssetRegistryService(repo) + context = operation_context() + registry.create_asset( + "Architecture", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-architecture", + representations=[ + AssetRepresentation.from_content( + "asset-architecture", + RepresentationKind.SOURCE, + "text/markdown", + "# Architecture\n", + ) + ], + ) + service = TransformationService(repo, asset_service=registry) + + result = service.execute_transformation( + TransformationRequest( + operation_id="structured_view", + source_asset_ids=("asset-architecture",), + output_asset_id="asset-architecture-structured", + ), + context, + ) + assert result.run is not None + assert result.lineage is not None + + reloaded = SQLiteAssetRegistryRepository(db_path) + + assert reloaded.get_transformation_run(result.run.run_id).status == TransformationRunStatus.COMPLETED + assert reloaded.list_transformation_runs(operation_id="structured_view")[0].run_id == result.run.run_id + assert reloaded.get_derived_lineage(result.lineage.lineage_id).output_asset_id == ( + "asset-architecture-structured" + ) + assert reloaded.list_derived_lineage(output_asset_id="asset-architecture-structured")[0].lineage_id == ( + result.lineage.lineage_id + ) + assert reloaded.list_representations(asset_id="asset-architecture-structured")[0].kind == ( + RepresentationKind.DERIVED + ) + + +def test_adapter_backed_operation_returns_capability_diagnostic_without_reimplementation() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + registry.create_asset( + "Markdown Source", + Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-markdown", + ) + service = TransformationService(repo, asset_service=registry) + + result = service.execute_transformation( + TransformationRequest( + operation_id="markdown_transform", + source_asset_ids=("asset-markdown",), + output_asset_id="asset-markdown-output", + ), + context, + ) + + assert result.success is False + assert result.run is not None + assert result.run.status == TransformationRunStatus.FAILED + assert result.diagnostics[0].code == "transformation.operation_not_executable" + assert result.diagnostics[0].details["adapter_ref"] == "markitect-tool" + assert repo.list_transformation_runs(status=TransformationRunStatus.FAILED)[0].run_id == result.run.run_id + + +def test_unknown_operation_reports_supported_operations_without_creating_run() -> None: + repo = InMemoryAssetRegistryRepository() + service = TransformationService(repo) + + result = service.execute_transformation( + TransformationRequest(operation_id="unknown_operation"), + operation_context(), + ) + + assert result.success is False + assert result.run is None + assert result.diagnostics[0].code == "transformation.operation_unsupported" + assert "structured_view" in result.diagnostics[0].details["supported"] + assert repo.list_transformation_runs() == [] + + +def test_source_read_policy_denial_fails_run_before_handler_execution() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + context = operation_context() + registry.create_asset( + "Restricted Source", + Classification(asset_type="document", sensitivity=Sensitivity.RESTRICTED), + context, + asset_id="asset-restricted", + representations=[ + AssetRepresentation.from_content( + "asset-restricted", + RepresentationKind.SOURCE, + "text/markdown", + "# Restricted\n", + ) + ], + ) + service = TransformationService( + repo, + asset_service=AssetRegistryService(repo, policy_gateway=DenySourceReadPolicy()), + policy_gateway=DenySourceReadPolicy(), + ) + + result = service.execute_transformation( + TransformationRequest( + operation_id="structured_view", + source_asset_ids=("asset-restricted",), + output_asset_id="asset-denied-output", + ), + context, + ) + + assert result.success is False + assert result.run is not None + assert result.run.status == TransformationRunStatus.FAILED + assert result.diagnostics[0].code == "transformation.permission_denied" + assert result.policy_decision is not None + assert result.policy_decision.resource == "asset:asset-restricted" + assert repo.list_assets(asset_type="derived_artifact") == [] + denied_event = repo.list_audit_events(target=f"transformation_run:{result.run.run_id}")[-1] + assert denied_event.outcome == AuditOutcome.DENIED + assert denied_event.operation == "transformation.run.execute" + + +def operation_context() -> OperationContext: + actor = Actor.create( + ActorType.HUMAN, + actor_id="user-test", + display_name="Test User", + groups=["engineering"], + ) + return OperationContext.create(actor, correlation_id="corr-test") + + +class DenySourceReadPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, Any] | None = None, + ) -> PolicyDecision: + if action == "asset.retrieve": + return PolicyDecision.deny( + context.actor.id, + action, + resource, + reason="source reads require review", + context={"resource_metadata": resource_metadata or {}}, + ) + return PolicyDecision.allow( + context.actor.id, + action, + resource, + context={"resource_metadata": resource_metadata or {}}, + ) diff --git a/workplans/KONT-WP-0008-transformations-workflow-jobs.md b/workplans/KONT-WP-0008-transformations-workflow-jobs.md index f046ace..00e18aa 100644 --- a/workplans/KONT-WP-0008-transformations-workflow-jobs.md +++ b/workplans/KONT-WP-0008-transformations-workflow-jobs.md @@ -4,13 +4,13 @@ type: workplan title: "Traceable Transformations And Workflow Jobs" domain: markitect repo: kontextual-engine -status: todo +status: active owner: codex topic_slug: markitect planning_priority: high planning_order: 8 created: "2026-05-05" -updated: "2026-05-05" +updated: "2026-05-06" state_hub_workstream_id: "1b7a6b04-7879-4862-bb3e-817f7f20fc59" --- @@ -45,11 +45,20 @@ workflow helpers. The engine owns the operation registry, run state, actors, policy checks, derived artifact identity, lineage, retries, review gates, and audit events. +## Implementation Status + +The first foundation slice is implemented for transformation operations, +transformation run persistence, and derived artifact lineage. See +`docs/transformation-implementation.md`. + +Workflow templates, job runner orchestration, review gates, exception queues, +and richer workflow audit reconstruction remain open in this workplan. + ## O8.1 - Implement transformation operation registry ```task id: KONT-WP-0008-T001 -status: todo +status: done priority: high state_hub_task_id: "ee2471b1-fab3-48f5-8b2d-d8f624abfc35" ``` @@ -66,11 +75,20 @@ Acceptance: - Markdown compose, include, transform, and validate operations are registered as adapter-backed operations rather than reimplemented. +Implemented: + +- `TransformationOperationRegistry` and default operation descriptors are in + `src/kontextual_engine/services/transformation_service.py`. +- `structured_view` is executable inside the engine as a generic derived + structured representation. +- Markdown operations are registered with `adapter_ref="markitect-tool"` and + no local handler, preserving the boundary against reimplementing Markitect. + ## O8.2 - Implement transformation runs with parameters actors and policy context ```task id: KONT-WP-0008-T002 -status: todo +status: done priority: high state_hub_task_id: "1eac7b47-8cff-4736-9f7d-599123218bad" ``` @@ -85,11 +103,20 @@ Acceptance: outputs. - Parameters needed to interpret or reproduce the run are preserved. +Implemented: + +- `TransformationRun` records include operation, source assets, source + versions, parameters, actor, correlation ID, policy context, diagnostics, + timestamps, status, retry metadata, and output asset IDs. +- Source-read and run-execution policy decisions are captured before handler + execution. +- Run state is persisted in memory and SQLite repositories. + ## O8.3 - Persist derived artifacts and source lineage ```task id: KONT-WP-0008-T003 -status: todo +status: done priority: high state_hub_task_id: "837ad793-2e9a-41f0-bce6-0a75815b5c15" ``` @@ -104,6 +131,15 @@ Acceptance: policy context, and output representation. - Re-runs create new traceable records rather than silently overwriting outputs. +Implemented: + +- Executed transformations can persist governed derived assets through + `AssetRegistryService`. +- `DerivedArtifactLineage` records are saved and queryable by output asset, + source asset, or transformation run. +- Derived asset versions use `VersionChangeType.DERIVED_OUTPUT` and point back + to the transformation run and source parent version. + ## O8.4 - Implement workflow templates steps dependencies and preconditions ```task