transformation registry, transformation runs, and derived artifact lineage

This commit is contained in:
2026-05-06 18:05:44 +02:00
parent 27c068f9ac
commit 43c06d6024
14 changed files with 1695 additions and 8 deletions

View File

@@ -12,6 +12,7 @@ from kontextual_engine.core import (
AuditEvent,
ContextEntity,
CoreRelationship,
DerivedArtifactLineage,
IdempotencyRecord,
IngestionJob,
IngestionJobStatus,
@@ -23,6 +24,8 @@ from kontextual_engine.core import (
RepresentationKind,
RetrievalFeedbackRecord,
Sensitivity,
TransformationRun,
TransformationRunStatus,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -42,6 +45,8 @@ class InMemoryAssetRegistryRepository:
retrieval_feedback: dict[str, RetrievalFeedbackRecord] = field(default_factory=dict)
idempotency_records: dict[str, IdempotencyRecord] = field(default_factory=dict)
ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict)
transformation_runs: dict[str, TransformationRun] = field(default_factory=dict)
derived_lineage: dict[str, DerivedArtifactLineage] = field(default_factory=dict)
def save_actor(self, actor: Actor) -> Actor:
self.actors[actor.id] = actor
@@ -300,6 +305,58 @@ class InMemoryAssetRegistryRepository:
jobs = [job for job in jobs if job.status == status]
return sorted(jobs, key=lambda job: (job.created_at, job.job_id))
def save_transformation_run(self, run: TransformationRun) -> TransformationRun:
self.get_actor(run.actor_id)
self.transformation_runs[run.run_id] = run
return run
def get_transformation_run(self, run_id: str) -> TransformationRun:
try:
return self.transformation_runs[run_id]
except KeyError as exc:
raise NotFoundError("Transformation run not found", details={"run_id": run_id}) from exc
def list_transformation_runs(
self,
*,
status: TransformationRunStatus | None = None,
operation_id: str | None = None,
) -> list[TransformationRun]:
runs: Iterable[TransformationRun] = self.transformation_runs.values()
if status is not None:
runs = [run for run in runs if run.status == status]
if operation_id is not None:
runs = [run for run in runs if run.operation_id == operation_id]
return sorted(runs, key=lambda run: (run.queued_at, run.run_id))
def save_derived_lineage(self, lineage: DerivedArtifactLineage) -> DerivedArtifactLineage:
self.get_asset(lineage.output_asset_id)
self.get_transformation_run(lineage.transformation_run_id)
self.derived_lineage[lineage.lineage_id] = lineage
return lineage
def get_derived_lineage(self, lineage_id: str) -> DerivedArtifactLineage:
try:
return self.derived_lineage[lineage_id]
except KeyError as exc:
raise NotFoundError("Derived lineage not found", details={"lineage_id": lineage_id}) from exc
def list_derived_lineage(
self,
*,
output_asset_id: str | None = None,
source_asset_id: str | None = None,
transformation_run_id: str | None = None,
) -> list[DerivedArtifactLineage]:
records: Iterable[DerivedArtifactLineage] = self.derived_lineage.values()
if output_asset_id is not None:
records = [record for record in records if record.output_asset_id == output_asset_id]
if source_asset_id is not None:
records = [record for record in records if source_asset_id in record.source_asset_ids]
if transformation_run_id is not None:
records = [record for record in records if record.transformation_run_id == transformation_run_id]
return sorted(records, key=lambda record: (record.transformation_run_id, record.lineage_id))
def _metadata_matches(
records: list[MetadataRecord],

View File

@@ -14,6 +14,7 @@ from kontextual_engine.core import (
AuditEvent,
ContextEntity,
CoreRelationship,
DerivedArtifactLineage,
IdempotencyRecord,
IngestionJob,
IngestionJobStatus,
@@ -26,6 +27,8 @@ from kontextual_engine.core import (
RelationshipTargetKind,
RetrievalFeedbackRecord,
Sensitivity,
TransformationRun,
TransformationRunStatus,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -598,6 +601,128 @@ class SQLiteAssetRegistryRepository:
)
return [IngestionJob.from_dict(_loads(row["payload"])) for row in rows]
def save_transformation_run(self, run: TransformationRun) -> TransformationRun:
try:
with self._connect() as conn:
conn.execute(
"""
insert into transformation_runs
(id, operation_id, status, actor_id, correlation_id, queued_at, updated_at, payload)
values (?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
operation_id=excluded.operation_id,
status=excluded.status,
actor_id=excluded.actor_id,
correlation_id=excluded.correlation_id,
queued_at=excluded.queued_at,
updated_at=excluded.updated_at,
payload=excluded.payload
""",
(
run.run_id,
run.operation_id,
run.status.value,
run.actor_id,
run.correlation_id,
run.queued_at,
run.updated_at,
_json(run.to_dict()),
),
)
except sqlite3.IntegrityError as exc:
if _is_foreign_key_error(exc):
raise ValidationError(
"Transformation run references an unknown actor",
details={"actor_id": run.actor_id, "run_id": run.run_id},
) from exc
raise
return run
def get_transformation_run(self, run_id: str) -> TransformationRun:
row = self._one("select payload from transformation_runs where id = ?", (run_id,))
if row is None:
raise NotFoundError("Transformation run not found", details={"run_id": run_id})
return TransformationRun.from_dict(_loads(row["payload"]))
def list_transformation_runs(
self,
*,
status: TransformationRunStatus | None = None,
operation_id: str | None = None,
) -> list[TransformationRun]:
clauses = []
params: list[Any] = []
if status is not None:
clauses.append("status = ?")
params.append(status.value)
if operation_id is not None:
clauses.append("operation_id = ?")
params.append(operation_id)
where = f" where {' and '.join(clauses)}" if clauses else ""
rows = self._all(f"select payload from transformation_runs{where} order by queued_at, id", tuple(params))
return [TransformationRun.from_dict(_loads(row["payload"])) for row in rows]
def save_derived_lineage(self, lineage: DerivedArtifactLineage) -> DerivedArtifactLineage:
try:
with self._connect() as conn:
conn.execute(
"""
insert into derived_lineage
(id, output_asset_id, transformation_run_id, payload)
values (?, ?, ?, ?)
on conflict(id) do update set
output_asset_id=excluded.output_asset_id,
transformation_run_id=excluded.transformation_run_id,
payload=excluded.payload
""",
(
lineage.lineage_id,
lineage.output_asset_id,
lineage.transformation_run_id,
_json(lineage.to_dict()),
),
)
except sqlite3.IntegrityError as exc:
if _is_foreign_key_error(exc):
raise ValidationError(
"Derived lineage references an unknown output asset or transformation run",
details={
"output_asset_id": lineage.output_asset_id,
"transformation_run_id": lineage.transformation_run_id,
"lineage_id": lineage.lineage_id,
},
) from exc
raise
return lineage
def get_derived_lineage(self, lineage_id: str) -> DerivedArtifactLineage:
row = self._one("select payload from derived_lineage where id = ?", (lineage_id,))
if row is None:
raise NotFoundError("Derived lineage not found", details={"lineage_id": lineage_id})
return DerivedArtifactLineage.from_dict(_loads(row["payload"]))
def list_derived_lineage(
self,
*,
output_asset_id: str | None = None,
source_asset_id: str | None = None,
transformation_run_id: str | None = None,
) -> list[DerivedArtifactLineage]:
clauses = []
params: list[Any] = []
if output_asset_id is not None:
clauses.append("output_asset_id = ?")
params.append(output_asset_id)
if transformation_run_id is not None:
clauses.append("transformation_run_id = ?")
params.append(transformation_run_id)
where = f" where {' and '.join(clauses)}" if clauses else ""
rows = self._all(f"select payload from derived_lineage{where} order by transformation_run_id, id", tuple(params))
records = [DerivedArtifactLineage.from_dict(_loads(row["payload"])) for row in rows]
if source_asset_id is not None:
records = [record for record in records if source_asset_id in record.source_asset_ids]
return records
def _initialize(self) -> None:
with self._connect() as conn:
conn.executescript(
@@ -697,6 +822,23 @@ class SQLiteAssetRegistryRepository:
payload text not null,
foreign key(actor_id) references actors(id)
);
create table if not exists transformation_runs (
id text primary key,
operation_id text not null,
status text not null,
actor_id text not null,
correlation_id text not null,
queued_at text not null,
updated_at text not null,
payload text not null,
foreign key(actor_id) references actors(id)
);
create table if not exists derived_lineage (
id text primary key,
output_asset_id text not null references assets(id) on delete cascade,
transformation_run_id text not null references transformation_runs(id) on delete cascade,
payload text not null
);
create index if not exists idx_assets_lifecycle on assets(lifecycle);
create index if not exists idx_representations_asset on representations(asset_id);
create index if not exists idx_metadata_asset on metadata_records(asset_id);
@@ -711,6 +853,11 @@ class SQLiteAssetRegistryRepository:
create index if not exists idx_retrieval_feedback_correlation on retrieval_feedback(correlation_id);
create index if not exists idx_ingestion_jobs_status on ingestion_jobs(status);
create index if not exists idx_ingestion_jobs_correlation on ingestion_jobs(correlation_id);
create index if not exists idx_transformation_runs_status on transformation_runs(status);
create index if not exists idx_transformation_runs_operation on transformation_runs(operation_id);
create index if not exists idx_transformation_runs_correlation on transformation_runs(correlation_id);
create index if not exists idx_derived_lineage_output on derived_lineage(output_asset_id);
create index if not exists idx_derived_lineage_run on derived_lineage(transformation_run_id);
"""
)