stable asset queries, lexical search, filters, contextual entity and relationship retrieval, permission-aware fail-closed behavior, source-grounded snippets, feedback capture, and KPI hooks

This commit is contained in:
2026-05-06 16:27:03 +02:00
parent 80a3e59701
commit 1e3c6fe34a
13 changed files with 3173 additions and 9 deletions

View File

@@ -0,0 +1,110 @@
# Retrieval Implementation Note
Date: 2026-05-06
Status: first implementation slice for `KONT-WP-0007`.
## Purpose
This note records the first governed retrieval implementation over the asset
registry. It introduces stable query request/result contracts before search
ranking, policy filtering, snippets, relationship traversal, and feedback grow
more complex.
## Implemented Package Shape
```text
src/kontextual_engine/
services/retrieval_service.py
```
The retrieval service depends on the asset registry repository port and domain
core contracts. It does not depend on HTTP, search backends, Markitect internals,
or AI providers.
## Implemented Capabilities
- `AssetQueryRequest` for stable asset queries with pagination, sorting, and
common asset filters.
- `AssetQueryResult` envelope with correlation ID, total count, returned count,
limit, offset, next offset, sort metadata, results, diagnostics, and success
flag.
- `AssetQueryItem` result entries carrying asset identity, classification,
lifecycle, source references, representations, metadata records, relevance
metadata, and diagnostics.
- Deterministic sorting by title, asset ID, asset type, lifecycle, creation
time, or update time with asset ID as a tie-breaker.
- Pagination by limit and offset.
- Structured validation diagnostics for invalid lifecycle, representation
kind, limit, offset, sort key, and sort order.
- Standard filters for asset type, lifecycle, sensitivity, owner, topic, review
state, metadata records, source system/path, and representation kind.
- Lexical search over normalized representation search text produced during
ingestion or supplied as representation metadata.
- Refreshable in-memory lexical index with indexed asset/representation counts.
- Relevance metadata for lexical matches, including strategy, query, match
count, and representation IDs.
- Zero-result measurement metadata for query smoke tests.
- Additional source-context filters for collection, tags, and created/updated
timestamp bounds.
- `ContextEntityQueryRequest`/`ContextEntityQueryResult` for querying
contextual entities by type, name, external reference, and metadata.
- `RelationshipQueryRequest`/`RelationshipQueryResult` for stable relationship
retrieval by source, target, asset, contextual entity, predicate, target
kind, direction, and workflow run.
- Asset queries can be constrained by contextual entity, workflow run, related
asset, and relationship predicate.
- Asset result entries can carry relationship context and linked contextual
entities when graph filters or relationship inclusion are requested.
- Relationship payloads expose direction, predicate, validity windows,
confidence, actor, provenance, creation time, and resolved source/target
context where available.
- Retrieval uses the engine policy gateway for query-scope authorization and
per-resource checks before assets, relationships, or contextual entities are
returned.
- Policy failures or unavailable policy context fail closed with empty denied
envelopes and structured diagnostics.
- Retrieval audit events record actor, correlation ID, query scope, policy
decision, outcome, result count, total count, and internal
permission-filter counts.
- `RetrievalSnippet` packets can be requested for lexical queries. Snippets are
built from normalized representation search text and carry asset ID,
representation ID, source reference ID, storage reference, media type, match
offsets, match text, and adapter provenance such as Markitect selectors or
source spans when present.
- Snippets are attached only to policy-authorized asset results, so denied
matching content is not exposed through snippet packets.
- `RetrievalFeedbackRecord` persists useful, irrelevant, missing, unsafe, and
low-confidence feedback with query context, result references, actor,
correlation ID, notes, and metadata.
- Retrieval quality metrics summarize zero-result rate, precision@k,
citation precision, feedback counts, unsafe/low-confidence counts, and
permission-filter timing observations from retrieval audit events.
## Not Yet Implemented
- Multi-hop relationship traversal and graph ranking.
- Full grounded answer package assembly.
## Test Coverage
`tests/test_asset_retrieval_service.py` covers:
- stable paginated query envelopes,
- result payloads with source references, representations, and metadata
records,
- combined standard, metadata, source, lifecycle, and representation filters,
- lexical search over normalized content with index refresh and zero-result
metadata,
- combined text, metadata, source, tag, collection, and sensitivity filters
over SQLite,
- contextual entity, workflow run, related asset, and relationship predicate
filters,
- direct contextual entity and relationship queries over SQLite,
- permission-filtered asset, relationship, and context entity envelopes,
- fail-closed query scope behavior when policy context is unavailable,
- source-grounded lexical snippets with representation/source references and
adapter provenance,
- persisted retrieval feedback and KPI measurement hooks over feedback,
query results, and retrieval audit timing,
- structured diagnostics for invalid query requests.

View File

@@ -49,6 +49,8 @@ from .core import (
PolicyEffect,
RelationshipTargetKind,
RepresentationKind,
RetrievalFeedbackLabel,
RetrievalFeedbackRecord,
Sensitivity,
SourceReference,
SourcePayload,
@@ -81,8 +83,23 @@ from .services import (
AssetChangeResult,
AssetIngestionResult,
AssetIngestionService,
AssetQueryItem,
AssetQueryRequest,
AssetQueryResult,
AssetRegistryService,
AssetRetrievalService,
ContextEntityQueryItem,
ContextEntityQueryRequest,
ContextEntityQueryResult,
LexicalIndexRefreshResult,
RelationshipChangeResult,
RelationshipQueryItem,
RelationshipQueryRequest,
RelationshipQueryResult,
RetrievalFeedbackRequest,
RetrievalFeedbackResult,
RetrievalQualityMetrics,
RetrievalSnippet,
)
from .storage import InMemoryKnowledgeRepository
from .workflows import (
@@ -108,8 +125,12 @@ __all__ = [
"AssetChangeResult",
"AssetIngestionResult",
"AssetIngestionService",
"AssetQueryItem",
"AssetQueryRequest",
"AssetQueryResult",
"AssetRegistryRepository",
"AssetRegistryService",
"AssetRetrievalService",
"AssetVersion",
"AuditEvent",
"AuditOutcome",
@@ -121,6 +142,9 @@ __all__ = [
"Collection",
"ContextAssembler",
"ContextEntity",
"ContextEntityQueryItem",
"ContextEntityQueryRequest",
"ContextEntityQueryResult",
"ContextEntityType",
"ContextItem",
"ContextPackage",
@@ -146,6 +170,7 @@ __all__ = [
"IdempotencyStatus",
"KnowledgeAsset",
"KontextualError",
"LexicalIndexRefreshResult",
"LifecycleState",
"MetadataFieldDefinition",
"MetadataRecord",
@@ -167,9 +192,18 @@ __all__ = [
"Relationship",
"RelationshipChangeResult",
"RelationshipGraph",
"RelationshipQueryItem",
"RelationshipQueryRequest",
"RelationshipQueryResult",
"RelationshipTargetKind",
"RelationshipType",
"RepresentationKind",
"RetrievalFeedbackLabel",
"RetrievalFeedbackRecord",
"RetrievalFeedbackRequest",
"RetrievalFeedbackResult",
"RetrievalQualityMetrics",
"RetrievalSnippet",
"RunManifest",
"RunStatus",
"Sensitivity",

View File

@@ -21,6 +21,7 @@ from kontextual_engine.core import (
MetadataSchema,
MetadataSchemaAssignment,
RepresentationKind,
RetrievalFeedbackRecord,
Sensitivity,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -38,6 +39,7 @@ class InMemoryAssetRegistryRepository:
relationships: dict[str, CoreRelationship] = field(default_factory=dict)
versions: dict[str, list[AssetVersion]] = field(default_factory=dict)
audit_events: dict[str, AuditEvent] = field(default_factory=dict)
retrieval_feedback: dict[str, RetrievalFeedbackRecord] = field(default_factory=dict)
idempotency_records: dict[str, IdempotencyRecord] = field(default_factory=dict)
ingestion_jobs: dict[str, IngestionJob] = field(default_factory=dict)
@@ -253,6 +255,24 @@ class InMemoryAssetRegistryRepository:
events = [event for event in events if event.correlation_id == correlation_id]
return sorted(events, key=lambda event: event.occurred_at)
def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord:
self.get_actor(record.actor_id)
self.retrieval_feedback[record.feedback_id] = record
return record
def list_retrieval_feedback(
self,
*,
correlation_id: str | None = None,
label: str | None = None,
) -> list[RetrievalFeedbackRecord]:
records: Iterable[RetrievalFeedbackRecord] = self.retrieval_feedback.values()
if correlation_id is not None:
records = [record for record in records if record.correlation_id == correlation_id]
if label is not None:
records = [record for record in records if record.label.value == label]
return sorted(records, key=lambda record: (record.created_at, record.feedback_id))
def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord:
self.idempotency_records[record.key] = record
return record

View File

@@ -24,6 +24,7 @@ from kontextual_engine.core import (
MetadataSchemaAssignment,
RepresentationKind,
RelationshipTargetKind,
RetrievalFeedbackRecord,
Sensitivity,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -466,6 +467,56 @@ class SQLiteAssetRegistryRepository:
rows = self._all(f"select payload from audit_events{where} order by occurred_at, rowid", tuple(params))
return [AuditEvent.from_dict(_loads(row["payload"])) for row in rows]
def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord:
try:
with self._connect() as conn:
conn.execute(
"""
insert into retrieval_feedback (id, label, actor_id, correlation_id, created_at, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
label=excluded.label,
actor_id=excluded.actor_id,
correlation_id=excluded.correlation_id,
created_at=excluded.created_at,
payload=excluded.payload
""",
(
record.feedback_id,
record.label.value,
record.actor_id,
record.correlation_id,
record.created_at,
_json(record.to_dict()),
),
)
except sqlite3.IntegrityError as exc:
if _is_foreign_key_error(exc):
raise ValidationError(
"Retrieval feedback references an unknown actor",
details={"actor_id": record.actor_id, "feedback_id": record.feedback_id},
) from exc
raise
return record
def list_retrieval_feedback(
self,
*,
correlation_id: str | None = None,
label: str | None = None,
) -> list[RetrievalFeedbackRecord]:
clauses = []
params: list[Any] = []
if correlation_id is not None:
clauses.append("correlation_id = ?")
params.append(correlation_id)
if label is not None:
clauses.append("label = ?")
params.append(label)
where = f" where {' and '.join(clauses)}" if clauses else ""
rows = self._all(f"select payload from retrieval_feedback{where} order by created_at, id", tuple(params))
return [RetrievalFeedbackRecord.from_dict(_loads(row["payload"])) for row in rows]
def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord:
with self._connect() as conn:
conn.execute(
@@ -620,6 +671,15 @@ class SQLiteAssetRegistryRepository:
payload text not null,
foreign key(actor_id) references actors(id)
);
create table if not exists retrieval_feedback (
id text primary key,
label text not null,
actor_id text not null,
correlation_id text not null,
created_at text not null,
payload text not null,
foreign key(actor_id) references actors(id)
);
create table if not exists idempotency_records (
key text primary key,
operation text not null,
@@ -647,6 +707,8 @@ class SQLiteAssetRegistryRepository:
create index if not exists idx_versions_asset on asset_versions(asset_id);
create index if not exists idx_audit_target on audit_events(target);
create index if not exists idx_audit_correlation on audit_events(correlation_id);
create index if not exists idx_retrieval_feedback_label on retrieval_feedback(label);
create index if not exists idx_retrieval_feedback_correlation on retrieval_feedback(correlation_id);
create index if not exists idx_ingestion_jobs_status on ingestion_jobs(status);
create index if not exists idx_ingestion_jobs_correlation on ingestion_jobs(correlation_id);
"""

View File

@@ -40,6 +40,7 @@ from .relationships import (
CoreRelationship,
RelationshipTargetKind,
)
from .retrieval_feedback import RetrievalFeedbackLabel, RetrievalFeedbackRecord
__all__ = [
"Actor",
@@ -76,6 +77,8 @@ __all__ = [
"PolicyEffect",
"RelationshipTargetKind",
"RepresentationKind",
"RetrievalFeedbackLabel",
"RetrievalFeedbackRecord",
"Sensitivity",
"SourceReference",
"SourcePayload",

View File

@@ -19,6 +19,8 @@ class ContextEntityType(str, Enum):
PROCESS = "process"
SOURCE_SYSTEM = "source_system"
TOPIC = "topic"
WORKFLOW_RUN = "workflow_run"
GENERATED_ARTIFACT = "generated_artifact"
BUSINESS_OBJECT = "business_object"

View File

@@ -0,0 +1,59 @@
"""Retrieval feedback and quality signal primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id, utc_now
class RetrievalFeedbackLabel(str, Enum):
USEFUL = "useful"
IRRELEVANT = "irrelevant"
MISSING = "missing"
UNSAFE = "unsafe"
LOW_CONFIDENCE = "low_confidence"
@dataclass(frozen=True)
class RetrievalFeedbackRecord:
label: RetrievalFeedbackLabel
query: dict[str, Any]
result_ref: dict[str, Any]
actor_id: str
correlation_id: str
notes: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
feedback_id: str = field(default_factory=lambda: new_id("feedback"))
created_at: str = field(default_factory=lambda: utc_now().isoformat())
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"feedback_id": self.feedback_id,
"label": self.label.value,
"query": dict(self.query),
"result_ref": dict(self.result_ref),
"actor_id": self.actor_id,
"correlation_id": self.correlation_id,
"notes": self.notes,
"metadata": dict(self.metadata),
"created_at": self.created_at,
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "RetrievalFeedbackRecord":
return cls(
feedback_id=data["feedback_id"],
label=RetrievalFeedbackLabel(data["label"]),
query=dict(data.get("query", {})),
result_ref=dict(data.get("result_ref", {})),
actor_id=data["actor_id"],
correlation_id=data["correlation_id"],
notes=data.get("notes"),
metadata=dict(data.get("metadata", {})),
created_at=data["created_at"],
)

View File

@@ -20,6 +20,7 @@ from kontextual_engine.core import (
MetadataSchema,
MetadataSchemaAssignment,
RepresentationKind,
RetrievalFeedbackRecord,
Sensitivity,
)
@@ -90,6 +91,14 @@ class AssetRegistryRepository(Protocol):
correlation_id: str | None = None,
) -> list[AuditEvent]: ...
def save_retrieval_feedback(self, record: RetrievalFeedbackRecord) -> RetrievalFeedbackRecord: ...
def list_retrieval_feedback(
self,
*,
correlation_id: str | None = None,
label: str | None = None,
) -> list[RetrievalFeedbackRecord]: ...
def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord: ...
def get_idempotency_record(self, key: str) -> IdempotencyRecord | None: ...

View File

@@ -6,11 +6,43 @@ from .asset_service import (
RelationshipChangeResult,
)
from .ingestion_service import AssetIngestionResult, AssetIngestionService
from .retrieval_service import (
AssetQueryItem,
AssetQueryRequest,
AssetQueryResult,
AssetRetrievalService,
ContextEntityQueryItem,
ContextEntityQueryRequest,
ContextEntityQueryResult,
LexicalIndexRefreshResult,
RelationshipQueryItem,
RelationshipQueryRequest,
RelationshipQueryResult,
RetrievalFeedbackRequest,
RetrievalFeedbackResult,
RetrievalQualityMetrics,
RetrievalSnippet,
)
__all__ = [
"AssetChangeResult",
"AssetIngestionResult",
"AssetIngestionService",
"AssetQueryItem",
"AssetQueryRequest",
"AssetQueryResult",
"AssetRegistryService",
"AssetRetrievalService",
"ContextEntityQueryItem",
"ContextEntityQueryRequest",
"ContextEntityQueryResult",
"LexicalIndexRefreshResult",
"RelationshipChangeResult",
"RelationshipQueryItem",
"RelationshipQueryRequest",
"RelationshipQueryResult",
"RetrievalFeedbackRequest",
"RetrievalFeedbackResult",
"RetrievalQualityMetrics",
"RetrievalSnippet",
]

View File

@@ -298,6 +298,8 @@ class AssetIngestionService:
metadata={
"extractor": extractor.name,
"normalized_hash": extraction.normalized.normalized_hash,
"search_text": extraction.normalized.text,
"search_text_length": len(extraction.normalized.text),
"permission_context": dict(payload.permission_context),
**extraction.metadata,
},

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,722 @@
from pathlib import Path
from kontextual_engine import (
Actor,
ActorType,
AssetIngestionService,
AssetQueryRequest,
AssetRegistryService,
AssetRepresentation,
AssetRetrievalService,
Classification,
ContextEntity,
ContextEntityQueryRequest,
ContextEntityType,
InMemoryAssetRegistryRepository,
LifecycleState,
MetadataRecord,
OperationContext,
PolicyDecision,
RelationshipQueryRequest,
RepresentationKind,
RetrievalFeedbackLabel,
RetrievalFeedbackRequest,
Sensitivity,
SQLiteAssetRegistryRepository,
SourceReference,
)
def test_asset_retrieval_returns_stable_paginated_envelope() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
source_ref = SourceReference(source_system="repo", path="docs/b.md", checksum="sha256:b")
representation = AssetRepresentation.from_content(
"asset-bravo",
RepresentationKind.NORMALIZED,
"text/plain",
"bravo content",
storage_ref="object://bravo-normalized",
source_ref_id=source_ref.id,
)
registry.create_asset(
"Charlie",
Classification(asset_type="note", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-charlie",
)
registry.create_asset(
"Bravo",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC, owner="Docs", topics=("retrieval",)),
context,
asset_id="asset-bravo",
source_refs=[source_ref],
representations=[representation],
metadata_records=[MetadataRecord("status", "approved", confirmed=True)],
)
registry.create_asset(
"Alpha",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC, owner="Docs"),
context,
asset_id="asset-alpha",
)
first_page = retrieval.query_assets(
AssetQueryRequest(asset_type="document", sort_by="title", limit=1),
context,
)
second_page = retrieval.query_assets(
AssetQueryRequest(asset_type="document", sort_by="title", limit=1, offset=1),
context,
)
assert first_page.success is True
assert first_page.total == 2
assert first_page.result_count == 1
assert first_page.next_offset == 1
assert first_page.items[0].asset.id == "asset-alpha"
assert second_page.next_offset is None
assert second_page.items[0].asset.id == "asset-bravo"
assert second_page.to_dict()["results"][0]["source_refs"][0]["path"] == "docs/b.md"
assert second_page.to_dict()["results"][0]["representations"][0]["storage_ref"] == "object://bravo-normalized"
assert second_page.to_dict()["results"][0]["metadata_records"][0]["key"] == "status"
assert second_page.to_dict()["correlation_id"] == "corr-retrieval"
def test_asset_retrieval_filters_source_metadata_lifecycle_and_representation_kind() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
source_ref = SourceReference(source_system="repo", path="docs/guide.md", checksum="sha256:guide")
registry.create_asset(
"Guide",
Classification(
asset_type="guide",
sensitivity=Sensitivity.INTERNAL,
owner="Docs",
topics=("retrieval",),
review_state="approved",
),
context,
asset_id="asset-guide",
source_refs=[source_ref],
representations=[
AssetRepresentation.from_content(
"asset-guide",
RepresentationKind.NORMALIZED,
"text/plain",
"guide normalized",
)
],
metadata_records=[MetadataRecord("channel", "internal", confirmed=True)],
)
draft = registry.create_asset(
"Draft",
Classification(asset_type="guide", sensitivity=Sensitivity.INTERNAL, owner="Docs"),
context,
asset_id="asset-draft",
metadata_records=[MetadataRecord("channel", "internal", confirmed=False)],
)
registry.transition_lifecycle(draft.asset.id, LifecycleState.RETIRED, context)
result = retrieval.query_assets(
AssetQueryRequest(
asset_type="guide",
lifecycle=LifecycleState.ACTIVE,
owner="Docs",
topic="retrieval",
review_state="approved",
metadata_filters={"channel": "internal"},
confirmed_metadata_only=True,
source_system="repo",
source_path="docs/guide.md",
representation_kind=RepresentationKind.NORMALIZED,
),
context,
)
assert [item.asset.id for item in result.items] == ["asset-guide"]
assert result.items[0].representations[0].kind == RepresentationKind.NORMALIZED
assert result.items[0].metadata_records[0].confirmed is True
def test_asset_retrieval_invalid_query_returns_structured_diagnostics() -> None:
retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository())
result = retrieval.query_assets(
AssetQueryRequest(
lifecycle="missing",
representation_kind="summary",
sort_by="rank",
sort_order="sideways",
limit=0,
offset=-1,
),
operation_context(),
)
assert result.success is False
assert result.total == 0
assert result.items == ()
assert {diagnostic.code for diagnostic in result.diagnostics} == {
"retrieval.lifecycle_invalid",
"retrieval.representation_kind_invalid",
"retrieval.sort_invalid",
"retrieval.sort_order_invalid",
"retrieval.limit_invalid",
"retrieval.offset_invalid",
}
payload = result.to_dict()
assert payload["success"] is False
assert payload["diagnostics"][0]["severity"] == "error"
def test_asset_retrieval_lexical_search_over_normalized_content(tmp_path: Path) -> None:
alpha = tmp_path / "alpha.txt"
beta = tmp_path / "beta.txt"
alpha.write_text("alpha retrieval signal\nalpha again\n", encoding="utf-8")
beta.write_text("beta only\n", encoding="utf-8")
repo = InMemoryAssetRegistryRepository()
ingestion = AssetIngestionService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
ingestion.ingest_file(alpha, context, asset_id="asset-alpha")
ingestion.ingest_file(beta, context, asset_id="asset-beta")
refresh = retrieval.refresh_index()
result = retrieval.query_assets(AssetQueryRequest(text="alpha"), context)
zero = retrieval.query_assets(AssetQueryRequest(text="gamma"), context)
assert refresh.indexed_assets == 2
assert refresh.indexed_representations == 2
assert [item.asset.id for item in result.items] == ["asset-alpha"]
assert result.items[0].relevance["strategy"] == "lexical_substring"
assert result.items[0].relevance["match_count"] == 2
assert result.items[0].relevance["representation_ids"]
assert result.metadata["zero_result"] is False
assert result.metadata["lexical_index"] == {
"indexed_assets": 2,
"indexed_representations": 2,
}
assert zero.total == 0
assert zero.metadata["zero_result"] is True
def test_asset_retrieval_returns_permission_filtered_source_grounded_snippets() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo, policy_gateway=DenyConfidentialRetrievalPolicy())
context = operation_context()
public_source = SourceReference(source_system="repo", path="docs/public.md", checksum="sha256:public")
confidential_source = SourceReference(
source_system="repo",
path="docs/confidential.md",
checksum="sha256:confidential",
)
registry.create_asset(
"Public Snippet",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
context,
asset_id="asset-public-snippet",
source_refs=[public_source],
representations=[
AssetRepresentation.from_content(
"asset-public-snippet",
RepresentationKind.NORMALIZED,
"text/markdown",
"normalized public",
source_ref_id=public_source.id,
metadata={
"search_text": "Public alpha signal that should be citeable from the normalized document.",
"extractor": "markitect-tool",
"markitect_selector": "section:intro",
},
)
],
)
registry.create_asset(
"Confidential Snippet",
Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL),
context,
asset_id="asset-confidential-snippet",
source_refs=[confidential_source],
representations=[
AssetRepresentation.from_content(
"asset-confidential-snippet",
RepresentationKind.NORMALIZED,
"text/markdown",
"normalized confidential",
source_ref_id=confidential_source.id,
metadata={
"search_text": "Confidential alpha signal must not leak as a snippet.",
"extractor": "markitect-tool",
"markitect_selector": "section:private",
},
)
],
)
retrieval.refresh_index()
result = retrieval.query_assets(
AssetQueryRequest(text="alpha", include_snippets=True, max_snippets=1, snippet_radius=12),
context,
)
assert [item.asset.id for item in result.items] == ["asset-public-snippet"]
assert result.items[0].relevance["snippet_count"] == 1
snippet = result.items[0].snippets[0]
assert snippet.asset_id == "asset-public-snippet"
assert snippet.source_ref_id == public_source.id
assert snippet.match_text == "alpha"
assert "alpha" in snippet.text
assert snippet.provenance["extractor"] == "markitect-tool"
assert snippet.provenance["markitect_selector"] == "section:intro"
assert result.to_dict()["results"][0]["snippets"][0]["source_ref_id"] == public_source.id
retrieval_audit = [
event for event in repo.list_audit_events(correlation_id="corr-retrieval")
if event.operation == "retrieval.assets.query"
][-1]
assert retrieval_audit.details["permission_filtered_count"] == 1
def test_asset_retrieval_filters_are_backend_portable_with_sqlite(tmp_path: Path) -> None:
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
source_ref = SourceReference(source_system="local_file", path="docs/report.txt", checksum="sha256:report")
registry.create_asset(
"Report",
Classification(
asset_type="report",
sensitivity=Sensitivity.INTERNAL,
owner="Analytics",
topics=("retrieval", "quarterly"),
review_state="approved",
metadata={"collection": "reports"},
),
context,
asset_id="asset-report",
source_refs=[source_ref],
representations=[
AssetRepresentation.from_content(
"asset-report",
RepresentationKind.NORMALIZED,
"text/plain",
"quarterly retrieval report",
metadata={"search_text": "quarterly retrieval report"},
)
],
metadata_records=[
MetadataRecord("collection", "reports", confirmed=True),
MetadataRecord("tags", ["finance", "retrieval"], confirmed=True),
],
)
registry.create_asset(
"Other",
Classification(asset_type="report", sensitivity=Sensitivity.INTERNAL, owner="Analytics"),
context,
asset_id="asset-other",
metadata_records=[MetadataRecord("collection", "misc", confirmed=True)],
)
retrieval.refresh_index()
result = retrieval.query_assets(
AssetQueryRequest(
text="retrieval",
asset_type="report",
sensitivity=Sensitivity.INTERNAL,
owner="Analytics",
tags=("finance", "retrieval"),
collection="reports",
source_system="local_file",
source_path="docs/report.txt",
metadata_filters={"collection": "reports"},
confirmed_metadata_only=True,
),
context,
)
assert [item.asset.id for item in result.items] == ["asset-report"]
assert result.items[0].relevance["match_count"] == 1
assert result.items[0].metadata_records[0].confirmed is True
def test_asset_retrieval_filters_by_context_entity_workflow_run_and_related_asset() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
classification = Classification(asset_type="document", sensitivity=Sensitivity.INTERNAL)
policy = registry.create_asset("Policy", classification, context, asset_id="asset-policy")
implementation = registry.create_asset(
"Implementation",
classification,
context,
asset_id="asset-implementation",
)
registry.create_asset("Unrelated", classification, context, asset_id="asset-unrelated")
project = ContextEntity(
entity_type=ContextEntityType.PROJECT,
name="Kontextual Engine",
external_ref="project:kontextual",
metadata={"phase": "mvp"},
entity_id="entity-project-kontextual",
)
workflow_run = ContextEntity(
entity_type=ContextEntityType.WORKFLOW_RUN,
name="Initial ingestion",
external_ref="workflow-run-42",
entity_id="entity-workflow-run-42",
)
registry.link_asset_to_context_entity(
policy.asset.id,
project,
"about_project",
context,
confidence=0.96,
provenance={"producer": "test-fixture"},
)
registry.link_asset_to_context_entity(
implementation.asset.id,
workflow_run,
"produced_by_run",
context,
)
registry.link_asset_to_asset(
implementation.asset.id,
policy.asset.id,
"implements",
context,
confidence=0.88,
provenance={"basis": "workplan"},
)
project_result = retrieval.query_assets(
AssetQueryRequest(
context_entity_type=ContextEntityType.PROJECT,
context_entity_name="Kontextual Engine",
relationship_predicate="about_project",
),
context,
)
related_result = retrieval.query_assets(
AssetQueryRequest(
related_asset_id=policy.asset.id,
relationship_predicate="implements",
),
context,
)
workflow_result = retrieval.query_assets(
AssetQueryRequest(workflow_run_id="workflow-run-42"),
context,
)
assert [item.asset.id for item in project_result.items] == ["asset-policy"]
assert project_result.items[0].relationships[0].predicate == "about_project"
assert project_result.items[0].relationships[0].confidence == 0.96
assert project_result.items[0].context_entities[0].entity_id == "entity-project-kontextual"
assert project_result.to_dict()["results"][0]["relationships"][0]["provenance"]["producer"] == "test-fixture"
assert [item.asset.id for item in related_result.items] == ["asset-implementation"]
assert related_result.items[0].relationships[0].target_id == "asset-policy"
assert [item.asset.id for item in workflow_result.items] == ["asset-implementation"]
assert workflow_result.items[0].context_entities[0].entity_type == ContextEntityType.WORKFLOW_RUN
def test_context_entity_and_relationship_queries_are_backend_portable_with_sqlite(tmp_path: Path) -> None:
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
policy = registry.create_asset(
"Policy",
Classification(asset_type="policy", sensitivity=Sensitivity.INTERNAL),
context,
asset_id="asset-policy",
)
case = ContextEntity(
entity_type=ContextEntityType.CASE,
name="Migration Case",
external_ref="case:migration",
metadata={"priority": "high"},
entity_id="entity-case-migration",
)
linked = registry.link_asset_to_context_entity(
policy.asset.id,
case,
"about_case",
context,
confidence=0.73,
provenance={"source": "test"},
)
entities = retrieval.query_context_entities(
ContextEntityQueryRequest(
entity_type="case",
external_ref="case:migration",
metadata_filters={"priority": "high"},
),
context,
)
relationships = retrieval.query_relationships(
RelationshipQueryRequest(
context_entity_id="entity-case-migration",
predicate="about_case",
target_kind="context_entity",
),
context,
)
assert entities.total == 1
assert entities.items[0].asset_ids == ("asset-policy",)
assert entities.items[0].relationship_count == 1
assert relationships.total == 1
assert relationships.items[0].relationship.relationship_id == linked.relationship.relationship_id
assert relationships.items[0].source_asset.id == "asset-policy"
assert relationships.items[0].target_entity.entity_id == "entity-case-migration"
assert relationships.to_dict()["results"][0]["confidence"] == 0.73
def test_graph_retrieval_invalid_queries_return_structured_diagnostics() -> None:
retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository())
context = operation_context()
asset_result = retrieval.query_assets(
AssetQueryRequest(context_entity_type="planet", relationship_direction="sideways"),
context,
)
entity_result = retrieval.query_context_entities(
ContextEntityQueryRequest(entity_type="planet", sort_by="rank", limit=0),
context,
)
relationship_result = retrieval.query_relationships(
RelationshipQueryRequest(
target_kind="memory_phase",
direction="sideways",
sort_order="diagonal",
offset=-1,
),
context,
)
assert asset_result.success is False
assert {diagnostic.code for diagnostic in asset_result.diagnostics} == {
"retrieval.context_entity_type_invalid",
"retrieval.relationship_direction_invalid",
}
assert entity_result.success is False
assert {diagnostic.code for diagnostic in entity_result.diagnostics} == {
"retrieval.context_entity_type_invalid",
"retrieval.sort_invalid",
"retrieval.limit_invalid",
}
assert relationship_result.success is False
assert {diagnostic.code for diagnostic in relationship_result.diagnostics} == {
"retrieval.relationship_target_kind_invalid",
"retrieval.relationship_direction_invalid",
"retrieval.sort_order_invalid",
"retrieval.offset_invalid",
}
def test_retrieval_policy_filters_assets_relationships_and_context_payloads() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo, policy_gateway=DenyConfidentialRetrievalPolicy())
context = operation_context()
public = registry.create_asset(
"Public",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
context,
asset_id="asset-public",
)
confidential = registry.create_asset(
"Confidential",
Classification(asset_type="document", sensitivity=Sensitivity.CONFIDENTIAL),
context,
asset_id="asset-confidential",
)
case = ContextEntity(
entity_type=ContextEntityType.CASE,
name="Visible Case",
entity_id="entity-visible-case",
)
registry.link_asset_to_context_entity(public.asset.id, case, "about_case", context)
registry.link_asset_to_context_entity(confidential.asset.id, case, "about_case", context)
registry.link_asset_to_asset(public.asset.id, confidential.asset.id, "references", context)
assets = retrieval.query_assets(AssetQueryRequest(sort_by="asset_id"), context)
relationships = retrieval.query_relationships(
RelationshipQueryRequest(predicate="references"),
context,
)
entities = retrieval.query_context_entities(
ContextEntityQueryRequest(entity_id="entity-visible-case"),
context,
)
assert [item.asset.id for item in assets.items] == ["asset-public"]
assert assets.total == 1
assert assets.metadata["policy_enforced"] is True
assert relationships.total == 0
assert entities.items[0].asset_ids == ("asset-public",)
assert entities.items[0].relationship_count == 1
audit_events = repo.list_audit_events(correlation_id="corr-retrieval")
retrieval_audit = [event for event in audit_events if event.operation == "retrieval.assets.query"][-1]
assert retrieval_audit.outcome.value == "partial"
assert retrieval_audit.details["permission_filtered_count"] == 1
assert retrieval_audit.policy_decision.action == "retrieval.assets.query"
def test_retrieval_scope_policy_fail_closed_returns_empty_denied_envelope() -> None:
repo = InMemoryAssetRegistryRepository()
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo, policy_gateway=BrokenRetrievalPolicy())
context = operation_context()
registry.create_asset(
"Public",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
context,
asset_id="asset-public",
)
result = retrieval.query_assets(AssetQueryRequest(), context)
assert result.success is False
assert result.total == 0
assert result.items == ()
assert [diagnostic.code for diagnostic in result.diagnostics] == ["retrieval.permission_denied"]
decision = result.diagnostics[0].details["policy_decision"]
assert decision["effect"] == "fail_closed"
audit_event = repo.list_audit_events(correlation_id="corr-retrieval")[-1]
assert audit_event.operation == "retrieval.assets.query"
assert audit_event.outcome.value == "denied"
assert audit_event.policy_decision.effect.value == "fail_closed"
def test_retrieval_feedback_persists_and_quality_metrics_use_feedback_and_audit(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
registry = AssetRegistryService(repo)
retrieval = AssetRetrievalService(repo)
context = operation_context()
source = SourceReference(source_system="repo", path="docs/feedback.md", checksum="sha256:feedback")
registry.create_asset(
"Feedback Source",
Classification(asset_type="document", sensitivity=Sensitivity.PUBLIC),
context,
asset_id="asset-feedback",
source_refs=[source],
representations=[
AssetRepresentation.from_content(
"asset-feedback",
RepresentationKind.NORMALIZED,
"text/markdown",
"normalized feedback",
source_ref_id=source.id,
metadata={"search_text": "alpha feedback citation target", "extractor": "plain-text"},
)
],
)
retrieval.refresh_index()
query_result = retrieval.query_assets(
AssetQueryRequest(text="alpha", include_snippets=True),
context,
)
feedback = retrieval.record_feedback(
RetrievalFeedbackRequest(
label=RetrievalFeedbackLabel.USEFUL,
query=query_result.request.to_dict(),
result_ref={
"asset_id": "asset-feedback",
"rank": 1,
"representation_id": query_result.items[0].representations[0].representation_id,
"source_ref_id": source.id,
},
metadata={"citation": True},
),
context,
)
reloaded = AssetRetrievalService(SQLiteAssetRegistryRepository(db_path))
records = reloaded.list_feedback(correlation_id="corr-retrieval")
metrics = reloaded.quality_metrics(query_results=(query_result,), precision_at_k=1)
assert feedback.success is True
assert records[0].feedback_id == feedback.record.feedback_id
assert records[0].query["text"] == "alpha"
assert records[0].result_ref["asset_id"] == "asset-feedback"
assert metrics.feedback_count == 1
assert metrics.useful_count == 1
assert metrics.zero_result_rate == 0.0
assert metrics.precision_at_k == 1.0
assert metrics.citation_precision == 1.0
assert metrics.permission_filter_observation_count >= 1
assert metrics.average_permission_filter_duration_ms is not None
def test_retrieval_feedback_invalid_label_returns_structured_diagnostic() -> None:
retrieval = AssetRetrievalService(InMemoryAssetRegistryRepository())
result = retrieval.record_feedback(
RetrievalFeedbackRequest(label="maybe", query={}, result_ref={}),
operation_context(),
)
assert result.success is False
assert result.record is None
assert [diagnostic.code for diagnostic in result.diagnostics] == ["retrieval.feedback_label_invalid"]
def operation_context() -> OperationContext:
actor = Actor.create(
ActorType.HUMAN,
actor_id="user-retrieval",
display_name="Retrieval Tester",
groups=["engineering"],
)
return OperationContext.create(actor, correlation_id="corr-retrieval")
class DenyConfidentialRetrievalPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
resource_metadata = resource_metadata or {}
if action == "asset.retrieve" and resource_metadata.get("sensitivity") == Sensitivity.CONFIDENTIAL.value:
return PolicyDecision.fail_closed(
context.actor.id,
action,
resource,
reason="confidential retrieval denied in test policy",
context={"resource_metadata": resource_metadata},
)
return PolicyDecision.allow(
context.actor.id,
action,
resource,
context={"resource_metadata": resource_metadata},
)
class BrokenRetrievalPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
if action == "retrieval.assets.query":
raise RuntimeError("policy context unavailable")
return PolicyDecision.allow(context.actor.id, action, resource)

View File

@@ -4,13 +4,13 @@ type: workplan
title: "Governed Retrieval And Context Graph"
domain: markitect
repo: kontextual-engine
status: todo
status: done
owner: codex
topic_slug: markitect
planning_priority: high
planning_order: 7
created: "2026-05-05"
updated: "2026-05-05"
updated: "2026-05-06"
state_hub_workstream_id: "64352515-9677-46bb-909a-9e2db4915dc7"
---
@@ -44,11 +44,32 @@ produce grounded units and snippets. Engine retrieval contracts, result
envelopes, policy filtering, pagination, feedback, and cross-format search
remain engine-owned.
## Implementation Status
As of 2026-05-06, the first retrieval slice is recorded in
`docs/retrieval-implementation.md`. It establishes asset query request/result
contracts, stable sorting and pagination, result envelopes with source
references, representations, metadata records, refreshable lexical search,
relevance metadata, zero-result smoke metadata, and structured validation
diagnostics. It also supports combined metadata, lifecycle, source-context,
tag, collection, timestamp, and representation filters across in-memory and
SQLite-backed repositories. The contextual graph slice adds direct contextual
entity and relationship query envelopes plus asset filters by contextual
entity, workflow run, related asset, and relationship predicate. Remaining work
is focused on multi-hop graph traversal/ranking, source-grounded snippets, and
feedback/KPI hooks. Permission-aware retrieval now uses the engine policy
gateway for query-scope and per-resource checks, with fail-closed denied
envelopes and retrieval audit events. Lexical queries can also return
source-grounded snippet packets with representation/source references and
adapter provenance. Feedback and KPI hooks persist retrieval feedback and
derive zero-result, precision, citation precision, safety, confidence, and
permission-filter timing signals.
## R7.1 - Implement query contracts pagination sorting and result envelopes
```task
id: KONT-WP-0007-T001
status: todo
status: done
priority: high
state_hub_task_id: "5a1b0661-ce22-4ee6-a9e7-0aedce9d4356"
```
@@ -63,11 +84,22 @@ Acceptance:
references, and diagnostics.
- Invalid queries return structured validation errors.
Implemented:
- `AssetQueryRequest`, `AssetQueryItem`, `AssetQueryResult`, and
`AssetRetrievalService` provide the stable asset query contract.
- Queries return deterministic ordering with pagination metadata and
correlation IDs.
- Result entries expose asset identity, classification, source references,
representations, and metadata records.
- Invalid lifecycle, representation kind, sort key, sort order, limit, and
offset return structured diagnostics without raising raw exceptions.
## R7.2 - Implement lexical search over normalized content
```task
id: KONT-WP-0007-T002
status: todo
status: done
priority: high
state_hub_task_id: "5ec90dcb-473c-4d01-85f2-8db18de0b7d1"
```
@@ -81,11 +113,23 @@ Acceptance:
- Search indexes can be refreshed after ingestion or update.
- p95 latency and zero-result rate can be measured in smoke tests.
Implemented:
- Normalized ingestion now stores representation search text and length
metadata for retrieval indexing.
- `AssetRetrievalService.refresh_index()` builds a refreshable lexical index
with indexed asset and representation counts.
- Text queries perform lexical substring matching over normalized
representations and return relevance metadata including strategy, query,
match count, and matching representation IDs.
- Query result metadata includes zero-result and lexical index statistics for
later smoke/performance measurement.
## R7.3 - Implement metadata lifecycle and source-context filters
```task
id: KONT-WP-0007-T003
status: todo
status: done
priority: high
state_hub_task_id: "9e7d0a5c-71d4-44ca-9b71-70f2206e4a02"
```
@@ -100,11 +144,24 @@ Acceptance:
- Filter behavior is covered across in-memory and durable backends where
supported.
Implemented:
- Asset queries support filters for asset type, lifecycle, sensitivity, owner,
topic, review state, source system/path, representation kind, collection,
tags, created/updated timestamp bounds, and custom metadata records.
- Text search can be combined with standard, source, tag, collection,
sensitivity, and metadata filters.
- Combined filter behavior is covered over in-memory and SQLite-backed asset
repositories.
- Permission enforcement is intentionally deferred to R7.5; current lifecycle
and sensitivity filters establish the policy inputs without claiming
authorization semantics.
## R7.4 - Implement contextual entity model and relationship retrieval
```task
id: KONT-WP-0007-T004
status: todo
status: done
priority: high
state_hub_task_id: "b3358059-ac58-4e37-985c-6e8c1cc6df30"
```
@@ -120,11 +177,27 @@ Acceptance:
- Callers can retrieve assets by project, case, topic, source, workflow run, or
related asset.
Implemented:
- Existing `ContextEntity`/`CoreRelationship` primitives are reused as the
canonical model; entity types now include workflow runs and generated
artifacts for operational graph use cases.
- `ContextEntityQueryRequest`/`ContextEntityQueryResult` provide stable
contextual entity lookup by type, name, external reference, and metadata.
- `RelationshipQueryRequest`/`RelationshipQueryResult` provide stable
relationship retrieval by source, target, asset, contextual entity,
workflow run, predicate, target kind, and direction.
- Asset queries can filter by contextual entity, workflow run, related asset,
and relationship predicate while returning relationship and contextual
entity context for matched assets.
- Graph retrieval behavior is covered across in-memory and SQLite-backed
repositories.
## R7.5 - Enforce permission-aware retrieval and fail-closed semantics
```task
id: KONT-WP-0007-T005
status: todo
status: done
priority: high
state_hub_task_id: "c6c93713-3ab1-41fb-bf35-15dd860b66fa"
```
@@ -140,11 +213,25 @@ Acceptance:
- Retrieval audit events capture actor, query scope, outcome, and policy
context.
Implemented:
- Retrieval services accept the engine `PolicyGateway`, defaulting to the
allow-all local adapter used elsewhere in the system.
- Asset, contextual entity, and relationship queries authorize the query scope
before loading result envelopes.
- Assets, contextual entities, and relationships are policy-filtered before
they are returned; relationships additionally require source and target
resource visibility so traversal cannot reveal denied assets or entities.
- Policy gateway failures produce empty denied envelopes with structured
diagnostics and fail-closed policy decisions.
- Retrieval audit events capture actor, correlation ID, query scope, policy
decision, outcome, result counts, and internal permission-filter counts.
## R7.6 - Return source-grounded snippets citations and explanation data
```task
id: KONT-WP-0007-T006
status: todo
status: done
priority: medium
state_hub_task_id: "1a6d5a95-d87a-447a-a186-cb73162cd9a1"
```
@@ -160,11 +247,26 @@ Acceptance:
- Markdown snippets can reference Markitect selector matches or context-package
spans as adapter provenance.
Implemented:
- `RetrievalSnippet` packets expose asset, representation, source reference,
storage reference, media type, match offsets, match text, snippet text, and
adapter provenance.
- Lexical asset queries can request snippets through `include_snippets`,
`max_snippets`, and `snippet_radius`.
- Snippets are generated from normalized representation search text and are
attached only to policy-authorized asset results.
- Markitect selectors, source spans, context spans, adapter provenance,
snapshots, and extractor identity are preserved when supplied as
representation metadata.
- Snippet behavior is covered with permission filtering so denied matching
content does not leak through snippet packets.
## R7.7 - Capture retrieval feedback and KPI measurement hooks
```task
id: KONT-WP-0007-T007
status: todo
status: done
priority: medium
state_hub_task_id: "e17e2839-400f-4348-98e3-f77acc0b2fde"
```
@@ -179,6 +281,20 @@ Acceptance:
- Precision@k, zero-result rate, permission-filter latency, and citation
precision have measurement hooks.
Implemented:
- `RetrievalFeedbackRecord` persists feedback labels for useful, irrelevant,
missing, unsafe, and low-confidence outcomes with actor, correlation ID,
query context, result references, notes, and metadata.
- Asset registry repository ports and memory/SQLite adapters persist and list
retrieval feedback.
- `AssetRetrievalService.record_feedback()` records authorized feedback with
structured diagnostics for invalid labels or denied feedback operations.
- `AssetRetrievalService.quality_metrics()` derives zero-result rate,
precision@k, citation precision, feedback totals, unsafe/low-confidence
counts, and permission-filter timing observations from query results,
feedback records, and retrieval audit events.
## Definition Of Done
- Retrieval tests cover text, metadata, lifecycle, relationship, contextual