diff --git a/docs/ingestion-implementation.md b/docs/ingestion-implementation.md index 4636462..4bbf40e 100644 --- a/docs/ingestion-implementation.md +++ b/docs/ingestion-implementation.md @@ -34,10 +34,20 @@ The new `AssetIngestionService` is separate from the older artifact-era - Connector and extractor port contracts owned by the engine. - Local file connector with source references, checksums, media type detection, file metadata, and directory file iteration. +- Explicit ingestion identity policy with conservative source-location identity + by default and opt-in content-digest identity for governed file move/rename + reconciliation. - Plain text extractor producing a normalized engine representation. +- CSV/TSV dataset extractor producing structured normalized table output with + columns, row counts, and table metadata. +- PDF and office document placeholder extractor that represents binary + documents as governed assets while reporting metadata-only extraction depth. - Markitect markdown extractor adapter boundary that delegates markdown parsing, headings, sections, frontmatter, and snapshot identity to `markitect-tool` when available. +- Missing `markitect-tool` dependency fails through structured + `AdapterUnavailableError` diagnostics instead of falling back to local + Markdown parsing. - Synchronous first-run ingestion flow that creates governed assets through `AssetRegistryService`. - Source and normalized `AssetRepresentation` records for ingested files. @@ -46,6 +56,12 @@ The new `AssetIngestionService` is separate from the older artifact-era - Failed unsupported-media ingestion records job failure without adding an asset to the trusted registry. - Directory ingestion with per-file child jobs and partial result accounting. +- Directory item results distinguish succeeded, skipped, failed, quarantined, + and retriable failure state. +- Re-ingestion can update an existing asset with new source references and + source/normalized representations instead of creating a second asset. +- Unchanged source re-ingestion can be skipped without creating a new asset + version. - In-memory and SQLite job persistence. ## Current SQLite Additions @@ -65,11 +81,9 @@ document classes part of the engine domain model. ## Not Yet Implemented - Asynchronous job runner and queue dispatch. -- Re-ingestion reconciliation for existing assets. -- Identity policies that preserve asset identity across source moves. -- PDF, office document, and dataset extractors. - Deep normalized structure for tables, links, embedded references, and fields - beyond extractor-provided metadata. + beyond extractor-provided metadata and the CSV/TSV baseline. +- Optional deep PDF and office document extraction adapters. - Quarantine policy checks beyond unsupported/failed extraction paths. ## Test Coverage @@ -81,4 +95,13 @@ document classes part of the engine domain model. - job persistence and status inspection, - unsupported media failure without trusted asset creation, - directory partial success/failure accounting, +- directory skipped item and retriable failure reporting, +- content-digest identity preserving asset identity across file moves, +- unchanged source re-ingestion skip behavior, +- Markitect markdown adapter delegation and missing-dependency behavior, +- CSV dataset structured normalization, +- PDF and office placeholder ingestion with explicit unsupported-depth + diagnostics, +- optional Markitect integration contract tests for parser, selector, + operation, snapshot, context package, contract, and schema behavior, - SQLite reload preserving ingestion jobs and ingested asset state. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 1ff305d..ac0101e 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -32,6 +32,7 @@ from .core import ( IdempotencyRecord, IdempotencyStatus, IngestionFailure, + IngestionIdentityPolicy, IngestionJob, IngestionJobStatus, KnowledgeAsset, @@ -137,6 +138,7 @@ __all__ = [ "IngestionResult", "IngestionService", "IngestionFailure", + "IngestionIdentityPolicy", "IngestionJob", "IngestionJobStatus", "InputBundle", diff --git a/src/kontextual_engine/adapters/builtin_extractors/__init__.py b/src/kontextual_engine/adapters/builtin_extractors/__init__.py index ec989f5..1ddf44b 100644 --- a/src/kontextual_engine/adapters/builtin_extractors/__init__.py +++ b/src/kontextual_engine/adapters/builtin_extractors/__init__.py @@ -1,5 +1,7 @@ """Built-in baseline format extractors.""" +from .datasets import CsvDatasetExtractor +from .documents import DocumentPlaceholderExtractor from .text import PlainTextExtractor -__all__ = ["PlainTextExtractor"] +__all__ = ["CsvDatasetExtractor", "DocumentPlaceholderExtractor", "PlainTextExtractor"] diff --git a/src/kontextual_engine/adapters/builtin_extractors/datasets.py b/src/kontextual_engine/adapters/builtin_extractors/datasets.py new file mode 100644 index 0000000..29210a1 --- /dev/null +++ b/src/kontextual_engine/adapters/builtin_extractors/datasets.py @@ -0,0 +1,79 @@ +"""Structured dataset baseline extractors.""" + +from __future__ import annotations + +import csv +import io +from typing import Any + +from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload + + +class CsvDatasetExtractor: + name = "csv-dataset" + media_types = ("text/csv", "application/csv", "text/tab-separated-values") + + def capabilities(self) -> ExtractorCapability: + return ExtractorCapability( + extractor_name=self.name, + media_types=self.media_types, + extraction_depth="structure", + produces_structure=True, + metadata={"formats": ["csv", "tsv"]}, + ) + + def supports(self, media_type: str) -> bool: + return media_type in self.media_types or media_type.startswith("text/csv") + + def extract(self, payload: SourcePayload) -> ExtractionResult: + text = payload.read_text("utf-8-sig") + delimiter = _delimiter_for(payload) + reader = csv.DictReader(io.StringIO(text), delimiter=delimiter) + columns = list(reader.fieldnames or []) + rows = [dict(row) for row in reader] + table = { + "name": payload.title, + "columns": columns, + "rows": rows, + "row_count": len(rows), + } + metadata: dict[str, Any] = { + "extractor": self.name, + "dataset_format": "tsv" if delimiter == "\t" else "csv", + "columns": columns, + "column_count": len(columns), + "row_count": len(rows), + "table_count": 1, + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + } + normalized = NormalizedDocument( + title=payload.title, + text=text, + structure={ + "kind": "dataset", + "format": metadata["dataset_format"], + "columns": columns, + "row_count": len(rows), + }, + tables=[table], + fields={ + "columns": columns, + "column_count": len(columns), + "row_count": len(rows), + "dataset_format": metadata["dataset_format"], + }, + confidence=0.95, + extractor_metadata={ + "extractor": self.name, + "source_media_type": payload.media_type, + }, + ) + return ExtractionResult(normalized=normalized, metadata=metadata) + + +def _delimiter_for(payload: SourcePayload) -> str: + filename = str(payload.metadata.get("filename", "")).lower() + if payload.media_type == "text/tab-separated-values" or filename.endswith(".tsv"): + return "\t" + return "," diff --git a/src/kontextual_engine/adapters/builtin_extractors/documents.py b/src/kontextual_engine/adapters/builtin_extractors/documents.py new file mode 100644 index 0000000..a260cb3 --- /dev/null +++ b/src/kontextual_engine/adapters/builtin_extractors/documents.py @@ -0,0 +1,89 @@ +"""Metadata-only document placeholder extractors.""" + +from __future__ import annotations + +from kontextual_engine.core import ( + ExtractionResult, + ExtractorCapability, + IngestionFailure, + NormalizedDocument, + SourcePayload, +) + + +class DocumentPlaceholderExtractor: + """Represent binary document formats until optional deep extractors exist.""" + + name = "document-placeholder" + media_types = ( + "application/pdf", + "application/msword", + "application/rtf", + "application/vnd.ms-excel", + "application/vnd.ms-powerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + + def capabilities(self) -> ExtractorCapability: + return ExtractorCapability( + extractor_name=self.name, + media_types=self.media_types, + extraction_depth="metadata_only", + produces_structure=False, + metadata={ + "placeholder": True, + "requires_optional_deep_extractor": True, + }, + ) + + def supports(self, media_type: str) -> bool: + return media_type in self.media_types + + def extract(self, payload: SourcePayload) -> ExtractionResult: + document_kind = "pdf" if payload.media_type == "application/pdf" else "office_document" + unsupported = { + "kind": document_kind, + "media_type": payload.media_type, + "reason": "deep_extraction_not_available", + } + diagnostic = IngestionFailure( + code="extraction.depth_unsupported", + message="Deep extraction for this document format requires an optional adapter", + retriable=False, + details={ + "extractor": self.name, + "media_type": payload.media_type, + "supported_depth": "metadata_only", + }, + ) + metadata = { + "extractor": self.name, + "document_kind": document_kind, + "extraction_depth": "metadata_only", + "unsupported_elements": [unsupported], + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + } + normalized = NormalizedDocument( + title=payload.title, + text="", + structure={ + "kind": document_kind, + "extraction_depth": "metadata_only", + }, + fields={ + "document_kind": document_kind, + "source_media_type": payload.media_type, + "source_size_bytes": payload.size_bytes, + }, + confidence=0.0, + unsupported_elements=[unsupported], + extractor_metadata={ + "extractor": self.name, + "source_media_type": payload.media_type, + "extraction_depth": "metadata_only", + }, + ) + return ExtractionResult(normalized=normalized, metadata=metadata, diagnostics=(diagnostic,)) diff --git a/src/kontextual_engine/adapters/local_files/connector.py b/src/kontextual_engine/adapters/local_files/connector.py index 8db24c6..9574537 100644 --- a/src/kontextual_engine/adapters/local_files/connector.py +++ b/src/kontextual_engine/adapters/local_files/connector.py @@ -65,6 +65,24 @@ def _guess_media_type(path: Path) -> str: return "text/markdown" if suffix in {".txt", ".text", ".log"}: return "text/plain" + if suffix == ".csv": + return "text/csv" + if suffix == ".tsv": + return "text/tab-separated-values" + if suffix == ".pdf": + return "application/pdf" + if suffix == ".doc": + return "application/msword" + if suffix == ".docx": + return "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + if suffix == ".xls": + return "application/vnd.ms-excel" + if suffix == ".xlsx": + return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + if suffix == ".ppt": + return "application/vnd.ms-powerpoint" + if suffix == ".pptx": + return "application/vnd.openxmlformats-officedocument.presentationml.presentation" guessed, _ = mimetypes.guess_type(path.name) return guessed or "application/octet-stream" diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index 3aec49d..c2aaaa9 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -9,6 +9,7 @@ from .ingestion import ( ExtractionResult, ExtractorCapability, IngestionFailure, + IngestionIdentityPolicy, IngestionJob, IngestionJobStatus, NormalizedDocument, @@ -58,6 +59,7 @@ __all__ = [ "IdempotencyRecord", "IdempotencyStatus", "IngestionFailure", + "IngestionIdentityPolicy", "IngestionJob", "IngestionJobStatus", "KnowledgeAsset", diff --git a/src/kontextual_engine/core/ingestion.py b/src/kontextual_engine/core/ingestion.py index 0899a55..8c27181 100644 --- a/src/kontextual_engine/core/ingestion.py +++ b/src/kontextual_engine/core/ingestion.py @@ -21,6 +21,11 @@ class IngestionJobStatus(str, Enum): CANCELED = "canceled" +class IngestionIdentityPolicy(str, Enum): + SOURCE_LOCATION = "source_location" + CONTENT_DIGEST = "content_digest" + + @dataclass(frozen=True) class IngestionFailure: code: str diff --git a/src/kontextual_engine/services/asset_service.py b/src/kontextual_engine/services/asset_service.py index 7c86436..d05cd53 100644 --- a/src/kontextual_engine/services/asset_service.py +++ b/src/kontextual_engine/services/asset_service.py @@ -345,6 +345,79 @@ class AssetRegistryService: def list_metadata_schema_assignments(self) -> list[MetadataSchemaAssignment]: return self.repository.list_metadata_schema_assignments() + def record_ingestion_update( + self, + asset_id: str, + source_ref: SourceReference, + representations: list[AssetRepresentation] | tuple[AssetRepresentation, ...], + metadata_records: list[MetadataRecord] | tuple[MetadataRecord, ...], + context: OperationContext, + *, + expected_current_version_id: str | None = None, + ) -> AssetChangeResult: + asset = self.repository.get_asset(asset_id) + self._assert_expected_current_version( + asset, + expected_current_version_id, + operation="asset.ingest.update", + ) + decision = self._authorize( + context, + "asset.ingest.update", + f"asset:{asset.id}", + resource_metadata={ + "source_system": source_ref.source_system, + "source_path": source_ref.path or "", + "checksum": source_ref.checksum or "", + "representation_count": str(len(representations)), + "metadata_record_count": str(len(metadata_records)), + }, + ) + self._validate_metadata_records( + asset.classification, + self.repository.list_metadata_records(asset.id) + list(metadata_records), + ) + updated = asset + if not _has_source_reference(updated, source_ref): + updated = updated.with_source_reference(source_ref) + alias = _source_alias(source_ref) + if alias: + updated = updated.with_alias(alias) + representation_ids: list[str] = [] + for representation in representations: + if representation.asset_id != asset.id: + representation = replace(representation, asset_id=asset.id) + self.repository.save_representation(representation) + representation_ids.append(representation.representation_id) + for record in metadata_records: + self.repository.save_metadata_record(asset.id, record) + version = AssetVersion( + asset_id=asset.id, + sequence=self._next_sequence(asset.id), + change_type=VersionChangeType.CONTENT_CHANGED, + representation_ids=tuple(representation_ids), + actor_id=context.actor.id, + parent_version_id=asset.current_version_id, + metadata_delta={record.key: record.value for record in metadata_records}, + lifecycle=updated.lifecycle.value, + ) + updated = updated.with_current_version(version.version_id) + self.repository.save_asset(updated) + self.repository.save_version(version) + event = self._audit( + "asset.ingest.update", + f"asset:{asset.id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "source_ref_id": source_ref.id, + "version_id": version.version_id, + "representation_ids": tuple(representation_ids), + }, + ) + return AssetChangeResult(updated, version, event, decision) + def add_representation( self, asset_id: str, @@ -881,3 +954,19 @@ def _remediation_for_error(error: KontextualError) -> str | None: if isinstance(error, AuthorizationError): return "Request policy approval or rerun with an actor that is authorized for this operation." return None + + +def _has_source_reference(asset: KnowledgeAsset, source_ref: SourceReference) -> bool: + return any( + existing.identity_key == source_ref.identity_key + or ( + existing.connector_ref is not None + and existing.connector_ref == source_ref.connector_ref + and existing.checksum == source_ref.checksum + ) + for existing in asset.source_refs + ) + + +def _source_alias(source_ref: SourceReference) -> str | None: + return source_ref.connector_ref or source_ref.path or source_ref.uri or source_ref.external_id diff --git a/src/kontextual_engine/services/ingestion_service.py b/src/kontextual_engine/services/ingestion_service.py index f8dec01..ad3d3b2 100644 --- a/src/kontextual_engine/services/ingestion_service.py +++ b/src/kontextual_engine/services/ingestion_service.py @@ -6,13 +6,18 @@ from dataclasses import dataclass from pathlib import Path from typing import Iterable -from kontextual_engine.adapters.builtin_extractors import PlainTextExtractor +from kontextual_engine.adapters.builtin_extractors import ( + CsvDatasetExtractor, + DocumentPlaceholderExtractor, + PlainTextExtractor, +) from kontextual_engine.adapters.local_files import LocalFileConnector from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor from kontextual_engine.core import ( AssetRepresentation, Classification, IngestionFailure, + IngestionIdentityPolicy, IngestionJob, IngestionJobStatus, KnowledgeAsset, @@ -34,6 +39,7 @@ class AssetIngestionResult: job: IngestionJob asset: KnowledgeAsset | None = None asset_change: AssetChangeResult | None = None + action: str = "failed" class AssetIngestionService: @@ -48,7 +54,15 @@ class AssetIngestionService: self.repository = repository self.asset_service = asset_service or AssetRegistryService(repository) self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])} - self.extractors = list(extractors or [PlainTextExtractor(), MarkitectMarkdownExtractor()]) + self.extractors = list( + extractors + or [ + PlainTextExtractor(), + CsvDatasetExtractor(), + DocumentPlaceholderExtractor(), + MarkitectMarkdownExtractor(), + ] + ) def connector_capabilities(self) -> list[dict]: return [connector.capabilities().to_dict() for connector in self.connectors.values()] @@ -65,11 +79,20 @@ class AssetIngestionService: title: str | None = None, classification: Classification | None = None, idempotency_key: str | None = None, + identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION, + skip_unchanged: bool = True, ) -> AssetIngestionResult: + identity_policy = IngestionIdentityPolicy(identity_policy) self.repository.save_actor(context.actor) connector = self._connector("local_file") job = IngestionJob.create( - input={"connector": connector.name, "source_uri": str(path), "mode": "file"}, + input={ + "connector": connector.name, + "source_uri": str(path), + "mode": "file", + "identity_policy": identity_policy.value, + "skip_unchanged": skip_unchanged, + }, actor_id=context.actor.id, correlation_id=context.correlation_id, ) @@ -84,6 +107,8 @@ class AssetIngestionService: title=title, classification=classification, idempotency_key=idempotency_key, + identity_policy=identity_policy, + skip_unchanged=skip_unchanged, ) except Exception as exc: failed = job.failed(_failure_from_exception(exc)) @@ -97,7 +122,10 @@ class AssetIngestionService: *, recursive: bool = True, classification: Classification | None = None, + identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION, + skip_unchanged: bool = True, ) -> IngestionJob: + identity_policy = IngestionIdentityPolicy(identity_policy) self.repository.save_actor(context.actor) connector = self._directory_connector("local_file") job = IngestionJob.create( @@ -106,6 +134,8 @@ class AssetIngestionService: "source_uri": str(path), "mode": "directory", "recursive": recursive, + "identity_policy": identity_policy.value, + "skip_unchanged": skip_unchanged, }, actor_id=context.actor.id, correlation_id=context.correlation_id, @@ -118,11 +148,18 @@ class AssetIngestionService: item_results: list[dict] = [] files = connector.iter_files(str(path), recursive=recursive) for source_uri in files: - result = self.ingest_file(source_uri, context, classification=classification) + result = self.ingest_file( + source_uri, + context, + classification=classification, + identity_policy=identity_policy, + skip_unchanged=skip_unchanged, + ) item = { "source_uri": source_uri, "job_id": result.job.job_id, - "status": result.job.status.value, + "status": result.action if result.action == "skipped" else result.job.status.value, + "action": result.action, } if result.asset is not None: output_asset_ids.append(result.asset.id) @@ -130,6 +167,9 @@ class AssetIngestionService: if result.job.failures: failures.extend(result.job.failures) item["failures"] = [failure.to_dict() for failure in result.job.failures] + item["retry_state"] = ( + "retriable" if any(failure.retriable for failure in result.job.failures) else "not_retriable" + ) item_results.append(item) partial_results = { @@ -137,7 +177,7 @@ class AssetIngestionService: "succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value), "failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value), "quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value), - "skipped": 0, + "skipped": sum(1 for item in item_results if item["status"] == "skipped"), "items": item_results, } if failures and output_asset_ids: @@ -177,12 +217,31 @@ class AssetIngestionService: title: str | None, classification: Classification | None, idempotency_key: str | None, + identity_policy: IngestionIdentityPolicy, + skip_unchanged: bool, ) -> AssetIngestionResult: job = job.running(source_ref=payload.source_ref) self.repository.save_ingestion_job(job) extractor = self._extractor(payload.media_type) extraction = extractor.extract(payload) - resolved_asset_id = asset_id or _stable_asset_id(payload) + resolved_asset_id = asset_id or _stable_asset_id(payload, identity_policy) + existing_asset = _get_asset_or_none(self.repository, resolved_asset_id) + if existing_asset and skip_unchanged and _asset_has_source_reference(existing_asset, payload.source_ref): + completed = job.completed( + output_asset_ids=(existing_asset.id,), + partial_results={ + "action": "skipped", + "reason": "unchanged_source", + "asset_id": existing_asset.id, + "identity_policy": identity_policy.value, + "connector": payload.connector_name, + "extractor": extractor.name, + "source_digest": payload.content_digest, + "diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics], + }, + ) + self.repository.save_ingestion_job(completed) + return AssetIngestionResult(completed, existing_asset, action="skipped") source_representation = AssetRepresentation.from_content( resolved_asset_id, RepresentationKind.SOURCE, @@ -211,19 +270,33 @@ class AssetIngestionService: **extraction.metadata, }, ) - asset_change = self.asset_service.create_asset( - title or payload.title, - classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), - context, - asset_id=resolved_asset_id, - source_refs=[payload.source_ref], - representations=[source_representation, normalized_representation], - metadata_records=_metadata_records(payload, extractor.name, extraction.metadata), - idempotency_key=idempotency_key, - ) + metadata_records = _metadata_records(payload, extractor.name, extraction.metadata) + if existing_asset: + asset_change = self.asset_service.record_ingestion_update( + resolved_asset_id, + payload.source_ref, + (source_representation, normalized_representation), + metadata_records, + context, + ) + action = "updated" + else: + asset_change = self.asset_service.create_asset( + title or payload.title, + classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + context, + asset_id=resolved_asset_id, + source_refs=[payload.source_ref], + representations=[source_representation, normalized_representation], + metadata_records=metadata_records, + idempotency_key=idempotency_key, + ) + action = "created" completed = job.completed( output_asset_ids=(asset_change.asset.id,), partial_results={ + "action": action, + "identity_policy": identity_policy.value, "connector": payload.connector_name, "extractor": extractor.name, "source_digest": payload.content_digest, @@ -235,7 +308,7 @@ class AssetIngestionService: }, ) self.repository.save_ingestion_job(completed) - return AssetIngestionResult(completed, asset_change.asset, asset_change) + return AssetIngestionResult(completed, asset_change.asset, asset_change, action=action) def _connector(self, name: str) -> SourceConnector: try: @@ -262,19 +335,46 @@ class AssetIngestionService: ) -def _stable_asset_id(payload: SourcePayload) -> str: - digest = mapping_digest( - { - "source_system": payload.source_ref.source_system, - "path": payload.source_ref.path, - "uri": payload.source_ref.uri, - "external_id": payload.source_ref.external_id, - "connector_ref": payload.source_ref.connector_ref, - } - ) +def _stable_asset_id(payload: SourcePayload, identity_policy: IngestionIdentityPolicy) -> str: + identity_data = { + "source_system": payload.source_ref.source_system, + } + if identity_policy == IngestionIdentityPolicy.CONTENT_DIGEST: + identity_data["checksum"] = payload.content_digest + else: + identity_data.update( + { + "path": payload.source_ref.path, + "uri": payload.source_ref.uri, + "external_id": payload.source_ref.external_id, + "connector_ref": payload.source_ref.connector_ref, + } + ) + digest = mapping_digest(identity_data) return f"asset-{digest.removeprefix('sha256:')[:20]}" +def _get_asset_or_none(repository: AssetRegistryRepository, asset_id: str) -> KnowledgeAsset | None: + try: + return repository.get_asset(asset_id) + except KontextualError as exc: + if exc.code == "kontextual.not_found": + return None + raise + + +def _asset_has_source_reference(asset: KnowledgeAsset, source_ref) -> bool: + return any( + existing.identity_key == source_ref.identity_key + or ( + existing.connector_ref is not None + and existing.connector_ref == source_ref.connector_ref + and existing.checksum == source_ref.checksum + ) + for existing in asset.source_refs + ) + + def _metadata_records( payload: SourcePayload, extractor_name: str, diff --git a/tests/test_asset_ingestion_service.py b/tests/test_asset_ingestion_service.py index f9bb3ef..1d1d9a4 100644 --- a/tests/test_asset_ingestion_service.py +++ b/tests/test_asset_ingestion_service.py @@ -1,10 +1,13 @@ from pathlib import Path +import pytest + from kontextual_engine import ( Actor, ActorType, AssetIngestionService, Classification, + IngestionIdentityPolicy, IngestionJobStatus, InMemoryAssetRegistryRepository, LifecycleState, @@ -76,6 +79,146 @@ def test_directory_ingestion_reports_partial_results(tmp_path: Path) -> None: assert len(job.failures) == 1 +def test_ingestion_content_digest_identity_preserves_asset_across_file_move(tmp_path: Path) -> None: + first_path = tmp_path / "original.txt" + moved_path = tmp_path / "renamed.txt" + first_path.write_text("same durable content\n", encoding="utf-8") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + context = operation_context() + + first = service.ingest_file( + first_path, + context, + identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST, + ) + first_path.rename(moved_path) + moved = service.ingest_file( + moved_path, + context, + identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST, + ) + repeated = service.ingest_file( + moved_path, + context, + identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST, + ) + + assert first.asset is not None + assert moved.asset is not None + assert repeated.asset is not None + assert first.action == "created" + assert moved.action == "updated" + assert repeated.action == "skipped" + assert moved.asset.id == first.asset.id + assert repeated.asset.id == first.asset.id + assert len(repo.list_assets()) == 1 + assert [source.path for source in repo.get_asset(first.asset.id).source_refs] == [ + str(first_path), + str(moved_path), + ] + assert repeated.job.partial_results["reason"] == "unchanged_source" + assert [version.sequence for version in repo.list_versions(first.asset.id)] == [1, 2] + assert [event.operation for event in repo.list_audit_events(target=f"asset:{first.asset.id}")] == [ + "asset.create", + "asset.ingest.update", + ] + + +def test_directory_ingestion_reports_skipped_and_retry_state(tmp_path: Path) -> None: + already_seen = tmp_path / "seen.txt" + unsupported = tmp_path / "unsupported.bin" + already_seen.write_text("skip me on the directory pass", encoding="utf-8") + unsupported.write_bytes(b"\x00\x01") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + context = operation_context() + + service.ingest_file(already_seen, context) + job = service.ingest_directory(tmp_path, context, recursive=False) + + items = {Path(item["source_uri"]).name: item for item in job.partial_results["items"]} + + assert job.status == IngestionJobStatus.PARTIALLY_COMPLETED + assert job.partial_results["succeeded"] == 0 + assert job.partial_results["skipped"] == 1 + assert job.partial_results["failed"] == 1 + assert items["seen.txt"]["status"] == "skipped" + assert items["seen.txt"]["action"] == "skipped" + assert items["unsupported.bin"]["status"] == IngestionJobStatus.FAILED.value + assert items["unsupported.bin"]["retry_state"] == "retriable" + assert items["unsupported.bin"]["failures"][0]["code"] == "kontextual.adapter_unavailable" + + +def test_asset_ingestion_service_ingests_csv_dataset_with_structured_table(tmp_path: Path) -> None: + source = tmp_path / "metrics.csv" + source.write_text("name,score\nalpha,0.82\nbeta,0.91\n", encoding="utf-8") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + + result = service.ingest_file( + source, + operation_context(), + asset_id="asset-metrics", + classification=Classification(asset_type="dataset", sensitivity=Sensitivity.INTERNAL), + ) + + normalized = repo.list_representations(asset_id="asset-metrics", kind=RepresentationKind.NORMALIZED)[0] + + assert result.job.status == IngestionJobStatus.COMPLETED + assert result.job.partial_results["extractor"] == "csv-dataset" + assert normalized.metadata["dataset_format"] == "csv" + assert normalized.metadata["columns"] == ["name", "score"] + assert normalized.metadata["row_count"] == 2 + assert normalized.metadata["table_count"] == 1 + assert [record.value for record in repo.list_metadata_records("asset-metrics") if record.key == "extractor"] == [ + "csv-dataset" + ] + + +@pytest.mark.parametrize( + ("filename", "content", "media_type", "document_kind"), + [ + ("source.pdf", b"%PDF-1.7\n", "application/pdf", "pdf"), + ( + "source.docx", + b"PK\x03\x04docx-placeholder", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "office_document", + ), + ], +) +def test_document_placeholder_formats_create_asset_with_unsupported_depth_diagnostic( + tmp_path: Path, + filename: str, + content: bytes, + media_type: str, + document_kind: str, +) -> None: + source = tmp_path / filename + source.write_bytes(content) + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo) + + result = service.ingest_file( + source, + operation_context(), + asset_id=f"asset-{source.stem}", + classification=Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), + ) + + normalized = repo.list_representations(asset_id=f"asset-{source.stem}", kind=RepresentationKind.NORMALIZED)[0] + + assert result.job.status == IngestionJobStatus.COMPLETED + assert result.asset is not None + assert result.job.partial_results["diagnostics"][0]["code"] == "extraction.depth_unsupported" + assert result.job.partial_results["diagnostics"][0]["details"]["media_type"] == media_type + assert normalized.producer == "document-placeholder" + assert normalized.metadata["document_kind"] == document_kind + assert normalized.metadata["extraction_depth"] == "metadata_only" + assert normalized.metadata["unsupported_elements"][0]["reason"] == "deep_extraction_not_available" + + def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None: source = tmp_path / "policy.txt" source.write_text("governed ingestion", encoding="utf-8") diff --git a/tests/test_markitect_ingestion_adapter.py b/tests/test_markitect_ingestion_adapter.py new file mode 100644 index 0000000..a54a89d --- /dev/null +++ b/tests/test_markitect_ingestion_adapter.py @@ -0,0 +1,97 @@ +import sys +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from kontextual_engine import SourcePayload, SourceReference, content_digest +from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor +from kontextual_engine.errors import AdapterUnavailableError + + +def test_markitect_markdown_extractor_missing_dependency_is_structured(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setitem(sys.modules, "markitect_tool", None) + extractor = MarkitectMarkdownExtractor() + payload = markdown_payload("# Missing Adapter\n") + + with pytest.raises(AdapterUnavailableError) as exc_info: + extractor.extract(payload) + + assert exc_info.value.details == { + "adapter": "markitect-tool", + "media_type": "text/markdown", + } + + +def test_markitect_markdown_extractor_delegates_to_markitect_tool( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source = tmp_path / "decision.md" + source.write_text("# Decision\n\nUse Markitect.\n", encoding="utf-8") + calls: list[tuple[str, str]] = [] + + def parse_markdown_file(path: Path) -> SimpleNamespace: + calls.append(("parse_markdown_file", str(path))) + return SimpleNamespace( + to_dict=lambda: { + "frontmatter": {"status": "accepted"}, + "headings": [{"level": 1, "text": "Decision", "line": 1}], + "sections": [ + { + "heading": {"level": 1, "text": "Decision", "line": 1}, + "blocks": [{"type": "paragraph", "text": "Use Markitect.", "line_start": 3}], + } + ], + } + ) + + def snapshot_identity_for_file(path: Path, *, parse_options: dict) -> SimpleNamespace: + calls.append(("snapshot_identity_for_file", f"{path}:{parse_options['profile']}")) + return SimpleNamespace( + to_dict=lambda: { + "snapshot_id": "snapshot:decision", + "content_hash": "sha256:decision", + "parser": "markdown-it-py/commonmark", + } + ) + + monkeypatch.setitem( + sys.modules, + "markitect_tool", + SimpleNamespace( + parse_markdown_file=parse_markdown_file, + parse_markdown=lambda text, source_path=None: None, + snapshot_identity_for_file=snapshot_identity_for_file, + ), + ) + + result = MarkitectMarkdownExtractor().extract(markdown_payload(source.read_text(encoding="utf-8"), source)) + + assert calls == [ + ("parse_markdown_file", str(source)), + ("snapshot_identity_for_file", f"{source}:default"), + ] + assert result.normalized.structure["frontmatter"] == {"status": "accepted"} + assert result.normalized.fields["heading_count"] == 1 + assert result.normalized.fields["section_count"] == 1 + assert result.metadata["snapshot"]["snapshot_id"] == "snapshot:decision" + assert result.normalized.extractor_metadata["snapshot"]["parser"] == "markdown-it-py/commonmark" + + +def markdown_payload(markdown: str, path: Path | None = None) -> SourcePayload: + data = markdown.encode("utf-8") + source_ref = SourceReference( + source_system="local_file", + path=str(path) if path else None, + checksum=content_digest(data), + connector_ref=f"local_file:{path}" if path else None, + ) + return SourcePayload( + connector_name="local_file", + source_uri=str(path) if path else "memory://markdown", + source_ref=source_ref, + media_type="text/markdown", + content=data, + title=path.stem if path else "Markdown", + ) diff --git a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md index 91fae84..2458ece 100644 --- a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md +++ b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md @@ -51,9 +51,14 @@ As of 2026-05-06, the first ingestion slice is recorded in `docs/ingestion-implementation.md`. It establishes ingestion job primitives, connector/extractor ports, local file ingestion, plain text normalization, Markitect markdown adapter boundaries, directory partial-result reporting, and -in-memory/SQLite job persistence. Remaining work is focused on async execution, -re-ingestion identity reconciliation, richer structural extraction, quarantine -policy checks, and non-text format adapters. +in-memory/SQLite job persistence. It now also includes explicit ingestion +identity policy, content-digest identity for governed file move/rename +reconciliation, unchanged-source skip behavior, and directory item retry/skipped +reporting. CSV/TSV datasets now produce structured normalized table output, and +PDF/office-like files can enter the governed asset set through metadata-only +placeholder extraction with explicit unsupported-depth diagnostics. Remaining +work is focused on async execution, richer structural extraction, quarantine +policy checks, and optional deep non-text extraction adapters. ## I6.1 - Implement ingestion job model status and retry surface @@ -97,7 +102,7 @@ Acceptance: ```task id: KONT-WP-0006-T003 -status: in_progress +status: done priority: high state_hub_task_id: "d3e3d4d2-a581-4438-bee7-6fc4161d3925" ``` @@ -111,11 +116,21 @@ Acceptance: - File path changes can be represented without changing stable asset identity when identity policy permits. +Implemented: + +- `IngestionIdentityPolicy.SOURCE_LOCATION` remains the conservative default. +- `IngestionIdentityPolicy.CONTENT_DIGEST` preserves asset identity across file + moves or renames when the caller opts into content identity. +- Existing assets receive a versioned `asset.ingest.update` record with new + source references and representations. +- Re-ingesting an unchanged source is reported as a skipped child item without + creating another asset version. + ## I6.4 - Implement text and markdown normalization via markitect-tool adapter ```task id: KONT-WP-0006-T004 -status: in_progress +status: done priority: high state_hub_task_id: "63bf2f7e-705d-40ae-a160-75fc508ffb1f" ``` @@ -131,11 +146,23 @@ Acceptance: - Parser, selector extraction, and snapshot identity behavior are covered by the Markitect integration contract tests. +Implemented: + +- Plain text normalization produces source-grounded normalized representations. +- Markdown normalization imports and calls `markitect-tool` only inside the + adapter boundary. +- Missing `markitect-tool` raises structured `AdapterUnavailableError` + diagnostics. +- Adapter unit tests verify delegation and missing-dependency behavior. +- Optional contract tests verify parser, selector extraction, operations, + snapshot identity, context packages, contracts, and schema behavior against + the local `markitect-tool` checkout when available. + ## I6.5 - Implement PDF office document and dataset baseline adapters ```task id: KONT-WP-0006-T005 -status: todo +status: done priority: high state_hub_task_id: "04d7c4b0-abfd-4b14-892f-91d1c1a820cd" ``` @@ -150,6 +177,15 @@ Acceptance: - Unsupported extraction depth is reported explicitly. - CSV or table-like datasets produce structured normalized output. +Implemented: + +- `CsvDatasetExtractor` supports CSV and TSV sources with structured columns, + row counts, table metadata, and normalized dataset fields. +- `DocumentPlaceholderExtractor` supports PDF and common office media types as + metadata-only assets with `extraction.depth_unsupported` diagnostics. +- Local file media-type detection is explicit for CSV, TSV, PDF, DOC/DOCX, + XLS/XLSX, and PPT/PPTX. + ## I6.6 - Extract structural elements into common normalized representation ```task