From 24cb3c5b6ac91617b89c1a4d11ab365abbb8d3f7 Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 13:43:16 +0200 Subject: [PATCH] richer normalized structure, permission context preservation --- docs/ingestion-implementation.md | 25 ++- .../adapters/builtin_extractors/datasets.py | 31 +++- .../adapters/builtin_extractors/documents.py | 7 + .../adapters/builtin_extractors/text.py | 84 +++++++++- .../adapters/markitect_tool/markdown.py | 62 ++++++++ .../services/ingestion_service.py | 150 +++++++++++++++++- tests/test_asset_ingestion_service.py | 138 ++++++++++++++++ tests/test_markitect_ingestion_adapter.py | 24 +++ tests/test_normalized_structure.py | 91 +++++++++++ ...06-multi-format-ingestion-normalization.md | 39 ++++- 10 files changed, 636 insertions(+), 15 deletions(-) create mode 100644 tests/test_normalized_structure.py diff --git a/docs/ingestion-implementation.md b/docs/ingestion-implementation.md index 4bbf40e..e265dda 100644 --- a/docs/ingestion-implementation.md +++ b/docs/ingestion-implementation.md @@ -38,13 +38,21 @@ The new `AssetIngestionService` is separate from the older artifact-era by default and opt-in content-digest identity for governed file move/rename reconciliation. - Plain text extractor producing a normalized engine representation. +- Plain text structural output includes lines, paragraphs, link extraction, + confidence, and extractor metadata. - CSV/TSV dataset extractor producing structured normalized table output with columns, row counts, and table metadata. +- CSV/TSV structural output includes table schemas, sample rows, link + extraction from cell values, confidence, and extractor metadata. - PDF and office document placeholder extractor that represents binary documents as governed assets while reporting metadata-only extraction depth. +- PDF and office placeholder output exposes unsupported elements, unsupported + counts, confidence, and extractor metadata. - Markitect markdown extractor adapter boundary that delegates markdown parsing, headings, sections, frontmatter, and snapshot identity to `markitect-tool` when available. +- Markdown normalized structure preserves Markitect-provided blocks, headings, + sections, snapshot metadata, table blocks, and link tokens where available. - Missing `markitect-tool` dependency fails through structured `AdapterUnavailableError` diagnostics instead of falling back to local Markdown parsing. @@ -62,6 +70,13 @@ The new `AssetIngestionService` is separate from the older artifact-era source/normalized representations instead of creating a second asset. - Unchanged source re-ingestion can be skipped without creating a new asset version. +- Ingestion validation runs after extraction and before registry writes. +- Invalid normalized output, missing source provenance, checksum mismatch, low + confidence without explicit unsupported elements, missing extractor + provenance, or denied source permission context quarantine the job without + creating a trusted asset. +- Source permission context is preserved on source/normalized representation + metadata and as a metadata record when present. - In-memory and SQLite job persistence. ## Current SQLite Additions @@ -82,9 +97,9 @@ document classes part of the engine domain model. - Asynchronous job runner and queue dispatch. - Deep normalized structure for tables, links, embedded references, and fields - beyond extractor-provided metadata and the CSV/TSV baseline. + beyond extractor-provided metadata and current text/Markdown/CSV baselines. - Optional deep PDF and office document extraction adapters. -- Quarantine policy checks beyond unsupported/failed extraction paths. +- Enterprise policy adapter integration for ingestion-time policy decisions. ## Test Coverage @@ -100,8 +115,14 @@ document classes part of the engine domain model. - unchanged source re-ingestion skip behavior, - Markitect markdown adapter delegation and missing-dependency behavior, - CSV dataset structured normalization, +- normalized structure coverage for text, Markdown, CSV, and document + placeholders, - PDF and office placeholder ingestion with explicit unsupported-depth diagnostics, +- ingestion validation/quarantine before registry writes, +- permission-context preservation on trusted ingested assets, +- directory reporting for succeeded, skipped, failed, quarantined, and + retriable items, - optional Markitect integration contract tests for parser, selector, operation, snapshot, context package, contract, and schema behavior, - SQLite reload preserving ingestion jobs and ingested asset state. diff --git a/src/kontextual_engine/adapters/builtin_extractors/datasets.py b/src/kontextual_engine/adapters/builtin_extractors/datasets.py index 29210a1..90f0d78 100644 --- a/src/kontextual_engine/adapters/builtin_extractors/datasets.py +++ b/src/kontextual_engine/adapters/builtin_extractors/datasets.py @@ -4,6 +4,7 @@ from __future__ import annotations import csv import io +import re from typing import Any from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload @@ -31,9 +32,10 @@ class CsvDatasetExtractor: reader = csv.DictReader(io.StringIO(text), delimiter=delimiter) columns = list(reader.fieldnames or []) rows = [dict(row) for row in reader] + links = _links_from_rows(rows) table = { "name": payload.title, - "columns": columns, + "columns": [{"name": column, "index": index} for index, column in enumerate(columns)], "rows": rows, "row_count": len(rows), } @@ -44,6 +46,9 @@ class CsvDatasetExtractor: "column_count": len(columns), "row_count": len(rows), "table_count": 1, + "link_count": len(links), + "links": links, + "sample_rows": rows[:5], "source_digest": payload.content_digest, "source_size_bytes": payload.size_bytes, } @@ -53,15 +58,19 @@ class CsvDatasetExtractor: structure={ "kind": "dataset", "format": metadata["dataset_format"], - "columns": columns, + "columns": [{"name": column, "index": index} for index, column in enumerate(columns)], "row_count": len(rows), + "table_count": 1, + "sample_rows": rows[:5], }, tables=[table], + links=links, fields={ "columns": columns, "column_count": len(columns), "row_count": len(rows), "dataset_format": metadata["dataset_format"], + "link_count": len(links), }, confidence=0.95, extractor_metadata={ @@ -77,3 +86,21 @@ def _delimiter_for(payload: SourcePayload) -> str: if payload.media_type == "text/tab-separated-values" or filename.endswith(".tsv"): return "\t" return "," + + +def _links_from_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + links: list[dict[str, Any]] = [] + for row_index, row in enumerate(rows): + for column, value in row.items(): + if not isinstance(value, str): + continue + for match in re.finditer(r"https?://[^\s<>)]+", value): + links.append( + { + "url": match.group(0), + "table": 0, + "row": row_index, + "column": column, + } + ) + return links diff --git a/src/kontextual_engine/adapters/builtin_extractors/documents.py b/src/kontextual_engine/adapters/builtin_extractors/documents.py index a260cb3..0616159 100644 --- a/src/kontextual_engine/adapters/builtin_extractors/documents.py +++ b/src/kontextual_engine/adapters/builtin_extractors/documents.py @@ -63,6 +63,9 @@ class DocumentPlaceholderExtractor: "document_kind": document_kind, "extraction_depth": "metadata_only", "unsupported_elements": [unsupported], + "unsupported_count": 1, + "link_count": 0, + "table_count": 0, "source_digest": payload.content_digest, "source_size_bytes": payload.size_bytes, } @@ -72,11 +75,15 @@ class DocumentPlaceholderExtractor: structure={ "kind": document_kind, "extraction_depth": "metadata_only", + "unsupported_elements": [unsupported], }, fields={ "document_kind": document_kind, "source_media_type": payload.media_type, "source_size_bytes": payload.size_bytes, + "unsupported_count": 1, + "link_count": 0, + "table_count": 0, }, confidence=0.0, unsupported_elements=[unsupported], diff --git a/src/kontextual_engine/adapters/builtin_extractors/text.py b/src/kontextual_engine/adapters/builtin_extractors/text.py index 63ed20c..ef56802 100644 --- a/src/kontextual_engine/adapters/builtin_extractors/text.py +++ b/src/kontextual_engine/adapters/builtin_extractors/text.py @@ -2,6 +2,8 @@ from __future__ import annotations +import re + from kontextual_engine.core import ExtractionResult, ExtractorCapability, NormalizedDocument, SourcePayload @@ -13,8 +15,8 @@ class PlainTextExtractor: return ExtractorCapability( extractor_name=self.name, media_types=self.media_types, - extraction_depth="text", - produces_structure=False, + extraction_depth="text_structure", + produces_structure=True, ) def supports(self, media_type: str) -> bool: @@ -22,10 +24,23 @@ class PlainTextExtractor: def extract(self, payload: SourcePayload) -> ExtractionResult: text = payload.read_text() + lines = _lines(text) + paragraphs = _paragraphs(text) + links = _links(text) normalized = NormalizedDocument( title=payload.title, text=text, - fields={"line_count": len(text.splitlines())}, + structure={ + "kind": "plain_text", + "lines": lines, + "paragraphs": paragraphs, + }, + links=links, + fields={ + "line_count": len(lines), + "paragraph_count": len(paragraphs), + "link_count": len(links), + }, confidence=1.0, extractor_metadata={ "extractor": self.name, @@ -36,7 +51,70 @@ class PlainTextExtractor: normalized=normalized, metadata={ "extractor": self.name, + "line_count": len(lines), + "paragraph_count": len(paragraphs), + "link_count": len(links), + "links": links, "source_digest": payload.content_digest, "source_size_bytes": payload.size_bytes, }, ) + + +def _lines(text: str) -> list[dict[str, int | str]]: + return [ + { + "index": index, + "line_number": index + 1, + "text": line, + } + for index, line in enumerate(text.splitlines()) + ] + + +def _paragraphs(text: str) -> list[dict[str, int | str]]: + paragraphs: list[dict[str, int | str]] = [] + current: list[str] = [] + start_line: int | None = None + for index, line in enumerate(text.splitlines(), start=1): + if line.strip(): + if start_line is None: + start_line = index + current.append(line) + continue + if current and start_line is not None: + paragraphs.append( + { + "index": len(paragraphs), + "line_start": start_line, + "line_end": index - 1, + "text": "\n".join(current), + } + ) + current = [] + start_line = None + if current and start_line is not None: + paragraphs.append( + { + "index": len(paragraphs), + "line_start": start_line, + "line_end": start_line + len(current) - 1, + "text": "\n".join(current), + } + ) + return paragraphs + + +def _links(text: str) -> list[dict[str, int | str]]: + links: list[dict[str, int | str]] = [] + for line_index, line in enumerate(text.splitlines(), start=1): + for match in re.finditer(r"https?://[^\s<>)]+", line): + links.append( + { + "url": match.group(0), + "line": line_index, + "start": match.start(), + "end": match.end(), + } + ) + return links diff --git a/src/kontextual_engine/adapters/markitect_tool/markdown.py b/src/kontextual_engine/adapters/markitect_tool/markdown.py index 0c17ce9..5d5102e 100644 --- a/src/kontextual_engine/adapters/markitect_tool/markdown.py +++ b/src/kontextual_engine/adapters/markitect_tool/markdown.py @@ -42,8 +42,11 @@ class MarkitectMarkdownExtractor: document = self._parse_document(mkt, text, source_path) serialized = document.to_dict() if hasattr(document, "to_dict") else {} snapshot = self._snapshot(mkt, source_path) + links = _links_from_tokens(list(serialized.get("tokens", []))) + tables = _tables_from_blocks(list(serialized.get("blocks", []))) structure = { "frontmatter": dict(serialized.get("frontmatter", {})), + "blocks": list(serialized.get("blocks", [])), "headings": list(serialized.get("headings", [])), "sections": list(serialized.get("sections", [])), } @@ -51,10 +54,15 @@ class MarkitectMarkdownExtractor: title=payload.title, text=text, structure=structure, + tables=tables, + links=links, fields={ "frontmatter": dict(serialized.get("frontmatter", {})), + "block_count": len(structure["blocks"]), "heading_count": len(structure["headings"]), "section_count": len(structure["sections"]), + "table_count": len(tables), + "link_count": len(links), }, confidence=1.0, extractor_metadata={ @@ -68,7 +76,12 @@ class MarkitectMarkdownExtractor: metadata={ "extractor": self.name, "frontmatter": structure["frontmatter"], + "blocks": structure["blocks"], "headings": structure["headings"], + "sections": structure["sections"], + "links": links, + "link_count": len(links), + "table_count": len(tables), "snapshot": snapshot, "source_digest": payload.content_digest, "source_size_bytes": payload.size_bytes, @@ -84,3 +97,52 @@ class MarkitectMarkdownExtractor: if not source_path or not Path(source_path).exists() or not hasattr(mkt, "snapshot_identity_for_file"): return {} return mkt.snapshot_identity_for_file(Path(source_path), parse_options={"profile": "default"}).to_dict() + + +def _links_from_tokens(tokens: list[dict[str, Any]]) -> list[dict[str, Any]]: + links: list[dict[str, Any]] = [] + for token in _walk_tokens(tokens): + if token.get("type") != "link_open": + continue + href = _attr_value(token.get("attrs"), "href") + if href: + links.append({"url": href, "kind": "markdown_link"}) + return links + + +def _walk_tokens(tokens: list[dict[str, Any]]) -> list[dict[str, Any]]: + walked: list[dict[str, Any]] = [] + for token in tokens: + walked.append(token) + children = token.get("children") + if isinstance(children, list): + walked.extend(_walk_tokens([child for child in children if isinstance(child, dict)])) + return walked + + +def _attr_value(attrs: Any, name: str) -> str | None: + if isinstance(attrs, dict): + value = attrs.get(name) + return str(value) if value is not None else None + if isinstance(attrs, list): + for item in attrs: + if isinstance(item, (list, tuple)) and len(item) == 2 and item[0] == name: + return str(item[1]) + return None + + +def _tables_from_blocks(blocks: list[dict[str, Any]]) -> list[dict[str, Any]]: + tables: list[dict[str, Any]] = [] + for index, block in enumerate(blocks): + if block.get("type") != "table": + continue + tables.append( + { + "index": len(tables), + "source_block_index": index, + "text": block.get("text", ""), + "line_start": block.get("line_start"), + "line_end": block.get("line_end"), + } + ) + return tables diff --git a/src/kontextual_engine/services/ingestion_service.py b/src/kontextual_engine/services/ingestion_service.py index ad3d3b2..3ca2721 100644 --- a/src/kontextual_engine/services/ingestion_service.py +++ b/src/kontextual_engine/services/ingestion_service.py @@ -2,7 +2,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, replace from pathlib import Path from typing import Iterable @@ -16,6 +16,7 @@ from kontextual_engine.adapters.markitect_tool import MarkitectMarkdownExtractor from kontextual_engine.core import ( AssetRepresentation, Classification, + ExtractionResult, IngestionFailure, IngestionIdentityPolicy, IngestionJob, @@ -178,14 +179,28 @@ class AssetIngestionService: "failed": sum(1 for item in item_results if item["status"] == IngestionJobStatus.FAILED.value), "quarantined": sum(1 for item in item_results if item["status"] == IngestionJobStatus.QUARANTINED.value), "skipped": sum(1 for item in item_results if item["status"] == "skipped"), + "retriable": sum(1 for item in item_results if item.get("retry_state") == "retriable"), "items": item_results, } + failed_count = partial_results["failed"] + quarantined_count = partial_results["quarantined"] if failures and output_asset_ids: job = job.partially_completed( output_asset_ids=tuple(output_asset_ids), failures=tuple(failures), partial_results=partial_results, ) + elif quarantined_count and not failed_count: + job = job.failed( + IngestionFailure( + code="ingestion.directory_quarantined", + message="Directory ingestion quarantined all non-skipped files", + retriable=False, + details=partial_results, + ), + status=IngestionJobStatus.QUARANTINED, + partial_results=partial_results, + ) elif failures: job = job.failed( IngestionFailure( @@ -224,6 +239,21 @@ class AssetIngestionService: self.repository.save_ingestion_job(job) extractor = self._extractor(payload.media_type) extraction = extractor.extract(payload) + validation_failures = _validate_extraction(payload, extraction) + if validation_failures: + quarantined = _quarantined_job( + job, + validation_failures, + partial_results={ + "action": "quarantined", + "connector": payload.connector_name, + "extractor": extraction.metadata.get("extractor"), + "source_digest": payload.content_digest, + "diagnostics": [failure.to_dict() for failure in validation_failures], + }, + ) + self.repository.save_ingestion_job(quarantined) + return AssetIngestionResult(quarantined, action="quarantined") resolved_asset_id = asset_id or _stable_asset_id(payload, identity_policy) existing_asset = _get_asset_or_none(self.repository, resolved_asset_id) if existing_asset and skip_unchanged and _asset_has_source_reference(existing_asset, payload.source_ref): @@ -254,6 +284,7 @@ class AssetIngestionService: "connector": payload.connector_name, "source_digest": payload.content_digest, "source_size_bytes": payload.size_bytes, + "permission_context": dict(payload.permission_context), **payload.metadata, }, ) @@ -267,6 +298,7 @@ class AssetIngestionService: metadata={ "extractor": extractor.name, "normalized_hash": extraction.normalized.normalized_hash, + "permission_context": dict(payload.permission_context), **extraction.metadata, }, ) @@ -380,7 +412,7 @@ def _metadata_records( extractor_name: str, extraction_metadata: dict, ) -> list[MetadataRecord]: - return [ + records = [ MetadataRecord("source_media_type", payload.media_type, provenance={"producer": payload.connector_name}), MetadataRecord("source_digest", payload.content_digest, provenance={"producer": payload.connector_name}), MetadataRecord("source_size_bytes", payload.size_bytes, provenance={"producer": payload.connector_name}), @@ -388,6 +420,120 @@ def _metadata_records( MetadataRecord("extractor", extractor_name, provenance={"producer": extractor_name}, confirmed=True), MetadataRecord("extraction", dict(extraction_metadata), provenance={"producer": extractor_name}), ] + if payload.permission_context: + records.append( + MetadataRecord( + "source_permission_context", + dict(payload.permission_context), + provenance={"producer": payload.connector_name}, + confirmed=True, + ) + ) + return records + + +def _validate_extraction( + payload: SourcePayload, + extraction: ExtractionResult, +) -> list[IngestionFailure]: + failures: list[IngestionFailure] = [] + if payload.permission_context.get("ingest_allowed") is False: + failures.append( + IngestionFailure( + code="ingestion.permission_denied", + message="Source permission context does not allow ingestion", + retriable=False, + details={"permission_context": dict(payload.permission_context)}, + ) + ) + if not payload.source_ref.source_system: + failures.append( + IngestionFailure( + code="ingestion.source_system_missing", + message="Source reference is missing source system", + retriable=False, + details={"source_uri": payload.source_uri}, + ) + ) + if not payload.source_ref.checksum: + failures.append( + IngestionFailure( + code="ingestion.source_checksum_missing", + message="Source reference is missing checksum provenance", + retriable=False, + details={"source_uri": payload.source_uri}, + ) + ) + elif payload.source_ref.checksum != payload.content_digest: + failures.append( + IngestionFailure( + code="ingestion.source_checksum_mismatch", + message="Source reference checksum does not match payload content", + retriable=True, + details={ + "source_uri": payload.source_uri, + "source_checksum": payload.source_ref.checksum, + "payload_digest": payload.content_digest, + }, + ) + ) + if not extraction.metadata.get("extractor"): + failures.append( + IngestionFailure( + code="ingestion.extractor_metadata_missing", + message="Extraction result is missing extractor provenance", + retriable=False, + details={"source_uri": payload.source_uri}, + ) + ) + normalized = extraction.normalized + has_normalized_output = bool( + normalized.text.strip() + or normalized.structure + or normalized.tables + or normalized.links + or normalized.fields + or normalized.unsupported_elements + ) + if not has_normalized_output: + failures.append( + IngestionFailure( + code="ingestion.normalized_empty", + message="Extraction produced no normalized content, structure, fields, or diagnostics", + retriable=False, + details={"source_uri": payload.source_uri, "media_type": payload.media_type}, + ) + ) + if ( + normalized.confidence is not None + and normalized.confidence < 0.2 + and not normalized.unsupported_elements + ): + failures.append( + IngestionFailure( + code="ingestion.extraction_low_confidence", + message="Extraction confidence is below trusted ingestion threshold", + retriable=False, + details={"confidence": normalized.confidence, "threshold": 0.2}, + ) + ) + return failures + + +def _quarantined_job( + job: IngestionJob, + failures: list[IngestionFailure], + *, + partial_results: dict, +) -> IngestionJob: + quarantined = job.failed( + failures[0], + status=IngestionJobStatus.QUARANTINED, + partial_results=partial_results, + ) + if len(failures) > 1: + quarantined = replace(quarantined, failures=tuple(failures)) + return quarantined def _failure_from_exception(exc: Exception) -> IngestionFailure: diff --git a/tests/test_asset_ingestion_service.py b/tests/test_asset_ingestion_service.py index 1d1d9a4..c601d52 100644 --- a/tests/test_asset_ingestion_service.py +++ b/tests/test_asset_ingestion_service.py @@ -7,14 +7,21 @@ from kontextual_engine import ( ActorType, AssetIngestionService, Classification, + ConnectorCapability, + ExtractionResult, + ExtractorCapability, IngestionIdentityPolicy, IngestionJobStatus, InMemoryAssetRegistryRepository, LifecycleState, + NormalizedDocument, OperationContext, RepresentationKind, Sensitivity, + SourcePayload, + SourceReference, SQLiteAssetRegistryRepository, + content_digest, ) @@ -45,6 +52,9 @@ def test_asset_ingestion_service_ingests_plain_text_file_as_governed_asset(tmp_p normalized = repo.list_representations(asset_id="asset-note", kind=RepresentationKind.NORMALIZED)[0] assert normalized.media_type == "application/vnd.kontextual.normalized+json" assert normalized.metadata["extractor"] == "plain-text" + assert normalized.metadata["line_count"] == 2 + assert normalized.metadata["paragraph_count"] == 1 + assert normalized.metadata["link_count"] == 0 assert repo.list_audit_events(target="asset:asset-note")[0].operation == "asset.create" @@ -219,6 +229,75 @@ def test_document_placeholder_formats_create_asset_with_unsupported_depth_diagno assert normalized.metadata["unsupported_elements"][0]["reason"] == "deep_extraction_not_available" +def test_ingestion_quarantines_empty_normalized_output_without_asset(tmp_path: Path) -> None: + source = tmp_path / "emptyish.txt" + source.write_text("content that the bad extractor drops", encoding="utf-8") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo, extractors=[EmptyTextExtractor()]) + + result = service.ingest_file(source, operation_context(), asset_id="asset-emptyish") + + assert result.asset is None + assert result.action == "quarantined" + assert result.job.status == IngestionJobStatus.QUARANTINED + assert result.job.failures[0].code == "ingestion.normalized_empty" + assert result.job.partial_results["action"] == "quarantined" + assert repo.list_assets() == [] + + +def test_ingestion_preserves_source_permission_context_on_representations() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo, connectors=[PermissionedConnector()]) + + result = service.ingest_file("permissioned.txt", operation_context(), asset_id="asset-permissioned") + + source = repo.list_representations(asset_id="asset-permissioned", kind=RepresentationKind.SOURCE)[0] + normalized = repo.list_representations(asset_id="asset-permissioned", kind=RepresentationKind.NORMALIZED)[0] + permission_records = [ + record for record in repo.list_metadata_records("asset-permissioned") + if record.key == "source_permission_context" + ] + + assert result.job.status == IngestionJobStatus.COMPLETED + assert source.metadata["permission_context"] == { + "ingest_allowed": True, + "labels": ["engineering"], + } + assert normalized.metadata["permission_context"]["labels"] == ["engineering"] + assert permission_records[0].value["ingest_allowed"] is True + + +def test_ingestion_quarantines_permission_denied_source_without_asset() -> None: + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo, connectors=[PermissionedConnector(ingest_allowed=False)]) + + result = service.ingest_file("denied.txt", operation_context(), asset_id="asset-denied") + + assert result.asset is None + assert result.job.status == IngestionJobStatus.QUARANTINED + assert result.job.failures[0].code == "ingestion.permission_denied" + assert repo.list_assets() == [] + + +def test_directory_ingestion_reports_quarantined_failed_and_retriable_counts(tmp_path: Path) -> None: + (tmp_path / "bad-normalized.txt").write_text("dropped", encoding="utf-8") + (tmp_path / "unsupported.bin").write_bytes(b"\x00\x01") + repo = InMemoryAssetRegistryRepository() + service = AssetIngestionService(repo, extractors=[EmptyTextExtractor()]) + + job = service.ingest_directory(tmp_path, operation_context(), recursive=False) + items = {Path(item["source_uri"]).name: item for item in job.partial_results["items"]} + + assert job.status == IngestionJobStatus.FAILED + assert job.partial_results["succeeded"] == 0 + assert job.partial_results["failed"] == 1 + assert job.partial_results["quarantined"] == 1 + assert job.partial_results["retriable"] == 1 + assert items["bad-normalized.txt"]["status"] == IngestionJobStatus.QUARANTINED.value + assert items["bad-normalized.txt"]["failures"][0]["code"] == "ingestion.normalized_empty" + assert items["unsupported.bin"]["retry_state"] == "retriable" + + def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None: source = tmp_path / "policy.txt" source.write_text("governed ingestion", encoding="utf-8") @@ -249,3 +328,62 @@ def operation_context() -> OperationContext: groups=["engineering"], ) return OperationContext.create(actor, correlation_id="corr-ingest") + + +class EmptyTextExtractor: + name = "empty-text" + + def capabilities(self) -> ExtractorCapability: + return ExtractorCapability( + extractor_name=self.name, + media_types=("text/plain",), + extraction_depth="text", + ) + + def supports(self, media_type: str) -> bool: + return media_type == "text/plain" + + def extract(self, payload: SourcePayload) -> ExtractionResult: + return ExtractionResult( + normalized=NormalizedDocument(text="", confidence=1.0), + metadata={ + "extractor": self.name, + "source_digest": payload.content_digest, + "source_size_bytes": payload.size_bytes, + }, + ) + + +class PermissionedConnector: + name = "local_file" + + def __init__(self, *, ingest_allowed: bool = True) -> None: + self.ingest_allowed = ingest_allowed + + def capabilities(self) -> ConnectorCapability: + return ConnectorCapability( + connector_name=self.name, + source_types=("file",), + supports_directories=False, + ) + + def fetch(self, source_uri: str) -> SourcePayload: + content = b"permissioned content" + return SourcePayload( + connector_name=self.name, + source_uri=source_uri, + source_ref=SourceReference( + source_system=self.name, + path=source_uri, + checksum=content_digest(content), + connector_ref=f"{self.name}:{source_uri}", + ), + media_type="text/plain", + content=content, + title=Path(source_uri).stem, + metadata={"filename": Path(source_uri).name}, + permission_context={ + "ingest_allowed": self.ingest_allowed, + "labels": ["engineering"], + }, + ) diff --git a/tests/test_markitect_ingestion_adapter.py b/tests/test_markitect_ingestion_adapter.py index a54a89d..8eda763 100644 --- a/tests/test_markitect_ingestion_adapter.py +++ b/tests/test_markitect_ingestion_adapter.py @@ -36,6 +36,11 @@ def test_markitect_markdown_extractor_delegates_to_markitect_tool( return SimpleNamespace( to_dict=lambda: { "frontmatter": {"status": "accepted"}, + "blocks": [ + {"type": "heading", "text": "Decision", "line_start": 1, "heading_level": 1}, + {"type": "paragraph", "text": "Use Markitect.", "line_start": 3}, + {"type": "table", "text": "| A |\n| - |", "line_start": 5, "line_end": 6}, + ], "headings": [{"level": 1, "text": "Decision", "line": 1}], "sections": [ { @@ -43,6 +48,17 @@ def test_markitect_markdown_extractor_delegates_to_markitect_tool( "blocks": [{"type": "paragraph", "text": "Use Markitect.", "line_start": 3}], } ], + "tokens": [ + { + "type": "inline", + "children": [ + { + "type": "link_open", + "attrs": {"href": "https://example.test/decision"}, + } + ], + } + ], } ) @@ -73,8 +89,16 @@ def test_markitect_markdown_extractor_delegates_to_markitect_tool( ("snapshot_identity_for_file", f"{source}:default"), ] assert result.normalized.structure["frontmatter"] == {"status": "accepted"} + assert result.normalized.structure["blocks"][1]["type"] == "paragraph" + assert result.normalized.links == [ + {"url": "https://example.test/decision", "kind": "markdown_link"} + ] + assert result.normalized.tables[0]["text"] == "| A |\n| - |" + assert result.normalized.fields["block_count"] == 3 assert result.normalized.fields["heading_count"] == 1 assert result.normalized.fields["section_count"] == 1 + assert result.normalized.fields["link_count"] == 1 + assert result.normalized.fields["table_count"] == 1 assert result.metadata["snapshot"]["snapshot_id"] == "snapshot:decision" assert result.normalized.extractor_metadata["snapshot"]["parser"] == "markdown-it-py/commonmark" diff --git a/tests/test_normalized_structure.py b/tests/test_normalized_structure.py new file mode 100644 index 0000000..0887edb --- /dev/null +++ b/tests/test_normalized_structure.py @@ -0,0 +1,91 @@ +from kontextual_engine import SourcePayload, SourceReference, content_digest +from kontextual_engine.adapters.builtin_extractors import ( + CsvDatasetExtractor, + DocumentPlaceholderExtractor, + PlainTextExtractor, +) + + +def test_plain_text_extractor_emits_structural_units_and_links() -> None: + payload = source_payload( + "Intro line\nwith https://example.test/ref\n\nSecond paragraph\n", + media_type="text/plain", + ) + + result = PlainTextExtractor().extract(payload) + + assert result.normalized.structure["kind"] == "plain_text" + assert result.normalized.fields["line_count"] == 4 + assert result.normalized.fields["paragraph_count"] == 2 + assert result.normalized.links == [ + { + "url": "https://example.test/ref", + "line": 2, + "start": 5, + "end": 29, + } + ] + assert result.metadata["link_count"] == 1 + + +def test_csv_dataset_extractor_emits_table_schema_samples_and_links() -> None: + payload = source_payload( + "name,source\nalpha,https://example.test/a\nbeta,\n", + media_type="text/csv", + filename="metrics.csv", + ) + + result = CsvDatasetExtractor().extract(payload) + + assert result.normalized.structure["kind"] == "dataset" + assert result.normalized.structure["columns"] == [ + {"name": "name", "index": 0}, + {"name": "source", "index": 1}, + ] + assert result.normalized.tables[0]["row_count"] == 2 + assert result.normalized.tables[0]["rows"][0]["name"] == "alpha" + assert result.normalized.links == [ + { + "url": "https://example.test/a", + "table": 0, + "row": 0, + "column": "source", + } + ] + assert result.metadata["sample_rows"][0]["source"] == "https://example.test/a" + + +def test_document_placeholder_exposes_unsupported_structure() -> None: + payload = source_payload(b"%PDF-1.7\n", media_type="application/pdf", filename="brief.pdf") + + result = DocumentPlaceholderExtractor().extract(payload) + + assert result.normalized.structure["kind"] == "pdf" + assert result.normalized.fields["unsupported_count"] == 1 + assert result.normalized.fields["link_count"] == 0 + assert result.normalized.unsupported_elements[0]["reason"] == "deep_extraction_not_available" + assert result.diagnostics[0].code == "extraction.depth_unsupported" + + +def source_payload( + content: str | bytes, + *, + media_type: str, + filename: str = "source.txt", +) -> SourcePayload: + data = content.encode("utf-8") if isinstance(content, str) else content + source_ref = SourceReference( + source_system="test", + path=filename, + checksum=content_digest(data), + connector_ref=f"test:{filename}", + ) + return SourcePayload( + connector_name="test", + source_uri=filename, + source_ref=source_ref, + media_type=media_type, + content=data, + title=filename.rsplit(".", maxsplit=1)[0], + metadata={"filename": filename}, + ) diff --git a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md index 2458ece..1e184f7 100644 --- a/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md +++ b/workplans/KONT-WP-0006-multi-format-ingestion-normalization.md @@ -4,7 +4,7 @@ type: workplan title: "Multi-Format Ingestion And Normalization" domain: markitect repo: kontextual-engine -status: active +status: done owner: codex topic_slug: markitect planning_priority: high @@ -56,9 +56,12 @@ identity policy, content-digest identity for governed file move/rename reconciliation, unchanged-source skip behavior, and directory item retry/skipped reporting. CSV/TSV datasets now produce structured normalized table output, and PDF/office-like files can enter the governed asset set through metadata-only -placeholder extraction with explicit unsupported-depth diagnostics. Remaining -work is focused on async execution, richer structural extraction, quarantine -policy checks, and optional deep non-text extraction adapters. +placeholder extraction with explicit unsupported-depth diagnostics. Normalized +structure now covers text lines/paragraphs/links, CSV/TSV schemas/tables/sample +rows/links, Markitect-provided Markdown blocks/headings/sections/link tokens, +and document placeholder unsupported elements. Remaining work is focused on +async execution and optional deep non-text extraction adapters, which are +deferred to adjacent workplans. This WP-0006 foundation slice is complete. ## I6.1 - Implement ingestion job model status and retry surface @@ -190,7 +193,7 @@ Implemented: ```task id: KONT-WP-0006-T006 -status: todo +status: done priority: medium state_hub_task_id: "7421bc87-d962-4938-9aa3-591f8489e542" ``` @@ -206,11 +209,23 @@ Acceptance: packages. - Extractor confidence and unsupported elements are visible. +Implemented: + +- The shared `NormalizedDocument` contract already carries text, structure, + tables, links, fields, confidence, unsupported elements, and extractor + metadata. +- Plain text emits line and paragraph units plus URL links. +- CSV/TSV emits table schema, rows, sample rows, and URL links from cells. +- Markdown maps Markitect-provided blocks, headings, sections, table blocks, + link tokens, and snapshot metadata without reimplementing Markdown parsing. +- PDF/office placeholders emit unsupported element structure and low-confidence + metadata-only normalized output. + ## I6.7 - Validate ingestion output quarantine failures and preserve provenance ```task id: KONT-WP-0006-T007 -status: todo +status: done priority: medium state_hub_task_id: "07b32021-3701-437a-ae87-030bed56a25c" ``` @@ -226,6 +241,18 @@ Acceptance: - Batch ingestion reports succeeded, failed, skipped, quarantined, and retriable items separately. +Implemented: + +- Ingestion validates extraction output before registry writes. +- Empty normalized output, missing/mismatched source checksum provenance, + missing extractor metadata, denied source permission context, and low + confidence without explicit unsupported elements quarantine the file job + without adding a trusted asset. +- Source permission context is preserved on representations and metadata + records for accepted assets. +- Directory ingestion reports succeeded, failed, skipped, quarantined, and + retriable item counts separately. + ## Definition Of Done - Local file, text, markdown, PDF/document placeholder, and dataset ingestion