From 565a5643a37f6fb62e6ca16e93452e4563691b7a Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 02:35:40 +0200 Subject: [PATCH] first ingestion/normalization slice --- docs/ingestion-implementation.md | 84 +++++ src/kontextual_engine/__init__.py | 38 ++- .../adapters/builtin_extractors/__init__.py | 5 + .../adapters/builtin_extractors/text.py | 42 +++ .../adapters/local_files/__init__.py | 5 + .../adapters/local_files/connector.py | 77 +++++ .../adapters/markitect_tool/__init__.py | 5 + .../adapters/markitect_tool/markdown.py | 86 +++++ .../adapters/memory/asset_registry.py | 23 ++ .../adapters/sqlite/asset_registry.py | 58 ++++ src/kontextual_engine/core/__init__.py | 18 + src/kontextual_engine/core/ingestion.py | 308 ++++++++++++++++++ src/kontextual_engine/ports/__init__.py | 5 +- src/kontextual_engine/ports/ingestion.py | 34 ++ src/kontextual_engine/ports/repositories.py | 10 + src/kontextual_engine/services/__init__.py | 9 +- .../services/ingestion_service.py | 304 +++++++++++++++++ tests/test_asset_ingestion_service.py | 108 ++++++ ...06-multi-format-ingestion-normalization.md | 22 +- 19 files changed, 1231 insertions(+), 10 deletions(-) create mode 100644 docs/ingestion-implementation.md create mode 100644 src/kontextual_engine/adapters/builtin_extractors/__init__.py create mode 100644 src/kontextual_engine/adapters/builtin_extractors/text.py create mode 100644 src/kontextual_engine/adapters/local_files/__init__.py create mode 100644 src/kontextual_engine/adapters/local_files/connector.py create mode 100644 src/kontextual_engine/adapters/markitect_tool/__init__.py create mode 100644 src/kontextual_engine/adapters/markitect_tool/markdown.py create mode 100644 src/kontextual_engine/core/ingestion.py create mode 100644 src/kontextual_engine/ports/ingestion.py create mode 100644 src/kontextual_engine/services/ingestion_service.py create mode 100644 tests/test_asset_ingestion_service.py diff --git a/docs/ingestion-implementation.md b/docs/ingestion-implementation.md new file mode 100644 index 0000000..4636462 --- /dev/null +++ b/docs/ingestion-implementation.md @@ -0,0 +1,84 @@ +# Ingestion Implementation Note + +Date: 2026-05-06 + +Status: first implementation slice for `KONT-WP-0006`. + +## Purpose + +This note records the first governed ingestion implementation built on top of +the asset registry service. It turns local source files into stable knowledge +assets with source and normalized representations while preserving source +provenance, actor context, auditability, and inspectable ingestion job state. + +## Implemented Package Shape + +```text +src/kontextual_engine/ + core/ingestion.py + ports/ingestion.py + services/ingestion_service.py + adapters/local_files/ + adapters/builtin_extractors/ + adapters/markitect_tool/ +``` + +The new `AssetIngestionService` is separate from the older artifact-era +`IngestionService` compatibility facade in `src/kontextual_engine/ingestion.py`. + +## Implemented Capabilities + +- Durable `IngestionJob` state with queued, running, completed, failed, + partially completed, retried, quarantined, and canceled statuses. +- Structured ingestion failures with retriable flags and diagnostic details. +- Connector and extractor port contracts owned by the engine. +- Local file connector with source references, checksums, media type detection, + file metadata, and directory file iteration. +- Plain text extractor producing a normalized engine representation. +- Markitect markdown extractor adapter boundary that delegates markdown parsing, + headings, sections, frontmatter, and snapshot identity to `markitect-tool` + when available. +- Synchronous first-run ingestion flow that creates governed assets through + `AssetRegistryService`. +- Source and normalized `AssetRepresentation` records for ingested files. +- Metadata records for connector, extractor, source digest, source media type, + size, and extraction metadata. +- Failed unsupported-media ingestion records job failure without adding an asset + to the trusted registry. +- Directory ingestion with per-file child jobs and partial result accounting. +- In-memory and SQLite job persistence. + +## Current SQLite Additions + +- `ingestion_jobs` + +The table stores indexed status, actor, correlation ID, timestamps, and a JSON +payload for the full job contract. + +## markitect-tool Boundary + +Markdown ingestion uses `MarkitectMarkdownExtractor`, which imports +`markitect-tool` only inside the adapter. The engine preserves Markitect output +as normalized structure and adapter metadata; it does not make Markitect +document classes part of the engine domain model. + +## Not Yet Implemented + +- Asynchronous job runner and queue dispatch. +- Re-ingestion reconciliation for existing assets. +- Identity policies that preserve asset identity across source moves. +- PDF, office document, and dataset extractors. +- Deep normalized structure for tables, links, embedded references, and fields + beyond extractor-provided metadata. +- Quarantine policy checks beyond unsupported/failed extraction paths. + +## Test Coverage + +`tests/test_asset_ingestion_service.py` covers: + +- plain text local-file ingestion into governed assets, +- source and normalized representation creation, +- job persistence and status inspection, +- unsupported media failure without trusted asset creation, +- directory partial success/failure accounting, +- SQLite reload preserving ingestion jobs and ingested asset state. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 363ca99..ed8e648 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -22,15 +22,22 @@ from .core import ( AuditEvent, AuditOutcome, Classification, + ConnectorCapability, ContextEntity, ContextEntityType, CoreRelationship, DerivedArtifactLineage, + ExtractionResult, + ExtractorCapability, IdempotencyRecord, IdempotencyStatus, + IngestionFailure, + IngestionJob, + IngestionJobStatus, KnowledgeAsset, LifecycleState, MetadataRecord, + NormalizedDocument, OperationContext, PolicyDecision, PolicyEffect, @@ -38,6 +45,7 @@ from .core import ( RepresentationKind, Sensitivity, SourceReference, + SourcePayload, VersionChangeType, ) from .errors import ( @@ -50,10 +58,23 @@ from .errors import ( ValidationError, ) from .ingestion import IngestionRequest, IngestionResult, IngestionService -from .ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway +from .ports import ( + AllowAllPolicyGateway, + AssetRegistryRepository, + DirectorySourceConnector, + FormatExtractor, + PolicyGateway, + SourceConnector, +) from .query import QueryEngine, QueryResult from .relationships import RelationshipGraph -from .services import AssetChangeResult, AssetRegistryService, RelationshipChangeResult +from .services import ( + AssetChangeResult, + AssetIngestionResult, + AssetIngestionService, + AssetRegistryService, + RelationshipChangeResult, +) from .storage import InMemoryKnowledgeRepository from .workflows import ( InputBundle, @@ -76,6 +97,8 @@ __all__ = [ "ActorType", "AssetRepresentation", "AssetChangeResult", + "AssetIngestionResult", + "AssetIngestionService", "AssetRegistryRepository", "AssetRegistryService", "AssetVersion", @@ -83,6 +106,7 @@ __all__ = [ "AuditOutcome", "AuthorizationError", "Classification", + "ConnectorCapability", "Collection", "ContextAssembler", "ContextEntity", @@ -92,12 +116,19 @@ __all__ = [ "CoreRelationship", "DerivedArtifactLineage", "Diagnostic", + "DirectorySourceConnector", "DuplicateResourceError", + "ExtractionResult", + "ExtractorCapability", + "FormatExtractor", "InMemoryAssetRegistryRepository", "InMemoryKnowledgeRepository", "IngestionRequest", "IngestionResult", "IngestionService", + "IngestionFailure", + "IngestionJob", + "IngestionJobStatus", "InputBundle", "IdempotencyRecord", "IdempotencyStatus", @@ -105,6 +136,7 @@ __all__ = [ "KontextualError", "LifecycleState", "MetadataRecord", + "NormalizedDocument", "NotFoundError", "OperationRun", "OperationStage", @@ -124,6 +156,8 @@ __all__ = [ "RunStatus", "Sensitivity", "SourceReference", + "SourceConnector", + "SourcePayload", "SQLiteAssetRegistryRepository", "ValidationError", "VersionChangeType", diff --git a/src/kontextual_engine/adapters/builtin_extractors/__init__.py b/src/kontextual_engine/adapters/builtin_extractors/__init__.py new file mode 100644 index 0000000..ec989f5 --- /dev/null +++ b/src/kontextual_engine/adapters/builtin_extractors/__init__.py @@ -0,0 +1,5 @@ +"""Built-in baseline format extractors.""" + +from .text import PlainTextExtractor + +__all__ = ["PlainTextExtractor"] diff --git a/src/kontextual_engine/adapters/builtin_extractors/text.py b/src/kontextual_engine/adapters/builtin_extractors/text.py new file mode 100644 index 0000000..63ed20c --- /dev/null +++ b/src/kontextual_engine/adapters/builtin_extractors/text.py @@ -0,0 +1,42 @@ +"""Plain text normalization extractor.""" + +from __future__ import annotations + +from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload + + +class PlainTextExtractor: + name = "plain-text" + media_types = ("text/plain",) + + def capabilities(self) -> ExtractorCapability: + return ExtractorCapability( + extractor_name=self.name, + media_types=self.media_types, + extraction_depth="text", + produces_structure=False, + ) + + def supports(self, media_type: str) -> bool: + return media_type in self.media_types or media_type.startswith("text/plain") + + def extract(self, payload: SourcePayload) -> ExtractionResult: + text = payload.read_text() + normalized = NormalizedDocument( + title=payload.title, + text=text, + fields={"line_count": len(text.splitlines())}, + confidence=1.0, + extractor_metadata={ + "extractor": self.name, + "source_media_type": payload.media_type, + }, + ) + return ExtractionResult( + normalized=normalized, + metadata={ + "extractor": self.name, + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + }, + ) diff --git a/src/kontextual_engine/adapters/local_files/__init__.py b/src/kontextual_engine/adapters/local_files/__init__.py new file mode 100644 index 0000000..cc87846 --- /dev/null +++ b/src/kontextual_engine/adapters/local_files/__init__.py @@ -0,0 +1,5 @@ +"""Local filesystem ingestion connector.""" + +from .connector import LocalFileConnector + +__all__ = ["LocalFileConnector"] diff --git a/src/kontextual_engine/adapters/local_files/connector.py b/src/kontextual_engine/adapters/local_files/connector.py new file mode 100644 index 0000000..8db24c6 --- /dev/null +++ b/src/kontextual_engine/adapters/local_files/connector.py @@ -0,0 +1,77 @@ +"""Local file and directory source connector.""" + +from __future__ import annotations + +import mimetypes +from pathlib import Path +from typing import Any + +from kontextual_engine.core import ConnectorCapability, SourcePayload, SourceReference, content_digest +from kontextual_engine.errors import NotFoundError, ValidationError + + +class LocalFileConnector: + name = "local_file" + + def capabilities(self) -> ConnectorCapability: + return ConnectorCapability( + connector_name=self.name, + source_types=("file", "directory"), + supports_directories=True, + metadata={"uri_schemes": ["file", "path"]}, + ) + + def fetch(self, source_uri: str) -> SourcePayload: + path = Path(source_uri).expanduser() + if not path.exists(): + raise NotFoundError("Local source file not found", details={"path": str(path)}) + if not path.is_file(): + raise ValidationError("Local source is not a file", details={"path": str(path)}) + + content = path.read_bytes() + media_type = _guess_media_type(path) + source_ref = SourceReference( + source_system=self.name, + path=str(path), + checksum=content_digest(content), + connector_ref=f"{self.name}:{path.resolve()}", + metadata=_file_metadata(path), + ) + return SourcePayload( + connector_name=self.name, + source_uri=str(path), + source_ref=source_ref, + media_type=media_type, + content=content, + title=path.stem, + metadata={"filename": path.name, **_file_metadata(path)}, + ) + + def iter_files(self, source_uri: str, *, recursive: bool = True) -> list[str]: + root = Path(source_uri).expanduser() + if not root.exists(): + raise NotFoundError("Local source directory not found", details={"path": str(root)}) + if root.is_file(): + return [str(root)] + if not root.is_dir(): + raise ValidationError("Local source is not a directory", details={"path": str(root)}) + pattern = "**/*" if recursive else "*" + return sorted(str(path) for path in root.glob(pattern) if path.is_file()) + + +def _guess_media_type(path: Path) -> str: + suffix = path.suffix.lower() + if suffix in {".md", ".markdown", ".mkd"}: + return "text/markdown" + if suffix in {".txt", ".text", ".log"}: + return "text/plain" + guessed, _ = mimetypes.guess_type(path.name) + return guessed or "application/octet-stream" + + +def _file_metadata(path: Path) -> dict[str, Any]: + stat = path.stat() + return { + "size_bytes": stat.st_size, + "mtime_ns": stat.st_mtime_ns, + } diff --git a/src/kontextual_engine/adapters/markitect_tool/__init__.py b/src/kontextual_engine/adapters/markitect_tool/__init__.py new file mode 100644 index 0000000..aa41126 --- /dev/null +++ b/src/kontextual_engine/adapters/markitect_tool/__init__.py @@ -0,0 +1,5 @@ +"""markitect-tool ingestion adapter boundary.""" + +from .markdown import MarkitectMarkdownExtractor + +__all__ = ["MarkitectMarkdownExtractor"] diff --git a/src/kontextual_engine/adapters/markitect_tool/markdown.py b/src/kontextual_engine/adapters/markitect_tool/markdown.py new file mode 100644 index 0000000..0c17ce9 --- /dev/null +++ b/src/kontextual_engine/adapters/markitect_tool/markdown.py @@ -0,0 +1,86 @@ +"""Markdown normalization through markitect-tool.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload +from kontextual_engine.errors import AdapterUnavailableError + + +class MarkitectMarkdownExtractor: + """Adapter boundary to markitect-tool; Markdown syntax logic stays external.""" + + name = "markitect-tool" + media_types = ("text/markdown", "text/x-markdown") + + def capabilities(self) -> ExtractorCapability: + return ExtractorCapability( + extractor_name=self.name, + media_types=self.media_types, + extraction_depth="structure", + produces_structure=True, + optional_dependency="markitect-tool", + metadata={"delegates_markdown_syntax": True}, + ) + + def supports(self, media_type: str) -> bool: + return media_type in self.media_types + + def extract(self, payload: SourcePayload) -> ExtractionResult: + try: + import markitect_tool as mkt + except Exception as exc: # pragma: no cover - depends on optional environment + raise AdapterUnavailableError( + "markitect-tool is required for markdown normalization", + details={"adapter": self.name, "media_type": payload.media_type}, + ) from exc + + source_path = payload.source_ref.path + text = payload.read_text() + document = self._parse_document(mkt, text, source_path) + serialized = document.to_dict() if hasattr(document, "to_dict") else {} + snapshot = self._snapshot(mkt, source_path) + structure = { + "frontmatter": dict(serialized.get("frontmatter", {})), + "headings": list(serialized.get("headings", [])), + "sections": list(serialized.get("sections", [])), + } + normalized = NormalizedDocument( + title=payload.title, + text=text, + structure=structure, + fields={ + "frontmatter": dict(serialized.get("frontmatter", {})), + "heading_count": len(structure["headings"]), + "section_count": len(structure["sections"]), + }, + confidence=1.0, + extractor_metadata={ + "extractor": self.name, + "source_media_type": payload.media_type, + "snapshot": snapshot, + }, + ) + return ExtractionResult( + normalized=normalized, + metadata={ + "extractor": self.name, + "frontmatter": structure["frontmatter"], + "headings": structure["headings"], + "snapshot": snapshot, + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + }, + ) + + def _parse_document(self, mkt: Any, text: str, source_path: str | None) -> Any: + if source_path and Path(source_path).exists() and hasattr(mkt, "parse_markdown_file"): + return mkt.parse_markdown_file(Path(source_path)) + return mkt.parse_markdown(text, source_path=source_path) + + def _snapshot(self, mkt: Any, source_path: str | None) -> dict[str, Any]: + if not source_path or not Path(source_path).exists() or not hasattr(mkt, "snapshot_identity_for_file"): + return {} + return mkt.snapshot_identity_for_file(Path(source_path), parse_options={"profile": "default"}).to_dict() diff --git a/src/kontextual_engine/adapters/memory/asset_registry.py b/src/kontextual_engine/adapters/memory/asset_registry.py index d412950..b49a43a 100644 --- a/src/kontextual_engine/adapters/memory/asset_registry.py +++ b/src/kontextual_engine/adapters/memory/asset_registry.py @@ -13,6 +13,8 @@ from kontextual_engine.core import ( ContextEntity, CoreRelationship, IdempotencyRecord, + IngestionJob, + IngestionJobStatus, KnowledgeAsset, LifecycleState, MetadataRecord, @@ -32,6 +34,7 @@ class InMemoryAssetRegistryRepository: versions: dict[str, list[AssetVersion]] = field(default_factory=dict) audit_events: dict[str, AuditEvent] = field(default_factory=dict) idempotency_records: dict[str, IdempotencyRecord] = field(default_factory=dict) + ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict) def save_actor(self, actor: Actor) -> Actor: self.actors[actor.id] = actor @@ -190,3 +193,23 @@ class InMemoryAssetRegistryRepository: def get_idempotency_record(self, key: str) -> IdempotencyRecord | None: return self.idempotency_records.get(key) + + def save_ingestion_job(self, job: IngestionJob) -> IngestionJob: + self.ingestion_jobs[job.job_id] = job + return job + + def get_ingestion_job(self, job_id: str) -> IngestionJob: + try: + return self.ingestion_jobs[job_id] + except KeyError as exc: + raise NotFoundError("Ingestion job not found", details={"job_id": job_id}) from exc + + def list_ingestion_jobs( + self, + *, + status: IngestionJobStatus | None = None, + ) -> list[IngestionJob]: + jobs: Iterable[IngestionJob] = self.ingestion_jobs.values() + if status is not None: + jobs = [job for job in jobs if job.status == status] + return sorted(jobs, key=lambda job: (job.created_at, job.job_id)) diff --git a/src/kontextual_engine/adapters/sqlite/asset_registry.py b/src/kontextual_engine/adapters/sqlite/asset_registry.py index 2ae7898..842acb9 100644 --- a/src/kontextual_engine/adapters/sqlite/asset_registry.py +++ b/src/kontextual_engine/adapters/sqlite/asset_registry.py @@ -15,6 +15,8 @@ from kontextual_engine.core import ( ContextEntity, CoreRelationship, IdempotencyRecord, + IngestionJob, + IngestionJobStatus, KnowledgeAsset, LifecycleState, MetadataRecord, @@ -381,6 +383,51 @@ class SQLiteAssetRegistryRepository: return None return IdempotencyRecord.from_dict(_loads(row["payload"])) + def save_ingestion_job(self, job: IngestionJob) -> IngestionJob: + with self._connect() as conn: + conn.execute( + """ + insert into ingestion_jobs (id, status, actor_id, correlation_id, created_at, updated_at, payload) + values (?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + status=excluded.status, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + updated_at=excluded.updated_at, + payload=excluded.payload + """, + ( + job.job_id, + job.status.value, + job.actor_id, + job.correlation_id, + job.created_at, + job.updated_at, + _json(job.to_dict()), + ), + ) + return job + + def get_ingestion_job(self, job_id: str) -> IngestionJob: + row = self._one("select payload from ingestion_jobs where id = ?", (job_id,)) + if row is None: + raise NotFoundError("Ingestion job not found", details={"job_id": job_id}) + return IngestionJob.from_dict(_loads(row["payload"])) + + def list_ingestion_jobs( + self, + *, + status: IngestionJobStatus | None = None, + ) -> list[IngestionJob]: + if status is None: + rows = self._all("select payload from ingestion_jobs order by created_at, id", ()) + else: + rows = self._all( + "select payload from ingestion_jobs where status = ? order by created_at, id", + (status.value,), + ) + return [IngestionJob.from_dict(_loads(row["payload"])) for row in rows] + def _initialize(self) -> None: with self._connect() as conn: conn.executescript( @@ -449,6 +496,15 @@ class SQLiteAssetRegistryRepository: status text not null, payload text not null ); + create table if not exists ingestion_jobs ( + id text primary key, + status text not null, + actor_id text not null, + correlation_id text not null, + created_at text not null, + updated_at text not null, + payload text not null + ); create index if not exists idx_assets_lifecycle on assets(lifecycle); create index if not exists idx_representations_asset on representations(asset_id); create index if not exists idx_metadata_asset on metadata_records(asset_id); @@ -458,6 +514,8 @@ class SQLiteAssetRegistryRepository: create index if not exists idx_versions_asset on asset_versions(asset_id); create index if not exists idx_audit_target on audit_events(target); create index if not exists idx_audit_correlation on audit_events(correlation_id); + create index if not exists idx_ingestion_jobs_status on ingestion_jobs(status); + create index if not exists idx_ingestion_jobs_correlation on ingestion_jobs(correlation_id); """ ) diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index a8098ff..a2d7e62 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -4,6 +4,16 @@ from .actors import Actor, ActorType, OperationContext from .assets import AssetRepresentation, KnowledgeAsset, RepresentationKind from .audit import AuditEvent, AuditOutcome from .idempotency import IdempotencyRecord, IdempotencyStatus +from .ingestion import ( + ConnectorCapability, + ExtractionResult, + ExtractorCapability, + IngestionFailure, + IngestionJob, + IngestionJobStatus, + NormalizedDocument, + SourcePayload, +) from .metadata import Classification, LifecycleState, MetadataRecord, Sensitivity from .policy import PolicyDecision, PolicyEffect from .primitives import content_digest, mapping_digest, new_id, stable_json_dumps, utc_now @@ -28,15 +38,22 @@ __all__ = [ "AuditEvent", "AuditOutcome", "Classification", + "ConnectorCapability", "ContextEntity", "ContextEntityType", "CoreRelationship", "DerivedArtifactLineage", + "ExtractionResult", + "ExtractorCapability", "IdempotencyRecord", "IdempotencyStatus", + "IngestionFailure", + "IngestionJob", + "IngestionJobStatus", "KnowledgeAsset", "LifecycleState", "MetadataRecord", + "NormalizedDocument", "OperationContext", "PolicyDecision", "PolicyEffect", @@ -44,6 +61,7 @@ __all__ = [ "RepresentationKind", "Sensitivity", "SourceReference", + "SourcePayload", "VersionChangeType", "content_digest", "mapping_digest", diff --git a/src/kontextual_engine/core/ingestion.py b/src/kontextual_engine/core/ingestion.py new file mode 100644 index 0000000..0899a55 --- /dev/null +++ b/src/kontextual_engine/core/ingestion.py @@ -0,0 +1,308 @@ +"""Ingestion job and normalized content primitives.""" + +from __future__ import annotations + +from dataclasses import dataclass, field, replace +from enum import Enum +from typing import Any + +from .primitives import compact_dict, content_digest, mapping_digest, new_id, stable_json_dumps, utc_now +from .provenance import SourceReference + + +class IngestionJobStatus(str, Enum): + QUEUED = "queued" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + PARTIALLY_COMPLETED = "partially_completed" + RETRIED = "retried" + QUARANTINED = "quarantined" + CANCELED = "canceled" + + +@dataclass(frozen=True) +class IngestionFailure: + code: str + message: str + retriable: bool = False + details: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "code": self.code, + "message": self.message, + "retriable": self.retriable, + "details": dict(self.details), + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "IngestionFailure": + return cls( + code=data["code"], + message=data["message"], + retriable=bool(data.get("retriable", False)), + details=dict(data.get("details", {})), + ) + + +@dataclass(frozen=True) +class ConnectorCapability: + connector_name: str + source_types: tuple[str, ...] + supports_directories: bool = False + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "connector_name": self.connector_name, + "source_types": list(self.source_types), + "supports_directories": self.supports_directories, + "metadata": dict(self.metadata), + } + ) + + +@dataclass(frozen=True) +class ExtractorCapability: + extractor_name: str + media_types: tuple[str, ...] + extraction_depth: str = "text" + produces_structure: bool = False + optional_dependency: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "extractor_name": self.extractor_name, + "media_types": list(self.media_types), + "extraction_depth": self.extraction_depth, + "produces_structure": self.produces_structure, + "optional_dependency": self.optional_dependency, + "metadata": dict(self.metadata), + } + ) + + +@dataclass(frozen=True) +class SourcePayload: + connector_name: str + source_uri: str + source_ref: SourceReference + media_type: str + content: bytes + title: str + metadata: dict[str, Any] = field(default_factory=dict) + permission_context: dict[str, Any] = field(default_factory=dict) + + @property + def content_digest(self) -> str: + return content_digest(self.content) + + @property + def size_bytes(self) -> int: + return len(self.content) + + def read_text(self, encoding: str = "utf-8") -> str: + return self.content.decode(encoding) + + +@dataclass(frozen=True) +class NormalizedDocument: + text: str + media_type: str = "application/vnd.kontextual.normalized+json" + title: str | None = None + structure: dict[str, Any] = field(default_factory=dict) + tables: list[dict[str, Any]] = field(default_factory=list) + links: list[dict[str, Any]] = field(default_factory=list) + fields: dict[str, Any] = field(default_factory=dict) + confidence: float | None = None + unsupported_elements: list[dict[str, Any]] = field(default_factory=list) + extractor_metadata: dict[str, Any] = field(default_factory=dict) + + @property + def normalized_hash(self) -> str: + return mapping_digest(self.to_dict(include_hash=False)) + + def to_dict(self, *, include_hash: bool = True) -> dict[str, Any]: + data = compact_dict( + { + "title": self.title, + "text": self.text, + "media_type": self.media_type, + "structure": dict(self.structure), + "tables": list(self.tables), + "links": list(self.links), + "fields": dict(self.fields), + "confidence": self.confidence, + "unsupported_elements": list(self.unsupported_elements), + "extractor_metadata": dict(self.extractor_metadata), + } + ) + if include_hash: + data["normalized_hash"] = self.normalized_hash + return data + + def to_json(self) -> str: + return stable_json_dumps(self.to_dict()) + + +@dataclass(frozen=True) +class ExtractionResult: + normalized: NormalizedDocument + metadata: dict[str, Any] = field(default_factory=dict) + diagnostics: tuple[IngestionFailure, ...] = () + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "normalized": self.normalized.to_dict(), + "metadata": dict(self.metadata), + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + ) + + +@dataclass(frozen=True) +class IngestionJob: + input: dict[str, Any] + actor_id: str + correlation_id: str + status: IngestionJobStatus = IngestionJobStatus.QUEUED + source_ref: SourceReference | None = None + output_asset_ids: tuple[str, ...] = () + failures: tuple[IngestionFailure, ...] = () + partial_results: dict[str, Any] = field(default_factory=dict) + retry_options: dict[str, Any] = field(default_factory=dict) + retry_of_job_id: str | None = None + attempts: int = 1 + metadata: dict[str, Any] = field(default_factory=dict) + job_id: str = field(default_factory=lambda: new_id("ingest")) + created_at: str = field(default_factory=lambda: utc_now().isoformat()) + updated_at: str = field(default_factory=lambda: utc_now().isoformat()) + completed_at: str | None = None + + @classmethod + def create( + cls, + *, + input: dict[str, Any], + actor_id: str, + correlation_id: str, + retry_of_job_id: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> "IngestionJob": + return cls( + input=dict(input), + actor_id=actor_id, + correlation_id=correlation_id, + retry_of_job_id=retry_of_job_id, + metadata=dict(metadata or {}), + ) + + def running(self, *, source_ref: SourceReference | None = None) -> "IngestionJob": + return replace( + self, + status=IngestionJobStatus.RUNNING, + source_ref=source_ref or self.source_ref, + updated_at=utc_now().isoformat(), + ) + + def completed( + self, + *, + output_asset_ids: tuple[str, ...], + partial_results: dict[str, Any] | None = None, + ) -> "IngestionJob": + now = utc_now().isoformat() + return replace( + self, + status=IngestionJobStatus.COMPLETED, + output_asset_ids=tuple(output_asset_ids), + partial_results=dict(partial_results or self.partial_results), + updated_at=now, + completed_at=now, + ) + + def failed( + self, + failure: IngestionFailure, + *, + status: IngestionJobStatus = IngestionJobStatus.FAILED, + partial_results: dict[str, Any] | None = None, + ) -> "IngestionJob": + now = utc_now().isoformat() + return replace( + self, + status=status, + failures=self.failures + (failure,), + partial_results=dict(partial_results or self.partial_results), + updated_at=now, + completed_at=now, + ) + + def partially_completed( + self, + *, + output_asset_ids: tuple[str, ...], + failures: tuple[IngestionFailure, ...], + partial_results: dict[str, Any], + ) -> "IngestionJob": + now = utc_now().isoformat() + return replace( + self, + status=IngestionJobStatus.PARTIALLY_COMPLETED, + output_asset_ids=tuple(output_asset_ids), + failures=tuple(failures), + partial_results=dict(partial_results), + updated_at=now, + completed_at=now, + ) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "job_id": self.job_id, + "status": self.status.value, + "input": dict(self.input), + "actor_id": self.actor_id, + "correlation_id": self.correlation_id, + "source_ref": self.source_ref.to_dict() if self.source_ref else None, + "output_asset_ids": list(self.output_asset_ids), + "failures": [failure.to_dict() for failure in self.failures], + "partial_results": dict(self.partial_results), + "retry_options": dict(self.retry_options), + "retry_of_job_id": self.retry_of_job_id, + "attempts": self.attempts, + "metadata": dict(self.metadata), + "created_at": self.created_at, + "updated_at": self.updated_at, + "completed_at": self.completed_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "IngestionJob": + source_ref = data.get("source_ref") + return cls( + job_id=data["job_id"], + status=IngestionJobStatus(data["status"]), + input=dict(data.get("input", {})), + actor_id=data["actor_id"], + correlation_id=data["correlation_id"], + source_ref=SourceReference.from_dict(source_ref) if source_ref else None, + output_asset_ids=tuple(data.get("output_asset_ids", [])), + failures=tuple(IngestionFailure.from_dict(item) for item in data.get("failures", [])), + partial_results=dict(data.get("partial_results", {})), + retry_options=dict(data.get("retry_options", {})), + retry_of_job_id=data.get("retry_of_job_id"), + attempts=int(data.get("attempts", 1)), + metadata=dict(data.get("metadata", {})), + created_at=data["created_at"], + updated_at=data["updated_at"], + completed_at=data.get("completed_at"), + ) diff --git a/src/kontextual_engine/ports/__init__.py b/src/kontextual_engine/ports/__init__.py index b3d3e16..b9f21d8 100644 --- a/src/kontextual_engine/ports/__init__.py +++ b/src/kontextual_engine/ports/__init__.py @@ -1,11 +1,14 @@ """Stable ports owned by the engine.""" +from .ingestion import DirectorySourceConnector, FormatExtractor, SourceConnector from .policy import AllowAllPolicyGateway, PolicyGateway from .repositories import AssetRegistryRepository __all__ = [ "AllowAllPolicyGateway", "AssetRegistryRepository", + "DirectorySourceConnector", + "FormatExtractor", "PolicyGateway", + "SourceConnector", ] - diff --git a/src/kontextual_engine/ports/ingestion.py b/src/kontextual_engine/ports/ingestion.py new file mode 100644 index 0000000..5988c52 --- /dev/null +++ b/src/kontextual_engine/ports/ingestion.py @@ -0,0 +1,34 @@ +"""Connector and extractor ports for ingestion.""" + +from __future__ import annotations + +from typing import Protocol + +from kontextual_engine.core import ( + ConnectorCapability, + ExtractionResult, + ExtractorCapability, + SourcePayload, +) + + +class SourceConnector(Protocol): + name: str + + def capabilities(self) -> ConnectorCapability: ... + + def fetch(self, source_uri: str) -> SourcePayload: ... + + +class DirectorySourceConnector(SourceConnector, Protocol): + def iter_files(self, source_uri: str, *, recursive: bool = True) -> list[str]: ... + + +class FormatExtractor(Protocol): + name: str + + def capabilities(self) -> ExtractorCapability: ... + + def supports(self, media_type: str) -> bool: ... + + def extract(self, payload: SourcePayload) -> ExtractionResult: ... diff --git a/src/kontextual_engine/ports/repositories.py b/src/kontextual_engine/ports/repositories.py index 1a75442..a64cf4c 100644 --- a/src/kontextual_engine/ports/repositories.py +++ b/src/kontextual_engine/ports/repositories.py @@ -12,6 +12,8 @@ from kontextual_engine.core import ( ContextEntity, CoreRelationship, IdempotencyRecord, + IngestionJob, + IngestionJobStatus, KnowledgeAsset, LifecycleState, MetadataRecord, @@ -71,3 +73,11 @@ class AssetRegistryRepository(Protocol): def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord: ... def get_idempotency_record(self, key: str) -> IdempotencyRecord | None: ... + + def save_ingestion_job(self, job: IngestionJob) -> IngestionJob: ... + def get_ingestion_job(self, job_id: str) -> IngestionJob: ... + def list_ingestion_jobs( + self, + *, + status: IngestionJobStatus | None = None, + ) -> list[IngestionJob]: ... diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 5faca01..477e779 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -1,5 +1,12 @@ """Application services for the engine.""" from .asset_service import AssetChangeResult, AssetRegistryService, RelationshipChangeResult +from .ingestion_service import AssetIngestionResult, AssetIngestionService -__all__ = ["AssetChangeResult", "AssetRegistryService", "RelationshipChangeResult"] +__all__ = [ + "AssetChangeResult", + "AssetIngestionResult", + "AssetIngestionService", + "AssetRegistryService", + "RelationshipChangeResult", +] diff --git a/src/kontextual_engine/services/ingestion_service.py b/src/kontextual_engine/services/ingestion_service.py new file mode 100644 index 0000000..18ac917 --- /dev/null +++ b/src/kontextual_engine/services/ingestion_service.py @@ -0,0 +1,304 @@ +"""Application service for governed asset ingestion.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +from kontextual_engine.adapters.builtin_extractors import PlainTextExtractor +from kontextual_engine.adapters.local_files import LocalFileConnector +from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor +from kontextual_engine.core import ( + AssetRepresentation, + Classification, + IngestionFailure, + IngestionJob, + IngestionJobStatus, + KnowledgeAsset, + MetadataRecord, + OperationContext, + RepresentationKind, + Sensitivity, + SourcePayload, + mapping_digest, +) +from kontextual_engine.errors import AdapterUnavailableError, KontextualError +from kontextual_engine.ports import AssetRegistryRepository, DirectorySourceConnector, FormatExtractor, SourceConnector + +from .asset_service import AssetChangeResult, AssetRegistryService + + +@dataclass(frozen=True) +class AssetIngestionResult: + job: IngestionJob + asset: KnowledgeAsset | None = None + asset_change: AssetChangeResult | None = None + + +class AssetIngestionService: + def __init__( + self, + repository: AssetRegistryRepository, + *, + asset_service: AssetRegistryService | None = None, + connectors: Iterable[SourceConnector] | None = None, + extractors: Iterable[FormatExtractor] | None = None, + ) -> None: + self.repository = repository + self.asset_service = asset_service or AssetRegistryService(repository) + self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])} + self.extractors = list(extractors or [PlainTextExtractor(), MarkitectMarkdownExtractor()]) + + def connector_capabilities(self) -> list[dict]: + return [connector.capabilities().to_dict() for connector in self.connectors.values()] + + def extractor_capabilities(self) -> list[dict]: + return [extractor.capabilities().to_dict() for extractor in self.extractors] + + def ingest_file( + self, + path: str | Path, + context: OperationContext, + *, + asset_id: str | None = None, + title: str | None = None, + classification: Classification | None = None, + idempotency_key: str | None = None, + ) -> AssetIngestionResult: + connector = self._connector("local_file") + job = IngestionJob.create( + input={"connector": connector.name, "source_uri": str(path), "mode": "file"}, + actor_id=context.actor.id, + correlation_id=context.correlation_id, + ) + self.repository.save_ingestion_job(job) + try: + payload = connector.fetch(str(path)) + return self._ingest_payload( + job, + payload, + context, + asset_id=asset_id, + title=title, + classification=classification, + idempotency_key=idempotency_key, + ) + except Exception as exc: + failed = job.failed(_failure_from_exception(exc)) + self.repository.save_ingestion_job(failed) + return AssetIngestionResult(failed) + + def ingest_directory( + self, + path: str | Path, + context: OperationContext, + *, + recursive: bool = True, + classification: Classification | None = None, + ) -> IngestionJob: + connector = self._directory_connector("local_file") + job = IngestionJob.create( + input={ + "connector": connector.name, + "source_uri": str(path), + "mode": "directory", + "recursive": recursive, + }, + actor_id=context.actor.id, + correlation_id=context.correlation_id, + ) + job = job.running() + self.repository.save_ingestion_job(job) + + output_asset_ids: list[str] = [] + failures: list[IngestionFailure] = [] + item_results: list[dict] = [] + files = connector.iter_files(str(path), recursive=recursive) + for source_uri in files: + result = self.ingest_file(source_uri, context, classification=classification) + item = { + "source_uri": source_uri, + "job_id": result.job.job_id, + "status": result.job.status.value, + } + if result.asset is not None: + output_asset_ids.append(result.asset.id) + item["asset_id"] = result.asset.id + if result.job.failures: + failures.extend(result.job.failures) + item["failures"] = [failure.to_dict() for failure in result.job.failures] + item_results.append(item) + + partial_results = { + "files_total": len(files), + "succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value), + "failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value), + "quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value), + "skipped": 0, + "items": item_results, + } + if failures and output_asset_ids: + job = job.partially_completed( + output_asset_ids=tuple(output_asset_ids), + failures=tuple(failures), + partial_results=partial_results, + ) + elif failures: + job = job.failed( + IngestionFailure( + code="ingestion.directory_failed", + message="Directory ingestion failed for all files", + retriable=True, + details=partial_results, + ), + partial_results=partial_results, + ) + else: + job = job.completed(output_asset_ids=tuple(output_asset_ids), partial_results=partial_results) + self.repository.save_ingestion_job(job) + return job + + def get_job(self, job_id: str) -> IngestionJob: + return self.repository.get_ingestion_job(job_id) + + def list_jobs(self, *, status: IngestionJobStatus | None = None) -> list[IngestionJob]: + return self.repository.list_ingestion_jobs(status=status) + + def _ingest_payload( + self, + job: IngestionJob, + payload: SourcePayload, + context: OperationContext, + *, + asset_id: str | None, + title: str | None, + classification: Classification | None, + idempotency_key: str | None, + ) -> AssetIngestionResult: + job = job.running(source_ref=payload.source_ref) + self.repository.save_ingestion_job(job) + extractor = self._extractor(payload.media_type) + extraction = extractor.extract(payload) + resolved_asset_id = asset_id or _stable_asset_id(payload) + source_representation = AssetRepresentation.from_content( + resolved_asset_id, + RepresentationKind.SOURCE, + payload.media_type, + payload.content, + storage_ref=payload.source_uri, + producer=payload.connector_name, + source_ref_id=payload.source_ref.id, + metadata={ + "connector": payload.connector_name, + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + **payload.metadata, + }, + ) + normalized_representation = AssetRepresentation.from_content( + resolved_asset_id, + RepresentationKind.NORMALIZED, + extraction.normalized.media_type, + extraction.normalized.to_json(), + producer=extractor.name, + source_ref_id=payload.source_ref.id, + metadata={ + "extractor": extractor.name, + "normalized_hash": extraction.normalized.normalized_hash, + **extraction.metadata, + }, + ) + asset_change = self.asset_service.create_asset( + title or payload.title, + classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id=resolved_asset_id, + source_refs=[payload.source_ref], + representations=[source_representation, normalized_representation], + metadata_records=_metadata_records(payload, extractor.name, extraction.metadata), + idempotency_key=idempotency_key, + ) + completed = job.completed( + output_asset_ids=(asset_change.asset.id,), + partial_results={ + "connector": payload.connector_name, + "extractor": extractor.name, + "source_digest": payload.content_digest, + "representations": [ + source_representation.representation_id, + normalized_representation.representation_id, + ], + "diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics], + }, + ) + self.repository.save_ingestion_job(completed) + return AssetIngestionResult(completed, asset_change.asset, asset_change) + + def _connector(self, name: str) -> SourceConnector: + try: + return self.connectors[name] + except KeyError as exc: + raise AdapterUnavailableError("Source connector is not registered", details={"connector": name}) from exc + + def _directory_connector(self, name: str) -> DirectorySourceConnector: + connector = self._connector(name) + if not hasattr(connector, "iter_files"): + raise AdapterUnavailableError( + "Source connector does not support directory iteration", + details={"connector": name}, + ) + return connector # type: ignore[return-value] + + def _extractor(self, media_type: str) -> FormatExtractor: + for extractor in self.extractors: + if extractor.supports(media_type): + return extractor + raise AdapterUnavailableError( + "No extractor registered for media type", + details={"media_type": media_type}, + ) + + +def _stable_asset_id(payload: SourcePayload) -> str: + digest = mapping_digest( + { + "source_system": payload.source_ref.source_system, + "path": payload.source_ref.path, + "uri": payload.source_ref.uri, + "external_id": payload.source_ref.external_id, + "connector_ref": payload.source_ref.connector_ref, + } + ) + return f"asset-{digest.removeprefix('sha256:')[:20]}" + + +def _metadata_records( + payload: SourcePayload, + extractor_name: str, + extraction_metadata: dict, +) -> list[MetadataRecord]: + return [ + MetadataRecord("source_media_type", payload.media_type, provenance={"producer": payload.connector_name}), + MetadataRecord("source_digest", payload.content_digest, provenance={"producer": payload.connector_name}), + MetadataRecord("source_size_bytes", payload.size_bytes, provenance={"producer": payload.connector_name}), + MetadataRecord("connector", payload.connector_name, provenance={"producer": payload.connector_name}, confirmed=True), + MetadataRecord("extractor", extractor_name, provenance={"producer": extractor_name}, confirmed=True), + MetadataRecord("extraction", dict(extraction_metadata), provenance={"producer": extractor_name}), + ] + + +def _failure_from_exception(exc: Exception) -> IngestionFailure: + if isinstance(exc, KontextualError): + return IngestionFailure( + code=exc.code, + message=str(exc), + retriable=isinstance(exc, AdapterUnavailableError), + details=dict(exc.details), + ) + return IngestionFailure( + code="ingestion.unexpected", + message=str(exc), + retriable=False, + details={"exception_type": type(exc).__name__}, + ) diff --git a/tests/test_asset_ingestion_service.py b/tests/test_asset_ingestion_service.py new file mode 100644 index 0000000..f9bb3ef --- /dev/null +++ b/tests/test_asset_ingestion_service.py @@ -0,0 +1,108 @@ +from pathlib import Path + +from kontextual_engine import ( + Actor, + ActorType, + AssetIngestionService, + Classification, + IngestionJobStatus, + InMemoryAssetRegistryRepository, + LifecycleState, + OperationContext, + RepresentationKind, + Sensitivity, + SQLiteAssetRegistryRepository, +) + + +def test_asset_ingestion_service_ingests_plain_text_file_as_governed_asset(tmp_path: Path) -> None: + source = tmp_path / "note.txt" + source.write_text("hello\nworld\n", encoding="utf-8") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + + result = service.ingest_file( + source, + operation_context(), + asset_id="asset-note", + classification=Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), + ) + + assert result.job.status == IngestionJobStatus.COMPLETED + assert result.job.correlation_id == "corr-ingest" + assert result.job.output_asset_ids == ("asset-note",) + assert result.asset is not None + assert result.asset.source_refs[0].source_system == "local_file" + assert result.asset.source_refs[0].path == str(source) + assert repo.get_ingestion_job(result.job.job_id).status == IngestionJobStatus.COMPLETED + assert {item.kind for item in repo.list_representations(asset_id="asset-note")} == { + RepresentationKind.SOURCE, + RepresentationKind.NORMALIZED, + } + normalized = repo.list_representations(asset_id="asset-note", kind=RepresentationKind.NORMALIZED)[0] + assert normalized.media_type == "application/vnd.kontextual.normalized+json" + assert normalized.metadata["extractor"] == "plain-text" + assert repo.list_audit_events(target="asset:asset-note")[0].operation == "asset.create" + + +def test_ingestion_failure_records_job_without_trusting_unsupported_asset(tmp_path: Path) -> None: + source = tmp_path / "blob.bin" + source.write_bytes(b"\x00\x01\x02") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + + result = service.ingest_file(source, operation_context(), asset_id="asset-blob") + + assert result.asset is None + assert result.job.status == IngestionJobStatus.FAILED + assert result.job.failures[0].code == "kontextual.adapter_unavailable" + assert result.job.failures[0].details["media_type"] == "application/octet-stream" + assert repo.list_assets() == [] + + +def test_directory_ingestion_reports_partial_results(tmp_path: Path) -> None: + (tmp_path / "one.txt").write_text("one", encoding="utf-8") + (tmp_path / "two.bin").write_bytes(b"\x00\x01") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + + job = service.ingest_directory(tmp_path, operation_context(), recursive=False) + + assert job.status == IngestionJobStatus.PARTIALLY_COMPLETED + assert job.partial_results["files_total"] == 2 + assert job.partial_results["succeeded"] == 1 + assert job.partial_results["failed"] == 1 + assert len(job.output_asset_ids) == 1 + assert len(job.failures) == 1 + + +def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None: + source = tmp_path / "policy.txt" + source.write_text("governed ingestion", encoding="utf-8") + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + service = AssetIngestionService(repo) + + result = service.ingest_file( + source, + operation_context(), + asset_id="asset-policy", + ) + + reloaded = SQLiteAssetRegistryRepository(db_path) + job = reloaded.get_ingestion_job(result.job.job_id) + + assert job.status == IngestionJobStatus.COMPLETED + assert job.output_asset_ids == ("asset-policy",) + assert reloaded.get_asset("asset-policy").lifecycle == LifecycleState.ACTIVE + assert len(reloaded.list_representations(asset_id="asset-policy")) == 2 + + +def operation_context() -> OperationContext: + actor = Actor.create( + ActorType.HUMAN, + actor_id="user-ingest", + display_name="Ingestion Tester", + groups=["engineering"], + ) + return OperationContext.create(actor, correlation_id="corr-ingest") diff --git a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md index afd245c..91fae84 100644 --- a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md +++ b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md @@ -4,13 +4,13 @@ type: workplan title: "Multi-Format Ingestion And Normalization" domain: markitect repo: kontextual-engine -status: todo +status: active owner: codex topic_slug: markitect planning_priority: high planning_order: 6 created: "2026-05-05" -updated: "2026-05-05" +updated: "2026-05-06" state_hub_workstream_id: "270c83c0-eaed-4143-99d0-bb3fcfd23758" --- @@ -45,11 +45,21 @@ needed, and snapshot identity. The engine should normalize Markitect results into its common representation and preserve source/adapter provenance rather than rebuilding Markdown syntax behavior. +## Implementation Status + +As of 2026-05-06, the first ingestion slice is recorded in +`docs/ingestion-implementation.md`. It establishes ingestion job primitives, +connector/extractor ports, local file ingestion, plain text normalization, +Markitect markdown adapter boundaries, directory partial-result reporting, and +in-memory/SQLite job persistence. Remaining work is focused on async execution, +re-ingestion identity reconciliation, richer structural extraction, quarantine +policy checks, and non-text format adapters. + ## I6.1 - Implement ingestion job model status and retry surface ```task id: KONT-WP-0006-T001 -status: todo +status: done priority: high state_hub_task_id: "8e5e514a-6eef-42d9-a93c-2458b4c82753" ``` @@ -68,7 +78,7 @@ Acceptance: ```task id: KONT-WP-0006-T002 -status: todo +status: done priority: high state_hub_task_id: "3eafdab5-478d-49d9-a17f-3cd7c8847cb1" ``` @@ -87,7 +97,7 @@ Acceptance: ```task id: KONT-WP-0006-T003 -status: todo +status: in_progress priority: high state_hub_task_id: "d3e3d4d2-a581-4438-bee7-6fc4161d3925" ``` @@ -105,7 +115,7 @@ Acceptance: ```task id: KONT-WP-0006-T004 -status: todo +status: in_progress priority: high state_hub_task_id: "63bf2f7e-705d-40ae-a160-75fc508ffb1f" ```