default source-location identity and opt-in content-digest identity for file move/rename reconciliation, PDF/DOCX-style placeholder ingestion

This commit is contained in:
2026-05-06 13:04:36 +02:00
parent 48dffedc09
commit a4a4759ac4
13 changed files with 724 additions and 39 deletions

View File

@@ -34,10 +34,20 @@ The new `AssetIngestionService` is separate from the older artifact-era
- Connector and extractor port contracts owned by the engine. - Connector and extractor port contracts owned by the engine.
- Local file connector with source references, checksums, media type detection, - Local file connector with source references, checksums, media type detection,
file metadata, and directory file iteration. file metadata, and directory file iteration.
- Explicit ingestion identity policy with conservative source-location identity
by default and opt-in content-digest identity for governed file move/rename
reconciliation.
- Plain text extractor producing a normalized engine representation. - Plain text extractor producing a normalized engine representation.
- CSV/TSV dataset extractor producing structured normalized table output with
columns, row counts, and table metadata.
- PDF and office document placeholder extractor that represents binary
documents as governed assets while reporting metadata-only extraction depth.
- Markitect markdown extractor adapter boundary that delegates markdown parsing, - Markitect markdown extractor adapter boundary that delegates markdown parsing,
headings, sections, frontmatter, and snapshot identity to `markitect-tool` headings, sections, frontmatter, and snapshot identity to `markitect-tool`
when available. when available.
- Missing `markitect-tool` dependency fails through structured
`AdapterUnavailableError` diagnostics instead of falling back to local
Markdown parsing.
- Synchronous first-run ingestion flow that creates governed assets through - Synchronous first-run ingestion flow that creates governed assets through
`AssetRegistryService`. `AssetRegistryService`.
- Source and normalized `AssetRepresentation` records for ingested files. - Source and normalized `AssetRepresentation` records for ingested files.
@@ -46,6 +56,12 @@ The new `AssetIngestionService` is separate from the older artifact-era
- Failed unsupported-media ingestion records job failure without adding an asset - Failed unsupported-media ingestion records job failure without adding an asset
to the trusted registry. to the trusted registry.
- Directory ingestion with per-file child jobs and partial result accounting. - Directory ingestion with per-file child jobs and partial result accounting.
- Directory item results distinguish succeeded, skipped, failed, quarantined,
and retriable failure state.
- Re-ingestion can update an existing asset with new source references and
source/normalized representations instead of creating a second asset.
- Unchanged source re-ingestion can be skipped without creating a new asset
version.
- In-memory and SQLite job persistence. - In-memory and SQLite job persistence.
## Current SQLite Additions ## Current SQLite Additions
@@ -65,11 +81,9 @@ document classes part of the engine domain model.
## Not Yet Implemented ## Not Yet Implemented
- Asynchronous job runner and queue dispatch. - Asynchronous job runner and queue dispatch.
- Re-ingestion reconciliation for existing assets.
- Identity policies that preserve asset identity across source moves.
- PDF, office document, and dataset extractors.
- Deep normalized structure for tables, links, embedded references, and fields - Deep normalized structure for tables, links, embedded references, and fields
beyond extractor-provided metadata. beyond extractor-provided metadata and the CSV/TSV baseline.
- Optional deep PDF and office document extraction adapters.
- Quarantine policy checks beyond unsupported/failed extraction paths. - Quarantine policy checks beyond unsupported/failed extraction paths.
## Test Coverage ## Test Coverage
@@ -81,4 +95,13 @@ document classes part of the engine domain model.
- job persistence and status inspection, - job persistence and status inspection,
- unsupported media failure without trusted asset creation, - unsupported media failure without trusted asset creation,
- directory partial success/failure accounting, - directory partial success/failure accounting,
- directory skipped item and retriable failure reporting,
- content-digest identity preserving asset identity across file moves,
- unchanged source re-ingestion skip behavior,
- Markitect markdown adapter delegation and missing-dependency behavior,
- CSV dataset structured normalization,
- PDF and office placeholder ingestion with explicit unsupported-depth
diagnostics,
- optional Markitect integration contract tests for parser, selector,
operation, snapshot, context package, contract, and schema behavior,
- SQLite reload preserving ingestion jobs and ingested asset state. - SQLite reload preserving ingestion jobs and ingested asset state.

View File

@@ -32,6 +32,7 @@ from .core import (
IdempotencyRecord, IdempotencyRecord,
IdempotencyStatus, IdempotencyStatus,
IngestionFailure, IngestionFailure,
IngestionIdentityPolicy,
IngestionJob, IngestionJob,
IngestionJobStatus, IngestionJobStatus,
KnowledgeAsset, KnowledgeAsset,
@@ -137,6 +138,7 @@ __all__ = [
"IngestionResult", "IngestionResult",
"IngestionService", "IngestionService",
"IngestionFailure", "IngestionFailure",
"IngestionIdentityPolicy",
"IngestionJob", "IngestionJob",
"IngestionJobStatus", "IngestionJobStatus",
"InputBundle", "InputBundle",

View File

@@ -1,5 +1,7 @@
"""Built-in baseline format extractors.""" """Built-in baseline format extractors."""
from .datasets import CsvDatasetExtractor
from .documents import DocumentPlaceholderExtractor
from .text import PlainTextExtractor from .text import PlainTextExtractor
__all__ = ["PlainTextExtractor"] __all__ = ["CsvDatasetExtractor", "DocumentPlaceholderExtractor", "PlainTextExtractor"]

View File

@@ -0,0 +1,79 @@
"""Structured dataset baseline extractors."""
from __future__ import annotations
import csv
import io
from typing import Any
from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload
class CsvDatasetExtractor:
name = "csv-dataset"
media_types = ("text/csv", "application/csv", "text/tab-separated-values")
def capabilities(self) -> ExtractorCapability:
return ExtractorCapability(
extractor_name=self.name,
media_types=self.media_types,
extraction_depth="structure",
produces_structure=True,
metadata={"formats": ["csv", "tsv"]},
)
def supports(self, media_type: str) -> bool:
return media_type in self.media_types or media_type.startswith("text/csv")
def extract(self, payload: SourcePayload) -> ExtractionResult:
text = payload.read_text("utf-8-sig")
delimiter = _delimiter_for(payload)
reader = csv.DictReader(io.StringIO(text), delimiter=delimiter)
columns = list(reader.fieldnames or [])
rows = [dict(row) for row in reader]
table = {
"name": payload.title,
"columns": columns,
"rows": rows,
"row_count": len(rows),
}
metadata: dict[str, Any] = {
"extractor": self.name,
"dataset_format": "tsv" if delimiter == "\t" else "csv",
"columns": columns,
"column_count": len(columns),
"row_count": len(rows),
"table_count": 1,
"source_digest": payload.content_digest,
"source_size_bytes": payload.size_bytes,
}
normalized = NormalizedDocument(
title=payload.title,
text=text,
structure={
"kind": "dataset",
"format": metadata["dataset_format"],
"columns": columns,
"row_count": len(rows),
},
tables=[table],
fields={
"columns": columns,
"column_count": len(columns),
"row_count": len(rows),
"dataset_format": metadata["dataset_format"],
},
confidence=0.95,
extractor_metadata={
"extractor": self.name,
"source_media_type": payload.media_type,
},
)
return ExtractionResult(normalized=normalized, metadata=metadata)
def _delimiter_for(payload: SourcePayload) -> str:
filename = str(payload.metadata.get("filename", "")).lower()
if payload.media_type == "text/tab-separated-values" or filename.endswith(".tsv"):
return "\t"
return ","

View File

@@ -0,0 +1,89 @@
"""Metadata-only document placeholder extractors."""
from __future__ import annotations
from kontextual_engine.core import (
ExtractionResult,
ExtractorCapability,
IngestionFailure,
NormalizedDocument,
SourcePayload,
)
class DocumentPlaceholderExtractor:
"""Represent binary document formats until optional deep extractors exist."""
name = "document-placeholder"
media_types = (
"application/pdf",
"application/msword",
"application/rtf",
"application/vnd.ms-excel",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
def capabilities(self) -> ExtractorCapability:
return ExtractorCapability(
extractor_name=self.name,
media_types=self.media_types,
extraction_depth="metadata_only",
produces_structure=False,
metadata={
"placeholder": True,
"requires_optional_deep_extractor": True,
},
)
def supports(self, media_type: str) -> bool:
return media_type in self.media_types
def extract(self, payload: SourcePayload) -> ExtractionResult:
document_kind = "pdf" if payload.media_type == "application/pdf" else "office_document"
unsupported = {
"kind": document_kind,
"media_type": payload.media_type,
"reason": "deep_extraction_not_available",
}
diagnostic = IngestionFailure(
code="extraction.depth_unsupported",
message="Deep extraction for this document format requires an optional adapter",
retriable=False,
details={
"extractor": self.name,
"media_type": payload.media_type,
"supported_depth": "metadata_only",
},
)
metadata = {
"extractor": self.name,
"document_kind": document_kind,
"extraction_depth": "metadata_only",
"unsupported_elements": [unsupported],
"source_digest": payload.content_digest,
"source_size_bytes": payload.size_bytes,
}
normalized = NormalizedDocument(
title=payload.title,
text="",
structure={
"kind": document_kind,
"extraction_depth": "metadata_only",
},
fields={
"document_kind": document_kind,
"source_media_type": payload.media_type,
"source_size_bytes": payload.size_bytes,
},
confidence=0.0,
unsupported_elements=[unsupported],
extractor_metadata={
"extractor": self.name,
"source_media_type": payload.media_type,
"extraction_depth": "metadata_only",
},
)
return ExtractionResult(normalized=normalized, metadata=metadata, diagnostics=(diagnostic,))

View File

@@ -65,6 +65,24 @@ def _guess_media_type(path: Path) -> str:
return "text/markdown" return "text/markdown"
if suffix in {".txt", ".text", ".log"}: if suffix in {".txt", ".text", ".log"}:
return "text/plain" return "text/plain"
if suffix == ".csv":
return "text/csv"
if suffix == ".tsv":
return "text/tab-separated-values"
if suffix == ".pdf":
return "application/pdf"
if suffix == ".doc":
return "application/msword"
if suffix == ".docx":
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
if suffix == ".xls":
return "application/vnd.ms-excel"
if suffix == ".xlsx":
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
if suffix == ".ppt":
return "application/vnd.ms-powerpoint"
if suffix == ".pptx":
return "application/vnd.openxmlformats-officedocument.presentationml.presentation"
guessed, _ = mimetypes.guess_type(path.name) guessed, _ = mimetypes.guess_type(path.name)
return guessed or "application/octet-stream" return guessed or "application/octet-stream"

View File

@@ -9,6 +9,7 @@ from .ingestion import (
ExtractionResult, ExtractionResult,
ExtractorCapability, ExtractorCapability,
IngestionFailure, IngestionFailure,
IngestionIdentityPolicy,
IngestionJob, IngestionJob,
IngestionJobStatus, IngestionJobStatus,
NormalizedDocument, NormalizedDocument,
@@ -58,6 +59,7 @@ __all__ = [
"IdempotencyRecord", "IdempotencyRecord",
"IdempotencyStatus", "IdempotencyStatus",
"IngestionFailure", "IngestionFailure",
"IngestionIdentityPolicy",
"IngestionJob", "IngestionJob",
"IngestionJobStatus", "IngestionJobStatus",
"KnowledgeAsset", "KnowledgeAsset",

View File

@@ -21,6 +21,11 @@ class IngestionJobStatus(str, Enum):
CANCELED = "canceled" CANCELED = "canceled"
class IngestionIdentityPolicy(str, Enum):
SOURCE_LOCATION = "source_location"
CONTENT_DIGEST = "content_digest"
@dataclass(frozen=True) @dataclass(frozen=True)
class IngestionFailure: class IngestionFailure:
code: str code: str

View File

@@ -345,6 +345,79 @@ class AssetRegistryService:
def list_metadata_schema_assignments(self) -> list[MetadataSchemaAssignment]: def list_metadata_schema_assignments(self) -> list[MetadataSchemaAssignment]:
return self.repository.list_metadata_schema_assignments() return self.repository.list_metadata_schema_assignments()
def record_ingestion_update(
self,
asset_id: str,
source_ref: SourceReference,
representations: list[AssetRepresentation] | tuple[AssetRepresentation, ...],
metadata_records: list[MetadataRecord] | tuple[MetadataRecord, ...],
context: OperationContext,
*,
expected_current_version_id: str | None = None,
) -> AssetChangeResult:
asset = self.repository.get_asset(asset_id)
self._assert_expected_current_version(
asset,
expected_current_version_id,
operation="asset.ingest.update",
)
decision = self._authorize(
context,
"asset.ingest.update",
f"asset:{asset.id}",
resource_metadata={
"source_system": source_ref.source_system,
"source_path": source_ref.path or "",
"checksum": source_ref.checksum or "",
"representation_count": str(len(representations)),
"metadata_record_count": str(len(metadata_records)),
},
)
self._validate_metadata_records(
asset.classification,
self.repository.list_metadata_records(asset.id) + list(metadata_records),
)
updated = asset
if not _has_source_reference(updated, source_ref):
updated = updated.with_source_reference(source_ref)
alias = _source_alias(source_ref)
if alias:
updated = updated.with_alias(alias)
representation_ids: list[str] = []
for representation in representations:
if representation.asset_id != asset.id:
representation = replace(representation, asset_id=asset.id)
self.repository.save_representation(representation)
representation_ids.append(representation.representation_id)
for record in metadata_records:
self.repository.save_metadata_record(asset.id, record)
version = AssetVersion(
asset_id=asset.id,
sequence=self._next_sequence(asset.id),
change_type=VersionChangeType.CONTENT_CHANGED,
representation_ids=tuple(representation_ids),
actor_id=context.actor.id,
parent_version_id=asset.current_version_id,
metadata_delta={record.key: record.value for record in metadata_records},
lifecycle=updated.lifecycle.value,
)
updated = updated.with_current_version(version.version_id)
self.repository.save_asset(updated)
self.repository.save_version(version)
event = self._audit(
"asset.ingest.update",
f"asset:{asset.id}",
AuditOutcome.SUCCESS,
context,
decision,
details={
"source_ref_id": source_ref.id,
"version_id": version.version_id,
"representation_ids": tuple(representation_ids),
},
)
return AssetChangeResult(updated, version, event, decision)
def add_representation( def add_representation(
self, self,
asset_id: str, asset_id: str,
@@ -881,3 +954,19 @@ def _remediation_for_error(error: KontextualError) -> str | None:
if isinstance(error, AuthorizationError): if isinstance(error, AuthorizationError):
return "Request policy approval or rerun with an actor that is authorized for this operation." return "Request policy approval or rerun with an actor that is authorized for this operation."
return None return None
def _has_source_reference(asset: KnowledgeAsset, source_ref: SourceReference) -> bool:
return any(
existing.identity_key == source_ref.identity_key
or (
existing.connector_ref is not None
and existing.connector_ref == source_ref.connector_ref
and existing.checksum == source_ref.checksum
)
for existing in asset.source_refs
)
def _source_alias(source_ref: SourceReference) -> str | None:
return source_ref.connector_ref or source_ref.path or source_ref.uri or source_ref.external_id

View File

@@ -6,13 +6,18 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Iterable from typing import Iterable
from kontextual_engine.adapters.builtin_extractors import PlainTextExtractor from kontextual_engine.adapters.builtin_extractors import (
CsvDatasetExtractor,
DocumentPlaceholderExtractor,
PlainTextExtractor,
)
from kontextual_engine.adapters.local_files import LocalFileConnector from kontextual_engine.adapters.local_files import LocalFileConnector
from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor
from kontextual_engine.core import ( from kontextual_engine.core import (
AssetRepresentation, AssetRepresentation,
Classification, Classification,
IngestionFailure, IngestionFailure,
IngestionIdentityPolicy,
IngestionJob, IngestionJob,
IngestionJobStatus, IngestionJobStatus,
KnowledgeAsset, KnowledgeAsset,
@@ -34,6 +39,7 @@ class AssetIngestionResult:
job: IngestionJob job: IngestionJob
asset: KnowledgeAsset | None = None asset: KnowledgeAsset | None = None
asset_change: AssetChangeResult | None = None asset_change: AssetChangeResult | None = None
action: str = "failed"
class AssetIngestionService: class AssetIngestionService:
@@ -48,7 +54,15 @@ class AssetIngestionService:
self.repository = repository self.repository = repository
self.asset_service = asset_service or AssetRegistryService(repository) self.asset_service = asset_service or AssetRegistryService(repository)
self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])} self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])}
self.extractors = list(extractors or [PlainTextExtractor(), MarkitectMarkdownExtractor()]) self.extractors = list(
extractors
or [
PlainTextExtractor(),
CsvDatasetExtractor(),
DocumentPlaceholderExtractor(),
MarkitectMarkdownExtractor(),
]
)
def connector_capabilities(self) -> list[dict]: def connector_capabilities(self) -> list[dict]:
return [connector.capabilities().to_dict() for connector in self.connectors.values()] return [connector.capabilities().to_dict() for connector in self.connectors.values()]
@@ -65,11 +79,20 @@ class AssetIngestionService:
title: str | None = None, title: str | None = None,
classification: Classification | None = None, classification: Classification | None = None,
idempotency_key: str | None = None, idempotency_key: str | None = None,
identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION,
skip_unchanged: bool = True,
) -> AssetIngestionResult: ) -> AssetIngestionResult:
identity_policy = IngestionIdentityPolicy(identity_policy)
self.repository.save_actor(context.actor) self.repository.save_actor(context.actor)
connector = self._connector("local_file") connector = self._connector("local_file")
job = IngestionJob.create( job = IngestionJob.create(
input={"connector": connector.name, "source_uri": str(path), "mode": "file"}, input={
"connector": connector.name,
"source_uri": str(path),
"mode": "file",
"identity_policy": identity_policy.value,
"skip_unchanged": skip_unchanged,
},
actor_id=context.actor.id, actor_id=context.actor.id,
correlation_id=context.correlation_id, correlation_id=context.correlation_id,
) )
@@ -84,6 +107,8 @@ class AssetIngestionService:
title=title, title=title,
classification=classification, classification=classification,
idempotency_key=idempotency_key, idempotency_key=idempotency_key,
identity_policy=identity_policy,
skip_unchanged=skip_unchanged,
) )
except Exception as exc: except Exception as exc:
failed = job.failed(_failure_from_exception(exc)) failed = job.failed(_failure_from_exception(exc))
@@ -97,7 +122,10 @@ class AssetIngestionService:
*, *,
recursive: bool = True, recursive: bool = True,
classification: Classification | None = None, classification: Classification | None = None,
identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION,
skip_unchanged: bool = True,
) -> IngestionJob: ) -> IngestionJob:
identity_policy = IngestionIdentityPolicy(identity_policy)
self.repository.save_actor(context.actor) self.repository.save_actor(context.actor)
connector = self._directory_connector("local_file") connector = self._directory_connector("local_file")
job = IngestionJob.create( job = IngestionJob.create(
@@ -106,6 +134,8 @@ class AssetIngestionService:
"source_uri": str(path), "source_uri": str(path),
"mode": "directory", "mode": "directory",
"recursive": recursive, "recursive": recursive,
"identity_policy": identity_policy.value,
"skip_unchanged": skip_unchanged,
}, },
actor_id=context.actor.id, actor_id=context.actor.id,
correlation_id=context.correlation_id, correlation_id=context.correlation_id,
@@ -118,11 +148,18 @@ class AssetIngestionService:
item_results: list[dict] = [] item_results: list[dict] = []
files = connector.iter_files(str(path), recursive=recursive) files = connector.iter_files(str(path), recursive=recursive)
for source_uri in files: for source_uri in files:
result = self.ingest_file(source_uri, context, classification=classification) result = self.ingest_file(
source_uri,
context,
classification=classification,
identity_policy=identity_policy,
skip_unchanged=skip_unchanged,
)
item = { item = {
"source_uri": source_uri, "source_uri": source_uri,
"job_id": result.job.job_id, "job_id": result.job.job_id,
"status": result.job.status.value, "status": result.action if result.action == "skipped" else result.job.status.value,
"action": result.action,
} }
if result.asset is not None: if result.asset is not None:
output_asset_ids.append(result.asset.id) output_asset_ids.append(result.asset.id)
@@ -130,6 +167,9 @@ class AssetIngestionService:
if result.job.failures: if result.job.failures:
failures.extend(result.job.failures) failures.extend(result.job.failures)
item["failures"] = [failure.to_dict() for failure in result.job.failures] item["failures"] = [failure.to_dict() for failure in result.job.failures]
item["retry_state"] = (
"retriable" if any(failure.retriable for failure in result.job.failures) else "not_retriable"
)
item_results.append(item) item_results.append(item)
partial_results = { partial_results = {
@@ -137,7 +177,7 @@ class AssetIngestionService:
"succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value), "succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value),
"failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value), "failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value),
"quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value), "quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value),
"skipped": 0, "skipped": sum(1 for item in item_results if item["status"] == "skipped"),
"items": item_results, "items": item_results,
} }
if failures and output_asset_ids: if failures and output_asset_ids:
@@ -177,12 +217,31 @@ class AssetIngestionService:
title: str | None, title: str | None,
classification: Classification | None, classification: Classification | None,
idempotency_key: str | None, idempotency_key: str | None,
identity_policy: IngestionIdentityPolicy,
skip_unchanged: bool,
) -> AssetIngestionResult: ) -> AssetIngestionResult:
job = job.running(source_ref=payload.source_ref) job = job.running(source_ref=payload.source_ref)
self.repository.save_ingestion_job(job) self.repository.save_ingestion_job(job)
extractor = self._extractor(payload.media_type) extractor = self._extractor(payload.media_type)
extraction = extractor.extract(payload) extraction = extractor.extract(payload)
resolved_asset_id = asset_id or _stable_asset_id(payload) resolved_asset_id = asset_id or _stable_asset_id(payload, identity_policy)
existing_asset = _get_asset_or_none(self.repository, resolved_asset_id)
if existing_asset and skip_unchanged and _asset_has_source_reference(existing_asset, payload.source_ref):
completed = job.completed(
output_asset_ids=(existing_asset.id,),
partial_results={
"action": "skipped",
"reason": "unchanged_source",
"asset_id": existing_asset.id,
"identity_policy": identity_policy.value,
"connector": payload.connector_name,
"extractor": extractor.name,
"source_digest": payload.content_digest,
"diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics],
},
)
self.repository.save_ingestion_job(completed)
return AssetIngestionResult(completed, existing_asset, action="skipped")
source_representation = AssetRepresentation.from_content( source_representation = AssetRepresentation.from_content(
resolved_asset_id, resolved_asset_id,
RepresentationKind.SOURCE, RepresentationKind.SOURCE,
@@ -211,19 +270,33 @@ class AssetIngestionService:
**extraction.metadata, **extraction.metadata,
}, },
) )
asset_change = self.asset_service.create_asset( metadata_records = _metadata_records(payload, extractor.name, extraction.metadata)
title or payload.title, if existing_asset:
classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL), asset_change = self.asset_service.record_ingestion_update(
context, resolved_asset_id,
asset_id=resolved_asset_id, payload.source_ref,
source_refs=[payload.source_ref], (source_representation, normalized_representation),
representations=[source_representation, normalized_representation], metadata_records,
metadata_records=_metadata_records(payload, extractor.name, extraction.metadata), context,
idempotency_key=idempotency_key, )
) action = "updated"
else:
asset_change = self.asset_service.create_asset(
title or payload.title,
classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id=resolved_asset_id,
source_refs=[payload.source_ref],
representations=[source_representation, normalized_representation],
metadata_records=metadata_records,
idempotency_key=idempotency_key,
)
action = "created"
completed = job.completed( completed = job.completed(
output_asset_ids=(asset_change.asset.id,), output_asset_ids=(asset_change.asset.id,),
partial_results={ partial_results={
"action": action,
"identity_policy": identity_policy.value,
"connector": payload.connector_name, "connector": payload.connector_name,
"extractor": extractor.name, "extractor": extractor.name,
"source_digest": payload.content_digest, "source_digest": payload.content_digest,
@@ -235,7 +308,7 @@ class AssetIngestionService:
}, },
) )
self.repository.save_ingestion_job(completed) self.repository.save_ingestion_job(completed)
return AssetIngestionResult(completed, asset_change.asset, asset_change) return AssetIngestionResult(completed, asset_change.asset, asset_change, action=action)
def _connector(self, name: str) -> SourceConnector: def _connector(self, name: str) -> SourceConnector:
try: try:
@@ -262,19 +335,46 @@ class AssetIngestionService:
) )
def _stable_asset_id(payload: SourcePayload) -> str: def _stable_asset_id(payload: SourcePayload, identity_policy: IngestionIdentityPolicy) -> str:
digest = mapping_digest( identity_data = {
{ "source_system": payload.source_ref.source_system,
"source_system": payload.source_ref.source_system, }
"path": payload.source_ref.path, if identity_policy == IngestionIdentityPolicy.CONTENT_DIGEST:
"uri": payload.source_ref.uri, identity_data["checksum"] = payload.content_digest
"external_id": payload.source_ref.external_id, else:
"connector_ref": payload.source_ref.connector_ref, identity_data.update(
} {
) "path": payload.source_ref.path,
"uri": payload.source_ref.uri,
"external_id": payload.source_ref.external_id,
"connector_ref": payload.source_ref.connector_ref,
}
)
digest = mapping_digest(identity_data)
return f"asset-{digest.removeprefix('sha256:')[:20]}" return f"asset-{digest.removeprefix('sha256:')[:20]}"
def _get_asset_or_none(repository: AssetRegistryRepository, asset_id: str) -> KnowledgeAsset | None:
try:
return repository.get_asset(asset_id)
except KontextualError as exc:
if exc.code == "kontextual.not_found":
return None
raise
def _asset_has_source_reference(asset: KnowledgeAsset, source_ref) -> bool:
return any(
existing.identity_key == source_ref.identity_key
or (
existing.connector_ref is not None
and existing.connector_ref == source_ref.connector_ref
and existing.checksum == source_ref.checksum
)
for existing in asset.source_refs
)
def _metadata_records( def _metadata_records(
payload: SourcePayload, payload: SourcePayload,
extractor_name: str, extractor_name: str,

View File

@@ -1,10 +1,13 @@
from pathlib import Path from pathlib import Path
import pytest
from kontextual_engine import ( from kontextual_engine import (
Actor, Actor,
ActorType, ActorType,
AssetIngestionService, AssetIngestionService,
Classification, Classification,
IngestionIdentityPolicy,
IngestionJobStatus, IngestionJobStatus,
InMemoryAssetRegistryRepository, InMemoryAssetRegistryRepository,
LifecycleState, LifecycleState,
@@ -76,6 +79,146 @@ def test_directory_ingestion_reports_partial_results(tmp_path: Path) -> None:
assert len(job.failures) == 1 assert len(job.failures) == 1
def test_ingestion_content_digest_identity_preserves_asset_across_file_move(tmp_path: Path) -> None:
first_path = tmp_path / "original.txt"
moved_path = tmp_path / "renamed.txt"
first_path.write_text("same durable content\n", encoding="utf-8")
repo = InMemoryAssetRegistryRepository()
service = AssetIngestionService(repo)
context = operation_context()
first = service.ingest_file(
first_path,
context,
identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST,
)
first_path.rename(moved_path)
moved = service.ingest_file(
moved_path,
context,
identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST,
)
repeated = service.ingest_file(
moved_path,
context,
identity_policy=IngestionIdentityPolicy.CONTENT_DIGEST,
)
assert first.asset is not None
assert moved.asset is not None
assert repeated.asset is not None
assert first.action == "created"
assert moved.action == "updated"
assert repeated.action == "skipped"
assert moved.asset.id == first.asset.id
assert repeated.asset.id == first.asset.id
assert len(repo.list_assets()) == 1
assert [source.path for source in repo.get_asset(first.asset.id).source_refs] == [
str(first_path),
str(moved_path),
]
assert repeated.job.partial_results["reason"] == "unchanged_source"
assert [version.sequence for version in repo.list_versions(first.asset.id)] == [1, 2]
assert [event.operation for event in repo.list_audit_events(target=f"asset:{first.asset.id}")] == [
"asset.create",
"asset.ingest.update",
]
def test_directory_ingestion_reports_skipped_and_retry_state(tmp_path: Path) -> None:
already_seen = tmp_path / "seen.txt"
unsupported = tmp_path / "unsupported.bin"
already_seen.write_text("skip me on the directory pass", encoding="utf-8")
unsupported.write_bytes(b"\x00\x01")
repo = InMemoryAssetRegistryRepository()
service = AssetIngestionService(repo)
context = operation_context()
service.ingest_file(already_seen, context)
job = service.ingest_directory(tmp_path, context, recursive=False)
items = {Path(item["source_uri"]).name: item for item in job.partial_results["items"]}
assert job.status == IngestionJobStatus.PARTIALLY_COMPLETED
assert job.partial_results["succeeded"] == 0
assert job.partial_results["skipped"] == 1
assert job.partial_results["failed"] == 1
assert items["seen.txt"]["status"] == "skipped"
assert items["seen.txt"]["action"] == "skipped"
assert items["unsupported.bin"]["status"] == IngestionJobStatus.FAILED.value
assert items["unsupported.bin"]["retry_state"] == "retriable"
assert items["unsupported.bin"]["failures"][0]["code"] == "kontextual.adapter_unavailable"
def test_asset_ingestion_service_ingests_csv_dataset_with_structured_table(tmp_path: Path) -> None:
source = tmp_path / "metrics.csv"
source.write_text("name,score\nalpha,0.82\nbeta,0.91\n", encoding="utf-8")
repo = InMemoryAssetRegistryRepository()
service = AssetIngestionService(repo)
result = service.ingest_file(
source,
operation_context(),
asset_id="asset-metrics",
classification=Classification(asset_type="dataset", sensitivity=Sensitivity.INTERNAL),
)
normalized = repo.list_representations(asset_id="asset-metrics", kind=RepresentationKind.NORMALIZED)[0]
assert result.job.status == IngestionJobStatus.COMPLETED
assert result.job.partial_results["extractor"] == "csv-dataset"
assert normalized.metadata["dataset_format"] == "csv"
assert normalized.metadata["columns"] == ["name", "score"]
assert normalized.metadata["row_count"] == 2
assert normalized.metadata["table_count"] == 1
assert [record.value for record in repo.list_metadata_records("asset-metrics") if record.key == "extractor"] == [
"csv-dataset"
]
@pytest.mark.parametrize(
("filename", "content", "media_type", "document_kind"),
[
("source.pdf", b"%PDF-1.7\n", "application/pdf", "pdf"),
(
"source.docx",
b"PK\x03\x04docx-placeholder",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"office_document",
),
],
)
def test_document_placeholder_formats_create_asset_with_unsupported_depth_diagnostic(
tmp_path: Path,
filename: str,
content: bytes,
media_type: str,
document_kind: str,
) -> None:
source = tmp_path / filename
source.write_bytes(content)
repo = InMemoryAssetRegistryRepository()
service = AssetIngestionService(repo)
result = service.ingest_file(
source,
operation_context(),
asset_id=f"asset-{source.stem}",
classification=Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
)
normalized = repo.list_representations(asset_id=f"asset-{source.stem}", kind=RepresentationKind.NORMALIZED)[0]
assert result.job.status == IngestionJobStatus.COMPLETED
assert result.asset is not None
assert result.job.partial_results["diagnostics"][0]["code"] == "extraction.depth_unsupported"
assert result.job.partial_results["diagnostics"][0]["details"]["media_type"] == media_type
assert normalized.producer == "document-placeholder"
assert normalized.metadata["document_kind"] == document_kind
assert normalized.metadata["extraction_depth"] == "metadata_only"
assert normalized.metadata["unsupported_elements"][0]["reason"] == "deep_extraction_not_available"
def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None: def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None:
source = tmp_path / "policy.txt" source = tmp_path / "policy.txt"
source.write_text("governed ingestion", encoding="utf-8") source.write_text("governed ingestion", encoding="utf-8")

View File

@@ -0,0 +1,97 @@
import sys
from pathlib import Path
from types import SimpleNamespace
import pytest
from kontextual_engine import SourcePayload, SourceReference, content_digest
from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor
from kontextual_engine.errors import AdapterUnavailableError
def test_markitect_markdown_extractor_missing_dependency_is_structured(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setitem(sys.modules, "markitect_tool", None)
extractor = MarkitectMarkdownExtractor()
payload = markdown_payload("# Missing Adapter\n")
with pytest.raises(AdapterUnavailableError) as exc_info:
extractor.extract(payload)
assert exc_info.value.details == {
"adapter": "markitect-tool",
"media_type": "text/markdown",
}
def test_markitect_markdown_extractor_delegates_to_markitect_tool(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
source = tmp_path / "decision.md"
source.write_text("# Decision\n\nUse Markitect.\n", encoding="utf-8")
calls: list[tuple[str, str]] = []
def parse_markdown_file(path: Path) -> SimpleNamespace:
calls.append(("parse_markdown_file", str(path)))
return SimpleNamespace(
to_dict=lambda: {
"frontmatter": {"status": "accepted"},
"headings": [{"level": 1, "text": "Decision", "line": 1}],
"sections": [
{
"heading": {"level": 1, "text": "Decision", "line": 1},
"blocks": [{"type": "paragraph", "text": "Use Markitect.", "line_start": 3}],
}
],
}
)
def snapshot_identity_for_file(path: Path, *, parse_options: dict) -> SimpleNamespace:
calls.append(("snapshot_identity_for_file", f"{path}:{parse_options['profile']}"))
return SimpleNamespace(
to_dict=lambda: {
"snapshot_id": "snapshot:decision",
"content_hash": "sha256:decision",
"parser": "markdown-it-py/commonmark",
}
)
monkeypatch.setitem(
sys.modules,
"markitect_tool",
SimpleNamespace(
parse_markdown_file=parse_markdown_file,
parse_markdown=lambda text, source_path=None: None,
snapshot_identity_for_file=snapshot_identity_for_file,
),
)
result = MarkitectMarkdownExtractor().extract(markdown_payload(source.read_text(encoding="utf-8"), source))
assert calls == [
("parse_markdown_file", str(source)),
("snapshot_identity_for_file", f"{source}:default"),
]
assert result.normalized.structure["frontmatter"] == {"status": "accepted"}
assert result.normalized.fields["heading_count"] == 1
assert result.normalized.fields["section_count"] == 1
assert result.metadata["snapshot"]["snapshot_id"] == "snapshot:decision"
assert result.normalized.extractor_metadata["snapshot"]["parser"] == "markdown-it-py/commonmark"
def markdown_payload(markdown: str, path: Path | None = None) -> SourcePayload:
data = markdown.encode("utf-8")
source_ref = SourceReference(
source_system="local_file",
path=str(path) if path else None,
checksum=content_digest(data),
connector_ref=f"local_file:{path}" if path else None,
)
return SourcePayload(
connector_name="local_file",
source_uri=str(path) if path else "memory://markdown",
source_ref=source_ref,
media_type="text/markdown",
content=data,
title=path.stem if path else "Markdown",
)

View File

@@ -51,9 +51,14 @@ As of 2026-05-06, the first ingestion slice is recorded in
`docs/ingestion-implementation.md`. It establishes ingestion job primitives, `docs/ingestion-implementation.md`. It establishes ingestion job primitives,
connector/extractor ports, local file ingestion, plain text normalization, connector/extractor ports, local file ingestion, plain text normalization,
Markitect markdown adapter boundaries, directory partial-result reporting, and Markitect markdown adapter boundaries, directory partial-result reporting, and
in-memory/SQLite job persistence. Remaining work is focused on async execution, in-memory/SQLite job persistence. It now also includes explicit ingestion
re-ingestion identity reconciliation, richer structural extraction, quarantine identity policy, content-digest identity for governed file move/rename
policy checks, and non-text format adapters. reconciliation, unchanged-source skip behavior, and directory item retry/skipped
reporting. CSV/TSV datasets now produce structured normalized table output, and
PDF/office-like files can enter the governed asset set through metadata-only
placeholder extraction with explicit unsupported-depth diagnostics. Remaining
work is focused on async execution, richer structural extraction, quarantine
policy checks, and optional deep non-text extraction adapters.
## I6.1 - Implement ingestion job model status and retry surface ## I6.1 - Implement ingestion job model status and retry surface
@@ -97,7 +102,7 @@ Acceptance:
```task ```task
id: KONT-WP-0006-T003 id: KONT-WP-0006-T003
status: in_progress status: done
priority: high priority: high
state_hub_task_id: "d3e3d4d2-a581-4438-bee7-6fc4161d3925" state_hub_task_id: "d3e3d4d2-a581-4438-bee7-6fc4161d3925"
``` ```
@@ -111,11 +116,21 @@ Acceptance:
- File path changes can be represented without changing stable asset identity - File path changes can be represented without changing stable asset identity
when identity policy permits. when identity policy permits.
Implemented:
- `IngestionIdentityPolicy.SOURCE_LOCATION` remains the conservative default.
- `IngestionIdentityPolicy.CONTENT_DIGEST` preserves asset identity across file
moves or renames when the caller opts into content identity.
- Existing assets receive a versioned `asset.ingest.update` record with new
source references and representations.
- Re-ingesting an unchanged source is reported as a skipped child item without
creating another asset version.
## I6.4 - Implement text and markdown normalization via markitect-tool adapter ## I6.4 - Implement text and markdown normalization via markitect-tool adapter
```task ```task
id: KONT-WP-0006-T004 id: KONT-WP-0006-T004
status: in_progress status: done
priority: high priority: high
state_hub_task_id: "63bf2f7e-705d-40ae-a160-75fc508ffb1f" state_hub_task_id: "63bf2f7e-705d-40ae-a160-75fc508ffb1f"
``` ```
@@ -131,11 +146,23 @@ Acceptance:
- Parser, selector extraction, and snapshot identity behavior are covered by - Parser, selector extraction, and snapshot identity behavior are covered by
the Markitect integration contract tests. the Markitect integration contract tests.
Implemented:
- Plain text normalization produces source-grounded normalized representations.
- Markdown normalization imports and calls `markitect-tool` only inside the
adapter boundary.
- Missing `markitect-tool` raises structured `AdapterUnavailableError`
diagnostics.
- Adapter unit tests verify delegation and missing-dependency behavior.
- Optional contract tests verify parser, selector extraction, operations,
snapshot identity, context packages, contracts, and schema behavior against
the local `markitect-tool` checkout when available.
## I6.5 - Implement PDF office document and dataset baseline adapters ## I6.5 - Implement PDF office document and dataset baseline adapters
```task ```task
id: KONT-WP-0006-T005 id: KONT-WP-0006-T005
status: todo status: done
priority: high priority: high
state_hub_task_id: "04d7c4b0-abfd-4b14-892f-91d1c1a820cd" state_hub_task_id: "04d7c4b0-abfd-4b14-892f-91d1c1a820cd"
``` ```
@@ -150,6 +177,15 @@ Acceptance:
- Unsupported extraction depth is reported explicitly. - Unsupported extraction depth is reported explicitly.
- CSV or table-like datasets produce structured normalized output. - CSV or table-like datasets produce structured normalized output.
Implemented:
- `CsvDatasetExtractor` supports CSV and TSV sources with structured columns,
row counts, table metadata, and normalized dataset fields.
- `DocumentPlaceholderExtractor` supports PDF and common office media types as
metadata-only assets with `extraction.depth_unsupported` diagnostics.
- Local file media-type detection is explicit for CSV, TSV, PDF, DOC/DOCX,
XLS/XLSX, and PPT/PPTX.
## I6.6 - Extract structural elements into common normalized representation ## I6.6 - Extract structural elements into common normalized representation
```task ```task