From 1e3c6fe34a65eabef11f1b18c23e9bcae2fe9182 Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 16:27:03 +0200 Subject: [PATCH] stable asset queries, lexical search, filters, contextual entity and relationship retrieval, permission-aware fail-closed behavior, source-grounded snippets, feedback capture, and KPI hooks --- docs/retrieval-implementation.md | 110 + src/kontextual_engine/__init__.py | 34 + .../adapters/memory/asset_registry.py | 20 + .../adapters/sqlite/asset_registry.py | 62 + src/kontextual_engine/core/__init__.py | 3 + src/kontextual_engine/core/relationships.py | 2 + .../core/retrieval_feedback.py | 59 + src/kontextual_engine/ports/repositories.py | 9 + src/kontextual_engine/services/__init__.py | 32 + .../services/ingestion_service.py | 2 + .../services/retrieval_service.py | 1993 +++++++++++++++++ tests/test_asset_retrieval_service.py | 722 ++++++ ...P-0007-governed-retrieval-context-graph.md | 134 +- 13 files changed, 3173 insertions(+), 9 deletions(-) create mode 100644 docs/retrieval-implementation.md create mode 100644 src/kontextual_engine/core/retrieval_feedback.py create mode 100644 src/kontextual_engine/services/retrieval_service.py create mode 100644 tests/test_asset_retrieval_service.py diff --git a/docs/retrieval-implementation.md b/docs/retrieval-implementation.md new file mode 100644 index 0000000..a9c50e9 --- /dev/null +++ b/docs/retrieval-implementation.md @@ -0,0 +1,110 @@ +# Retrieval Implementation Note + +Date: 2026-05-06 + +Status: first implementation slice for `KONT-WP-0007`. + +## Purpose + +This note records the first governed retrieval implementation over the asset +registry. It introduces stable query request/result contracts before search +ranking, policy filtering, snippets, relationship traversal, and feedback grow +more complex. + +## Implemented Package Shape + +```text +src/kontextual_engine/ + services/retrieval_service.py +``` + +The retrieval service depends on the asset registry repository port and domain +core contracts. It does not depend on HTTP, search backends, Markitect internals, +or AI providers. + +## Implemented Capabilities + +- `AssetQueryRequest` for stable asset queries with pagination, sorting, and + common asset filters. +- `AssetQueryResult` envelope with correlation ID, total count, returned count, + limit, offset, next offset, sort metadata, results, diagnostics, and success + flag. +- `AssetQueryItem` result entries carrying asset identity, classification, + lifecycle, source references, representations, metadata records, relevance + metadata, and diagnostics. +- Deterministic sorting by title, asset ID, asset type, lifecycle, creation + time, or update time with asset ID as a tie-breaker. +- Pagination by limit and offset. +- Structured validation diagnostics for invalid lifecycle, representation + kind, limit, offset, sort key, and sort order. +- Standard filters for asset type, lifecycle, sensitivity, owner, topic, review + state, metadata records, source system/path, and representation kind. +- Lexical search over normalized representation search text produced during + ingestion or supplied as representation metadata. +- Refreshable in-memory lexical index with indexed asset/representation counts. +- Relevance metadata for lexical matches, including strategy, query, match + count, and representation IDs. +- Zero-result measurement metadata for query smoke tests. +- Additional source-context filters for collection, tags, and created/updated + timestamp bounds. +- `ContextEntityQueryRequest`/`ContextEntityQueryResult` for querying + contextual entities by type, name, external reference, and metadata. +- `RelationshipQueryRequest`/`RelationshipQueryResult` for stable relationship + retrieval by source, target, asset, contextual entity, predicate, target + kind, direction, and workflow run. +- Asset queries can be constrained by contextual entity, workflow run, related + asset, and relationship predicate. +- Asset result entries can carry relationship context and linked contextual + entities when graph filters or relationship inclusion are requested. +- Relationship payloads expose direction, predicate, validity windows, + confidence, actor, provenance, creation time, and resolved source/target + context where available. +- Retrieval uses the engine policy gateway for query-scope authorization and + per-resource checks before assets, relationships, or contextual entities are + returned. +- Policy failures or unavailable policy context fail closed with empty denied + envelopes and structured diagnostics. +- Retrieval audit events record actor, correlation ID, query scope, policy + decision, outcome, result count, total count, and internal + permission-filter counts. +- `RetrievalSnippet` packets can be requested for lexical queries. Snippets are + built from normalized representation search text and carry asset ID, + representation ID, source reference ID, storage reference, media type, match + offsets, match text, and adapter provenance such as Markitect selectors or + source spans when present. +- Snippets are attached only to policy-authorized asset results, so denied + matching content is not exposed through snippet packets. +- `RetrievalFeedbackRecord` persists useful, irrelevant, missing, unsafe, and + low-confidence feedback with query context, result references, actor, + correlation ID, notes, and metadata. +- Retrieval quality metrics summarize zero-result rate, precision@k, + citation precision, feedback counts, unsafe/low-confidence counts, and + permission-filter timing observations from retrieval audit events. + +## Not Yet Implemented + +- Multi-hop relationship traversal and graph ranking. +- Full grounded answer package assembly. + +## Test Coverage + +`tests/test_asset_retrieval_service.py` covers: + +- stable paginated query envelopes, +- result payloads with source references, representations, and metadata + records, +- combined standard, metadata, source, lifecycle, and representation filters, +- lexical search over normalized content with index refresh and zero-result + metadata, +- combined text, metadata, source, tag, collection, and sensitivity filters + over SQLite, +- contextual entity, workflow run, related asset, and relationship predicate + filters, +- direct contextual entity and relationship queries over SQLite, +- permission-filtered asset, relationship, and context entity envelopes, +- fail-closed query scope behavior when policy context is unavailable, +- source-grounded lexical snippets with representation/source references and + adapter provenance, +- persisted retrieval feedback and KPI measurement hooks over feedback, + query results, and retrieval audit timing, +- structured diagnostics for invalid query requests. diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index ac0101e..9548541 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -49,6 +49,8 @@ from .core import ( PolicyEffect, RelationshipTargetKind, RepresentationKind, + RetrievalFeedbackLabel, + RetrievalFeedbackRecord, Sensitivity, SourceReference, SourcePayload, @@ -81,8 +83,23 @@ from .services import ( AssetChangeResult, AssetIngestionResult, AssetIngestionService, + AssetQueryItem, + AssetQueryRequest, + AssetQueryResult, AssetRegistryService, + AssetRetrievalService, + ContextEntityQueryItem, + ContextEntityQueryRequest, + ContextEntityQueryResult, + LexicalIndexRefreshResult, RelationshipChangeResult, + RelationshipQueryItem, + RelationshipQueryRequest, + RelationshipQueryResult, + RetrievalFeedbackRequest, + RetrievalFeedbackResult, + RetrievalQualityMetrics, + RetrievalSnippet, ) from .storage import InMemoryKnowledgeRepository from .workflows import ( @@ -108,8 +125,12 @@ __all__ = [ "AssetChangeResult", "AssetIngestionResult", "AssetIngestionService", + "AssetQueryItem", + "AssetQueryRequest", + "AssetQueryResult", "AssetRegistryRepository", "AssetRegistryService", + "AssetRetrievalService", "AssetVersion", "AuditEvent", "AuditOutcome", @@ -121,6 +142,9 @@ __all__ = [ "Collection", "ContextAssembler", "ContextEntity", + "ContextEntityQueryItem", + "ContextEntityQueryRequest", + "ContextEntityQueryResult", "ContextEntityType", "ContextItem", "ContextPackage", @@ -146,6 +170,7 @@ __all__ = [ "IdempotencyStatus", "KnowledgeAsset", "KontextualError", + "LexicalIndexRefreshResult", "LifecycleState", "MetadataFieldDefinition", "MetadataRecord", @@ -167,9 +192,18 @@ __all__ = [ "Relationship", "RelationshipChangeResult", "RelationshipGraph", + "RelationshipQueryItem", + "RelationshipQueryRequest", + "RelationshipQueryResult", "RelationshipTargetKind", "RelationshipType", "RepresentationKind", + "RetrievalFeedbackLabel", + "RetrievalFeedbackRecord", + "RetrievalFeedbackRequest", + "RetrievalFeedbackResult", + "RetrievalQualityMetrics", + "RetrievalSnippet", "RunManifest", "RunStatus", "Sensitivity", diff --git a/src/kontextual_engine/adapters/memory/asset_registry.py b/src/kontextual_engine/adapters/memory/asset_registry.py index 2689a0f..bad31dd 100644 --- a/src/kontextual_engine/adapters/memory/asset_registry.py +++ b/src/kontextual_engine/adapters/memory/asset_registry.py @@ -21,6 +21,7 @@ from kontextual_engine.core import ( MetadataSchema, MetadataSchemaAssignment, RepresentationKind, + RetrievalFeedbackRecord, Sensitivity, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -38,6 +39,7 @@ class InMemoryAssetRegistryRepository: relationships: dict[str, CoreRelationship] = field(default_factory=dict) versions: dict[str, list[AssetVersion]] = field(default_factory=dict) audit_events: dict[str, AuditEvent] = field(default_factory=dict) + retrieval_feedback: dict[str, RetrievalFeedbackRecord] = field(default_factory=dict) idempotency_records: dict[str, IdempotencyRecord] = field(default_factory=dict) ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict) @@ -253,6 +255,24 @@ class InMemoryAssetRegistryRepository: events = [event for event in events if event.correlation_id == correlation_id] return sorted(events, key=lambda event: event.occurred_at) + def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord: + self.get_actor(record.actor_id) + self.retrieval_feedback[record.feedback_id] = record + return record + + def list_retrieval_feedback( + self, + *, + correlation_id: str | None = None, + label: str | None = None, + ) -> list[RetrievalFeedbackRecord]: + records: Iterable[RetrievalFeedbackRecord] = self.retrieval_feedback.values() + if correlation_id is not None: + records = [record for record in records if record.correlation_id == correlation_id] + if label is not None: + records = [record for record in records if record.label.value == label] + return sorted(records, key=lambda record: (record.created_at, record.feedback_id)) + def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord: self.idempotency_records[record.key] = record return record diff --git a/src/kontextual_engine/adapters/sqlite/asset_registry.py b/src/kontextual_engine/adapters/sqlite/asset_registry.py index 1c07115..e951d31 100644 --- a/src/kontextual_engine/adapters/sqlite/asset_registry.py +++ b/src/kontextual_engine/adapters/sqlite/asset_registry.py @@ -24,6 +24,7 @@ from kontextual_engine.core import ( MetadataSchemaAssignment, RepresentationKind, RelationshipTargetKind, + RetrievalFeedbackRecord, Sensitivity, ) from kontextual_engine.errors import NotFoundError, ValidationError @@ -466,6 +467,56 @@ class SQLiteAssetRegistryRepository: rows = self._all(f"select payload from audit_events{where} order by occurred_at, rowid", tuple(params)) return [AuditEvent.from_dict(_loads(row["payload"])) for row in rows] + def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord: + try: + with self._connect() as conn: + conn.execute( + """ + insert into retrieval_feedback (id, label, actor_id, correlation_id, created_at, payload) + values (?, ?, ?, ?, ?, ?) + on conflict(id) do update set + label=excluded.label, + actor_id=excluded.actor_id, + correlation_id=excluded.correlation_id, + created_at=excluded.created_at, + payload=excluded.payload + """, + ( + record.feedback_id, + record.label.value, + record.actor_id, + record.correlation_id, + record.created_at, + _json(record.to_dict()), + ), + ) + except sqlite3.IntegrityError as exc: + if _is_foreign_key_error(exc): + raise ValidationError( + "Retrieval feedback references an unknown actor", + details={"actor_id": record.actor_id, "feedback_id": record.feedback_id}, + ) from exc + raise + return record + + def list_retrieval_feedback( + self, + *, + correlation_id: str | None = None, + label: str | None = None, + ) -> list[RetrievalFeedbackRecord]: + clauses = [] + params: list[Any] = [] + if correlation_id is not None: + clauses.append("correlation_id = ?") + params.append(correlation_id) + if label is not None: + clauses.append("label = ?") + params.append(label) + where = f" where {' and '.join(clauses)}" if clauses else "" + rows = self._all(f"select payload from retrieval_feedback{where} order by created_at, id", tuple(params)) + return [RetrievalFeedbackRecord.from_dict(_loads(row["payload"])) for row in rows] + def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord: with self._connect() as conn: conn.execute( @@ -620,6 +671,15 @@ class SQLiteAssetRegistryRepository: payload text not null, foreign key(actor_id) references actors(id) ); + create table if not exists retrieval_feedback ( + id text primary key, + label text not null, + actor_id text not null, + correlation_id text not null, + created_at text not null, + payload text not null, + foreign key(actor_id) references actors(id) + ); create table if not exists idempotency_records ( key text primary key, operation text not null, @@ -647,6 +707,8 @@ class SQLiteAssetRegistryRepository: create index if not exists idx_versions_asset on asset_versions(asset_id); create index if not exists idx_audit_target on audit_events(target); create index if not exists idx_audit_correlation on audit_events(correlation_id); + create index if not exists idx_retrieval_feedback_label on retrieval_feedback(label); + create index if not exists idx_retrieval_feedback_correlation on retrieval_feedback(correlation_id); create index if not exists idx_ingestion_jobs_status on ingestion_jobs(status); create index if not exists idx_ingestion_jobs_correlation on ingestion_jobs(correlation_id); """ diff --git a/src/kontextual_engine/core/__init__.py b/src/kontextual_engine/core/__init__.py index c2aaaa9..b8f3631 100644 --- a/src/kontextual_engine/core/__init__.py +++ b/src/kontextual_engine/core/__init__.py @@ -40,6 +40,7 @@ from .relationships import ( CoreRelationship, RelationshipTargetKind, ) +from .retrieval_feedback import RetrievalFeedbackLabel, RetrievalFeedbackRecord __all__ = [ "Actor", @@ -76,6 +77,8 @@ __all__ = [ "PolicyEffect", "RelationshipTargetKind", "RepresentationKind", + "RetrievalFeedbackLabel", + "RetrievalFeedbackRecord", "Sensitivity", "SourceReference", "SourcePayload", diff --git a/src/kontextual_engine/core/relationships.py b/src/kontextual_engine/core/relationships.py index 732da67..e26f8e7 100644 --- a/src/kontextual_engine/core/relationships.py +++ b/src/kontextual_engine/core/relationships.py @@ -19,6 +19,8 @@ class ContextEntityType(str, Enum): PROCESS = "process" SOURCE_SYSTEM = "source_system" TOPIC = "topic" + WORKFLOW_RUN = "workflow_run" + GENERATED_ARTIFACT = "generated_artifact" BUSINESS_OBJECT = "business_object" diff --git a/src/kontextual_engine/core/retrieval_feedback.py b/src/kontextual_engine/core/retrieval_feedback.py new file mode 100644 index 0000000..a717a60 --- /dev/null +++ b/src/kontextual_engine/core/retrieval_feedback.py @@ -0,0 +1,59 @@ +"""Retrieval feedback and quality signal primitives.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +from .primitives import compact_dict, new_id, utc_now + + +class RetrievalFeedbackLabel(str, Enum): + USEFUL = "useful" + IRRELEVANT = "irrelevant" + MISSING = "missing" + UNSAFE = "unsafe" + LOW_CONFIDENCE = "low_confidence" + + +@dataclass(frozen=True) +class RetrievalFeedbackRecord: + label: RetrievalFeedbackLabel + query: dict[str, Any] + result_ref: dict[str, Any] + actor_id: str + correlation_id: str + notes: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + feedback_id: str = field(default_factory=lambda: new_id("feedback")) + created_at: str = field(default_factory=lambda: utc_now().isoformat()) + + def to_dict(self) -> dict[str, Any]: + return compact_dict( + { + "feedback_id": self.feedback_id, + "label": self.label.value, + "query": dict(self.query), + "result_ref": dict(self.result_ref), + "actor_id": self.actor_id, + "correlation_id": self.correlation_id, + "notes": self.notes, + "metadata": dict(self.metadata), + "created_at": self.created_at, + } + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "RetrievalFeedbackRecord": + return cls( + feedback_id=data["feedback_id"], + label=RetrievalFeedbackLabel(data["label"]), + query=dict(data.get("query", {})), + result_ref=dict(data.get("result_ref", {})), + actor_id=data["actor_id"], + correlation_id=data["correlation_id"], + notes=data.get("notes"), + metadata=dict(data.get("metadata", {})), + created_at=data["created_at"], + ) diff --git a/src/kontextual_engine/ports/repositories.py b/src/kontextual_engine/ports/repositories.py index a2721ca..cde7558 100644 --- a/src/kontextual_engine/ports/repositories.py +++ b/src/kontextual_engine/ports/repositories.py @@ -20,6 +20,7 @@ from kontextual_engine.core import ( MetadataSchema, MetadataSchemaAssignment, RepresentationKind, + RetrievalFeedbackRecord, Sensitivity, ) @@ -90,6 +91,14 @@ class AssetRegistryRepository(Protocol): correlation_id: str | None = None, ) -> list[AuditEvent]: ... + def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord: ... + def list_retrieval_feedback( + self, + *, + correlation_id: str | None = None, + label: str | None = None, + ) -> list[RetrievalFeedbackRecord]: ... + def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord: ... def get_idempotency_record(self, key: str) -> IdempotencyRecord | None: ... diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 75eb8df..a7eafae 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -6,11 +6,43 @@ from .asset_service import ( RelationshipChangeResult, ) from .ingestion_service import AssetIngestionResult, AssetIngestionService +from .retrieval_service import ( + AssetQueryItem, + AssetQueryRequest, + AssetQueryResult, + AssetRetrievalService, + ContextEntityQueryItem, + ContextEntityQueryRequest, + ContextEntityQueryResult, + LexicalIndexRefreshResult, + RelationshipQueryItem, + RelationshipQueryRequest, + RelationshipQueryResult, + RetrievalFeedbackRequest, + RetrievalFeedbackResult, + RetrievalQualityMetrics, + RetrievalSnippet, +) __all__ = [ "AssetChangeResult", "AssetIngestionResult", "AssetIngestionService", + "AssetQueryItem", + "AssetQueryRequest", + "AssetQueryResult", "AssetRegistryService", + "AssetRetrievalService", + "ContextEntityQueryItem", + "ContextEntityQueryRequest", + "ContextEntityQueryResult", + "LexicalIndexRefreshResult", "RelationshipChangeResult", + "RelationshipQueryItem", + "RelationshipQueryRequest", + "RelationshipQueryResult", + "RetrievalFeedbackRequest", + "RetrievalFeedbackResult", + "RetrievalQualityMetrics", + "RetrievalSnippet", ] diff --git a/src/kontextual_engine/services/ingestion_service.py b/src/kontextual_engine/services/ingestion_service.py index 3ca2721..6b6b4f6 100644 --- a/src/kontextual_engine/services/ingestion_service.py +++ b/src/kontextual_engine/services/ingestion_service.py @@ -298,6 +298,8 @@ class AssetIngestionService: metadata={ "extractor": extractor.name, "normalized_hash": extraction.normalized.normalized_hash, + "search_text": extraction.normalized.text, + "search_text_length": len(extraction.normalized.text), "permission_context": dict(payload.permission_context), **extraction.metadata, }, diff --git a/src/kontextual_engine/services/retrieval_service.py b/src/kontextual_engine/services/retrieval_service.py new file mode 100644 index 0000000..93a1917 --- /dev/null +++ b/src/kontextual_engine/services/retrieval_service.py @@ -0,0 +1,1993 @@ +"""Governed retrieval contracts over the asset registry.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from time import perf_counter +from typing import Any + +from kontextual_engine.core import ( + AuditEvent, + AuditOutcome, + AssetRepresentation, + ContextEntity, + ContextEntityType, + CoreRelationship, + KnowledgeAsset, + LifecycleState, + MetadataRecord, + OperationContext, + PolicyDecision, + RelationshipTargetKind, + RepresentationKind, + RetrievalFeedbackLabel, + RetrievalFeedbackRecord, + Sensitivity, +) +from kontextual_engine.errors import Diagnostic +from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway + + +SUPPORTED_SORT_KEYS = { + "asset_id", + "asset_type", + "created_at", + "lifecycle", + "title", + "updated_at", +} +SUPPORTED_CONTEXT_ENTITY_SORT_KEYS = { + "entity_id", + "entity_type", + "external_ref", + "name", +} +SUPPORTED_RELATIONSHIP_SORT_KEYS = { + "created_at", + "predicate", + "relationship_id", + "source_id", + "target_id", + "target_kind", +} + + +@dataclass(frozen=True) +class AssetQueryRequest: + text: str | None = None + asset_type: str | None = None + lifecycle: LifecycleState | str | None = None + sensitivity: Sensitivity | str | None = None + owner: str | None = None + topic: str | None = None + tags: tuple[str, ...] = () + collection: str | None = None + review_state: str | None = None + metadata_filters: dict[str, Any] = field(default_factory=dict) + confirmed_metadata_only: bool = False + source_system: str | None = None + source_path: str | None = None + context_entity_id: str | None = None + context_entity_type: ContextEntityType | str | None = None + context_entity_name: str | None = None + context_entity_external_ref: str | None = None + workflow_run_id: str | None = None + related_asset_id: str | None = None + relationship_predicate: str | None = None + relationship_direction: str = "both" + include_relationships: bool = False + include_snippets: bool = False + max_snippets: int = 3 + snippet_radius: int = 80 + created_after: str | None = None + created_before: str | None = None + updated_after: str | None = None + updated_before: str | None = None + representation_kind: RepresentationKind | str | None = None + sort_by: str = "title" + sort_order: str = "asc" + limit: int = 50 + offset: int = 0 + + def to_dict(self) -> dict[str, Any]: + return { + "text": self.text, + "asset_type": self.asset_type, + "lifecycle": self.lifecycle.value if isinstance(self.lifecycle, LifecycleState) else self.lifecycle, + "sensitivity": self.sensitivity.value if isinstance(self.sensitivity, Sensitivity) else self.sensitivity, + "owner": self.owner, + "topic": self.topic, + "tags": list(self.tags), + "collection": self.collection, + "review_state": self.review_state, + "metadata_filters": dict(self.metadata_filters), + "confirmed_metadata_only": self.confirmed_metadata_only, + "source_system": self.source_system, + "source_path": self.source_path, + "context_entity_id": self.context_entity_id, + "context_entity_type": self.context_entity_type.value + if isinstance(self.context_entity_type, ContextEntityType) + else self.context_entity_type, + "context_entity_name": self.context_entity_name, + "context_entity_external_ref": self.context_entity_external_ref, + "workflow_run_id": self.workflow_run_id, + "related_asset_id": self.related_asset_id, + "relationship_predicate": self.relationship_predicate, + "relationship_direction": self.relationship_direction, + "include_relationships": self.include_relationships, + "include_snippets": self.include_snippets, + "max_snippets": self.max_snippets, + "snippet_radius": self.snippet_radius, + "created_after": self.created_after, + "created_before": self.created_before, + "updated_after": self.updated_after, + "updated_before": self.updated_before, + "representation_kind": self.representation_kind.value + if isinstance(self.representation_kind, RepresentationKind) + else self.representation_kind, + "sort_by": self.sort_by, + "sort_order": self.sort_order, + "limit": self.limit, + "offset": self.offset, + } + + +@dataclass(frozen=True) +class AssetQueryItem: + asset: KnowledgeAsset + representations: tuple[AssetRepresentation, ...] = () + metadata_records: tuple[MetadataRecord, ...] = () + relationships: tuple[CoreRelationship, ...] = () + context_entities: tuple[ContextEntity, ...] = () + snippets: tuple["RetrievalSnippet", ...] = () + diagnostics: tuple[Diagnostic, ...] = () + relevance: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "representations", tuple(self.representations)) + object.__setattr__(self, "metadata_records", tuple(self.metadata_records)) + object.__setattr__(self, "relationships", tuple(self.relationships)) + object.__setattr__(self, "context_entities", tuple(self.context_entities)) + object.__setattr__(self, "snippets", tuple(self.snippets)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def to_dict(self) -> dict[str, Any]: + return { + "asset_id": self.asset.id, + "title": self.asset.title, + "lifecycle": self.asset.lifecycle.value, + "classification": self.asset.classification.to_dict(), + "current_version_id": self.asset.current_version_id, + "aliases": list(self.asset.aliases), + "asset_metadata": dict(self.asset.metadata), + "source_refs": [source_ref.to_dict() for source_ref in self.asset.source_refs], + "representations": [representation.to_dict() for representation in self.representations], + "metadata_records": [record.to_dict() for record in self.metadata_records], + "relationships": [relationship.to_dict() for relationship in self.relationships], + "context_entities": [entity.to_dict() for entity in self.context_entities], + "snippets": [snippet.to_dict() for snippet in self.snippets], + "relevance": dict(self.relevance), + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class AssetQueryResult: + request: AssetQueryRequest + correlation_id: str + total: int + items: tuple[AssetQueryItem, ...] = () + diagnostics: tuple[Diagnostic, ...] = () + metadata: dict[str, Any] = field(default_factory=dict) + success: bool = True + + def __post_init__(self) -> None: + object.__setattr__(self, "items", tuple(self.items)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + @property + def result_count(self) -> int: + return len(self.items) + + @property + def next_offset(self) -> int | None: + next_offset = self.request.offset + self.result_count + return next_offset if next_offset < self.total else None + + def to_dict(self) -> dict[str, Any]: + return { + "query": self.request.to_dict(), + "correlation_id": self.correlation_id, + "success": self.success, + "total": self.total, + "result_count": self.result_count, + "limit": self.request.limit, + "offset": self.request.offset, + "next_offset": self.next_offset, + "sort": { + "by": self.request.sort_by, + "order": self.request.sort_order, + }, + "metadata": dict(self.metadata), + "results": [item.to_dict() for item in self.items], + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class LexicalIndexRefreshResult: + indexed_assets: int + indexed_representations: int + + def to_dict(self) -> dict[str, int]: + return { + "indexed_assets": self.indexed_assets, + "indexed_representations": self.indexed_representations, + } + + +@dataclass(frozen=True) +class RetrievalSnippet: + asset_id: str + representation_id: str + text: str + start_offset: int + end_offset: int + match_text: str + media_type: str + source_ref_id: str | None = None + storage_ref: str | None = None + provenance: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "asset_id": self.asset_id, + "representation_id": self.representation_id, + "source_ref_id": self.source_ref_id, + "storage_ref": self.storage_ref, + "media_type": self.media_type, + "text": self.text, + "start_offset": self.start_offset, + "end_offset": self.end_offset, + "match_text": self.match_text, + "provenance": dict(self.provenance), + } + + +@dataclass(frozen=True) +class ContextEntityQueryRequest: + entity_id: str | None = None + entity_type: ContextEntityType | str | None = None + name: str | None = None + external_ref: str | None = None + metadata_filters: dict[str, Any] = field(default_factory=dict) + sort_by: str = "name" + sort_order: str = "asc" + limit: int = 50 + offset: int = 0 + + def to_dict(self) -> dict[str, Any]: + return { + "entity_id": self.entity_id, + "entity_type": self.entity_type.value if isinstance(self.entity_type, ContextEntityType) else self.entity_type, + "name": self.name, + "external_ref": self.external_ref, + "metadata_filters": dict(self.metadata_filters), + "sort_by": self.sort_by, + "sort_order": self.sort_order, + "limit": self.limit, + "offset": self.offset, + } + + +@dataclass(frozen=True) +class ContextEntityQueryItem: + entity: ContextEntity + asset_ids: tuple[str, ...] = () + relationship_count: int = 0 + + def __post_init__(self) -> None: + object.__setattr__(self, "asset_ids", tuple(self.asset_ids)) + + def to_dict(self) -> dict[str, Any]: + return { + **self.entity.to_dict(), + "asset_ids": list(self.asset_ids), + "relationship_count": self.relationship_count, + } + + +@dataclass(frozen=True) +class ContextEntityQueryResult: + request: ContextEntityQueryRequest + correlation_id: str + total: int + items: tuple[ContextEntityQueryItem, ...] = () + diagnostics: tuple[Diagnostic, ...] = () + success: bool = True + + def __post_init__(self) -> None: + object.__setattr__(self, "items", tuple(self.items)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + @property + def result_count(self) -> int: + return len(self.items) + + @property + def next_offset(self) -> int | None: + next_offset = self.request.offset + self.result_count + return next_offset if next_offset < self.total else None + + def to_dict(self) -> dict[str, Any]: + return { + "query": self.request.to_dict(), + "correlation_id": self.correlation_id, + "success": self.success, + "total": self.total, + "result_count": self.result_count, + "limit": self.request.limit, + "offset": self.request.offset, + "next_offset": self.next_offset, + "sort": { + "by": self.request.sort_by, + "order": self.request.sort_order, + }, + "results": [item.to_dict() for item in self.items], + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class RelationshipQueryRequest: + source_id: str | None = None + target_id: str | None = None + asset_id: str | None = None + context_entity_id: str | None = None + context_entity_type: ContextEntityType | str | None = None + context_entity_name: str | None = None + context_entity_external_ref: str | None = None + workflow_run_id: str | None = None + target_kind: RelationshipTargetKind | str | None = None + predicate: str | None = None + direction: str = "both" + sort_by: str = "source_id" + sort_order: str = "asc" + limit: int = 50 + offset: int = 0 + + def to_dict(self) -> dict[str, Any]: + return { + "source_id": self.source_id, + "target_id": self.target_id, + "asset_id": self.asset_id, + "context_entity_id": self.context_entity_id, + "context_entity_type": self.context_entity_type.value + if isinstance(self.context_entity_type, ContextEntityType) + else self.context_entity_type, + "context_entity_name": self.context_entity_name, + "context_entity_external_ref": self.context_entity_external_ref, + "workflow_run_id": self.workflow_run_id, + "target_kind": self.target_kind.value if isinstance(self.target_kind, RelationshipTargetKind) else self.target_kind, + "predicate": self.predicate, + "direction": self.direction, + "sort_by": self.sort_by, + "sort_order": self.sort_order, + "limit": self.limit, + "offset": self.offset, + } + + +@dataclass(frozen=True) +class RelationshipQueryItem: + relationship: CoreRelationship + source_asset: KnowledgeAsset | None = None + target_asset: KnowledgeAsset | None = None + target_entity: ContextEntity | None = None + + def to_dict(self) -> dict[str, Any]: + payload = self.relationship.to_dict() + if self.source_asset is not None: + payload["source_asset"] = self.source_asset.to_dict() + if self.target_asset is not None: + payload["target_asset"] = self.target_asset.to_dict() + if self.target_entity is not None: + payload["target_entity"] = self.target_entity.to_dict() + return payload + + +@dataclass(frozen=True) +class RelationshipQueryResult: + request: RelationshipQueryRequest + correlation_id: str + total: int + items: tuple[RelationshipQueryItem, ...] = () + diagnostics: tuple[Diagnostic, ...] = () + success: bool = True + + def __post_init__(self) -> None: + object.__setattr__(self, "items", tuple(self.items)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + @property + def result_count(self) -> int: + return len(self.items) + + @property + def next_offset(self) -> int | None: + next_offset = self.request.offset + self.result_count + return next_offset if next_offset < self.total else None + + def to_dict(self) -> dict[str, Any]: + return { + "query": self.request.to_dict(), + "correlation_id": self.correlation_id, + "success": self.success, + "total": self.total, + "result_count": self.result_count, + "limit": self.request.limit, + "offset": self.request.offset, + "next_offset": self.next_offset, + "sort": { + "by": self.request.sort_by, + "order": self.request.sort_order, + }, + "results": [item.to_dict() for item in self.items], + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class RetrievalFeedbackRequest: + label: RetrievalFeedbackLabel | str + query: dict[str, Any] + result_ref: dict[str, Any] = field(default_factory=dict) + notes: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "label": self.label.value if isinstance(self.label, RetrievalFeedbackLabel) else self.label, + "query": dict(self.query), + "result_ref": dict(self.result_ref), + "notes": self.notes, + "metadata": dict(self.metadata), + } + + +@dataclass(frozen=True) +class RetrievalFeedbackResult: + record: RetrievalFeedbackRecord | None + correlation_id: str + diagnostics: tuple[Diagnostic, ...] = () + success: bool = True + + def __post_init__(self) -> None: + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + def to_dict(self) -> dict[str, Any]: + return { + "success": self.success, + "correlation_id": self.correlation_id, + "record": self.record.to_dict() if self.record else None, + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + } + + +@dataclass(frozen=True) +class RetrievalQualityMetrics: + query_count: int + zero_result_count: int + zero_result_rate: float + feedback_count: int + useful_count: int + unsafe_count: int + low_confidence_count: int + precision_at_k: float | None + citation_precision: float | None + permission_filter_observation_count: int + average_permission_filter_duration_ms: float | None + + def to_dict(self) -> dict[str, Any]: + return { + "query_count": self.query_count, + "zero_result_count": self.zero_result_count, + "zero_result_rate": self.zero_result_rate, + "feedback_count": self.feedback_count, + "useful_count": self.useful_count, + "unsafe_count": self.unsafe_count, + "low_confidence_count": self.low_confidence_count, + "precision_at_k": self.precision_at_k, + "citation_precision": self.citation_precision, + "permission_filter_observation_count": self.permission_filter_observation_count, + "average_permission_filter_duration_ms": self.average_permission_filter_duration_ms, + } + + +@dataclass(frozen=True) +class _LexicalDocument: + asset_id: str + representation_id: str + text: str + media_type: str + source_ref_id: str | None = None + storage_ref: str | None = None + provenance: dict[str, Any] = field(default_factory=dict) + + +class AssetRetrievalService: + def __init__( + self, + repository: AssetRegistryRepository, + *, + policy_gateway: PolicyGateway | None = None, + ) -> None: + self.repository = repository + self.policy_gateway = policy_gateway or AllowAllPolicyGateway() + self._lexical_index: tuple[_LexicalDocument, ...] = () + self._last_refresh = LexicalIndexRefreshResult(indexed_assets=0, indexed_representations=0) + + def refresh_index(self) -> LexicalIndexRefreshResult: + documents: list[_LexicalDocument] = [] + for asset in self.repository.list_assets(): + for representation in self.repository.list_representations( + asset_id=asset.id, + kind=RepresentationKind.NORMALIZED, + ): + text = representation.metadata.get("search_text") + if not isinstance(text, str) or not text: + continue + documents.append( + _LexicalDocument( + asset_id=asset.id, + representation_id=representation.representation_id, + text=text, + media_type=representation.media_type, + source_ref_id=representation.source_ref_id, + storage_ref=representation.storage_ref, + provenance=_snippet_provenance(representation), + ) + ) + self._lexical_index = tuple(documents) + self._last_refresh = LexicalIndexRefreshResult( + indexed_assets=len({document.asset_id for document in documents}), + indexed_representations=len(documents), + ) + return self._last_refresh + + def query_assets( + self, + request: AssetQueryRequest, + context: OperationContext, + ) -> AssetQueryResult: + diagnostics, normalized = _validate_request(request) + if diagnostics: + return AssetQueryResult( + request=request, + correlation_id=context.correlation_id, + total=0, + diagnostics=tuple(diagnostics), + success=False, + ) + scope_decision = self._authorize_for_retrieval( + context, + "retrieval.assets.query", + "retrieval:assets", + resource_metadata={"query": normalized.to_dict()}, + ) + if not scope_decision.allowed: + self._audit_retrieval( + "retrieval.assets.query", + "retrieval:assets", + AuditOutcome.DENIED, + context, + scope_decision, + details={"query": normalized.to_dict()}, + ) + return AssetQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=0, + diagnostics=( + _permission_denied_diagnostic(scope_decision), + ), + success=False, + ) + + assets = self.repository.list_assets( + lifecycle=normalized.lifecycle, + asset_type=normalized.asset_type, + sensitivity=normalized.sensitivity, + owner=normalized.owner, + topic=normalized.topic, + review_state=normalized.review_state, + metadata_filters=normalized.metadata_filters or None, + confirmed_metadata_only=normalized.confirmed_metadata_only, + ) + assets = [ + asset + for asset in assets + if _source_matches( + asset, + source_system=normalized.source_system, + source_path=normalized.source_path, + ) + ] + assets = [ + asset + for asset in assets + if _collection_matches( + asset, + self.repository.list_metadata_records(asset.id), + normalized.collection, + ) + and _tags_match( + asset, + self.repository.list_metadata_records(asset.id), + normalized.tags, + ) + and _timestamp_matches(asset, normalized) + ] + if normalized.representation_kind is not None: + assets = [ + asset + for asset in assets + if self.repository.list_representations( + asset_id=asset.id, + kind=normalized.representation_kind, + ) + ] + + relationship_context_by_asset: dict[str, tuple[CoreRelationship, ...]] = {} + context_entities_by_asset: dict[str, tuple[ContextEntity, ...]] = {} + if _asset_query_has_graph_filter(normalized): + graph_matches: list[KnowledgeAsset] = [] + for asset in assets: + relationships, entities = self._relationship_context_for_asset(asset.id, normalized, context) + if relationships: + graph_matches.append(asset) + relationship_context_by_asset[asset.id] = relationships + context_entities_by_asset[asset.id] = entities + assets = graph_matches + + relevance_by_asset: dict[str, dict[str, Any]] = {} + snippets_by_asset: dict[str, tuple[RetrievalSnippet, ...]] = {} + if normalized.text: + relevance_by_asset, snippets_by_asset = self._lexical_matches( + normalized.text, + include_snippets=normalized.include_snippets, + max_snippets=normalized.max_snippets, + snippet_radius=normalized.snippet_radius, + ) + assets = [asset for asset in assets if asset.id in relevance_by_asset] + + permission_filter_started = perf_counter() + asset_count_before_policy = len(assets) + assets = [asset for asset in assets if self._asset_allowed(asset, context)] + permission_filtered_count = asset_count_before_policy - len(assets) + permission_filter_duration_ms = _elapsed_ms(permission_filter_started) + + ordered = _sort_assets(assets, normalized.sort_by, normalized.sort_order) + total = len(ordered) + page = ordered[normalized.offset : normalized.offset + normalized.limit] + items = tuple( + self._item_for_asset( + asset, + normalized.representation_kind, + relevance=relevance_by_asset.get(asset.id, {}), + include_relationships=normalized.include_relationships, + relationships=relationship_context_by_asset.get(asset.id), + context_entities=context_entities_by_asset.get(asset.id), + snippets=snippets_by_asset.get(asset.id, ()), + context=context, + ) + for asset in page + ) + result = AssetQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=total, + items=items, + metadata={ + "zero_result": total == 0, + "graph_filter": _asset_query_has_graph_filter(normalized), + "policy_enforced": True, + "lexical_index": self._last_refresh.to_dict(), + }, + ) + self._audit_retrieval( + "retrieval.assets.query", + "retrieval:assets", + AuditOutcome.PARTIAL if permission_filtered_count else AuditOutcome.SUCCESS, + context, + scope_decision, + details={ + "query": normalized.to_dict(), + "result_count": result.result_count, + "total": result.total, + "permission_filtered_count": permission_filtered_count, + "permission_filter_duration_ms": permission_filter_duration_ms, + }, + ) + return result + + def query_context_entities( + self, + request: ContextEntityQueryRequest, + context: OperationContext, + ) -> ContextEntityQueryResult: + diagnostics, normalized = _validate_context_entity_request(request) + if diagnostics: + return ContextEntityQueryResult( + request=request, + correlation_id=context.correlation_id, + total=0, + diagnostics=tuple(diagnostics), + success=False, + ) + scope_decision = self._authorize_for_retrieval( + context, + "retrieval.context_entities.query", + "retrieval:context_entities", + resource_metadata={"query": normalized.to_dict()}, + ) + if not scope_decision.allowed: + self._audit_retrieval( + "retrieval.context_entities.query", + "retrieval:context_entities", + AuditOutcome.DENIED, + context, + scope_decision, + details={"query": normalized.to_dict()}, + ) + return ContextEntityQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=0, + diagnostics=( + _permission_denied_diagnostic(scope_decision), + ), + success=False, + ) + + entities = [ + entity + for entity in self.repository.list_context_entities() + if _context_entity_matches( + entity, + entity_id=normalized.entity_id, + entity_type=normalized.entity_type, + name=normalized.name, + external_ref=normalized.external_ref, + metadata_filters=normalized.metadata_filters, + ) + ] + permission_filter_started = perf_counter() + entity_count_before_policy = len(entities) + entities = [entity for entity in entities if self._context_entity_allowed(entity, context)] + permission_filtered_count = entity_count_before_policy - len(entities) + permission_filter_duration_ms = _elapsed_ms(permission_filter_started) + ordered = _sort_context_entities(entities, normalized.sort_by, normalized.sort_order) + total = len(ordered) + page = ordered[normalized.offset : normalized.offset + normalized.limit] + items = tuple(self._context_entity_item(entity, context) for entity in page) + result = ContextEntityQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=total, + items=items, + ) + self._audit_retrieval( + "retrieval.context_entities.query", + "retrieval:context_entities", + AuditOutcome.PARTIAL if permission_filtered_count else AuditOutcome.SUCCESS, + context, + scope_decision, + details={ + "query": normalized.to_dict(), + "result_count": result.result_count, + "total": result.total, + "permission_filtered_count": permission_filtered_count, + "permission_filter_duration_ms": permission_filter_duration_ms, + }, + ) + return result + + def query_relationships( + self, + request: RelationshipQueryRequest, + context: OperationContext, + ) -> RelationshipQueryResult: + diagnostics, normalized = _validate_relationship_request(request) + if diagnostics: + return RelationshipQueryResult( + request=request, + correlation_id=context.correlation_id, + total=0, + diagnostics=tuple(diagnostics), + success=False, + ) + scope_decision = self._authorize_for_retrieval( + context, + "retrieval.relationships.query", + "retrieval:relationships", + resource_metadata={"query": normalized.to_dict()}, + ) + if not scope_decision.allowed: + self._audit_retrieval( + "retrieval.relationships.query", + "retrieval:relationships", + AuditOutcome.DENIED, + context, + scope_decision, + details={"query": normalized.to_dict()}, + ) + return RelationshipQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=0, + diagnostics=( + _permission_denied_diagnostic(scope_decision), + ), + success=False, + ) + + relationships = self._relationships_for_request(normalized) + permission_filter_started = perf_counter() + relationship_count_before_policy = len(relationships) + relationships = [ + relationship for relationship in relationships if self._relationship_allowed(relationship, context) + ] + permission_filtered_count = relationship_count_before_policy - len(relationships) + permission_filter_duration_ms = _elapsed_ms(permission_filter_started) + ordered = _sort_relationships(relationships, normalized.sort_by, normalized.sort_order) + total = len(ordered) + page = ordered[normalized.offset : normalized.offset + normalized.limit] + items = tuple(self._relationship_item(relationship) for relationship in page) + result = RelationshipQueryResult( + request=normalized, + correlation_id=context.correlation_id, + total=total, + items=items, + ) + self._audit_retrieval( + "retrieval.relationships.query", + "retrieval:relationships", + AuditOutcome.PARTIAL if permission_filtered_count else AuditOutcome.SUCCESS, + context, + scope_decision, + details={ + "query": normalized.to_dict(), + "result_count": result.result_count, + "total": result.total, + "permission_filtered_count": permission_filtered_count, + "permission_filter_duration_ms": permission_filter_duration_ms, + }, + ) + return result + + def record_feedback( + self, + request: RetrievalFeedbackRequest, + context: OperationContext, + ) -> RetrievalFeedbackResult: + diagnostics: list[Diagnostic] = [] + label = _parse_feedback_label(request.label, diagnostics) + if diagnostics: + return RetrievalFeedbackResult( + record=None, + correlation_id=context.correlation_id, + diagnostics=tuple(diagnostics), + success=False, + ) + decision = self._authorize_for_retrieval( + context, + "retrieval.feedback.record", + "retrieval:feedback", + resource_metadata=request.to_dict(), + ) + if not decision.allowed: + self._audit_retrieval( + "retrieval.feedback.record", + "retrieval:feedback", + AuditOutcome.DENIED, + context, + decision, + details=request.to_dict(), + ) + return RetrievalFeedbackResult( + record=None, + correlation_id=context.correlation_id, + diagnostics=( + _permission_denied_diagnostic(decision), + ), + success=False, + ) + record = RetrievalFeedbackRecord( + label=label or RetrievalFeedbackLabel.LOW_CONFIDENCE, + query=dict(request.query), + result_ref=dict(request.result_ref), + actor_id=context.actor.id, + correlation_id=context.correlation_id, + notes=request.notes, + metadata=dict(request.metadata), + ) + saved = self.repository.save_retrieval_feedback(record) + self._audit_retrieval( + "retrieval.feedback.record", + f"retrieval_feedback:{saved.feedback_id}", + AuditOutcome.SUCCESS, + context, + decision, + details={ + "feedback_id": saved.feedback_id, + "label": saved.label.value, + "result_ref": dict(saved.result_ref), + }, + ) + return RetrievalFeedbackResult(record=saved, correlation_id=context.correlation_id) + + def list_feedback( + self, + *, + correlation_id: str | None = None, + label: RetrievalFeedbackLabel | str | None = None, + ) -> tuple[RetrievalFeedbackRecord, ...]: + label_value = label.value if isinstance(label, RetrievalFeedbackLabel) else label + return tuple(self.repository.list_retrieval_feedback(correlation_id=correlation_id, label=label_value)) + + def quality_metrics( + self, + *, + query_results: tuple[AssetQueryResult, ...] = (), + precision_at_k: int = 5, + ) -> RetrievalQualityMetrics: + feedback = self.repository.list_retrieval_feedback() + query_count = len(query_results) + zero_result_count = sum(1 for result in query_results if result.total == 0) + ranked_feedback = [ + record + for record in feedback + if _feedback_rank(record) is not None and (_feedback_rank(record) or 0) <= precision_at_k + ] + useful_ranked = [record for record in ranked_feedback if record.label == RetrievalFeedbackLabel.USEFUL] + citation_feedback = [record for record in feedback if _feedback_has_citation_ref(record)] + useful_citations = [record for record in citation_feedback if record.label == RetrievalFeedbackLabel.USEFUL] + permission_latencies = [ + float(event.details["permission_filter_duration_ms"]) + for event in self.repository.list_audit_events() + if event.operation.startswith("retrieval.") + and "permission_filter_duration_ms" in event.details + ] + return RetrievalQualityMetrics( + query_count=query_count, + zero_result_count=zero_result_count, + zero_result_rate=zero_result_count / query_count if query_count else 0.0, + feedback_count=len(feedback), + useful_count=sum(1 for record in feedback if record.label == RetrievalFeedbackLabel.USEFUL), + unsafe_count=sum(1 for record in feedback if record.label == RetrievalFeedbackLabel.UNSAFE), + low_confidence_count=sum(1 for record in feedback if record.label == RetrievalFeedbackLabel.LOW_CONFIDENCE), + precision_at_k=len(useful_ranked) / len(ranked_feedback) if ranked_feedback else None, + citation_precision=len(useful_citations) / len(citation_feedback) if citation_feedback else None, + permission_filter_observation_count=len(permission_latencies), + average_permission_filter_duration_ms=sum(permission_latencies) / len(permission_latencies) + if permission_latencies + else None, + ) + + def _item_for_asset( + self, + asset: KnowledgeAsset, + representation_kind: RepresentationKind | None, + *, + relevance: dict[str, Any] | None = None, + include_relationships: bool = False, + relationships: tuple[CoreRelationship, ...] | None = None, + context_entities: tuple[ContextEntity, ...] | None = None, + snippets: tuple[RetrievalSnippet, ...] = (), + context: OperationContext | None = None, + ) -> AssetQueryItem: + if relationships is None and include_relationships: + relationships = self._relationships_for_asset(asset.id, "both") + if context is not None: + relationships = tuple( + relationship for relationship in relationships if self._relationship_allowed(relationship, context) + ) + if context_entities is None and relationships: + context_entities = self._context_entities_for_relationships(relationships) + return AssetQueryItem( + asset=asset, + representations=tuple( + self.repository.list_representations( + asset_id=asset.id, + kind=representation_kind, + ) + ), + metadata_records=tuple(self.repository.list_metadata_records(asset.id)), + relationships=tuple(relationships or ()), + context_entities=tuple(context_entities or ()), + snippets=tuple(snippets), + relevance=dict(relevance or {}), + ) + + def _lexical_matches( + self, + text: str, + *, + include_snippets: bool, + max_snippets: int, + snippet_radius: int, + ) -> tuple[dict[str, dict[str, Any]], dict[str, tuple[RetrievalSnippet, ...]]]: + if not self._lexical_index: + self.refresh_index() + needle = text.casefold() + matches: dict[str, dict[str, Any]] = {} + snippets: dict[str, list[RetrievalSnippet]] = {} + for document in self._lexical_index: + haystack = document.text.casefold() + count = haystack.count(needle) + if count <= 0: + continue + current = matches.setdefault( + document.asset_id, + { + "strategy": "lexical_substring", + "query": text, + "match_count": 0, + "representation_ids": [], + }, + ) + current["match_count"] += count + current["representation_ids"].append(document.representation_id) + if include_snippets: + asset_snippets = snippets.setdefault(document.asset_id, []) + remaining = max_snippets - len(asset_snippets) + if remaining > 0: + asset_snippets.extend( + _snippets_for_document( + document, + text, + max_snippets=remaining, + snippet_radius=snippet_radius, + ) + ) + current["snippet_count"] = len(asset_snippets) + return matches, {asset_id: tuple(items) for asset_id, items in snippets.items()} + + def _context_entity_item(self, entity: ContextEntity, context: OperationContext) -> ContextEntityQueryItem: + relationships = [ + relationship + for relationship in self.repository.list_relationships(target_id=entity.entity_id) + if relationship.target_kind == RelationshipTargetKind.CONTEXT_ENTITY + and self._relationship_allowed(relationship, context) + ] + return ContextEntityQueryItem( + entity=entity, + asset_ids=tuple(sorted({relationship.source_id for relationship in relationships})), + relationship_count=len(relationships), + ) + + def _relationship_item(self, relationship: CoreRelationship) -> RelationshipQueryItem: + source_asset = self.repository.get_asset(relationship.source_id) + target_asset = None + target_entity = None + if relationship.target_kind == RelationshipTargetKind.ASSET: + target_asset = self.repository.get_asset(relationship.target_id) + else: + target_entity = self.repository.get_context_entity(relationship.target_id) + return RelationshipQueryItem( + relationship=relationship, + source_asset=source_asset, + target_asset=target_asset, + target_entity=target_entity, + ) + + def _relationship_context_for_asset( + self, + asset_id: str, + request: AssetQueryRequest, + context: OperationContext, + ) -> tuple[tuple[CoreRelationship, ...], tuple[ContextEntity, ...]]: + relationships = self._relationships_for_asset(asset_id, request.relationship_direction) + entity_ids = self._context_entity_ids_for_asset_query(request) + filtered: list[CoreRelationship] = [] + for relationship in relationships: + if request.relationship_predicate is not None and relationship.predicate != request.relationship_predicate: + continue + if request.related_asset_id is not None and not _relationship_connects_asset( + relationship, + asset_id=asset_id, + related_asset_id=request.related_asset_id, + ): + continue + if entity_ids is not None and not ( + relationship.target_kind == RelationshipTargetKind.CONTEXT_ENTITY + and relationship.target_id in entity_ids + ): + continue + if not self._relationship_allowed(relationship, context): + continue + filtered.append(relationship) + ordered = tuple(_sort_relationships(_unique_relationships(filtered), "source_id", "asc")) + return ordered, self._context_entities_for_relationships(ordered) + + def _relationships_for_request(self, request: RelationshipQueryRequest) -> list[CoreRelationship]: + relationships = self._relationships_for_relationship_query_base(request) + entity_ids = self._context_entity_ids_for_relationship_query(request) + filtered: list[CoreRelationship] = [] + for relationship in relationships: + if request.target_kind is not None and relationship.target_kind != request.target_kind: + continue + if request.predicate is not None and relationship.predicate != request.predicate: + continue + if entity_ids is not None and not ( + relationship.target_kind == RelationshipTargetKind.CONTEXT_ENTITY + and relationship.target_id in entity_ids + ): + continue + filtered.append(relationship) + return _unique_relationships(filtered) + + def _relationships_for_relationship_query_base( + self, + request: RelationshipQueryRequest, + ) -> list[CoreRelationship]: + if request.asset_id is not None: + return list(self._relationships_for_asset(request.asset_id, request.direction)) + if request.source_id is not None and request.target_id is not None: + return [ + relationship + for relationship in self.repository.list_relationships(source_id=request.source_id) + if relationship.target_id == request.target_id + ] + if request.source_id is not None: + return list(self.repository.list_relationships(source_id=request.source_id)) + if request.target_id is not None: + return list(self.repository.list_relationships(target_id=request.target_id)) + return _unique_relationships( + [ + relationship + for asset in self.repository.list_assets() + for relationship in self.repository.list_relationships(source_id=asset.id) + ] + ) + + def _relationships_for_asset(self, asset_id: str, direction: str) -> tuple[CoreRelationship, ...]: + relationships: list[CoreRelationship] = [] + if direction in {"outbound", "both"}: + relationships.extend(self.repository.list_relationships(source_id=asset_id)) + if direction in {"inbound", "both"}: + relationships.extend( + relationship + for relationship in self.repository.list_relationships(target_id=asset_id) + if relationship.target_kind == RelationshipTargetKind.ASSET + ) + return tuple(_unique_relationships(relationships)) + + def _context_entity_ids_for_asset_query(self, request: AssetQueryRequest) -> set[str] | None: + if not _asset_query_has_context_entity_filter(request): + return None + return self._resolve_context_entity_ids( + entity_id=request.context_entity_id, + entity_type=request.context_entity_type, + name=request.context_entity_name, + external_ref=request.context_entity_external_ref, + workflow_run_id=request.workflow_run_id, + ) + + def _context_entity_ids_for_relationship_query(self, request: RelationshipQueryRequest) -> set[str] | None: + if not _relationship_query_has_context_entity_filter(request): + return None + return self._resolve_context_entity_ids( + entity_id=request.context_entity_id, + entity_type=request.context_entity_type, + name=request.context_entity_name, + external_ref=request.context_entity_external_ref, + workflow_run_id=request.workflow_run_id, + ) + + def _resolve_context_entity_ids( + self, + *, + entity_id: str | None, + entity_type: ContextEntityType | None, + name: str | None, + external_ref: str | None, + workflow_run_id: str | None, + ) -> set[str]: + entities = self.repository.list_context_entities() + if workflow_run_id is not None: + entity_type = ContextEntityType.WORKFLOW_RUN + return { + entity.entity_id + for entity in entities + if _context_entity_matches( + entity, + entity_id=entity_id, + entity_type=entity_type, + name=name, + external_ref=external_ref, + metadata_filters={}, + ) + and _workflow_run_matches(entity, workflow_run_id) + } + + def _context_entities_for_relationships( + self, + relationships: tuple[CoreRelationship, ...], + ) -> tuple[ContextEntity, ...]: + entities: list[ContextEntity] = [] + for relationship in relationships: + if relationship.target_kind != RelationshipTargetKind.CONTEXT_ENTITY: + continue + entities.append(self.repository.get_context_entity(relationship.target_id)) + return tuple(_unique_context_entities(entities)) + + def _asset_allowed(self, asset: KnowledgeAsset, context: OperationContext) -> bool: + decision = self._authorize_for_retrieval( + context, + "asset.retrieve", + f"asset:{asset.id}", + resource_metadata={ + "asset_id": asset.id, + "asset_type": asset.classification.asset_type, + "lifecycle": asset.lifecycle.value, + "sensitivity": asset.classification.sensitivity.value, + "owner": asset.classification.owner, + }, + ) + return decision.allowed + + def _context_entity_allowed(self, entity: ContextEntity, context: OperationContext) -> bool: + decision = self._authorize_for_retrieval( + context, + "context_entity.retrieve", + f"context_entity:{entity.entity_id}", + resource_metadata={ + "entity_id": entity.entity_id, + "entity_type": entity.entity_type.value, + "external_ref": entity.external_ref, + }, + ) + return decision.allowed + + def _relationship_allowed(self, relationship: CoreRelationship, context: OperationContext) -> bool: + decision = self._authorize_for_retrieval( + context, + "asset.relationship.retrieve", + f"relationship:{relationship.relationship_id}", + resource_metadata={ + "relationship_id": relationship.relationship_id, + "source_id": relationship.source_id, + "target_id": relationship.target_id, + "target_kind": relationship.target_kind.value, + "predicate": relationship.predicate, + }, + ) + if not decision.allowed: + return False + source_asset = self.repository.get_asset(relationship.source_id) + if not self._asset_allowed(source_asset, context): + return False + if relationship.target_kind == RelationshipTargetKind.ASSET: + target_asset = self.repository.get_asset(relationship.target_id) + return self._asset_allowed(target_asset, context) + target_entity = self.repository.get_context_entity(relationship.target_id) + return self._context_entity_allowed(target_entity, context) + + def _authorize_for_retrieval( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, Any] | None = None, + ) -> PolicyDecision: + self.repository.save_actor(context.actor) + try: + return self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata=resource_metadata, + ) + except Exception as exc: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Retrieval policy gateway failed", + context={ + "gateway_error": type(exc).__name__, + "resource_metadata": resource_metadata or {}, + }, + ) + + def _audit_retrieval( + self, + operation: str, + target: str, + outcome: AuditOutcome, + context: OperationContext, + policy_decision: PolicyDecision, + *, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + event = AuditEvent.from_context( + operation, + target, + outcome, + context, + policy_decision=policy_decision, + details=details, + ) + return self.repository.save_audit_event(event) + + +def _validate_request(request: AssetQueryRequest) -> tuple[list[Diagnostic], AssetQueryRequest]: + diagnostics: list[Diagnostic] = [] + lifecycle = _parse_lifecycle(request.lifecycle, diagnostics) + sensitivity = _parse_sensitivity(request.sensitivity, diagnostics) + representation_kind = _parse_representation_kind(request.representation_kind, diagnostics) + context_entity_type = _parse_context_entity_type(request.context_entity_type, diagnostics) + relationship_direction = _parse_relationship_direction(request.relationship_direction, diagnostics) + + if request.limit < 1 or request.limit > 500: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.limit_invalid", + message="Query limit must be between 1 and 500", + details={"limit": request.limit}, + ) + ) + if request.offset < 0: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.offset_invalid", + message="Query offset must be zero or greater", + details={"offset": request.offset}, + ) + ) + if request.max_snippets < 0 or request.max_snippets > 20: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.max_snippets_invalid", + message="Max snippets must be between 0 and 20", + details={"max_snippets": request.max_snippets}, + ) + ) + if request.snippet_radius < 0 or request.snippet_radius > 1000: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.snippet_radius_invalid", + message="Snippet radius must be between 0 and 1000", + details={"snippet_radius": request.snippet_radius}, + ) + ) + if request.sort_by not in SUPPORTED_SORT_KEYS: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.sort_invalid", + message="Query sort key is not supported", + details={"sort_by": request.sort_by, "supported": sorted(SUPPORTED_SORT_KEYS)}, + ) + ) + if request.sort_order not in {"asc", "desc"}: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.sort_order_invalid", + message="Query sort order must be asc or desc", + details={"sort_order": request.sort_order}, + ) + ) + + return diagnostics, AssetQueryRequest( + text=request.text, + asset_type=request.asset_type, + lifecycle=lifecycle, + sensitivity=sensitivity, + owner=request.owner, + topic=request.topic, + tags=tuple(request.tags), + collection=request.collection, + review_state=request.review_state, + metadata_filters=dict(request.metadata_filters), + confirmed_metadata_only=request.confirmed_metadata_only, + source_system=request.source_system, + source_path=request.source_path, + context_entity_id=request.context_entity_id, + context_entity_type=context_entity_type, + context_entity_name=request.context_entity_name, + context_entity_external_ref=request.context_entity_external_ref, + workflow_run_id=request.workflow_run_id, + related_asset_id=request.related_asset_id, + relationship_predicate=request.relationship_predicate, + relationship_direction=relationship_direction, + include_relationships=request.include_relationships, + include_snippets=request.include_snippets, + max_snippets=request.max_snippets, + snippet_radius=request.snippet_radius, + created_after=request.created_after, + created_before=request.created_before, + updated_after=request.updated_after, + updated_before=request.updated_before, + representation_kind=representation_kind, + sort_by=request.sort_by, + sort_order=request.sort_order, + limit=request.limit, + offset=request.offset, + ) + + +def _permission_denied_diagnostic(decision: PolicyDecision) -> Diagnostic: + return Diagnostic( + severity="error", + code="retrieval.permission_denied", + message="Retrieval query denied by policy", + details={"policy_decision": decision.to_dict()}, + ) + + +def _parse_feedback_label( + value: RetrievalFeedbackLabel | str, + diagnostics: list[Diagnostic], +) -> RetrievalFeedbackLabel | None: + if isinstance(value, RetrievalFeedbackLabel): + return value + try: + return RetrievalFeedbackLabel(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.feedback_label_invalid", + message="Retrieval feedback label is not supported", + details={"label": value, "supported": [item.value for item in RetrievalFeedbackLabel]}, + ) + ) + return None + + +def _feedback_rank(record: RetrievalFeedbackRecord) -> int | None: + rank = record.result_ref.get("rank") + if isinstance(rank, int): + return rank + if isinstance(rank, str) and rank.isdigit(): + return int(rank) + return None + + +def _feedback_has_citation_ref(record: RetrievalFeedbackRecord) -> bool: + return any( + record.result_ref.get(key) + for key in ( + "snippet_id", + "representation_id", + "source_ref_id", + ) + ) or bool(record.metadata.get("citation")) + + +def _elapsed_ms(started_at: float) -> float: + return round((perf_counter() - started_at) * 1000, 3) + + +def _parse_lifecycle(value: LifecycleState | str | None, diagnostics: list[Diagnostic]) -> LifecycleState | None: + if value is None or isinstance(value, LifecycleState): + return value + try: + return LifecycleState(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.lifecycle_invalid", + message="Lifecycle filter is not supported", + details={"lifecycle": value, "supported": [item.value for item in LifecycleState]}, + ) + ) + return None + + +def _parse_sensitivity(value: Sensitivity | str | None, diagnostics: list[Diagnostic]) -> Sensitivity | None: + if value is None or isinstance(value, Sensitivity): + return value + try: + return Sensitivity(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.sensitivity_invalid", + message="Sensitivity filter is not supported", + details={"sensitivity": value, "supported": [item.value for item in Sensitivity]}, + ) + ) + return None + + +def _parse_representation_kind( + value: RepresentationKind | str | None, + diagnostics: list[Diagnostic], +) -> RepresentationKind | None: + if value is None or isinstance(value, RepresentationKind): + return value + try: + return RepresentationKind(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.representation_kind_invalid", + message="Representation kind filter is not supported", + details={"representation_kind": value, "supported": [item.value for item in RepresentationKind]}, + ) + ) + return None + + +def _parse_context_entity_type( + value: ContextEntityType | str | None, + diagnostics: list[Diagnostic], +) -> ContextEntityType | None: + if value is None or isinstance(value, ContextEntityType): + return value + try: + return ContextEntityType(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.context_entity_type_invalid", + message="Context entity type filter is not supported", + details={"context_entity_type": value, "supported": [item.value for item in ContextEntityType]}, + ) + ) + return None + + +def _parse_relationship_target_kind( + value: RelationshipTargetKind | str | None, + diagnostics: list[Diagnostic], +) -> RelationshipTargetKind | None: + if value is None or isinstance(value, RelationshipTargetKind): + return value + try: + return RelationshipTargetKind(value) + except ValueError: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.relationship_target_kind_invalid", + message="Relationship target kind is not supported", + details={"target_kind": value, "supported": [item.value for item in RelationshipTargetKind]}, + ) + ) + return None + + +def _parse_relationship_direction(value: str, diagnostics: list[Diagnostic]) -> str: + if value in {"outbound", "inbound", "both"}: + return value + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.relationship_direction_invalid", + message="Relationship direction must be outbound, inbound, or both", + details={"direction": value}, + ) + ) + return "both" + + +def _validate_context_entity_request( + request: ContextEntityQueryRequest, +) -> tuple[list[Diagnostic], ContextEntityQueryRequest]: + diagnostics: list[Diagnostic] = [] + entity_type = _parse_context_entity_type(request.entity_type, diagnostics) + _validate_window_and_sort( + diagnostics, + limit=request.limit, + offset=request.offset, + sort_by=request.sort_by, + sort_order=request.sort_order, + supported_sort_keys=SUPPORTED_CONTEXT_ENTITY_SORT_KEYS, + ) + return diagnostics, ContextEntityQueryRequest( + entity_id=request.entity_id, + entity_type=entity_type, + name=request.name, + external_ref=request.external_ref, + metadata_filters=dict(request.metadata_filters), + sort_by=request.sort_by, + sort_order=request.sort_order, + limit=request.limit, + offset=request.offset, + ) + + +def _validate_relationship_request( + request: RelationshipQueryRequest, +) -> tuple[list[Diagnostic], RelationshipQueryRequest]: + diagnostics: list[Diagnostic] = [] + context_entity_type = _parse_context_entity_type(request.context_entity_type, diagnostics) + target_kind = _parse_relationship_target_kind(request.target_kind, diagnostics) + direction = _parse_relationship_direction(request.direction, diagnostics) + _validate_window_and_sort( + diagnostics, + limit=request.limit, + offset=request.offset, + sort_by=request.sort_by, + sort_order=request.sort_order, + supported_sort_keys=SUPPORTED_RELATIONSHIP_SORT_KEYS, + ) + return diagnostics, RelationshipQueryRequest( + source_id=request.source_id, + target_id=request.target_id, + asset_id=request.asset_id, + context_entity_id=request.context_entity_id, + context_entity_type=context_entity_type, + context_entity_name=request.context_entity_name, + context_entity_external_ref=request.context_entity_external_ref, + workflow_run_id=request.workflow_run_id, + target_kind=target_kind, + predicate=request.predicate, + direction=direction, + sort_by=request.sort_by, + sort_order=request.sort_order, + limit=request.limit, + offset=request.offset, + ) + + +def _validate_window_and_sort( + diagnostics: list[Diagnostic], + *, + limit: int, + offset: int, + sort_by: str, + sort_order: str, + supported_sort_keys: set[str], +) -> None: + if limit < 1 or limit > 500: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.limit_invalid", + message="Query limit must be between 1 and 500", + details={"limit": limit}, + ) + ) + if offset < 0: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.offset_invalid", + message="Query offset must be zero or greater", + details={"offset": offset}, + ) + ) + if sort_by not in supported_sort_keys: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.sort_invalid", + message="Query sort key is not supported", + details={"sort_by": sort_by, "supported": sorted(supported_sort_keys)}, + ) + ) + if sort_order not in {"asc", "desc"}: + diagnostics.append( + Diagnostic( + severity="error", + code="retrieval.sort_order_invalid", + message="Query sort order must be asc or desc", + details={"sort_order": sort_order}, + ) + ) + + +def _source_matches( + asset: KnowledgeAsset, + *, + source_system: str | None, + source_path: str | None, +) -> bool: + if source_system is None and source_path is None: + return True + for source_ref in asset.source_refs: + if source_system is not None and source_ref.source_system != source_system: + continue + if source_path is not None and source_ref.path != source_path: + continue + return True + return False + + +def _collection_matches( + asset: KnowledgeAsset, + metadata_records: list[MetadataRecord], + collection: str | None, +) -> bool: + if collection is None: + return True + if asset.metadata.get("collection") == collection: + return True + if asset.classification.metadata.get("collection") == collection: + return True + return any(record.key == "collection" and record.value == collection for record in metadata_records) + + +def _tags_match( + asset: KnowledgeAsset, + metadata_records: list[MetadataRecord], + tags: tuple[str, ...], +) -> bool: + if not tags: + return True + values = set(asset.classification.topics) + for record in metadata_records: + if record.key not in {"tag", "tags"}: + continue + if isinstance(record.value, list): + values.update(str(item) for item in record.value) + elif record.value is not None: + values.add(str(record.value)) + return all(tag in values for tag in tags) + + +def _timestamp_matches(asset: KnowledgeAsset, request: AssetQueryRequest) -> bool: + if request.created_after and asset.created_at < request.created_after: + return False + if request.created_before and asset.created_at > request.created_before: + return False + if request.updated_after and asset.updated_at < request.updated_after: + return False + if request.updated_before and asset.updated_at > request.updated_before: + return False + return True + + +def _asset_query_has_graph_filter(request: AssetQueryRequest) -> bool: + return ( + _asset_query_has_context_entity_filter(request) + or request.related_asset_id is not None + or request.relationship_predicate is not None + ) + + +def _asset_query_has_context_entity_filter(request: AssetQueryRequest) -> bool: + return any( + value is not None + for value in ( + request.context_entity_id, + request.context_entity_type, + request.context_entity_name, + request.context_entity_external_ref, + request.workflow_run_id, + ) + ) + + +def _relationship_query_has_context_entity_filter(request: RelationshipQueryRequest) -> bool: + return any( + value is not None + for value in ( + request.context_entity_id, + request.context_entity_type, + request.context_entity_name, + request.context_entity_external_ref, + request.workflow_run_id, + ) + ) + + +def _context_entity_matches( + entity: ContextEntity, + *, + entity_id: str | None, + entity_type: ContextEntityType | None, + name: str | None, + external_ref: str | None, + metadata_filters: dict[str, Any], +) -> bool: + if entity_id is not None and entity.entity_id != entity_id: + return False + if entity_type is not None and entity.entity_type != entity_type: + return False + if name is not None and entity.name != name: + return False + if external_ref is not None and entity.external_ref != external_ref: + return False + for key, expected in metadata_filters.items(): + if not _metadata_value_matches(entity.metadata.get(key), expected): + return False + return True + + +def _workflow_run_matches(entity: ContextEntity, workflow_run_id: str | None) -> bool: + if workflow_run_id is None: + return True + if entity.entity_id == workflow_run_id or entity.external_ref == workflow_run_id: + return True + return entity.metadata.get("workflow_run_id") == workflow_run_id + + +def _snippet_provenance(representation: AssetRepresentation) -> dict[str, Any]: + provenance: dict[str, Any] = {} + for key in ( + "adapter_provenance", + "context_span", + "extractor", + "markitect_selector", + "snapshot", + "source_span", + ): + value = representation.metadata.get(key) + if value: + provenance[key] = value + return provenance + + +def _snippets_for_document( + document: _LexicalDocument, + query_text: str, + *, + max_snippets: int, + snippet_radius: int, +) -> list[RetrievalSnippet]: + if max_snippets <= 0: + return [] + needle = query_text.casefold() + haystack = document.text.casefold() + snippets: list[RetrievalSnippet] = [] + cursor = 0 + while len(snippets) < max_snippets: + match_start = haystack.find(needle, cursor) + if match_start < 0: + break + match_end = match_start + len(query_text) + snippet_start = max(0, match_start - snippet_radius) + snippet_end = min(len(document.text), match_end + snippet_radius) + snippet_text = document.text[snippet_start:snippet_end].strip() + if snippet_start > 0: + snippet_text = "..." + snippet_text + if snippet_end < len(document.text): + snippet_text = snippet_text + "..." + snippets.append( + RetrievalSnippet( + asset_id=document.asset_id, + representation_id=document.representation_id, + source_ref_id=document.source_ref_id, + storage_ref=document.storage_ref, + media_type=document.media_type, + text=snippet_text, + start_offset=snippet_start, + end_offset=snippet_end, + match_text=document.text[match_start:match_end], + provenance=dict(document.provenance), + ) + ) + cursor = match_end + return snippets + + +def _relationship_connects_asset( + relationship: CoreRelationship, + *, + asset_id: str, + related_asset_id: str, +) -> bool: + if relationship.target_kind != RelationshipTargetKind.ASSET: + return False + return ( + relationship.source_id == asset_id + and relationship.target_id == related_asset_id + or relationship.source_id == related_asset_id + and relationship.target_id == asset_id + ) + + +def _metadata_value_matches(value: Any, expected: Any) -> bool: + if isinstance(value, list): + return expected in value + return value == expected + + +def _unique_relationships(relationships: list[CoreRelationship]) -> list[CoreRelationship]: + seen: set[str] = set() + unique: list[CoreRelationship] = [] + for relationship in relationships: + if relationship.relationship_id in seen: + continue + seen.add(relationship.relationship_id) + unique.append(relationship) + return unique + + +def _unique_context_entities(entities: list[ContextEntity]) -> list[ContextEntity]: + seen: set[str] = set() + unique: list[ContextEntity] = [] + for entity in entities: + if entity.entity_id in seen: + continue + seen.add(entity.entity_id) + unique.append(entity) + return sorted(unique, key=lambda item: (item.entity_type.value, item.name, item.entity_id)) + + +def _sort_assets( + assets: list[KnowledgeAsset], + sort_by: str, + sort_order: str, +) -> list[KnowledgeAsset]: + reverse = sort_order == "desc" + + def sort_key(asset: KnowledgeAsset) -> tuple[Any, str]: + if sort_by == "asset_id": + primary = asset.id + elif sort_by == "asset_type": + primary = asset.classification.asset_type + elif sort_by == "created_at": + primary = asset.created_at + elif sort_by == "lifecycle": + primary = asset.lifecycle.value + elif sort_by == "updated_at": + primary = asset.updated_at + else: + primary = asset.title.casefold() + return primary, asset.id + + return sorted(assets, key=sort_key, reverse=reverse) + + +def _sort_context_entities( + entities: list[ContextEntity], + sort_by: str, + sort_order: str, +) -> list[ContextEntity]: + reverse = sort_order == "desc" + + def sort_key(entity: ContextEntity) -> tuple[Any, str]: + if sort_by == "entity_id": + primary = entity.entity_id + elif sort_by == "entity_type": + primary = entity.entity_type.value + elif sort_by == "external_ref": + primary = entity.external_ref or "" + else: + primary = entity.name.casefold() + return primary, entity.entity_id + + return sorted(entities, key=sort_key, reverse=reverse) + + +def _sort_relationships( + relationships: list[CoreRelationship], + sort_by: str, + sort_order: str, +) -> list[CoreRelationship]: + reverse = sort_order == "desc" + + def sort_key(relationship: CoreRelationship) -> tuple[Any, str]: + if sort_by == "created_at": + primary = relationship.created_at + elif sort_by == "predicate": + primary = relationship.predicate + elif sort_by == "relationship_id": + primary = relationship.relationship_id + elif sort_by == "target_id": + primary = relationship.target_id + elif sort_by == "target_kind": + primary = relationship.target_kind.value + else: + primary = relationship.source_id + return primary, relationship.relationship_id + + return sorted(relationships, key=sort_key, reverse=reverse) diff --git a/tests/test_asset_retrieval_service.py b/tests/test_asset_retrieval_service.py new file mode 100644 index 0000000..e79b8de --- /dev/null +++ b/tests/test_asset_retrieval_service.py @@ -0,0 +1,722 @@ +from pathlib import Path + +from kontextual_engine import ( + Actor, + ActorType, + AssetIngestionService, + AssetQueryRequest, + AssetRegistryService, + AssetRepresentation, + AssetRetrievalService, + Classification, + ContextEntity, + ContextEntityQueryRequest, + ContextEntityType, + InMemoryAssetRegistryRepository, + LifecycleState, + MetadataRecord, + OperationContext, + PolicyDecision, + RelationshipQueryRequest, + RepresentationKind, + RetrievalFeedbackLabel, + RetrievalFeedbackRequest, + Sensitivity, + SQLiteAssetRegistryRepository, + SourceReference, +) + + +def test_asset_retrieval_returns_stable_paginated_envelope() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + source_ref = SourceReference(source_system="repo", path="docs/b.md", checksum="sha256:b") + representation = AssetRepresentation.from_content( + "asset-bravo", + RepresentationKind.NORMALIZED, + "text/plain", + "bravo content", + storage_ref="object://bravo-normalized", + source_ref_id=source_ref.id, + ) + + registry.create_asset( + "Charlie", + Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-charlie", + ) + registry.create_asset( + "Bravo", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC, owner="Docs", topics=("retrieval",)), + context, + asset_id="asset-bravo", + source_refs=[source_ref], + representations=[representation], + metadata_records=[MetadataRecord("status", "approved", confirmed=True)], + ) + registry.create_asset( + "Alpha", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC, owner="Docs"), + context, + asset_id="asset-alpha", + ) + + first_page = retrieval.query_assets( + AssetQueryRequest(asset_type="document", sort_by="title", limit=1), + context, + ) + second_page = retrieval.query_assets( + AssetQueryRequest(asset_type="document", sort_by="title", limit=1, offset=1), + context, + ) + + assert first_page.success is True + assert first_page.total == 2 + assert first_page.result_count == 1 + assert first_page.next_offset == 1 + assert first_page.items[0].asset.id == "asset-alpha" + assert second_page.next_offset is None + assert second_page.items[0].asset.id == "asset-bravo" + assert second_page.to_dict()["results"][0]["source_refs"][0]["path"] == "docs/b.md" + assert second_page.to_dict()["results"][0]["representations"][0]["storage_ref"] == "object://bravo-normalized" + assert second_page.to_dict()["results"][0]["metadata_records"][0]["key"] == "status" + assert second_page.to_dict()["correlation_id"] == "corr-retrieval" + + +def test_asset_retrieval_filters_source_metadata_lifecycle_and_representation_kind() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + source_ref = SourceReference(source_system="repo", path="docs/guide.md", checksum="sha256:guide") + registry.create_asset( + "Guide", + Classification( + asset_type="guide", + sensitivity=Sensitivity.INTERNAL, + owner="Docs", + topics=("retrieval",), + review_state="approved", + ), + context, + asset_id="asset-guide", + source_refs=[source_ref], + representations=[ + AssetRepresentation.from_content( + "asset-guide", + RepresentationKind.NORMALIZED, + "text/plain", + "guide normalized", + ) + ], + metadata_records=[MetadataRecord("channel", "internal", confirmed=True)], + ) + draft = registry.create_asset( + "Draft", + Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL, owner="Docs"), + context, + asset_id="asset-draft", + metadata_records=[MetadataRecord("channel", "internal", confirmed=False)], + ) + registry.transition_lifecycle(draft.asset.id, LifecycleState.RETIRED, context) + + result = retrieval.query_assets( + AssetQueryRequest( + asset_type="guide", + lifecycle=LifecycleState.ACTIVE, + owner="Docs", + topic="retrieval", + review_state="approved", + metadata_filters={"channel": "internal"}, + confirmed_metadata_only=True, + source_system="repo", + source_path="docs/guide.md", + representation_kind=RepresentationKind.NORMALIZED, + ), + context, + ) + + assert [item.asset.id for item in result.items] == ["asset-guide"] + assert result.items[0].representations[0].kind == RepresentationKind.NORMALIZED + assert result.items[0].metadata_records[0].confirmed is True + + +def test_asset_retrieval_invalid_query_returns_structured_diagnostics() -> None: + retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository()) + result = retrieval.query_assets( + AssetQueryRequest( + lifecycle="missing", + representation_kind="summary", + sort_by="rank", + sort_order="sideways", + limit=0, + offset=-1, + ), + operation_context(), + ) + + assert result.success is False + assert result.total == 0 + assert result.items == () + assert {diagnostic.code for diagnostic in result.diagnostics} == { + "retrieval.lifecycle_invalid", + "retrieval.representation_kind_invalid", + "retrieval.sort_invalid", + "retrieval.sort_order_invalid", + "retrieval.limit_invalid", + "retrieval.offset_invalid", + } + payload = result.to_dict() + assert payload["success"] is False + assert payload["diagnostics"][0]["severity"] == "error" + + +def test_asset_retrieval_lexical_search_over_normalized_content(tmp_path: Path) -> None: + alpha = tmp_path / "alpha.txt" + beta = tmp_path / "beta.txt" + alpha.write_text("alpha retrieval signal\nalpha again\n", encoding="utf-8") + beta.write_text("beta only\n", encoding="utf-8") + repo = InMemoryAssetRegistryRepository() + ingestion = AssetIngestionService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + + ingestion.ingest_file(alpha, context, asset_id="asset-alpha") + ingestion.ingest_file(beta, context, asset_id="asset-beta") + refresh = retrieval.refresh_index() + result = retrieval.query_assets(AssetQueryRequest(text="alpha"), context) + zero = retrieval.query_assets(AssetQueryRequest(text="gamma"), context) + + assert refresh.indexed_assets == 2 + assert refresh.indexed_representations == 2 + assert [item.asset.id for item in result.items] == ["asset-alpha"] + assert result.items[0].relevance["strategy"] == "lexical_substring" + assert result.items[0].relevance["match_count"] == 2 + assert result.items[0].relevance["representation_ids"] + assert result.metadata["zero_result"] is False + assert result.metadata["lexical_index"] == { + "indexed_assets": 2, + "indexed_representations": 2, + } + assert zero.total == 0 + assert zero.metadata["zero_result"] is True + + +def test_asset_retrieval_returns_permission_filtered_source_grounded_snippets() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo, policy_gateway=DenyConfidentialRetrievalPolicy()) + context = operation_context() + public_source = SourceReference(source_system="repo", path="docs/public.md", checksum="sha256:public") + confidential_source = SourceReference( + source_system="repo", + path="docs/confidential.md", + checksum="sha256:confidential", + ) + registry.create_asset( + "Public Snippet", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), + context, + asset_id="asset-public-snippet", + source_refs=[public_source], + representations=[ + AssetRepresentation.from_content( + "asset-public-snippet", + RepresentationKind.NORMALIZED, + "text/markdown", + "normalized public", + source_ref_id=public_source.id, + metadata={ + "search_text": "Public alpha signal that should be citeable from the normalized document.", + "extractor": "markitect-tool", + "markitect_selector": "section:intro", + }, + ) + ], + ) + registry.create_asset( + "Confidential Snippet", + Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL), + context, + asset_id="asset-confidential-snippet", + source_refs=[confidential_source], + representations=[ + AssetRepresentation.from_content( + "asset-confidential-snippet", + RepresentationKind.NORMALIZED, + "text/markdown", + "normalized confidential", + source_ref_id=confidential_source.id, + metadata={ + "search_text": "Confidential alpha signal must not leak as a snippet.", + "extractor": "markitect-tool", + "markitect_selector": "section:private", + }, + ) + ], + ) + + retrieval.refresh_index() + result = retrieval.query_assets( + AssetQueryRequest(text="alpha", include_snippets=True, max_snippets=1, snippet_radius=12), + context, + ) + + assert [item.asset.id for item in result.items] == ["asset-public-snippet"] + assert result.items[0].relevance["snippet_count"] == 1 + snippet = result.items[0].snippets[0] + assert snippet.asset_id == "asset-public-snippet" + assert snippet.source_ref_id == public_source.id + assert snippet.match_text == "alpha" + assert "alpha" in snippet.text + assert snippet.provenance["extractor"] == "markitect-tool" + assert snippet.provenance["markitect_selector"] == "section:intro" + assert result.to_dict()["results"][0]["snippets"][0]["source_ref_id"] == public_source.id + retrieval_audit = [ + event for event in repo.list_audit_events(correlation_id="corr-retrieval") + if event.operation == "retrieval.assets.query" + ][-1] + assert retrieval_audit.details["permission_filtered_count"] == 1 + + +def test_asset_retrieval_filters_are_backend_portable_with_sqlite(tmp_path: Path) -> None: + repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + source_ref = SourceReference(source_system="local_file", path="docs/report.txt", checksum="sha256:report") + registry.create_asset( + "Report", + Classification( + asset_type="report", + sensitivity=Sensitivity.INTERNAL, + owner="Analytics", + topics=("retrieval", "quarterly"), + review_state="approved", + metadata={"collection": "reports"}, + ), + context, + asset_id="asset-report", + source_refs=[source_ref], + representations=[ + AssetRepresentation.from_content( + "asset-report", + RepresentationKind.NORMALIZED, + "text/plain", + "quarterly retrieval report", + metadata={"search_text": "quarterly retrieval report"}, + ) + ], + metadata_records=[ + MetadataRecord("collection", "reports", confirmed=True), + MetadataRecord("tags", ["finance", "retrieval"], confirmed=True), + ], + ) + registry.create_asset( + "Other", + Classification(asset_type="report", sensitivity=Sensitivity.INTERNAL, owner="Analytics"), + context, + asset_id="asset-other", + metadata_records=[MetadataRecord("collection", "misc", confirmed=True)], + ) + + retrieval.refresh_index() + result = retrieval.query_assets( + AssetQueryRequest( + text="retrieval", + asset_type="report", + sensitivity=Sensitivity.INTERNAL, + owner="Analytics", + tags=("finance", "retrieval"), + collection="reports", + source_system="local_file", + source_path="docs/report.txt", + metadata_filters={"collection": "reports"}, + confirmed_metadata_only=True, + ), + context, + ) + + assert [item.asset.id for item in result.items] == ["asset-report"] + assert result.items[0].relevance["match_count"] == 1 + assert result.items[0].metadata_records[0].confirmed is True + + +def test_asset_retrieval_filters_by_context_entity_workflow_run_and_related_asset() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL) + policy = registry.create_asset("Policy", classification, context, asset_id="asset-policy") + implementation = registry.create_asset( + "Implementation", + classification, + context, + asset_id="asset-implementation", + ) + registry.create_asset("Unrelated", classification, context, asset_id="asset-unrelated") + project = ContextEntity( + entity_type=ContextEntityType.PROJECT, + name="Kontextual Engine", + external_ref="project:kontextual", + metadata={"phase": "mvp"}, + entity_id="entity-project-kontextual", + ) + workflow_run = ContextEntity( + entity_type=ContextEntityType.WORKFLOW_RUN, + name="Initial ingestion", + external_ref="workflow-run-42", + entity_id="entity-workflow-run-42", + ) + + registry.link_asset_to_context_entity( + policy.asset.id, + project, + "about_project", + context, + confidence=0.96, + provenance={"producer": "test-fixture"}, + ) + registry.link_asset_to_context_entity( + implementation.asset.id, + workflow_run, + "produced_by_run", + context, + ) + registry.link_asset_to_asset( + implementation.asset.id, + policy.asset.id, + "implements", + context, + confidence=0.88, + provenance={"basis": "workplan"}, + ) + + project_result = retrieval.query_assets( + AssetQueryRequest( + context_entity_type=ContextEntityType.PROJECT, + context_entity_name="Kontextual Engine", + relationship_predicate="about_project", + ), + context, + ) + related_result = retrieval.query_assets( + AssetQueryRequest( + related_asset_id=policy.asset.id, + relationship_predicate="implements", + ), + context, + ) + workflow_result = retrieval.query_assets( + AssetQueryRequest(workflow_run_id="workflow-run-42"), + context, + ) + + assert [item.asset.id for item in project_result.items] == ["asset-policy"] + assert project_result.items[0].relationships[0].predicate == "about_project" + assert project_result.items[0].relationships[0].confidence == 0.96 + assert project_result.items[0].context_entities[0].entity_id == "entity-project-kontextual" + assert project_result.to_dict()["results"][0]["relationships"][0]["provenance"]["producer"] == "test-fixture" + assert [item.asset.id for item in related_result.items] == ["asset-implementation"] + assert related_result.items[0].relationships[0].target_id == "asset-policy" + assert [item.asset.id for item in workflow_result.items] == ["asset-implementation"] + assert workflow_result.items[0].context_entities[0].entity_type == ContextEntityType.WORKFLOW_RUN + + +def test_context_entity_and_relationship_queries_are_backend_portable_with_sqlite(tmp_path: Path) -> None: + repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite") + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + policy = registry.create_asset( + "Policy", + Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL), + context, + asset_id="asset-policy", + ) + case = ContextEntity( + entity_type=ContextEntityType.CASE, + name="Migration Case", + external_ref="case:migration", + metadata={"priority": "high"}, + entity_id="entity-case-migration", + ) + linked = registry.link_asset_to_context_entity( + policy.asset.id, + case, + "about_case", + context, + confidence=0.73, + provenance={"source": "test"}, + ) + + entities = retrieval.query_context_entities( + ContextEntityQueryRequest( + entity_type="case", + external_ref="case:migration", + metadata_filters={"priority": "high"}, + ), + context, + ) + relationships = retrieval.query_relationships( + RelationshipQueryRequest( + context_entity_id="entity-case-migration", + predicate="about_case", + target_kind="context_entity", + ), + context, + ) + + assert entities.total == 1 + assert entities.items[0].asset_ids == ("asset-policy",) + assert entities.items[0].relationship_count == 1 + assert relationships.total == 1 + assert relationships.items[0].relationship.relationship_id == linked.relationship.relationship_id + assert relationships.items[0].source_asset.id == "asset-policy" + assert relationships.items[0].target_entity.entity_id == "entity-case-migration" + assert relationships.to_dict()["results"][0]["confidence"] == 0.73 + + +def test_graph_retrieval_invalid_queries_return_structured_diagnostics() -> None: + retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository()) + context = operation_context() + + asset_result = retrieval.query_assets( + AssetQueryRequest(context_entity_type="planet", relationship_direction="sideways"), + context, + ) + entity_result = retrieval.query_context_entities( + ContextEntityQueryRequest(entity_type="planet", sort_by="rank", limit=0), + context, + ) + relationship_result = retrieval.query_relationships( + RelationshipQueryRequest( + target_kind="memory_phase", + direction="sideways", + sort_order="diagonal", + offset=-1, + ), + context, + ) + + assert asset_result.success is False + assert {diagnostic.code for diagnostic in asset_result.diagnostics} == { + "retrieval.context_entity_type_invalid", + "retrieval.relationship_direction_invalid", + } + assert entity_result.success is False + assert {diagnostic.code for diagnostic in entity_result.diagnostics} == { + "retrieval.context_entity_type_invalid", + "retrieval.sort_invalid", + "retrieval.limit_invalid", + } + assert relationship_result.success is False + assert {diagnostic.code for diagnostic in relationship_result.diagnostics} == { + "retrieval.relationship_target_kind_invalid", + "retrieval.relationship_direction_invalid", + "retrieval.sort_order_invalid", + "retrieval.offset_invalid", + } + + +def test_retrieval_policy_filters_assets_relationships_and_context_payloads() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo, policy_gateway=DenyConfidentialRetrievalPolicy()) + context = operation_context() + public = registry.create_asset( + "Public", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), + context, + asset_id="asset-public", + ) + confidential = registry.create_asset( + "Confidential", + Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL), + context, + asset_id="asset-confidential", + ) + case = ContextEntity( + entity_type=ContextEntityType.CASE, + name="Visible Case", + entity_id="entity-visible-case", + ) + registry.link_asset_to_context_entity(public.asset.id, case, "about_case", context) + registry.link_asset_to_context_entity(confidential.asset.id, case, "about_case", context) + registry.link_asset_to_asset(public.asset.id, confidential.asset.id, "references", context) + + assets = retrieval.query_assets(AssetQueryRequest(sort_by="asset_id"), context) + relationships = retrieval.query_relationships( + RelationshipQueryRequest(predicate="references"), + context, + ) + entities = retrieval.query_context_entities( + ContextEntityQueryRequest(entity_id="entity-visible-case"), + context, + ) + + assert [item.asset.id for item in assets.items] == ["asset-public"] + assert assets.total == 1 + assert assets.metadata["policy_enforced"] is True + assert relationships.total == 0 + assert entities.items[0].asset_ids == ("asset-public",) + assert entities.items[0].relationship_count == 1 + audit_events = repo.list_audit_events(correlation_id="corr-retrieval") + retrieval_audit = [event for event in audit_events if event.operation == "retrieval.assets.query"][-1] + assert retrieval_audit.outcome.value == "partial" + assert retrieval_audit.details["permission_filtered_count"] == 1 + assert retrieval_audit.policy_decision.action == "retrieval.assets.query" + + +def test_retrieval_scope_policy_fail_closed_returns_empty_denied_envelope() -> None: + repo = InMemoryAssetRegistryRepository() + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo, policy_gateway=BrokenRetrievalPolicy()) + context = operation_context() + registry.create_asset( + "Public", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), + context, + asset_id="asset-public", + ) + + result = retrieval.query_assets(AssetQueryRequest(), context) + + assert result.success is False + assert result.total == 0 + assert result.items == () + assert [diagnostic.code for diagnostic in result.diagnostics] == ["retrieval.permission_denied"] + decision = result.diagnostics[0].details["policy_decision"] + assert decision["effect"] == "fail_closed" + audit_event = repo.list_audit_events(correlation_id="corr-retrieval")[-1] + assert audit_event.operation == "retrieval.assets.query" + assert audit_event.outcome.value == "denied" + assert audit_event.policy_decision.effect.value == "fail_closed" + + +def test_retrieval_feedback_persists_and_quality_metrics_use_feedback_and_audit(tmp_path: Path) -> None: + db_path = tmp_path / "registry.sqlite" + repo = SQLiteAssetRegistryRepository(db_path) + registry = AssetRegistryService(repo) + retrieval = AssetRetrievalService(repo) + context = operation_context() + source = SourceReference(source_system="repo", path="docs/feedback.md", checksum="sha256:feedback") + registry.create_asset( + "Feedback Source", + Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC), + context, + asset_id="asset-feedback", + source_refs=[source], + representations=[ + AssetRepresentation.from_content( + "asset-feedback", + RepresentationKind.NORMALIZED, + "text/markdown", + "normalized feedback", + source_ref_id=source.id, + metadata={"search_text": "alpha feedback citation target", "extractor": "plain-text"}, + ) + ], + ) + + retrieval.refresh_index() + query_result = retrieval.query_assets( + AssetQueryRequest(text="alpha", include_snippets=True), + context, + ) + feedback = retrieval.record_feedback( + RetrievalFeedbackRequest( + label=RetrievalFeedbackLabel.USEFUL, + query=query_result.request.to_dict(), + result_ref={ + "asset_id": "asset-feedback", + "rank": 1, + "representation_id": query_result.items[0].representations[0].representation_id, + "source_ref_id": source.id, + }, + metadata={"citation": True}, + ), + context, + ) + + reloaded = AssetRetrievalService(SQLiteAssetRegistryRepository(db_path)) + records = reloaded.list_feedback(correlation_id="corr-retrieval") + metrics = reloaded.quality_metrics(query_results=(query_result,), precision_at_k=1) + + assert feedback.success is True + assert records[0].feedback_id == feedback.record.feedback_id + assert records[0].query["text"] == "alpha" + assert records[0].result_ref["asset_id"] == "asset-feedback" + assert metrics.feedback_count == 1 + assert metrics.useful_count == 1 + assert metrics.zero_result_rate == 0.0 + assert metrics.precision_at_k == 1.0 + assert metrics.citation_precision == 1.0 + assert metrics.permission_filter_observation_count >= 1 + assert metrics.average_permission_filter_duration_ms is not None + + +def test_retrieval_feedback_invalid_label_returns_structured_diagnostic() -> None: + retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository()) + result = retrieval.record_feedback( + RetrievalFeedbackRequest(label="maybe", query={}, result_ref={}), + operation_context(), + ) + + assert result.success is False + assert result.record is None + assert [diagnostic.code for diagnostic in result.diagnostics] == ["retrieval.feedback_label_invalid"] + + +def operation_context() -> OperationContext: + actor = Actor.create( + ActorType.HUMAN, + actor_id="user-retrieval", + display_name="Retrieval Tester", + groups=["engineering"], + ) + return OperationContext.create(actor, correlation_id="corr-retrieval") + + +class DenyConfidentialRetrievalPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict | None = None, + ) -> PolicyDecision: + resource_metadata = resource_metadata or {} + if action == "asset.retrieve" and resource_metadata.get("sensitivity") == Sensitivity.CONFIDENTIAL.value: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason="confidential retrieval denied in test policy", + context={"resource_metadata": resource_metadata}, + ) + return PolicyDecision.allow( + context.actor.id, + action, + resource, + context={"resource_metadata": resource_metadata}, + ) + + +class BrokenRetrievalPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict | None = None, + ) -> PolicyDecision: + if action == "retrieval.assets.query": + raise RuntimeError("policy context unavailable") + return PolicyDecision.allow(context.actor.id, action, resource) diff --git a/workplans/KONT-WP-0007-governed-retrieval-context-graph.md b/workplans/KONT-WP-0007-governed-retrieval-context-graph.md index aa7fa04..84a2d78 100644 --- a/workplans/KONT-WP-0007-governed-retrieval-context-graph.md +++ b/workplans/KONT-WP-0007-governed-retrieval-context-graph.md @@ -4,13 +4,13 @@ type: workplan title: "Governed Retrieval And Context Graph" domain: markitect repo: kontextual-engine -status: todo +status: done owner: codex topic_slug: markitect planning_priority: high planning_order: 7 created: "2026-05-05" -updated: "2026-05-05" +updated: "2026-05-06" state_hub_workstream_id: "64352515-9677-46bb-909a-9e2db4915dc7" --- @@ -44,11 +44,32 @@ produce grounded units and snippets. Engine retrieval contracts, result envelopes, policy filtering, pagination, feedback, and cross-format search remain engine-owned. +## Implementation Status + +As of 2026-05-06, the first retrieval slice is recorded in +`docs/retrieval-implementation.md`. It establishes asset query request/result +contracts, stable sorting and pagination, result envelopes with source +references, representations, metadata records, refreshable lexical search, +relevance metadata, zero-result smoke metadata, and structured validation +diagnostics. It also supports combined metadata, lifecycle, source-context, +tag, collection, timestamp, and representation filters across in-memory and +SQLite-backed repositories. The contextual graph slice adds direct contextual +entity and relationship query envelopes plus asset filters by contextual +entity, workflow run, related asset, and relationship predicate. Remaining work +is focused on multi-hop graph traversal/ranking, source-grounded snippets, and +feedback/KPI hooks. Permission-aware retrieval now uses the engine policy +gateway for query-scope and per-resource checks, with fail-closed denied +envelopes and retrieval audit events. Lexical queries can also return +source-grounded snippet packets with representation/source references and +adapter provenance. Feedback and KPI hooks persist retrieval feedback and +derive zero-result, precision, citation precision, safety, confidence, and +permission-filter timing signals. + ## R7.1 - Implement query contracts pagination sorting and result envelopes ```task id: KONT-WP-0007-T001 -status: todo +status: done priority: high state_hub_task_id: "5a1b0661-ce22-4ee6-a9e7-0aedce9d4356" ``` @@ -63,11 +84,22 @@ Acceptance: references, and diagnostics. - Invalid queries return structured validation errors. +Implemented: + +- `AssetQueryRequest`, `AssetQueryItem`, `AssetQueryResult`, and + `AssetRetrievalService` provide the stable asset query contract. +- Queries return deterministic ordering with pagination metadata and + correlation IDs. +- Result entries expose asset identity, classification, source references, + representations, and metadata records. +- Invalid lifecycle, representation kind, sort key, sort order, limit, and + offset return structured diagnostics without raising raw exceptions. + ## R7.2 - Implement lexical search over normalized content ```task id: KONT-WP-0007-T002 -status: todo +status: done priority: high state_hub_task_id: "5ec90dcb-473c-4d01-85f2-8db18de0b7d1" ``` @@ -81,11 +113,23 @@ Acceptance: - Search indexes can be refreshed after ingestion or update. - p95 latency and zero-result rate can be measured in smoke tests. +Implemented: + +- Normalized ingestion now stores representation search text and length + metadata for retrieval indexing. +- `AssetRetrievalService.refresh_index()` builds a refreshable lexical index + with indexed asset and representation counts. +- Text queries perform lexical substring matching over normalized + representations and return relevance metadata including strategy, query, + match count, and matching representation IDs. +- Query result metadata includes zero-result and lexical index statistics for + later smoke/performance measurement. + ## R7.3 - Implement metadata lifecycle and source-context filters ```task id: KONT-WP-0007-T003 -status: todo +status: done priority: high state_hub_task_id: "9e7d0a5c-71d4-44ca-9b71-70f2206e4a02" ``` @@ -100,11 +144,24 @@ Acceptance: - Filter behavior is covered across in-memory and durable backends where supported. +Implemented: + +- Asset queries support filters for asset type, lifecycle, sensitivity, owner, + topic, review state, source system/path, representation kind, collection, + tags, created/updated timestamp bounds, and custom metadata records. +- Text search can be combined with standard, source, tag, collection, + sensitivity, and metadata filters. +- Combined filter behavior is covered over in-memory and SQLite-backed asset + repositories. +- Permission enforcement is intentionally deferred to R7.5; current lifecycle + and sensitivity filters establish the policy inputs without claiming + authorization semantics. + ## R7.4 - Implement contextual entity model and relationship retrieval ```task id: KONT-WP-0007-T004 -status: todo +status: done priority: high state_hub_task_id: "b3358059-ac58-4e37-985c-6e8c1cc6df30" ``` @@ -120,11 +177,27 @@ Acceptance: - Callers can retrieve assets by project, case, topic, source, workflow run, or related asset. +Implemented: + +- Existing `ContextEntity`/`CoreRelationship` primitives are reused as the + canonical model; entity types now include workflow runs and generated + artifacts for operational graph use cases. +- `ContextEntityQueryRequest`/`ContextEntityQueryResult` provide stable + contextual entity lookup by type, name, external reference, and metadata. +- `RelationshipQueryRequest`/`RelationshipQueryResult` provide stable + relationship retrieval by source, target, asset, contextual entity, + workflow run, predicate, target kind, and direction. +- Asset queries can filter by contextual entity, workflow run, related asset, + and relationship predicate while returning relationship and contextual + entity context for matched assets. +- Graph retrieval behavior is covered across in-memory and SQLite-backed + repositories. + ## R7.5 - Enforce permission-aware retrieval and fail-closed semantics ```task id: KONT-WP-0007-T005 -status: todo +status: done priority: high state_hub_task_id: "c6c93713-3ab1-41fb-bf35-15dd860b66fa" ``` @@ -140,11 +213,25 @@ Acceptance: - Retrieval audit events capture actor, query scope, outcome, and policy context. +Implemented: + +- Retrieval services accept the engine `PolicyGateway`, defaulting to the + allow-all local adapter used elsewhere in the system. +- Asset, contextual entity, and relationship queries authorize the query scope + before loading result envelopes. +- Assets, contextual entities, and relationships are policy-filtered before + they are returned; relationships additionally require source and target + resource visibility so traversal cannot reveal denied assets or entities. +- Policy gateway failures produce empty denied envelopes with structured + diagnostics and fail-closed policy decisions. +- Retrieval audit events capture actor, correlation ID, query scope, policy + decision, outcome, result counts, and internal permission-filter counts. + ## R7.6 - Return source-grounded snippets citations and explanation data ```task id: KONT-WP-0007-T006 -status: todo +status: done priority: medium state_hub_task_id: "1a6d5a95-d87a-447a-a186-cb73162cd9a1" ``` @@ -160,11 +247,26 @@ Acceptance: - Markdown snippets can reference Markitect selector matches or context-package spans as adapter provenance. +Implemented: + +- `RetrievalSnippet` packets expose asset, representation, source reference, + storage reference, media type, match offsets, match text, snippet text, and + adapter provenance. +- Lexical asset queries can request snippets through `include_snippets`, + `max_snippets`, and `snippet_radius`. +- Snippets are generated from normalized representation search text and are + attached only to policy-authorized asset results. +- Markitect selectors, source spans, context spans, adapter provenance, + snapshots, and extractor identity are preserved when supplied as + representation metadata. +- Snippet behavior is covered with permission filtering so denied matching + content does not leak through snippet packets. + ## R7.7 - Capture retrieval feedback and KPI measurement hooks ```task id: KONT-WP-0007-T007 -status: todo +status: done priority: medium state_hub_task_id: "e17e2839-400f-4348-98e3-f77acc0b2fde" ``` @@ -179,6 +281,20 @@ Acceptance: - Precision@k, zero-result rate, permission-filter latency, and citation precision have measurement hooks. +Implemented: + +- `RetrievalFeedbackRecord` persists feedback labels for useful, irrelevant, + missing, unsafe, and low-confidence outcomes with actor, correlation ID, + query context, result references, notes, and metadata. +- Asset registry repository ports and memory/SQLite adapters persist and list + retrieval feedback. +- `AssetRetrievalService.record_feedback()` records authorized feedback with + structured diagnostics for invalid labels or denied feedback operations. +- `AssetRetrievalService.quality_metrics()` derives zero-result rate, + precision@k, citation precision, feedback totals, unsafe/low-confidence + counts, and permission-filter timing observations from query results, + feedback records, and retrieval audit events. + ## Definition Of Done - Retrieval tests cover text, metadata, lifecycle, relationship, contextual