asset listing filters

This commit is contained in:
2026-05-06 08:11:54 +02:00
parent dbe93be1a9
commit f5a52e780d
7 changed files with 282 additions and 11 deletions

View File

@@ -37,6 +37,8 @@ and SQLite repositories are adapters behind those ports.
- Metadata schema validation before asset create and metadata update writes.
- Durable metadata schema registry and assignment rules for policy-selected
validation.
- Asset listing filters for lifecycle, asset type, sensitivity, owner, topic,
review state, metadata record values, and confirmed-only metadata.
- Actor and `OperationContext` required for material mutations.
- Policy gateway authorization before asset mutations.
- Fail-closed policy denial through `AuthorizationError`.
@@ -76,7 +78,6 @@ idempotency key.
## Not Yet Implemented
- Standard metadata filtering beyond lifecycle and asset type.
- Policy assignment storage and enterprise policy adapters.
- Conflict detection beyond version-sequence uniqueness.
- Restore and supersession service operations.
@@ -99,4 +100,6 @@ These remain in scope for later `KONT-WP-0005` tasks or adjacent workplans.
- SQLite reload preserving context entities, relationships, and idempotency
records,
- custom metadata schema validation before registry writes,
- persistent metadata schema registry and assignment reload behavior.
- persistent metadata schema registry and assignment reload behavior,
- classification and metadata-record asset filtering across memory and SQLite
repositories.

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable
from typing import Any, Iterable
from kontextual_engine.core import (
Actor,
@@ -21,6 +21,7 @@ from kontextual_engine.core import (
MetadataSchema,
MetadataSchemaAssignment,
RepresentationKind,
Sensitivity,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -65,12 +66,37 @@ class InMemoryAssetRegistryRepository:
*,
lifecycle: LifecycleState | None = None,
asset_type: str | None = None,
sensitivity: Sensitivity | str | None = None,
owner: str | None = None,
topic: str | None = None,
review_state: str | None = None,
metadata_filters: dict[str, Any] | None = None,
confirmed_metadata_only: bool = False,
) -> list[KnowledgeAsset]:
assets: Iterable[KnowledgeAsset] = self.assets.values()
if lifecycle is not None:
assets = [asset for asset in assets if asset.lifecycle == lifecycle]
if asset_type is not None:
assets = [asset for asset in assets if asset.classification.asset_type == asset_type]
if sensitivity is not None:
sensitivity = Sensitivity(sensitivity)
assets = [asset for asset in assets if asset.classification.sensitivity == sensitivity]
if owner is not None:
assets = [asset for asset in assets if asset.classification.owner == owner]
if topic is not None:
assets = [asset for asset in assets if topic in asset.classification.topics]
if review_state is not None:
assets = [asset for asset in assets if asset.classification.review_state == review_state]
if metadata_filters:
assets = [
asset
for asset in assets
if _metadata_matches(
self.metadata_records.get(asset.id, []),
metadata_filters,
confirmed_metadata_only=confirmed_metadata_only,
)
]
return sorted(assets, key=lambda asset: (asset.title, asset.id))
def save_representation(self, representation: AssetRepresentation) -> AssetRepresentation:
@@ -225,7 +251,7 @@ class InMemoryAssetRegistryRepository:
events = [event for event in events if event.target == target]
if correlation_id is not None:
events = [event for event in events if event.correlation_id == correlation_id]
return sorted(events, key=lambda event: (event.occurred_at, event.event_id))
return sorted(events, key=lambda event: event.occurred_at)
def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord:
self.idempotency_records[record.key] = record
@@ -253,3 +279,24 @@ class InMemoryAssetRegistryRepository:
if status is not None:
jobs = [job for job in jobs if job.status == status]
return sorted(jobs, key=lambda job: (job.created_at, job.job_id))
def _metadata_matches(
records: list[MetadataRecord],
metadata_filters: dict[str, Any],
*,
confirmed_metadata_only: bool,
) -> bool:
for key, expected in metadata_filters.items():
candidates = [record for record in records if record.key == key]
if confirmed_metadata_only:
candidates = [record for record in candidates if record.confirmed]
if not any(_metadata_value_matches(record.value, expected) for record in candidates):
return False
return True
def _metadata_value_matches(value: Any, expected: Any) -> bool:
if isinstance(value, list) and not isinstance(expected, list):
return expected in value
return value == expected

View File

@@ -24,6 +24,7 @@ from kontextual_engine.core import (
MetadataSchemaAssignment,
RepresentationKind,
RelationshipTargetKind,
Sensitivity,
)
from kontextual_engine.errors import NotFoundError, ValidationError
@@ -87,6 +88,12 @@ class SQLiteAssetRegistryRepository:
*,
lifecycle: LifecycleState | None = None,
asset_type: str | None = None,
sensitivity: Sensitivity | str | None = None,
owner: str | None = None,
topic: str | None = None,
review_state: str | None = None,
metadata_filters: dict[str, Any] | None = None,
confirmed_metadata_only: bool = False,
) -> list[KnowledgeAsset]:
clauses = []
params: list[Any] = []
@@ -98,7 +105,28 @@ class SQLiteAssetRegistryRepository:
params.append(asset_type)
where = f" where {' and '.join(clauses)}" if clauses else ""
rows = self._all(f"select payload from assets{where} order by title, id", tuple(params))
return [KnowledgeAsset.from_dict(_loads(row["payload"])) for row in rows]
assets = [KnowledgeAsset.from_dict(_loads(row["payload"])) for row in rows]
if sensitivity is not None:
sensitivity = Sensitivity(sensitivity)
assets = [asset for asset in assets if asset.classification.sensitivity == sensitivity]
if owner is not None:
assets = [asset for asset in assets if asset.classification.owner == owner]
if topic is not None:
assets = [asset for asset in assets if topic in asset.classification.topics]
if review_state is not None:
assets = [asset for asset in assets if asset.classification.review_state == review_state]
if metadata_filters:
records_by_asset = self._metadata_records_for_assets([asset.id for asset in assets])
assets = [
asset
for asset in assets
if _metadata_matches(
records_by_asset.get(asset.id, []),
metadata_filters,
confirmed_metadata_only=confirmed_metadata_only,
)
]
return assets
def save_representation(self, representation: AssetRepresentation) -> AssetRepresentation:
try:
@@ -422,7 +450,7 @@ class SQLiteAssetRegistryRepository:
clauses.append("correlation_id = ?")
params.append(correlation_id)
where = f" where {' and '.join(clauses)}" if clauses else ""
rows = self._all(f"select payload from audit_events{where} order by occurred_at, id", tuple(params))
rows = self._all(f"select payload from audit_events{where} order by occurred_at, rowid", tuple(params))
return [AuditEvent.from_dict(_loads(row["payload"])) for row in rows]
def save_idempotency_record(self, record: IdempotencyRecord) -> IdempotencyRecord:
@@ -616,6 +644,23 @@ class SQLiteAssetRegistryRepository:
with self._connect() as conn:
return list(conn.execute(query, params).fetchall())
def _metadata_records_for_assets(self, asset_ids: list[str]) -> dict[str, list[MetadataRecord]]:
if not asset_ids:
return {}
placeholders = ",".join("?" for _ in asset_ids)
rows = self._all(
f"""
select asset_id, payload from metadata_records
where asset_id in ({placeholders})
order by asset_id, key, id
""",
tuple(asset_ids),
)
records: dict[str, list[MetadataRecord]] = {}
for row in rows:
records.setdefault(row["asset_id"], []).append(MetadataRecord.from_dict(_loads(row["payload"])))
return records
def _json(value: dict[str, Any]) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"))
@@ -623,3 +668,24 @@ def _json(value: dict[str, Any]) -> str:
def _loads(value: str) -> dict[str, Any]:
return json.loads(value)
def _metadata_matches(
records: list[MetadataRecord],
metadata_filters: dict[str, Any],
*,
confirmed_metadata_only: bool,
) -> bool:
for key, expected in metadata_filters.items():
candidates = [record for record in records if record.key == key]
if confirmed_metadata_only:
candidates = [record for record in candidates if record.confirmed]
if not any(_metadata_value_matches(record.value, expected) for record in candidates):
return False
return True
def _metadata_value_matches(value: Any, expected: Any) -> bool:
if isinstance(value, list) and not isinstance(expected, list):
return expected in value
return value == expected

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Protocol
from typing import Any, Protocol
from kontextual_engine.core import (
Actor,
@@ -20,6 +20,7 @@ from kontextual_engine.core import (
MetadataSchema,
MetadataSchemaAssignment,
RepresentationKind,
Sensitivity,
)
@@ -34,6 +35,12 @@ class AssetRegistryRepository(Protocol):
*,
lifecycle: LifecycleState | None = None,
asset_type: str | None = None,
sensitivity: Sensitivity | str | None = None,
owner: str | None = None,
topic: str | None = None,
review_state: str | None = None,
metadata_filters: dict[str, Any] | None = None,
confirmed_metadata_only: bool = False,
) -> list[KnowledgeAsset]: ...
def save_representation(self, representation: AssetRepresentation) -> AssetRepresentation: ...

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass, replace
from typing import Any
from kontextual_engine.core import (
AssetRepresentation,
@@ -22,6 +23,7 @@ from kontextual_engine.core import (
OperationContext,
PolicyDecision,
RelationshipTargetKind,
Sensitivity,
SourceReference,
VersionChangeType,
)
@@ -308,6 +310,29 @@ class AssetRegistryService:
def get_asset(self, asset_id: str) -> KnowledgeAsset:
return self.repository.get_asset(asset_id)
def list_assets(
self,
*,
lifecycle: LifecycleState | None = None,
asset_type: str | None = None,
sensitivity: Sensitivity | str | None = None,
owner: str | None = None,
topic: str | None = None,
review_state: str | None = None,
metadata_filters: dict[str, Any] | None = None,
confirmed_metadata_only: bool = False,
) -> list[KnowledgeAsset]:
return self.repository.list_assets(
lifecycle=lifecycle,
asset_type=asset_type,
sensitivity=sensitivity,
owner=owner,
topic=topic,
review_state=review_state,
metadata_filters=metadata_filters,
confirmed_metadata_only=confirmed_metadata_only,
)
def register_context_entity(self, entity: ContextEntity, context: OperationContext) -> ContextEntity:
decision = self._authorize(
context,

View File

@@ -279,6 +279,80 @@ def test_asset_registry_applies_persisted_metadata_schema_assignments() -> None:
assert service.list_metadata_schema_assignments()[0].policy_ref == "local://metadata-policy/policy-note"
def test_asset_registry_filters_assets_by_standard_metadata_and_records() -> None:
repo = InMemoryAssetRegistryRepository()
service = AssetRegistryService(repo)
context = operation_context()
service.create_asset(
"Architecture ADR",
Classification(
asset_type="document",
sensitivity=Sensitivity.PUBLIC,
topics=("architecture", "adr"),
owner="Platform Knowledge",
review_state="approved",
),
context,
asset_id="asset-adr",
metadata_records=[
MetadataRecord("status", "accepted", confirmed=True),
MetadataRecord("tags", ["governance", "markdown"], confirmed=True),
],
)
service.create_asset(
"Internal Risk Note",
Classification(
asset_type="note",
sensitivity=Sensitivity.CONFIDENTIAL,
topics=("risk",),
owner="Security",
review_state="draft",
),
context,
asset_id="asset-risk",
metadata_records=[MetadataRecord("status", "accepted", confirmed=False)],
)
service.create_asset(
"Architecture Brief",
Classification(
asset_type="document",
sensitivity=Sensitivity.INTERNAL,
topics=("architecture",),
owner="Platform Knowledge",
review_state="draft",
),
context,
asset_id="asset-brief",
metadata_records=[MetadataRecord("status", "draft", confirmed=True)],
)
assert [asset.id for asset in service.list_assets(sensitivity=Sensitivity.PUBLIC)] == ["asset-adr"]
assert [asset.id for asset in service.list_assets(owner="Security")] == ["asset-risk"]
assert [asset.id for asset in service.list_assets(topic="architecture")] == [
"asset-adr",
"asset-brief",
]
assert [asset.id for asset in service.list_assets(review_state="draft")] == [
"asset-brief",
"asset-risk",
]
assert [asset.id for asset in service.list_assets(metadata_filters={"status": "accepted"})] == [
"asset-adr",
"asset-risk",
]
assert [
asset.id
for asset in service.list_assets(
metadata_filters={"status": "accepted"},
confirmed_metadata_only=True,
)
] == ["asset-adr"]
assert [asset.id for asset in service.list_assets(metadata_filters={"tags": "markdown"})] == [
"asset-adr"
]
def test_sqlite_asset_registry_survives_reinstantiation(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
@@ -421,6 +495,55 @@ def test_sqlite_registry_persists_metadata_schemas_and_assignments(tmp_path: Pat
assert reloaded_repo.get_metadata_schema_assignment("assignment-review-documents").schema_id == "schema-review-v1"
def test_sqlite_registry_filters_assets_after_reload(tmp_path: Path) -> None:
db_path = tmp_path / "registry.sqlite"
repo = SQLiteAssetRegistryRepository(db_path)
service = AssetRegistryService(repo)
context = operation_context()
service.create_asset(
"Public Guide",
Classification(
asset_type="guide",
sensitivity=Sensitivity.PUBLIC,
topics=("markdown", "proxy"),
owner="Docs",
review_state="approved",
),
context,
asset_id="asset-guide",
metadata_records=[MetadataRecord("channel", "public", confirmed=True)],
)
service.create_asset(
"Internal Guide",
Classification(
asset_type="guide",
sensitivity=Sensitivity.INTERNAL,
topics=("markdown",),
owner="Docs",
review_state="draft",
),
context,
asset_id="asset-internal-guide",
metadata_records=[MetadataRecord("channel", "public", confirmed=False)],
)
reloaded = AssetRegistryService(SQLiteAssetRegistryRepository(db_path))
assert [asset.id for asset in reloaded.list_assets(asset_type="guide", owner="Docs")] == [
"asset-internal-guide",
"asset-guide",
]
assert [asset.id for asset in reloaded.list_assets(topic="proxy")] == ["asset-guide"]
assert [
asset.id
for asset in reloaded.list_assets(
metadata_filters={"channel": "public"},
confirmed_metadata_only=True,
)
] == ["asset-guide"]
def test_sqlite_registry_enforces_representation_asset_reference(tmp_path: Path) -> None:
repo = SQLiteAssetRegistryRepository(tmp_path / "registry.sqlite")
representation = AssetRepresentation.from_content(

View File

@@ -67,9 +67,9 @@ representations, metadata records, context entities, asset/context
relationships, idempotent asset creation, and custom metadata schema
validation before registry writes. It now also includes a durable metadata
schema registry and assignment rules for policy-selected validation. Remaining
work in this workplan is concentrated on standard metadata filtering beyond
lifecycle and asset type, restore/supersession operations, conflict semantics
beyond sequence/idempotency checks, and batch partial-failure envelopes.
work in this workplan is concentrated on restore/supersession operations,
conflict semantics beyond sequence/idempotency checks, and batch
partial-failure envelopes.
## G5.1 - Implement stable asset identity and source references
@@ -117,7 +117,7 @@ Acceptance:
```task
id: KONT-WP-0005-T003
status: in_progress
status: done
priority: high
state_hub_task_id: "b06c5124-ce54-4241-b712-2fbab856877b"
```