Asset create/list/get, Metadata add/list, Lifecycle transition, Relationship create/list, Audit event query, Policy evaluation

This commit is contained in:
2026-05-06 20:39:29 +02:00
parent e53bc4144d
commit dee0ce8a12
5 changed files with 517 additions and 13 deletions

View File

@@ -22,6 +22,19 @@ Implemented in `KONT-WP-0009-T001`:
- `GET /api/v1/version`
- `GET /openapi.json`
Implemented in `KONT-WP-0009-T002`:
- `POST /api/v1/assets`
- `GET /api/v1/assets`
- `GET /api/v1/assets/{asset_id}`
- `POST /api/v1/assets/{asset_id}/metadata`
- `GET /api/v1/assets/{asset_id}/metadata`
- `POST /api/v1/assets/{asset_id}/lifecycle`
- `POST /api/v1/relationships`
- `GET /api/v1/relationships`
- `GET /api/v1/audit/events`
- `POST /api/v1/policy/evaluate`
The unversioned health/readiness/version endpoints are operational probes. The
versioned `/api/v1/*` endpoints establish the MVP API namespace. Future
domain-resource endpoints should live under `/api/v1`.
@@ -72,7 +85,6 @@ resources.
## Deferred
- Asset, metadata, relationship, audit, and policy endpoints.
- Ingestion, retrieval, transformation, and workflow endpoints.
- Actor context, delegation, and authorization middleware.
- Agent-safe operation catalog.

View File

@@ -7,8 +7,9 @@ Status: active implementation note for `KONT-WP-0009`.
## Purpose
This note records the first optional FastAPI service adapter. The service layer
is intentionally thin: it exposes operational probes and API version metadata
while leaving domain behavior in the application services.
is intentionally thin: it exposes operational probes, API version metadata, and
the first governed asset-resource surface while leaving domain behavior in the
application services.
## Implemented Package Shape
@@ -30,6 +31,16 @@ src/kontextual_engine/
- `GET /api/v1/health`
- `GET /api/v1/ready`
- `GET /api/v1/version`
- `POST /api/v1/assets`
- `GET /api/v1/assets`
- `GET /api/v1/assets/{asset_id}`
- `POST /api/v1/assets/{asset_id}/metadata`
- `GET /api/v1/assets/{asset_id}/metadata`
- `POST /api/v1/assets/{asset_id}/lifecycle`
- `POST /api/v1/relationships`
- `GET /api/v1/relationships`
- `GET /api/v1/audit/events`
- `POST /api/v1/policy/evaluate`
- `GET /openapi.json`
Unversioned endpoints are operational probes. Versioned endpoints establish
@@ -46,11 +57,21 @@ the `/api/v1` namespace for future domain resources.
- package version discovery,
- health payload,
- readiness checks,
- version payload.
- version payload,
- asset service construction,
- basic actor/correlation context construction,
- asset, metadata, lifecycle, relationship, audit, and policy operation
translation.
Readiness currently checks that the configured asset registry repository can
list assets. It does not mutate state.
Asset, metadata, relationship, lifecycle, audit, and policy operations delegate
to `AssetRegistryService`, the configured repository, and the configured
`PolicyGateway`. Protected mutations therefore keep the existing policy and
audit semantics. The current header-to-actor translation is deliberately simple
and will be expanded in `KONT-WP-0009-T004`.
## Dependency Boundary
The `service` extra now includes FastAPI, Uvicorn, and HTTPX for test-client
@@ -69,6 +90,12 @@ missing-dependency behavior are tested without FastAPI.
`tests/test_service_api.py` covers:
- service runtime health/readiness/version without FastAPI installed,
- runtime asset create/list/get operations,
- runtime metadata add/list operations,
- runtime lifecycle transition operations,
- runtime relationship create/list operations,
- runtime audit query and policy evaluation,
- runtime policy denial blocking a protected operation,
- `create_app()` missing-dependency behavior when the optional extra is absent,
- health/readiness/version/OpenAPI endpoint contracts when FastAPI and HTTPX
are installed,
@@ -77,7 +104,6 @@ missing-dependency behavior are tested without FastAPI.
## Not Yet Implemented
- Asset, metadata, relationship, audit, and policy endpoints.
- Ingestion, retrieval, transformation, workflow, review, and reconstruction
endpoints.
- Request actor context and delegation middleware.

View File

@@ -11,8 +11,23 @@ from importlib import metadata
from typing import Any
from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository
from kontextual_engine.core import utc_now
from kontextual_engine.ports import AssetRegistryRepository
from kontextual_engine.core import (
Actor,
ActorType,
Classification,
ContextEntity,
ContextEntityType,
LifecycleState,
MetadataRecord,
OperationContext,
PolicyDecision,
RelationshipTargetKind,
SourceReference,
utc_now,
)
from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError
from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway
from kontextual_engine.services import AssetRegistryService
API_VERSION = "v1"
@@ -22,10 +37,33 @@ OPENAPI_VERSION = "1.0.0"
@dataclass
class ServiceRuntime:
repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository)
policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway)
api_version: str = API_VERSION
service_name: str = "kontextual-engine"
started_at: str = field(default_factory=lambda: utc_now().isoformat())
def asset_service(self) -> AssetRegistryService:
return AssetRegistryService(self.repository, policy_gateway=self.policy_gateway)
def operation_context(
self,
*,
actor_id: str = "api-user",
actor_type: str = "human",
display_name: str | None = None,
correlation_id: str | None = None,
groups: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> OperationContext:
actor = Actor.create(
ActorType(actor_type),
actor_id=actor_id,
display_name=display_name,
groups=groups,
metadata=metadata,
)
return OperationContext.create(actor, correlation_id=correlation_id)
@property
def package_version(self) -> str:
try:
@@ -75,10 +113,155 @@ class ServiceRuntime:
"openapi_version": OPENAPI_VERSION,
}
def create_asset(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
classification = Classification.from_dict(payload["classification"])
result = self.asset_service().create_asset(
payload["title"],
classification,
context,
asset_id=payload.get("asset_id"),
source_refs=[_source_reference(item) for item in payload.get("source_refs", ())],
metadata_records=[_metadata_record(item) for item in payload.get("metadata_records", ())],
idempotency_key=payload.get("idempotency_key"),
)
return _asset_change_result(result)
def get_asset(self, asset_id: str) -> dict[str, Any]:
return self.asset_service().get_asset(asset_id).to_dict()
def list_assets(
self,
*,
lifecycle: str | None = None,
asset_type: str | None = None,
sensitivity: str | None = None,
owner: str | None = None,
topic: str | None = None,
review_state: str | None = None,
) -> dict[str, Any]:
assets = self.asset_service().list_assets(
lifecycle=LifecycleState(lifecycle) if lifecycle else None,
asset_type=asset_type,
sensitivity=sensitivity,
owner=owner,
topic=topic,
review_state=review_state,
)
return {"items": [asset.to_dict() for asset in assets], "count": len(assets)}
def add_metadata_record(
self,
asset_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
result = self.asset_service().add_metadata_record(
asset_id,
_metadata_record(payload),
context,
expected_current_version_id=payload.get("expected_current_version_id"),
)
return _asset_change_result(result)
def list_metadata_records(self, asset_id: str) -> dict[str, Any]:
records = self.repository.list_metadata_records(asset_id)
return {"items": [record.to_dict() for record in records], "count": len(records)}
def transition_lifecycle(
self,
asset_id: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
result = self.asset_service().transition_lifecycle(
asset_id,
LifecycleState(payload["lifecycle"]),
context,
expected_current_version_id=payload.get("expected_current_version_id"),
)
return _asset_change_result(result)
def create_relationship(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
target_kind = RelationshipTargetKind(payload.get("target_kind", RelationshipTargetKind.ASSET.value))
service = self.asset_service()
if target_kind == RelationshipTargetKind.CONTEXT_ENTITY:
entity_payload = payload.get("context_entity") or {}
entity = ContextEntity(
entity_id=payload["target_id"],
entity_type=ContextEntityType(entity_payload.get("entity_type", ContextEntityType.TOPIC.value)),
name=entity_payload.get("name", payload["target_id"]),
external_ref=entity_payload.get("external_ref"),
metadata=dict(entity_payload.get("metadata", {})),
)
result = service.link_asset_to_context_entity(
payload["source_asset_id"],
entity,
payload["predicate"],
context,
confidence=payload.get("confidence"),
provenance=dict(payload.get("provenance", {})),
expected_current_version_id=payload.get("expected_current_version_id"),
)
else:
result = service.link_asset_to_asset(
payload["source_asset_id"],
payload["target_id"],
payload["predicate"],
context,
confidence=payload.get("confidence"),
provenance=dict(payload.get("provenance", {})),
expected_current_version_id=payload.get("expected_current_version_id"),
)
return {
"relationship": result.relationship.to_dict(),
"version": result.version.to_dict(),
"audit_event": result.audit_event.to_dict(),
"policy_decision": result.policy_decision.to_dict(),
}
def list_relationships(
self,
*,
source_id: str | None = None,
target_id: str | None = None,
) -> dict[str, Any]:
relationships = self.repository.list_relationships(source_id=source_id, target_id=target_id)
return {
"items": [relationship.to_dict() for relationship in relationships],
"count": len(relationships),
}
def list_audit_events(
self,
*,
target: str | None = None,
correlation_id: str | None = None,
) -> dict[str, Any]:
events = self.repository.list_audit_events(target=target, correlation_id=correlation_id)
return {"items": [event.to_dict() for event in events], "count": len(events)}
def evaluate_policy(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
try:
decision = self.policy_gateway.authorize(
context,
payload["action"],
payload["resource"],
resource_metadata=dict(payload.get("resource_metadata", {})),
)
except Exception as exc:
decision = PolicyDecision.fail_closed(
context.actor.id,
payload.get("action", "unknown"),
payload.get("resource", "unknown"),
reason=str(exc),
context={"gateway_error": type(exc).__name__},
)
return decision.to_dict()
def create_app(runtime: ServiceRuntime | None = None):
try:
from fastapi import FastAPI
from fastapi import Depends, FastAPI, Header, HTTPException, Query
except ImportError as exc: # pragma: no cover - exercised when optional extra is absent
raise RuntimeError(
"FastAPI service dependencies are not installed. Install kontextual-engine[service]."
@@ -120,4 +303,151 @@ def create_app(runtime: ServiceRuntime | None = None):
def versioned_version() -> dict[str, Any]:
return runtime.version()
def context_from_headers(
x_actor_id: str = Header("api-user"),
x_actor_type: str = Header("human"),
x_actor_display_name: str | None = Header(None),
x_correlation_id: str | None = Header(None),
) -> OperationContext:
return runtime.operation_context(
actor_id=x_actor_id,
actor_type=x_actor_type,
display_name=x_actor_display_name,
correlation_id=x_correlation_id,
)
def response(callable_obj, *args: Any, **kwargs: Any) -> Any:
try:
return callable_obj(*args, **kwargs)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=_error_payload(exc)) from exc
except AuthorizationError as exc:
raise HTTPException(status_code=403, detail=_error_payload(exc)) from exc
except ValidationError as exc:
raise HTTPException(status_code=422, detail=_error_payload(exc)) from exc
except KontextualError as exc:
raise HTTPException(status_code=400, detail=_error_payload(exc)) from exc
@app.post(f"{prefix}/assets", tags=["assets"])
def create_asset(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.create_asset, payload, context)
@app.get(f"{prefix}/assets", tags=["assets"])
def list_assets(
lifecycle: str | None = Query(None),
asset_type: str | None = Query(None),
sensitivity: str | None = Query(None),
owner: str | None = Query(None),
topic: str | None = Query(None),
review_state: str | None = Query(None),
) -> dict[str, Any]:
return response(
runtime.list_assets,
lifecycle=lifecycle,
asset_type=asset_type,
sensitivity=sensitivity,
owner=owner,
topic=topic,
review_state=review_state,
)
@app.get(f"{prefix}/assets/{{asset_id}}", tags=["assets"])
def get_asset(asset_id: str) -> dict[str, Any]:
return response(runtime.get_asset, asset_id)
@app.post(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"])
def add_metadata(
asset_id: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.add_metadata_record, asset_id, payload, context)
@app.get(f"{prefix}/assets/{{asset_id}}/metadata", tags=["metadata"])
def list_metadata(asset_id: str) -> dict[str, Any]:
return response(runtime.list_metadata_records, asset_id)
@app.post(f"{prefix}/assets/{{asset_id}}/lifecycle", tags=["assets"])
def transition_lifecycle(
asset_id: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.transition_lifecycle, asset_id, payload, context)
@app.post(f"{prefix}/relationships", tags=["relationships"])
def create_relationship(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.create_relationship, payload, context)
@app.get(f"{prefix}/relationships", tags=["relationships"])
def list_relationships(
source_id: str | None = Query(None),
target_id: str | None = Query(None),
) -> dict[str, Any]:
return response(runtime.list_relationships, source_id=source_id, target_id=target_id)
@app.get(f"{prefix}/audit/events", tags=["audit"])
def list_audit_events(
target: str | None = Query(None),
correlation_id: str | None = Query(None),
) -> dict[str, Any]:
return response(runtime.list_audit_events, target=target, correlation_id=correlation_id)
@app.post(f"{prefix}/policy/evaluate", tags=["policy"])
def evaluate_policy(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.evaluate_policy, payload, context)
return app
def _metadata_record(data: dict[str, Any]) -> MetadataRecord:
if "record_id" in data and "created_at" in data:
return MetadataRecord.from_dict(data)
return MetadataRecord(
key=data["key"],
value=data.get("value"),
provenance=dict(data.get("provenance", {})),
confidence=data.get("confidence"),
confirmed=bool(data.get("confirmed", False)),
record_id=data.get("record_id") or MetadataRecord(data["key"], data.get("value")).record_id,
)
def _source_reference(data: dict[str, Any]) -> SourceReference:
if "id" in data:
return SourceReference.from_dict(data)
return SourceReference(
source_system=data["source_system"],
path=data.get("path"),
uri=data.get("uri"),
external_id=data.get("external_id"),
checksum=data.get("checksum"),
connector_ref=data.get("connector_ref"),
metadata=dict(data.get("metadata", {})),
)
def _asset_change_result(result: Any) -> dict[str, Any]:
return {
"asset": result.asset.to_dict(),
"version": result.version.to_dict(),
"audit_event": result.audit_event.to_dict(),
"policy_decision": result.policy_decision.to_dict(),
}
def _error_payload(error: KontextualError) -> dict[str, Any]:
return {
"code": error.code,
"message": str(error),
"details": dict(error.details),
}

View File

@@ -1,6 +1,6 @@
import pytest
from kontextual_engine import ServiceRuntime, create_app
from kontextual_engine import AuthorizationError, OperationContext, PolicyDecision, ServiceRuntime, create_app
from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository
@@ -15,6 +15,106 @@ def test_service_runtime_health_readiness_and_version_are_importable_without_fas
assert runtime.version()["api_version"] == "v1"
def test_service_runtime_exposes_asset_metadata_relationship_audit_and_policy_operations() -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="user-api", correlation_id="corr-api")
created = runtime.create_asset(
{
"asset_id": "asset-api",
"title": "API Asset",
"classification": {
"asset_type": "document",
"sensitivity": "internal",
"owner": "Platform Knowledge",
},
"metadata_records": [
{
"key": "status",
"value": "draft",
"confirmed": True,
"provenance": {"producer": "api-test"},
}
],
},
context,
)
successor = runtime.create_asset(
{
"asset_id": "asset-successor",
"title": "Successor",
"classification": {"asset_type": "document", "sensitivity": "public"},
},
context,
)
metadata = runtime.add_metadata_record(
"asset-api",
{"key": "reviewer", "value": "codex", "confirmed": True},
context,
)
lifecycle = runtime.transition_lifecycle(
"asset-api",
{"lifecycle": "retired"},
context,
)
relationship = runtime.create_relationship(
{
"source_asset_id": "asset-api",
"target_id": "asset-successor",
"predicate": "superseded_by",
"target_kind": "asset",
"confidence": 1.0,
},
context,
)
audit = runtime.list_audit_events(target="asset:asset-api")
policy = runtime.evaluate_policy(
{"action": "asset.retrieve", "resource": "asset:asset-api"},
context,
)
assert created["asset"]["id"] == "asset-api"
assert successor["asset"]["id"] == "asset-successor"
assert runtime.get_asset("asset-api")["lifecycle"] == "retired"
assert runtime.list_assets(asset_type="document")["count"] == 2
assert metadata["version"]["change_type"] == "metadata_changed"
assert lifecycle["asset"]["lifecycle"] == "retired"
assert [record["key"] for record in runtime.list_metadata_records("asset-api")["items"]] == [
"status",
"reviewer",
]
assert relationship["relationship"]["predicate"] == "superseded_by"
assert runtime.list_relationships(source_id="asset-api")["count"] == 1
assert [event["operation"] for event in audit["items"]] == [
"asset.create",
"asset.metadata.add",
"asset.lifecycle.transition",
"asset.relationship.add",
]
assert policy["effect"] == "allow"
def test_service_runtime_policy_denial_blocks_protected_asset_operation() -> None:
runtime = ServiceRuntime(
repository=InMemoryAssetRegistryRepository(),
policy_gateway=DenyCreatePolicy(),
)
with pytest.raises(AuthorizationError) as exc_info:
runtime.create_asset(
{
"asset_id": "asset-denied",
"title": "Denied",
"classification": {"asset_type": "document"},
},
runtime.operation_context(actor_id="user-denied"),
)
assert exc_info.value.details["policy_decision"]["effect"] == "deny"
assert runtime.list_assets()["count"] == 0
assert runtime.list_audit_events(target="asset:asset-denied")["items"][0]["outcome"] == "denied"
def test_create_app_reports_missing_optional_dependency_when_fastapi_is_absent() -> None:
try:
import fastapi # noqa: F401
@@ -60,7 +160,31 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None:
assert "/api/v1/health" in paths
assert "/api/v1/ready" in paths
assert "/api/v1/version" in paths
assert "/api/v1/assets" in paths
assert "/api/v1/relationships" in paths
assert "/api/v1/audit/events" in paths
assert "/api/v1/policy/evaluate" in paths
def test_create_app_attaches_runtime_to_application_state(client) -> None:
assert client.app.state.kontextual_runtime.api_version == "v1"
class DenyCreatePolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict[str, str] | None = None,
) -> PolicyDecision:
if action == "asset.create":
return PolicyDecision.deny(
context.actor.id,
action,
resource,
reason="asset creation disabled",
context={"resource_metadata": resource_metadata or {}},
)
return PolicyDecision.allow(context.actor.id, action, resource)

View File

@@ -49,11 +49,13 @@ review gates through Markitect APIs.
## Implementation Status
The first optional FastAPI service skeleton is implemented for health,
readiness, version, and OpenAPI contracts. See
readiness, version, OpenAPI contracts, and the initial asset/metadata/
relationship/audit/policy resource surface. See
`docs/service-api-implementation.md`.
Domain-resource endpoints, actor context, agent operations, context packages,
and dry-run/review-gate response contracts remain open in this workplan.
Ingestion, retrieval, transformation, workflow, actor context, agent
operations, context packages, and dry-run/review-gate response contracts remain
open in this workplan.
## S9.1 - Implement versioned FastAPI service skeleton and health contracts
@@ -86,7 +88,7 @@ Implemented:
```task
id: KONT-WP-0009-T002
status: todo
status: done
priority: high
state_hub_task_id: "a37e5ba3-e128-4100-b22c-c85cca3f8db3"
```
@@ -100,6 +102,16 @@ Acceptance:
- Permission and policy checks run before protected operations.
- Audit history can be queried by authorized callers.
Implemented:
- `ServiceRuntime` exposes asset create/list/get, metadata add/list,
lifecycle transition, relationship create/list, audit query, and policy
evaluation operations over existing service contracts.
- FastAPI routes under `/api/v1` wrap those runtime operations without owning
domain behavior.
- Policy denial blocks protected asset mutations and denied attempts remain
auditable.
## S9.3 - Expose ingestion retrieval transformation and workflow APIs
```task