generated from coulomb/repo-seed
109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
from pathlib import Path
|
|
|
|
from kontextual_engine import (
|
|
Actor,
|
|
ActorType,
|
|
AssetIngestionService,
|
|
Classification,
|
|
IngestionJobStatus,
|
|
InMemoryAssetRegistryRepository,
|
|
LifecycleState,
|
|
OperationContext,
|
|
RepresentationKind,
|
|
Sensitivity,
|
|
SQLiteAssetRegistryRepository,
|
|
)
|
|
|
|
|
|
def test_asset_ingestion_service_ingests_plain_text_file_as_governed_asset(tmp_path: Path) -> None:
|
|
source = tmp_path / "note.txt"
|
|
source.write_text("hello\nworld\n", encoding="utf-8")
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetIngestionService(repo)
|
|
|
|
result = service.ingest_file(
|
|
source,
|
|
operation_context(),
|
|
asset_id="asset-note",
|
|
classification=Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
|
|
)
|
|
|
|
assert result.job.status == IngestionJobStatus.COMPLETED
|
|
assert result.job.correlation_id == "corr-ingest"
|
|
assert result.job.output_asset_ids == ("asset-note",)
|
|
assert result.asset is not None
|
|
assert result.asset.source_refs[0].source_system == "local_file"
|
|
assert result.asset.source_refs[0].path == str(source)
|
|
assert repo.get_ingestion_job(result.job.job_id).status == IngestionJobStatus.COMPLETED
|
|
assert {item.kind for item in repo.list_representations(asset_id="asset-note")} == {
|
|
RepresentationKind.SOURCE,
|
|
RepresentationKind.NORMALIZED,
|
|
}
|
|
normalized = repo.list_representations(asset_id="asset-note", kind=RepresentationKind.NORMALIZED)[0]
|
|
assert normalized.media_type == "application/vnd.kontextual.normalized+json"
|
|
assert normalized.metadata["extractor"] == "plain-text"
|
|
assert repo.list_audit_events(target="asset:asset-note")[0].operation == "asset.create"
|
|
|
|
|
|
def test_ingestion_failure_records_job_without_trusting_unsupported_asset(tmp_path: Path) -> None:
|
|
source = tmp_path / "blob.bin"
|
|
source.write_bytes(b"\x00\x01\x02")
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetIngestionService(repo)
|
|
|
|
result = service.ingest_file(source, operation_context(), asset_id="asset-blob")
|
|
|
|
assert result.asset is None
|
|
assert result.job.status == IngestionJobStatus.FAILED
|
|
assert result.job.failures[0].code == "kontextual.adapter_unavailable"
|
|
assert result.job.failures[0].details["media_type"] == "application/octet-stream"
|
|
assert repo.list_assets() == []
|
|
|
|
|
|
def test_directory_ingestion_reports_partial_results(tmp_path: Path) -> None:
|
|
(tmp_path / "one.txt").write_text("one", encoding="utf-8")
|
|
(tmp_path / "two.bin").write_bytes(b"\x00\x01")
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = AssetIngestionService(repo)
|
|
|
|
job = service.ingest_directory(tmp_path, operation_context(), recursive=False)
|
|
|
|
assert job.status == IngestionJobStatus.PARTIALLY_COMPLETED
|
|
assert job.partial_results["files_total"] == 2
|
|
assert job.partial_results["succeeded"] == 1
|
|
assert job.partial_results["failed"] == 1
|
|
assert len(job.output_asset_ids) == 1
|
|
assert len(job.failures) == 1
|
|
|
|
|
|
def test_sqlite_ingestion_jobs_survive_reinstantiation(tmp_path: Path) -> None:
|
|
source = tmp_path / "policy.txt"
|
|
source.write_text("governed ingestion", encoding="utf-8")
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
service = AssetIngestionService(repo)
|
|
|
|
result = service.ingest_file(
|
|
source,
|
|
operation_context(),
|
|
asset_id="asset-policy",
|
|
)
|
|
|
|
reloaded = SQLiteAssetRegistryRepository(db_path)
|
|
job = reloaded.get_ingestion_job(result.job.job_id)
|
|
|
|
assert job.status == IngestionJobStatus.COMPLETED
|
|
assert job.output_asset_ids == ("asset-policy",)
|
|
assert reloaded.get_asset("asset-policy").lifecycle == LifecycleState.ACTIVE
|
|
assert len(reloaded.list_representations(asset_id="asset-policy")) == 2
|
|
|
|
|
|
def operation_context() -> OperationContext:
|
|
actor = Actor.create(
|
|
ActorType.HUMAN,
|
|
actor_id="user-ingest",
|
|
display_name="Ingestion Tester",
|
|
groups=["engineering"],
|
|
)
|
|
return OperationContext.create(actor, correlation_id="corr-ingest")
|