Files
kontextual-engine/tests/test_ingestion_workflows_context.py

101 lines
3.1 KiB
Python

from kontextual_engine import (
ArtifactType,
Collection,
ContextAssembler,
Diagnostic,
InMemoryKnowledgeRepository,
IngestionRequest,
IngestionService,
InputBundle,
OperationRun,
OperationStage,
Relationship,
RunManifest,
RunStatus,
WorkflowStep,
)
def test_plain_text_ingestion_creates_normalized_artifact() -> None:
service = IngestionService()
result = service.ingest(
IngestionRequest(
collection_id="collection-1",
name="note",
content="hello",
media_type="text/plain",
artifact_type=ArtifactType.CONTENT,
)
)
assert result.adapter == "plain-text"
assert result.artifacts[0].name == "note"
assert result.artifacts[0].metadata.media_type == "text/plain"
assert result.normalized["text"] == "hello"
def test_input_bundle_hash_is_deterministic() -> None:
first = InputBundle(
operation="compose",
inputs={"b": "sha256:2", "a": "sha256:1"},
options={"heading_delta": 1},
)
second = InputBundle(
operation="compose",
inputs={"a": "sha256:1", "b": "sha256:2"},
options={"heading_delta": 1},
)
assert first.calculate_hash() == second.calculate_hash()
def test_operation_run_lifecycle_and_manifest() -> None:
bundle = InputBundle(operation="ingest", inputs={"source": "sha256:abc"})
run = OperationRun.create("ingest", bundle.calculate_hash(), depth=1)
run.advance(OperationStage.RUNNING)
assert run.status == RunStatus.RUNNING
run.mark_complete()
manifest = RunManifest(
run=run,
input_bundle=bundle,
steps=[WorkflowStep(id="step-1", kind="ingest", status=RunStatus.SUCCESS)],
produced_artifact_ids=["artifact-1"],
)
data = manifest.to_dict()
assert data["run"]["status"] == "success"
assert data["input_bundle"]["input_bundle_hash"] == bundle.calculate_hash()
assert data["steps"][0]["kind"] == "ingest"
def test_operation_run_failure_records_diagnostic() -> None:
bundle = InputBundle(operation="query", inputs={})
run = OperationRun.create("query", bundle.calculate_hash())
run.mark_failed(Diagnostic(severity="error", code="test", message="failed"))
assert run.status == RunStatus.FAILED
assert run.to_dict()["diagnostics"][0]["code"] == "test"
def test_context_assembler_includes_related_artifacts() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("runtime"))
root = repo.save_artifact(repo_artifact(collection.id, "root", "Root summary\nDetails"))
related = repo.save_artifact(repo_artifact(collection.id, "related", "Related summary"))
repo.save_relationship(Relationship.create(root.id, related.id, "relates to"))
package = ContextAssembler(repo).for_artifact(root.id)
rendered = package.render_markdown()
assert len(package.items) == 2
assert "Root summary" in rendered
assert "Related summary" in rendered
def repo_artifact(collection_id: str, name: str, content: str):
from kontextual_engine import Artifact
return Artifact.create(collection_id, name, content)