Files
kontextual-engine/src/kontextual_engine/services/ingestion_service.py

555 lines
22 KiB
Python

"""Application service for governed asset ingestion."""
from __future__ import annotations
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Iterable
from kontextual_engine.adapters.builtin_extractors import (
CsvDatasetExtractor,
DocumentPlaceholderExtractor,
PlainTextExtractor,
)
from kontextual_engine.adapters.local_files import LocalFileConnector
from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor
from kontextual_engine.core import (
AssetRepresentation,
Classification,
ExtractionResult,
IngestionFailure,
IngestionIdentityPolicy,
IngestionJob,
IngestionJobStatus,
KnowledgeAsset,
MetadataRecord,
OperationContext,
RepresentationKind,
Sensitivity,
SourcePayload,
mapping_digest,
)
from kontextual_engine.errors import AdapterUnavailableError, KontextualError
from kontextual_engine.ports import AssetRegistryRepository, DirectorySourceConnector, FormatExtractor, SourceConnector
from .asset_service import AssetChangeResult, AssetRegistryService
@dataclass(frozen=True)
class AssetIngestionResult:
job: IngestionJob
asset: KnowledgeAsset | None = None
asset_change: AssetChangeResult | None = None
action: str = "failed"
class AssetIngestionService:
def __init__(
self,
repository: AssetRegistryRepository,
*,
asset_service: AssetRegistryService | None = None,
connectors: Iterable[SourceConnector] | None = None,
extractors: Iterable[FormatExtractor] | None = None,
) -> None:
self.repository = repository
self.asset_service = asset_service or AssetRegistryService(repository)
self.connectors = {connector.name: connector for connector in (connectors or [LocalFileConnector()])}
self.extractors = list(
extractors
or [
PlainTextExtractor(),
CsvDatasetExtractor(),
DocumentPlaceholderExtractor(),
MarkitectMarkdownExtractor(),
]
)
def connector_capabilities(self) -> list[dict]:
return [connector.capabilities().to_dict() for connector in self.connectors.values()]
def extractor_capabilities(self) -> list[dict]:
return [extractor.capabilities().to_dict() for extractor in self.extractors]
def ingest_file(
self,
path: str | Path,
context: OperationContext,
*,
asset_id: str | None = None,
title: str | None = None,
classification: Classification | None = None,
idempotency_key: str | None = None,
identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION,
skip_unchanged: bool = True,
) -> AssetIngestionResult:
identity_policy = IngestionIdentityPolicy(identity_policy)
self.repository.save_actor(context.actor)
connector = self._connector("local_file")
job = IngestionJob.create(
input={
"connector": connector.name,
"source_uri": str(path),
"mode": "file",
"identity_policy": identity_policy.value,
"skip_unchanged": skip_unchanged,
},
actor_id=context.actor.id,
correlation_id=context.correlation_id,
)
self.repository.save_ingestion_job(job)
try:
payload = connector.fetch(str(path))
return self._ingest_payload(
job,
payload,
context,
asset_id=asset_id,
title=title,
classification=classification,
idempotency_key=idempotency_key,
identity_policy=identity_policy,
skip_unchanged=skip_unchanged,
)
except Exception as exc:
failed = job.failed(_failure_from_exception(exc))
self.repository.save_ingestion_job(failed)
return AssetIngestionResult(failed)
def ingest_directory(
self,
path: str | Path,
context: OperationContext,
*,
recursive: bool = True,
classification: Classification | None = None,
identity_policy: IngestionIdentityPolicy | str = IngestionIdentityPolicy.SOURCE_LOCATION,
skip_unchanged: bool = True,
) -> IngestionJob:
identity_policy = IngestionIdentityPolicy(identity_policy)
self.repository.save_actor(context.actor)
connector = self._directory_connector("local_file")
job = IngestionJob.create(
input={
"connector": connector.name,
"source_uri": str(path),
"mode": "directory",
"recursive": recursive,
"identity_policy": identity_policy.value,
"skip_unchanged": skip_unchanged,
},
actor_id=context.actor.id,
correlation_id=context.correlation_id,
)
job = job.running()
self.repository.save_ingestion_job(job)
output_asset_ids: list[str] = []
failures: list[IngestionFailure] = []
item_results: list[dict] = []
files = connector.iter_files(str(path), recursive=recursive)
for source_uri in files:
result = self.ingest_file(
source_uri,
context,
classification=classification,
identity_policy=identity_policy,
skip_unchanged=skip_unchanged,
)
item = {
"source_uri": source_uri,
"job_id": result.job.job_id,
"status": result.action if result.action == "skipped" else result.job.status.value,
"action": result.action,
}
if result.asset is not None:
output_asset_ids.append(result.asset.id)
item["asset_id"] = result.asset.id
if result.job.failures:
failures.extend(result.job.failures)
item["failures"] = [failure.to_dict() for failure in result.job.failures]
item["retry_state"] = (
"retriable" if any(failure.retriable for failure in result.job.failures) else "not_retriable"
)
item_results.append(item)
partial_results = {
"files_total": len(files),
"succeeded": sum(1 for item in item_results if item["status"] == IngestionJobStatus.COMPLETED.value),
"failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value),
"quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value),
"skipped": sum(1 for item in item_results if item["status"] == "skipped"),
"retriable": sum(1 for item in item_results if item.get("retry_state") == "retriable"),
"items": item_results,
}
failed_count = partial_results["failed"]
quarantined_count = partial_results["quarantined"]
if failures and output_asset_ids:
job = job.partially_completed(
output_asset_ids=tuple(output_asset_ids),
failures=tuple(failures),
partial_results=partial_results,
)
elif quarantined_count and not failed_count:
job = job.failed(
IngestionFailure(
code="ingestion.directory_quarantined",
message="Directory ingestion quarantined all non-skipped files",
retriable=False,
details=partial_results,
),
status=IngestionJobStatus.QUARANTINED,
partial_results=partial_results,
)
elif failures:
job = job.failed(
IngestionFailure(
code="ingestion.directory_failed",
message="Directory ingestion failed for all files",
retriable=True,
details=partial_results,
),
partial_results=partial_results,
)
else:
job = job.completed(output_asset_ids=tuple(output_asset_ids), partial_results=partial_results)
self.repository.save_ingestion_job(job)
return job
def get_job(self, job_id: str) -> IngestionJob:
return self.repository.get_ingestion_job(job_id)
def list_jobs(self, *, status: IngestionJobStatus | None = None) -> list[IngestionJob]:
return self.repository.list_ingestion_jobs(status=status)
def _ingest_payload(
self,
job: IngestionJob,
payload: SourcePayload,
context: OperationContext,
*,
asset_id: str | None,
title: str | None,
classification: Classification | None,
idempotency_key: str | None,
identity_policy: IngestionIdentityPolicy,
skip_unchanged: bool,
) -> AssetIngestionResult:
job = job.running(source_ref=payload.source_ref)
self.repository.save_ingestion_job(job)
extractor = self._extractor(payload.media_type)
extraction = extractor.extract(payload)
validation_failures = _validate_extraction(payload, extraction)
if validation_failures:
quarantined = _quarantined_job(
job,
validation_failures,
partial_results={
"action": "quarantined",
"connector": payload.connector_name,
"extractor": extraction.metadata.get("extractor"),
"source_digest": payload.content_digest,
"diagnostics": [failure.to_dict() for failure in validation_failures],
},
)
self.repository.save_ingestion_job(quarantined)
return AssetIngestionResult(quarantined, action="quarantined")
resolved_asset_id = asset_id or _stable_asset_id(payload, identity_policy)
existing_asset = _get_asset_or_none(self.repository, resolved_asset_id)
if existing_asset and skip_unchanged and _asset_has_source_reference(existing_asset, payload.source_ref):
completed = job.completed(
output_asset_ids=(existing_asset.id,),
partial_results={
"action": "skipped",
"reason": "unchanged_source",
"asset_id": existing_asset.id,
"identity_policy": identity_policy.value,
"connector": payload.connector_name,
"extractor": extractor.name,
"source_digest": payload.content_digest,
"diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics],
},
)
self.repository.save_ingestion_job(completed)
return AssetIngestionResult(completed, existing_asset, action="skipped")
source_representation = AssetRepresentation.from_content(
resolved_asset_id,
RepresentationKind.SOURCE,
payload.media_type,
payload.content,
storage_ref=payload.source_uri,
producer=payload.connector_name,
source_ref_id=payload.source_ref.id,
metadata={
"connector": payload.connector_name,
"source_digest": payload.content_digest,
"source_size_bytes": payload.size_bytes,
"permission_context": dict(payload.permission_context),
**payload.metadata,
},
)
normalized_representation = AssetRepresentation.from_content(
resolved_asset_id,
RepresentationKind.NORMALIZED,
extraction.normalized.media_type,
extraction.normalized.to_json(),
producer=extractor.name,
source_ref_id=payload.source_ref.id,
metadata={
"extractor": extractor.name,
"normalized_hash": extraction.normalized.normalized_hash,
"search_text": extraction.normalized.text,
"search_text_length": len(extraction.normalized.text),
"permission_context": dict(payload.permission_context),
**extraction.metadata,
},
)
metadata_records = _metadata_records(payload, extractor.name, extraction.metadata)
if existing_asset:
asset_change = self.asset_service.record_ingestion_update(
resolved_asset_id,
payload.source_ref,
(source_representation, normalized_representation),
metadata_records,
context,
)
action = "updated"
else:
asset_change = self.asset_service.create_asset(
title or payload.title,
classification or Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL),
context,
asset_id=resolved_asset_id,
source_refs=[payload.source_ref],
representations=[source_representation, normalized_representation],
metadata_records=metadata_records,
idempotency_key=idempotency_key,
)
action = "created"
completed = job.completed(
output_asset_ids=(asset_change.asset.id,),
partial_results={
"action": action,
"identity_policy": identity_policy.value,
"connector": payload.connector_name,
"extractor": extractor.name,
"source_digest": payload.content_digest,
"representations": [
source_representation.representation_id,
normalized_representation.representation_id,
],
"diagnostics": [diagnostic.to_dict() for diagnostic in extraction.diagnostics],
},
)
self.repository.save_ingestion_job(completed)
return AssetIngestionResult(completed, asset_change.asset, asset_change, action=action)
def _connector(self, name: str) -> SourceConnector:
try:
return self.connectors[name]
except KeyError as exc:
raise AdapterUnavailableError("Source connector is not registered", details={"connector": name}) from exc
def _directory_connector(self, name: str) -> DirectorySourceConnector:
connector = self._connector(name)
if not hasattr(connector, "iter_files"):
raise AdapterUnavailableError(
"Source connector does not support directory iteration",
details={"connector": name},
)
return connector # type: ignore[return-value]
def _extractor(self, media_type: str) -> FormatExtractor:
for extractor in self.extractors:
if extractor.supports(media_type):
return extractor
raise AdapterUnavailableError(
"No extractor registered for media type",
details={"media_type": media_type},
)
def _stable_asset_id(payload: SourcePayload, identity_policy: IngestionIdentityPolicy) -> str:
identity_data = {
"source_system": payload.source_ref.source_system,
}
if identity_policy == IngestionIdentityPolicy.CONTENT_DIGEST:
identity_data["checksum"] = payload.content_digest
else:
identity_data.update(
{
"path": payload.source_ref.path,
"uri": payload.source_ref.uri,
"external_id": payload.source_ref.external_id,
"connector_ref": payload.source_ref.connector_ref,
}
)
digest = mapping_digest(identity_data)
return f"asset-{digest.removeprefix('sha256:')[:20]}"
def _get_asset_or_none(repository: AssetRegistryRepository, asset_id: str) -> KnowledgeAsset | None:
try:
return repository.get_asset(asset_id)
except KontextualError as exc:
if exc.code == "kontextual.not_found":
return None
raise
def _asset_has_source_reference(asset: KnowledgeAsset, source_ref) -> bool:
return any(
existing.identity_key == source_ref.identity_key
or (
existing.connector_ref is not None
and existing.connector_ref == source_ref.connector_ref
and existing.checksum == source_ref.checksum
)
for existing in asset.source_refs
)
def _metadata_records(
payload: SourcePayload,
extractor_name: str,
extraction_metadata: dict,
) -> list[MetadataRecord]:
records = [
MetadataRecord("source_media_type", payload.media_type, provenance={"producer": payload.connector_name}),
MetadataRecord("source_digest", payload.content_digest, provenance={"producer": payload.connector_name}),
MetadataRecord("source_size_bytes", payload.size_bytes, provenance={"producer": payload.connector_name}),
MetadataRecord("connector", payload.connector_name, provenance={"producer": payload.connector_name}, confirmed=True),
MetadataRecord("extractor", extractor_name, provenance={"producer": extractor_name}, confirmed=True),
MetadataRecord("extraction", dict(extraction_metadata), provenance={"producer": extractor_name}),
]
if payload.permission_context:
records.append(
MetadataRecord(
"source_permission_context",
dict(payload.permission_context),
provenance={"producer": payload.connector_name},
confirmed=True,
)
)
return records
def _validate_extraction(
payload: SourcePayload,
extraction: ExtractionResult,
) -> list[IngestionFailure]:
failures: list[IngestionFailure] = []
if payload.permission_context.get("ingest_allowed") is False:
failures.append(
IngestionFailure(
code="ingestion.permission_denied",
message="Source permission context does not allow ingestion",
retriable=False,
details={"permission_context": dict(payload.permission_context)},
)
)
if not payload.source_ref.source_system:
failures.append(
IngestionFailure(
code="ingestion.source_system_missing",
message="Source reference is missing source system",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
if not payload.source_ref.checksum:
failures.append(
IngestionFailure(
code="ingestion.source_checksum_missing",
message="Source reference is missing checksum provenance",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
elif payload.source_ref.checksum != payload.content_digest:
failures.append(
IngestionFailure(
code="ingestion.source_checksum_mismatch",
message="Source reference checksum does not match payload content",
retriable=True,
details={
"source_uri": payload.source_uri,
"source_checksum": payload.source_ref.checksum,
"payload_digest": payload.content_digest,
},
)
)
if not extraction.metadata.get("extractor"):
failures.append(
IngestionFailure(
code="ingestion.extractor_metadata_missing",
message="Extraction result is missing extractor provenance",
retriable=False,
details={"source_uri": payload.source_uri},
)
)
normalized = extraction.normalized
has_normalized_output = bool(
normalized.text.strip()
or normalized.structure
or normalized.tables
or normalized.links
or normalized.fields
or normalized.unsupported_elements
)
if not has_normalized_output:
failures.append(
IngestionFailure(
code="ingestion.normalized_empty",
message="Extraction produced no normalized content, structure, fields, or diagnostics",
retriable=False,
details={"source_uri": payload.source_uri, "media_type": payload.media_type},
)
)
if (
normalized.confidence is not None
and normalized.confidence < 0.2
and not normalized.unsupported_elements
):
failures.append(
IngestionFailure(
code="ingestion.extraction_low_confidence",
message="Extraction confidence is below trusted ingestion threshold",
retriable=False,
details={"confidence": normalized.confidence, "threshold": 0.2},
)
)
return failures
def _quarantined_job(
job: IngestionJob,
failures: list[IngestionFailure],
*,
partial_results: dict,
) -> IngestionJob:
quarantined = job.failed(
failures[0],
status=IngestionJobStatus.QUARANTINED,
partial_results=partial_results,
)
if len(failures) > 1:
quarantined = replace(quarantined, failures=tuple(failures))
return quarantined
def _failure_from_exception(exc: Exception) -> IngestionFailure:
if isinstance(exc, KontextualError):
return IngestionFailure(
code=exc.code,
message=str(exc),
retriable=isinstance(exc, AdapterUnavailableError),
details=dict(exc.details),
)
return IngestionFailure(
code="ingestion.unexpected",
message=str(exc),
retriable=False,
details={"exception_type": type(exc).__name__},
)