diff --git a/docs/service-api-boundary.md b/docs/service-api-boundary.md index 29f3714..9c6adb9 100644 --- a/docs/service-api-boundary.md +++ b/docs/service-api-boundary.md @@ -35,6 +35,84 @@ Implemented in `KONT-WP-0009-T002`: - `GET /api/v1/audit/events` - `POST /api/v1/policy/evaluate` +Implemented in `KONT-WP-0009-T003`: + +- `GET /api/v1/ingestion/capabilities` +- `POST /api/v1/ingestion/jobs` +- `GET /api/v1/ingestion/jobs` +- `GET /api/v1/ingestion/jobs/{job_id}` +- `POST /api/v1/retrieval/index/refresh` +- `POST /api/v1/retrieval/assets` +- `POST /api/v1/retrieval/context-entities` +- `POST /api/v1/retrieval/relationships` +- `POST /api/v1/retrieval/feedback` +- `GET /api/v1/retrieval/feedback` +- `GET /api/v1/retrieval/quality` +- `GET /api/v1/transformations/operations` +- `POST /api/v1/transformations/runs` +- `GET /api/v1/transformations/runs` +- `GET /api/v1/transformations/runs/{run_id}` +- `POST /api/v1/transformations/runs/{run_id}/retry` +- `POST /api/v1/transformations/runs/{run_id}/cancel` +- `POST /api/v1/workflows/templates` +- `GET /api/v1/workflows/templates` +- `GET /api/v1/workflows/templates/{template_id}` +- `POST /api/v1/workflows/runs` +- `POST /api/v1/workflows/runs/queue` +- `GET /api/v1/workflows/runs` +- `GET /api/v1/workflows/runs/{run_id}` +- `POST /api/v1/workflows/runs/{run_id}/resume` +- `POST /api/v1/workflows/runs/{run_id}/retry` +- `POST /api/v1/workflows/runs/{run_id}/cancel` +- `GET /api/v1/workflows/runs/{run_id}/reconstruction` +- `GET /api/v1/workflows/reviews` +- `GET /api/v1/workflows/exceptions` +- `POST /api/v1/workflows/runs/{run_id}/reviews/{review_id}/decision` + +Implemented in `KONT-WP-0009-T004`: + +- `GET /api/v1/context` +- Actor headers: `X-Actor-Id`, `X-Actor-Type`, `X-Actor-Display-Name`, + `X-Actor-External-Ref`, `X-Actor-Groups`. +- Delegation headers: `X-Delegated-Actor-Id`, `X-Delegated-Actor-Type`, + `X-Delegated-Actor-Display-Name`, `X-Delegated-Actor-External-Ref`, + `X-Delegated-Actor-Groups`. +- Agent headers: `X-Agent-Id`, `X-Agent-Name`, `X-Agent-Run-Id`, + `X-Agent-Tool`. +- Scope headers: `X-Request-Scope` and `X-Policy-Scope` as JSON objects. +- Redacted HTTP authorization error payloads. + +Implemented in `KONT-WP-0009-T005`: + +- `GET /api/v1/agents/operations` +- `GET /api/v1/agents/operations/{operation_id}` +- `POST /api/v1/agents/operations/{operation_id}` +- Catalog entries declare input/output shape notes, permissions, audit + operation, failure modes, and dry-run support. +- Execution is limited to documented operation IDs and emits separate + `agent.operation.*` audit events before dispatching through existing service + contracts. + +Implemented in `KONT-WP-0009-T006`: + +- `GET /api/v1/context-packages/schema` +- `POST /api/v1/context-packages` +- Context packages are assembled from governed retrieval results. +- Payloads carry source refs, snippets, metadata, relationships, policy + constraints, opaque external memory refs, and audit/policy references. +- The `markitect` format emits a Markitect-compatible envelope while keeping + markdown rendering and selector semantics delegated to `markitect-tool`. + +Implemented in `KONT-WP-0009-T007`: + +- Agent operation policies can return `require_review` and receive structured + `review_required` envelopes with review obligations. +- Agent operation policies can return `dry_run_only` and receive + `dry_run_required` envelopes unless the request is already a dry run. +- Review and dry-run outcomes are audited with explicit `review_required` and + `dry_run` audit outcomes. +- Partial-failure job envelopes are covered by contract tests. + The unversioned health/readiness/version endpoints are operational probes. The versioned `/api/v1/*` endpoints establish the MVP API namespace. Future domain-resource endpoints should live under `/api/v1`. @@ -49,19 +127,14 @@ python3 -m pip install -e '.[service]' ## Planned Resource Shape -Planned endpoint groups: +Remaining planned endpoint groups: - `POST /collections`, `GET /collections`, `GET /collections/{id}` - `POST /artifacts`, `GET /artifacts/{id}`, `GET /artifacts` -- `POST /relationships`, `GET /relationships` -- `POST /ingest` -- `POST /query/artifacts`, `POST /query/relationships` -- `POST /runs`, `GET /runs/{id}`, `GET /runs/{id}/manifest` - `POST /context/artifact/{id}` For the governed asset registry architecture, these planned groups should be -translated to assets, metadata, relationships, ingestion jobs, retrieval, -transformations, workflow templates/runs, review queues, and reconstruction +translated to assets, metadata, context packages, and bounded agent operation resources. ## MVP API Versioning Policy @@ -85,10 +158,6 @@ resources. ## Deferred -- Ingestion, retrieval, transformation, and workflow endpoints. -- Actor context, delegation, and authorization middleware. -- Agent-safe operation catalog. -- Context package API. -- Dry-run and review-gate response envelopes for high-impact operations. +No MVP service API task remains deferred in this workplan. - Streaming run execution. - Provider-backed assisted steps. diff --git a/docs/service-api-implementation.md b/docs/service-api-implementation.md index 4a96868..594bd4e 100644 --- a/docs/service-api-implementation.md +++ b/docs/service-api-implementation.md @@ -8,8 +8,8 @@ Status: active implementation note for `KONT-WP-0009`. This note records the first optional FastAPI service adapter. The service layer is intentionally thin: it exposes operational probes, API version metadata, and -the first governed asset-resource surface while leaving domain behavior in the -application services. +the governed asset-resource, job, retrieval, transformation, and workflow +surface while leaving domain behavior in the application services. ## Implemented Package Shape @@ -31,6 +31,7 @@ src/kontextual_engine/ - `GET /api/v1/health` - `GET /api/v1/ready` - `GET /api/v1/version` +- `GET /api/v1/context` - `POST /api/v1/assets` - `GET /api/v1/assets` - `GET /api/v1/assets/{asset_id}` @@ -41,6 +42,42 @@ src/kontextual_engine/ - `GET /api/v1/relationships` - `GET /api/v1/audit/events` - `POST /api/v1/policy/evaluate` +- `GET /api/v1/ingestion/capabilities` +- `POST /api/v1/ingestion/jobs` +- `GET /api/v1/ingestion/jobs` +- `GET /api/v1/ingestion/jobs/{job_id}` +- `POST /api/v1/retrieval/index/refresh` +- `POST /api/v1/retrieval/assets` +- `POST /api/v1/retrieval/context-entities` +- `POST /api/v1/retrieval/relationships` +- `POST /api/v1/retrieval/feedback` +- `GET /api/v1/retrieval/feedback` +- `GET /api/v1/retrieval/quality` +- `GET /api/v1/transformations/operations` +- `POST /api/v1/transformations/runs` +- `GET /api/v1/transformations/runs` +- `GET /api/v1/transformations/runs/{run_id}` +- `POST /api/v1/transformations/runs/{run_id}/retry` +- `POST /api/v1/transformations/runs/{run_id}/cancel` +- `POST /api/v1/workflows/templates` +- `GET /api/v1/workflows/templates` +- `GET /api/v1/workflows/templates/{template_id}` +- `POST /api/v1/workflows/runs` +- `POST /api/v1/workflows/runs/queue` +- `GET /api/v1/workflows/runs` +- `GET /api/v1/workflows/runs/{run_id}` +- `POST /api/v1/workflows/runs/{run_id}/resume` +- `POST /api/v1/workflows/runs/{run_id}/retry` +- `POST /api/v1/workflows/runs/{run_id}/cancel` +- `GET /api/v1/workflows/runs/{run_id}/reconstruction` +- `GET /api/v1/workflows/reviews` +- `GET /api/v1/workflows/exceptions` +- `POST /api/v1/workflows/runs/{run_id}/reviews/{review_id}/decision` +- `GET /api/v1/agents/operations` +- `GET /api/v1/agents/operations/{operation_id}` +- `POST /api/v1/agents/operations/{operation_id}` +- `GET /api/v1/context-packages/schema` +- `POST /api/v1/context-packages` - `GET /openapi.json` Unversioned endpoints are operational probes. Versioned endpoints establish @@ -59,18 +96,63 @@ the `/api/v1` namespace for future domain resources. - readiness checks, - version payload, - asset service construction, -- basic actor/correlation context construction, +- actor/correlation/delegation context construction, - asset, metadata, lifecycle, relationship, audit, and policy operation - translation. + translation, +- ingestion job start/list/get translation, +- governed retrieval query translation for assets, context entities, and + relationships, +- transformation operation/run/retry/cancel translation, +- workflow template/run/review/exception/reconstruction translation, +- bounded agent operation catalog and dispatch translation, +- governed context-package assembly translation. Readiness currently checks that the configured asset registry repository can list assets. It does not mutate state. -Asset, metadata, relationship, lifecycle, audit, and policy operations delegate -to `AssetRegistryService`, the configured repository, and the configured -`PolicyGateway`. Protected mutations therefore keep the existing policy and -audit semantics. The current header-to-actor translation is deliberately simple -and will be expanded in `KONT-WP-0009-T004`. +Asset, metadata, relationship, lifecycle, audit, policy, ingestion, retrieval, +transformation, and workflow operations delegate to existing application +services, the configured repository, and the configured `PolicyGateway`. +Protected mutations and retrieval operations therefore keep the existing policy +and audit semantics. + +`KONT-WP-0009-T004` expanded request context parsing. The FastAPI adapter now +accepts explicit actor, delegated actor, group, agent, request-scope, and +policy-scope headers, and exposes the resulting operation context at +`GET /api/v1/context`. Authorization errors are redacted at the HTTP boundary +so denied responses keep action, resource, correlation, and public policy +decision fields while omitting protected resource metadata from policy context. + +Job and run responses include correlation IDs, state, output references, +failures or diagnostics, and compact retry/cancel hints where the runtime can +derive them. Retrieval responses remain permission-filtered and source-grounded +through source references, representations, snippets, relationships, and +retrieval audit events. Transformation and workflow responses expose lineage, +audit events, run reconstruction, review tasks, and exception queues from the +underlying core services. + +`KONT-WP-0009-T005` added a bounded agent operation catalog. Agents can list +documented operations and execute only those operation IDs. Each catalog entry +declares required permissions, input/output shape notes, audit operation, +failure modes, and dry-run support. Execution first authorizes and audits +`agent.operation.*`, then dispatches through existing runtime operations such +as retrieval, metadata enrichment, transformation, workflow invocation, review +submission, or result reporting. + +`KONT-WP-0009-T006` added a governed context-package assembly API. Packages are +assembled from permission-filtered retrieval results rather than unrestricted +repository reads. The payload carries selected assets, snippets, metadata, +relationships, representations, source references, policy constraints, opaque +external memory refs, and an audit event. The `markitect` format adds a +Markitect-compatible payload envelope while keeping markdown rendering, +selector semantics, and contract checks delegated to `markitect-tool`. + +`KONT-WP-0009-T007` added explicit review/dry-run response envelopes for agent +operation policy decisions. `require_review` decisions now return +`review_required` envelopes with review obligations and `review_required` audit +outcomes. `dry_run_only` decisions return `dry_run_required` envelopes unless +the request is already a dry run. Partial-failure contracts are covered through +directory ingestion with mixed supported and unsupported inputs. ## Dependency Boundary @@ -96,6 +178,18 @@ missing-dependency behavior are tested without FastAPI. - runtime relationship create/list operations, - runtime audit query and policy evaluation, - runtime policy denial blocking a protected operation, +- runtime actor context with delegated actors and AI-agent identity, +- redacted API authorization error payloads, +- runtime ingestion jobs with completed and failed job envelopes, +- runtime source-grounded retrieval with snippets, +- runtime transformation runs with lineage and audit references, +- runtime workflow template/run/reconstruction contracts, +- runtime bounded agent operation catalog, dry-run behavior, dispatch, and + separate agent audit events, +- runtime context-package assembly with source-grounded results, opaque memory + references, and Markitect-compatible payload shape, +- runtime review-required and dry-run-only agent operation envelopes, +- runtime partial-failure ingestion job envelopes, - `create_app()` missing-dependency behavior when the optional extra is absent, - health/readiness/version/OpenAPI endpoint contracts when FastAPI and HTTPX are installed, @@ -104,9 +198,4 @@ missing-dependency behavior are tested without FastAPI. ## Not Yet Implemented -- Ingestion, retrieval, transformation, workflow, review, and reconstruction - endpoints. -- Request actor context and delegation middleware. -- Bounded agent operation catalog. -- Context package API. -- Dry-run and review-gate response envelopes. +No MVP service API tasks remain open in `KONT-WP-0009`. diff --git a/src/kontextual_engine/api/app.py b/src/kontextual_engine/api/app.py index ccc3505..afa16e5 100644 --- a/src/kontextual_engine/api/app.py +++ b/src/kontextual_engine/api/app.py @@ -6,6 +6,7 @@ requests into service/runtime contracts and must not own domain behavior. from __future__ import annotations +import json from dataclasses import dataclass, field from importlib import metadata from typing import Any @@ -14,26 +15,158 @@ from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository from kontextual_engine.core import ( Actor, ActorType, + AuditEvent, + AuditOutcome, Classification, ContextEntity, ContextEntityType, + IngestionIdentityPolicy, + IngestionJobStatus, LifecycleState, MetadataRecord, OperationContext, PolicyDecision, + PolicyEffect, RelationshipTargetKind, + RetrievalFeedbackLabel, SourceReference, + TransformationRunStatus, + WorkflowExceptionKind, + WorkflowExceptionStatus, + WorkflowInputDefinition, + WorkflowReviewDecisionType, + WorkflowReviewStatus, + WorkflowRunStatus, + WorkflowStepDefinition, + WorkflowTemplate, + new_id, utc_now, ) from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError from kontextual_engine.ports import AllowAllPolicyGateway, AssetRegistryRepository, PolicyGateway -from kontextual_engine.services import AssetRegistryService +from kontextual_engine.services import ( + AssetIngestionService, + AssetQueryRequest, + AssetRegistryService, + AssetRetrievalService, + ContextEntityQueryRequest, + RelationshipQueryRequest, + RetrievalFeedbackRequest, + TransformationRequest, + TransformationService, + WorkflowInvocation, + WorkflowService, +) API_VERSION = "v1" OPENAPI_VERSION = "1.0.0" +AGENT_OPERATION_CATALOG: tuple[dict[str, Any], ...] = ( + { + "operation_id": "inspect_asset", + "description": "Read one asset envelope by ID.", + "input_schema": {"required": ["asset_id"]}, + "output_schema": {"type": "asset"}, + "required_permissions": ["agent.operation.inspect_asset", "asset.retrieve"], + "audit_operation": "agent.operation.inspect_asset", + "failure_modes": ["not_found", "permission_denied"], + "dry_run_supported": True, + }, + { + "operation_id": "retrieve_asset", + "description": "Read one source-grounded asset bundle with metadata, representations, and relationships.", + "input_schema": {"required": ["asset_id"]}, + "output_schema": {"type": "asset_bundle"}, + "required_permissions": ["agent.operation.retrieve_asset", "asset.retrieve"], + "audit_operation": "agent.operation.retrieve_asset", + "failure_modes": ["not_found", "permission_denied"], + "dry_run_supported": True, + }, + { + "operation_id": "search_assets", + "description": "Run a governed retrieval query over assets.", + "input_schema": {"required": ["query"]}, + "output_schema": {"type": "asset_query_result"}, + "required_permissions": ["agent.operation.search_assets", "retrieval.assets.query"], + "audit_operation": "agent.operation.search_assets", + "failure_modes": ["permission_denied", "validation_error", "zero_results"], + "dry_run_supported": True, + }, + { + "operation_id": "assemble_context", + "description": "Assemble a non-durable source-grounded context preview from a bounded asset query.", + "input_schema": {"required": ["query"], "optional": ["intent", "instructions", "constraints"]}, + "output_schema": {"type": "context_preview"}, + "required_permissions": ["agent.operation.assemble_context", "retrieval.assets.query"], + "audit_operation": "agent.operation.assemble_context", + "failure_modes": ["permission_denied", "validation_error", "zero_results"], + "dry_run_supported": True, + }, + { + "operation_id": "enrich_metadata", + "description": "Add one metadata record to an asset.", + "input_schema": {"required": ["asset_id", "metadata"]}, + "output_schema": {"type": "asset_change"}, + "required_permissions": ["agent.operation.enrich_metadata", "asset.metadata.add"], + "audit_operation": "agent.operation.enrich_metadata", + "failure_modes": ["not_found", "permission_denied", "validation_error", "version_conflict"], + "dry_run_supported": True, + }, + { + "operation_id": "classify_asset", + "description": "Request the classify transformation operation for an asset.", + "input_schema": {"required": ["asset_id"]}, + "output_schema": {"type": "transformation_run_result"}, + "required_permissions": ["agent.operation.classify_asset", "transformation.run.execute"], + "audit_operation": "agent.operation.classify_asset", + "failure_modes": ["permission_denied", "adapter_unavailable", "operation_failed"], + "dry_run_supported": True, + }, + { + "operation_id": "transform_asset", + "description": "Execute a registered transformation operation.", + "input_schema": {"required": ["transformation"]}, + "output_schema": {"type": "transformation_run_result"}, + "required_permissions": ["agent.operation.transform_asset", "transformation.run.execute"], + "audit_operation": "agent.operation.transform_asset", + "failure_modes": ["permission_denied", "adapter_unavailable", "operation_failed"], + "dry_run_supported": True, + }, + { + "operation_id": "invoke_workflow", + "description": "Invoke a registered workflow template.", + "input_schema": {"required": ["workflow"]}, + "output_schema": {"type": "workflow_run_result"}, + "required_permissions": ["agent.operation.invoke_workflow", "workflow.run.execute"], + "audit_operation": "agent.operation.invoke_workflow", + "failure_modes": ["not_found", "permission_denied", "operation_failed", "review_required"], + "dry_run_supported": True, + }, + { + "operation_id": "submit_review", + "description": "Submit a decision for one open workflow review task.", + "input_schema": {"required": ["run_id", "review_id", "decision"]}, + "output_schema": {"type": "workflow_run_result"}, + "required_permissions": ["agent.operation.submit_review", "workflow.review.decide"], + "audit_operation": "agent.operation.submit_review", + "failure_modes": ["not_found", "permission_denied", "review_not_open", "operation_failed"], + "dry_run_supported": True, + }, + { + "operation_id": "report_result", + "description": "Record an agent result report without mutating domain assets.", + "input_schema": {"required": ["summary"], "optional": ["result_ref", "metadata"]}, + "output_schema": {"type": "agent_report"}, + "required_permissions": ["agent.operation.report_result"], + "audit_operation": "agent.operation.report_result", + "failure_modes": ["permission_denied", "validation_error"], + "dry_run_supported": True, + }, +) + + @dataclass class ServiceRuntime: repository: AssetRegistryRepository = field(default_factory=InMemoryAssetRegistryRepository) @@ -45,24 +178,91 @@ class ServiceRuntime: def asset_service(self) -> AssetRegistryService: return AssetRegistryService(self.repository, policy_gateway=self.policy_gateway) + def ingestion_service(self) -> AssetIngestionService: + return AssetIngestionService(self.repository, asset_service=self.asset_service()) + + def retrieval_service(self) -> AssetRetrievalService: + return AssetRetrievalService(self.repository, policy_gateway=self.policy_gateway) + + def transformation_service(self) -> TransformationService: + return TransformationService( + self.repository, + policy_gateway=self.policy_gateway, + asset_service=self.asset_service(), + ) + + def workflow_service(self) -> WorkflowService: + return WorkflowService( + self.repository, + transformation_service=self.transformation_service(), + policy_gateway=self.policy_gateway, + ) + def operation_context( self, *, actor_id: str = "api-user", actor_type: str = "human", display_name: str | None = None, + external_ref: str | None = None, correlation_id: str | None = None, groups: list[str] | None = None, + delegated_actor_id: str | None = None, + delegated_actor_type: str = "human", + delegated_actor_display_name: str | None = None, + delegated_actor_external_ref: str | None = None, + delegated_actor_groups: list[str] | None = None, + request_scope: dict[str, Any] | None = None, + policy_scope: dict[str, Any] | None = None, + agent_id: str | None = None, + agent_name: str | None = None, + agent_run_id: str | None = None, + agent_tool: str | None = None, metadata: dict[str, Any] | None = None, ) -> OperationContext: + actor_metadata = dict(metadata or {}) + agent_metadata = _agent_metadata( + agent_id=agent_id, + agent_name=agent_name, + agent_run_id=agent_run_id, + agent_tool=agent_tool, + ) + if agent_metadata: + actor_metadata["agent"] = agent_metadata actor = Actor.create( ActorType(actor_type), actor_id=actor_id, display_name=display_name, + external_ref=external_ref, groups=groups, - metadata=metadata, + metadata=actor_metadata, + ) + delegated_actor = None + if delegated_actor_id: + delegated_actor = Actor.create( + ActorType(delegated_actor_type), + actor_id=delegated_actor_id, + display_name=delegated_actor_display_name, + external_ref=delegated_actor_external_ref, + groups=delegated_actor_groups, + ) + context_metadata: dict[str, Any] = {} + if agent_metadata: + context_metadata["agent"] = agent_metadata + if delegated_actor is not None: + context_metadata["delegation"] = { + "mode": "on_behalf_of", + "actor_id": actor.id, + "delegated_actor_id": delegated_actor.id, + } + return OperationContext.create( + actor, + correlation_id=correlation_id, + delegated_actor=delegated_actor, + request_scope=request_scope, + policy_scope=policy_scope, + metadata=context_metadata, ) - return OperationContext.create(actor, correlation_id=correlation_id) @property def package_version(self) -> str: @@ -258,10 +458,595 @@ class ServiceRuntime: ) return decision.to_dict() + def ingestion_capabilities(self) -> dict[str, Any]: + service = self.ingestion_service() + return { + "connectors": service.connector_capabilities(), + "extractors": service.extractor_capabilities(), + } + + def start_ingestion_job(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + service = self.ingestion_service() + mode = payload.get("mode", "file") + classification = _optional_classification(payload.get("classification")) + identity_policy = payload.get("identity_policy", IngestionIdentityPolicy.SOURCE_LOCATION.value) + if mode == "directory": + job = service.ingest_directory( + payload["path"], + context, + recursive=bool(payload.get("recursive", True)), + classification=classification, + identity_policy=identity_policy, + skip_unchanged=bool(payload.get("skip_unchanged", True)), + ) + return _ingestion_job_envelope(job) + if mode != "file": + raise ValidationError( + "Unsupported ingestion mode", + details={"mode": mode, "supported": ["file", "directory"]}, + ) + result = service.ingest_file( + payload["path"], + context, + asset_id=payload.get("asset_id"), + title=payload.get("title"), + classification=classification, + idempotency_key=payload.get("idempotency_key"), + identity_policy=identity_policy, + skip_unchanged=bool(payload.get("skip_unchanged", True)), + ) + return _ingestion_result_envelope(result) + + def get_ingestion_job(self, job_id: str) -> dict[str, Any]: + return _ingestion_job_envelope(self.ingestion_service().get_job(job_id)) + + def list_ingestion_jobs(self, *, status: str | None = None) -> dict[str, Any]: + parsed_status = _enum_filter(IngestionJobStatus, status, "ingestion job status") + jobs = self.ingestion_service().list_jobs(status=parsed_status) + return {"items": [_ingestion_job_envelope(job) for job in jobs], "count": len(jobs)} + + def refresh_retrieval_index(self) -> dict[str, Any]: + return self.retrieval_service().refresh_index().to_dict() + + def query_assets(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + service = self.retrieval_service() + service.refresh_index() + return service.query_assets(_asset_query_request(payload), context).to_dict() + + def query_context_entities(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return self.retrieval_service().query_context_entities( + _context_entity_query_request(payload), + context, + ).to_dict() + + def query_relationships(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return self.retrieval_service().query_relationships( + _relationship_query_request(payload), + context, + ).to_dict() + + def record_retrieval_feedback(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return self.retrieval_service().record_feedback( + RetrievalFeedbackRequest( + label=payload["label"], + query=dict(payload.get("query", {})), + result_ref=dict(payload.get("result_ref", {})), + notes=payload.get("notes"), + metadata=dict(payload.get("metadata", {})), + ), + context, + ).to_dict() + + def list_retrieval_feedback( + self, + *, + correlation_id: str | None = None, + label: str | None = None, + ) -> dict[str, Any]: + parsed_label = _enum_filter(RetrievalFeedbackLabel, label, "retrieval feedback label") + records = self.retrieval_service().list_feedback(correlation_id=correlation_id, label=parsed_label) + return {"items": [record.to_dict() for record in records], "count": len(records)} + + def retrieval_quality_metrics(self) -> dict[str, Any]: + return self.retrieval_service().quality_metrics().to_dict() + + def list_transformation_operations(self) -> dict[str, Any]: + operations = self.transformation_service().list_operations() + return {"items": [operation.to_dict() for operation in operations], "count": len(operations)} + + def execute_transformation(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + result = self.transformation_service().execute_transformation( + _transformation_request(payload), + context, + ) + return _transformation_result_envelope(result) + + def get_transformation_run(self, run_id: str) -> dict[str, Any]: + return _transformation_run_envelope(self.transformation_service().get_run(run_id)) + + def list_transformation_runs( + self, + *, + status: str | None = None, + operation_id: str | None = None, + ) -> dict[str, Any]: + parsed_status = _enum_filter(TransformationRunStatus, status, "transformation run status") + runs = self.transformation_service().list_runs(status=parsed_status, operation_id=operation_id) + return {"items": [_transformation_run_envelope(run) for run in runs], "count": len(runs)} + + def retry_transformation_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: + return _transformation_result_envelope(self.transformation_service().retry_run(run_id, context)) + + def cancel_transformation_run( + self, + run_id: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + run = self.transformation_service().cancel_run(run_id, context, reason=payload.get("reason")) + return _transformation_run_envelope(run) + + def register_workflow_template(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return self.workflow_service().register_template(_workflow_template(payload), context).to_dict() + + def get_workflow_template(self, template_id: str, *, version: str | None = None) -> dict[str, Any]: + return self.workflow_service().get_template(template_id, version=version).to_dict() + + def list_workflow_templates(self, *, template_id: str | None = None) -> dict[str, Any]: + templates = self.workflow_service().list_templates(template_id=template_id) + return {"items": [template.to_dict() for template in templates], "count": len(templates)} + + def queue_workflow_run(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return _workflow_result_envelope( + self.workflow_service().queue_template(_workflow_invocation(payload), context) + ) + + def invoke_workflow_run(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + return _workflow_result_envelope( + self.workflow_service().invoke_template(_workflow_invocation(payload), context) + ) + + def get_workflow_run(self, run_id: str) -> dict[str, Any]: + return _workflow_run_envelope(self.repository.get_workflow_run(run_id)) + + def list_workflow_runs( + self, + *, + status: str | None = None, + template_id: str | None = None, + ) -> dict[str, Any]: + parsed_status = _enum_filter(WorkflowRunStatus, status, "workflow run status") + runs = self.repository.list_workflow_runs(status=parsed_status, template_id=template_id) + return {"items": [_workflow_run_envelope(run) for run in runs], "count": len(runs)} + + def resume_workflow_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: + return _workflow_result_envelope(self.workflow_service().resume_run(run_id, context)) + + def retry_workflow_run(self, run_id: str, context: OperationContext) -> dict[str, Any]: + return _workflow_result_envelope(self.workflow_service().retry_run(run_id, context)) + + def cancel_workflow_run( + self, + run_id: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + run = self.workflow_service().cancel_run(run_id, context, reason=payload.get("reason")) + return _workflow_run_envelope(run) + + def reconstruct_workflow_run(self, run_id: str) -> dict[str, Any]: + return self.workflow_service().reconstruct_run(run_id).to_dict() + + def list_workflow_review_tasks( + self, + *, + status: str | None = WorkflowReviewStatus.OPEN.value, + workflow_run_id: str | None = None, + ) -> dict[str, Any]: + parsed_status = _enum_filter(WorkflowReviewStatus, status, "workflow review status") + reviews = self.workflow_service().list_review_tasks( + status=parsed_status, + workflow_run_id=workflow_run_id, + ) + return {"items": [review.to_dict() for review in reviews], "count": len(reviews)} + + def list_workflow_exceptions( + self, + *, + status: str | None = WorkflowExceptionStatus.OPEN.value, + kind: str | None = None, + workflow_run_id: str | None = None, + ) -> dict[str, Any]: + parsed_status = _enum_filter(WorkflowExceptionStatus, status, "workflow exception status") + parsed_kind = _enum_filter(WorkflowExceptionKind, kind, "workflow exception kind") + exceptions = self.workflow_service().list_exception_queue( + status=parsed_status, + kind=parsed_kind, + workflow_run_id=workflow_run_id, + ) + return {"items": [exception.to_dict() for exception in exceptions], "count": len(exceptions)} + + def record_workflow_review_decision( + self, + run_id: str, + review_id: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + decision = payload.get("decision", WorkflowReviewDecisionType.CONTINUE.value) + return _workflow_result_envelope( + self.workflow_service().record_review_decision( + run_id, + review_id, + decision, + context, + note=payload.get("note", ""), + correction=dict(payload.get("correction", {})), + ) + ) + + def list_agent_operations(self) -> dict[str, Any]: + return {"items": [dict(item) for item in AGENT_OPERATION_CATALOG], "count": len(AGENT_OPERATION_CATALOG)} + + def get_agent_operation(self, operation_id: str) -> dict[str, Any]: + return dict(_agent_operation(operation_id)) + + def execute_agent_operation( + self, + operation_id: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + operation = _agent_operation(operation_id) + dry_run = bool(payload.get("dry_run", False)) + operation_payload = dict(payload.get("payload", payload)) + operation_payload.pop("dry_run", None) + decision = self._authorize_agent_operation(operation, operation_payload, context) + effect = decision.effect + if effect == PolicyEffect.REQUIRE_REVIEW: + event = self._audit_agent_operation( + operation, + AuditOutcome.REVIEW_REQUIRED, + context, + decision, + details={"phase": "review_required", "payload_keys": sorted(operation_payload)}, + ) + return _agent_review_required_envelope(operation, decision, event, context) + if effect == PolicyEffect.DRY_RUN_ONLY and not dry_run: + event = self._audit_agent_operation( + operation, + AuditOutcome.DRY_RUN, + context, + decision, + details={"phase": "dry_run_required", "payload_keys": sorted(operation_payload)}, + ) + return _agent_dry_run_required_envelope(operation, decision, event, context) + queued_event = self._audit_agent_operation( + operation, + AuditOutcome.DRY_RUN if dry_run else AuditOutcome.SUCCESS, + context, + decision, + details={ + "phase": "accepted", + "dry_run": dry_run, + "payload_keys": sorted(operation_payload), + }, + ) + if dry_run: + return { + "operation_id": operation_id, + "dry_run": True, + "success": True, + "would_execute": operation, + "correlation_id": context.correlation_id, + "policy_decision": decision.to_dict(), + "audit_event": queued_event.to_dict(), + } + try: + result = self._dispatch_agent_operation(operation_id, operation_payload, context) + except Exception as exc: + failed_event = self._audit_agent_operation( + operation, + AuditOutcome.FAILED, + context, + decision, + details={ + "phase": "failed", + "error_type": type(exc).__name__, + "payload_keys": sorted(operation_payload), + }, + ) + if isinstance(exc, KontextualError): + exc.details.setdefault("agent_audit_event_id", failed_event.event_id) + raise + completed_event = self._audit_agent_operation( + operation, + AuditOutcome.SUCCESS, + context, + decision, + details={ + "phase": "completed", + "result_keys": sorted(result) if isinstance(result, dict) else [], + }, + ) + return { + "operation_id": operation_id, + "dry_run": False, + "success": True, + "correlation_id": context.correlation_id, + "result": result, + "policy_decision": decision.to_dict(), + "audit_event": completed_event.to_dict(), + } + + def _dispatch_agent_operation( + self, + operation_id: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + if operation_id == "inspect_asset": + return self.get_asset(payload["asset_id"]) + if operation_id == "retrieve_asset": + return self._asset_bundle(payload["asset_id"]) + if operation_id == "search_assets": + return self.query_assets(dict(payload.get("query", {})), context) + if operation_id == "assemble_context": + query_result = self.query_assets(dict(payload.get("query", {})), context) + return { + "context_preview": { + "intent": payload.get("intent", "Support a bounded agent task with source-grounded context."), + "instructions": payload.get("instructions", ""), + "constraints": dict(payload.get("constraints", {})), + "correlation_id": query_result["correlation_id"], + "source_grounded": True, + "items": query_result["results"], + "result_count": query_result["result_count"], + "total": query_result["total"], + } + } + if operation_id == "enrich_metadata": + return self.add_metadata_record(payload["asset_id"], dict(payload["metadata"]), context) + if operation_id == "classify_asset": + request = { + "operation_id": "classify", + "source_asset_ids": [payload["asset_id"]], + "parameters": dict(payload.get("parameters", {})), + "metadata": dict(payload.get("metadata", {})), + } + return self.execute_transformation(request, context) + if operation_id == "transform_asset": + return self.execute_transformation(dict(payload["transformation"]), context) + if operation_id == "invoke_workflow": + return self.invoke_workflow_run(dict(payload["workflow"]), context) + if operation_id == "submit_review": + return self.record_workflow_review_decision( + payload["run_id"], + payload["review_id"], + payload, + context, + ) + if operation_id == "report_result": + return self._agent_report(payload, context) + raise ValidationError("Unsupported agent operation", details={"operation_id": operation_id}) + + def _authorize_agent_operation( + self, + operation: dict[str, Any], + payload: dict[str, Any], + context: OperationContext, + ) -> PolicyDecision: + action = f"agent.operation.{operation['operation_id']}" + resource = f"agent_operation:{operation['operation_id']}" + try: + decision = self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata={ + "operation": operation, + "payload_keys": sorted(payload), + "dry_run_supported": operation["dry_run_supported"], + }, + ) + except Exception as exc: + decision = PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Agent operation policy gateway failed", + context={"gateway_error": type(exc).__name__}, + ) + if not decision.allowed and decision.effect not in ( + PolicyEffect.REQUIRE_REVIEW, + PolicyEffect.DRY_RUN_ONLY, + ): + event = self._audit_agent_operation(operation, AuditOutcome.DENIED, context, decision) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": action, + "resource": resource, + "correlation_id": context.correlation_id, + "agent_audit_event_id": event.event_id, + "policy_decision": decision.to_dict(), + }, + ) + return decision + + def _audit_agent_operation( + self, + operation: dict[str, Any], + outcome: AuditOutcome, + context: OperationContext, + policy_decision: PolicyDecision, + *, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + event = AuditEvent.from_context( + operation["audit_operation"], + f"agent_operation:{operation['operation_id']}", + outcome, + context, + policy_decision=policy_decision, + details=details, + ) + return self.repository.save_audit_event(event) + + def _asset_bundle(self, asset_id: str) -> dict[str, Any]: + asset = self.repository.get_asset(asset_id) + metadata_records = self.repository.list_metadata_records(asset_id) + representations = self.repository.list_representations(asset_id=asset_id) + relationships = self.repository.list_relationships(source_id=asset_id) + return { + "asset": asset.to_dict(), + "metadata_records": [record.to_dict() for record in metadata_records], + "representations": [representation.to_dict() for representation in representations], + "relationships": [relationship.to_dict() for relationship in relationships], + "source_grounded": bool(asset.source_refs or representations), + } + + def _agent_report(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + if not payload.get("summary"): + raise ValidationError("Agent result report requires a summary", details={"required": ["summary"]}) + decision = PolicyDecision.allow( + context.actor.id, + "agent.operation.report_result.record", + f"agent:{context.actor.id}", + context={"correlation_id": context.correlation_id}, + ) + event = AuditEvent.from_context( + "agent.report.recorded", + f"agent:{context.actor.id}", + AuditOutcome.SUCCESS, + context, + policy_decision=decision, + details={ + "summary": payload["summary"], + "result_ref": dict(payload.get("result_ref", {})), + "metadata": dict(payload.get("metadata", {})), + }, + ) + saved = self.repository.save_audit_event(event) + return { + "summary": payload["summary"], + "result_ref": dict(payload.get("result_ref", {})), + "metadata": dict(payload.get("metadata", {})), + "audit_event": saved.to_dict(), + } + + def context_package_schema(self) -> dict[str, Any]: + return { + "kind": "kontextual.context_package", + "version": "1", + "required": ["query"], + "optional": [ + "package_id", + "title", + "intent", + "instructions", + "constraints", + "external_memory_refs", + "metadata", + "format", + ], + "formats": ["kontextual", "markitect"], + "source_grounding": ["source_refs", "representations", "snippets", "relationships", "metadata_records"], + "memory_boundary": "external_memory_refs are opaque pointers; memory graph contents are not embedded", + } + + def assemble_context_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + query = dict(payload.get("query", {})) + query.setdefault("include_snippets", True) + query.setdefault("include_relationships", True) + query.setdefault("max_snippets", 5) + constraints = dict(payload.get("constraints", {})) + decision = self._authorize_context_package(query, constraints, context) + query_result = self.query_assets(query, context) + package = _context_package_payload( + payload, + query_result, + context, + constraints=constraints, + ) + event = self._audit_context_package(package, context, decision) + package["audit_event"] = event.to_dict() + package["policy_decision"] = decision.to_dict() + return package + + def _authorize_context_package( + self, + query: dict[str, Any], + constraints: dict[str, Any], + context: OperationContext, + ) -> PolicyDecision: + try: + decision = self.policy_gateway.authorize( + context, + "context_package.assemble", + "context_package:new", + resource_metadata={ + "query": query, + "constraints": constraints, + "external_memory_refs": "opaque", + }, + ) + except Exception as exc: + decision = PolicyDecision.fail_closed( + context.actor.id, + "context_package.assemble", + "context_package:new", + reason=str(exc) or "Context package policy gateway failed", + context={"gateway_error": type(exc).__name__}, + ) + if not decision.allowed: + event = AuditEvent.from_context( + "context_package.assemble", + "context_package:new", + AuditOutcome.DENIED, + context, + policy_decision=decision, + details={"query": query, "constraints": constraints}, + ) + saved = self.repository.save_audit_event(event) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": "context_package.assemble", + "resource": "context_package:new", + "correlation_id": context.correlation_id, + "audit_event_id": saved.event_id, + "policy_decision": decision.to_dict(), + }, + ) + return decision + + def _audit_context_package( + self, + package: dict[str, Any], + context: OperationContext, + decision: PolicyDecision, + ) -> AuditEvent: + event = AuditEvent.from_context( + "context_package.assemble", + f"context_package:{package['package_id']}", + AuditOutcome.SUCCESS, + context, + policy_decision=decision, + details={ + "result_count": package["result_count"], + "source_grounded": package["source_grounded"], + "format": package["format"], + "external_memory_ref_count": len(package["external_memory_refs"]), + }, + ) + return self.repository.save_audit_event(event) + def create_app(runtime: ServiceRuntime | None = None): try: from fastapi import Depends, FastAPI, Header, HTTPException, Query + from fastapi.responses import JSONResponse except ImportError as exc: # pragma: no cover - exercised when optional extra is absent raise RuntimeError( "FastAPI service dependencies are not installed. Install kontextual-engine[service]." @@ -277,6 +1062,22 @@ def create_app(runtime: ServiceRuntime | None = None): ) app.state.kontextual_runtime = runtime + @app.exception_handler(NotFoundError) + async def not_found_error_handler(_request, exc: NotFoundError) -> JSONResponse: + return JSONResponse(status_code=404, content=_error_payload(exc)) + + @app.exception_handler(AuthorizationError) + async def authorization_error_handler(_request, exc: AuthorizationError) -> JSONResponse: + return JSONResponse(status_code=403, content=_authorization_error_payload(exc)) + + @app.exception_handler(ValidationError) + async def validation_error_handler(_request, exc: ValidationError) -> JSONResponse: + return JSONResponse(status_code=422, content=_error_payload(exc)) + + @app.exception_handler(KontextualError) + async def kontextual_error_handler(_request, exc: KontextualError) -> JSONResponse: + return JSONResponse(status_code=400, content=_error_payload(exc)) + @app.get("/health", tags=["system"]) def health() -> dict[str, Any]: return runtime.health() @@ -304,16 +1105,44 @@ def create_app(runtime: ServiceRuntime | None = None): return runtime.version() def context_from_headers( - x_actor_id: str = Header("api-user"), - x_actor_type: str = Header("human"), + x_actor_id: str | None = Header(None), + x_actor_type: str | None = Header(None), x_actor_display_name: str | None = Header(None), + x_actor_external_ref: str | None = Header(None), + x_actor_groups: str | None = Header(None), x_correlation_id: str | None = Header(None), + x_delegated_actor_id: str | None = Header(None), + x_delegated_actor_type: str | None = Header(None), + x_delegated_actor_display_name: str | None = Header(None), + x_delegated_actor_external_ref: str | None = Header(None), + x_delegated_actor_groups: str | None = Header(None), + x_agent_id: str | None = Header(None), + x_agent_name: str | None = Header(None), + x_agent_run_id: str | None = Header(None), + x_agent_tool: str | None = Header(None), + x_request_scope: str | None = Header(None), + x_policy_scope: str | None = Header(None), ) -> OperationContext: + actor_id = x_actor_id or x_agent_id or "api-user" + actor_type = x_actor_type or ("ai_agent" if x_agent_id else "human") return runtime.operation_context( - actor_id=x_actor_id, - actor_type=x_actor_type, + actor_id=actor_id, + actor_type=actor_type, display_name=x_actor_display_name, + external_ref=x_actor_external_ref, correlation_id=x_correlation_id, + groups=_split_header_list(x_actor_groups), + delegated_actor_id=x_delegated_actor_id, + delegated_actor_type=x_delegated_actor_type or "human", + delegated_actor_display_name=x_delegated_actor_display_name, + delegated_actor_external_ref=x_delegated_actor_external_ref, + delegated_actor_groups=_split_header_list(x_delegated_actor_groups), + request_scope=_json_header(x_request_scope, "X-Request-Scope"), + policy_scope=_json_header(x_policy_scope, "X-Policy-Scope"), + agent_id=x_agent_id, + agent_name=x_agent_name, + agent_run_id=x_agent_run_id, + agent_tool=x_agent_tool, ) def response(callable_obj, *args: Any, **kwargs: Any) -> Any: @@ -322,11 +1151,26 @@ def create_app(runtime: ServiceRuntime | None = None): except NotFoundError as exc: raise HTTPException(status_code=404, detail=_error_payload(exc)) from exc except AuthorizationError as exc: - raise HTTPException(status_code=403, detail=_error_payload(exc)) from exc + raise HTTPException(status_code=403, detail=_authorization_error_payload(exc)) from exc except ValidationError as exc: raise HTTPException(status_code=422, detail=_error_payload(exc)) from exc except KontextualError as exc: raise HTTPException(status_code=400, detail=_error_payload(exc)) from exc + except (KeyError, TypeError, ValueError) as exc: + raise HTTPException( + status_code=422, + detail={ + "code": "kontextual.validation", + "message": "Invalid request payload", + "details": {"error_type": type(exc).__name__, "message": str(exc)}, + }, + ) from exc + + @app.get(f"{prefix}/context", tags=["context"]) + def current_context( + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return context.to_dict() @app.post(f"{prefix}/assets", tags=["assets"]) def create_asset( @@ -406,9 +1250,598 @@ def create_app(runtime: ServiceRuntime | None = None): ) -> dict[str, Any]: return response(runtime.evaluate_policy, payload, context) + @app.get(f"{prefix}/ingestion/capabilities", tags=["ingestion"]) + def ingestion_capabilities() -> dict[str, Any]: + return response(runtime.ingestion_capabilities) + + @app.post(f"{prefix}/ingestion/jobs", tags=["ingestion"]) + def start_ingestion_job( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.start_ingestion_job, payload, context) + + @app.get(f"{prefix}/ingestion/jobs", tags=["ingestion"]) + def list_ingestion_jobs(status: str | None = Query(None)) -> dict[str, Any]: + return response(runtime.list_ingestion_jobs, status=status) + + @app.get(f"{prefix}/ingestion/jobs/{{job_id}}", tags=["ingestion"]) + def get_ingestion_job(job_id: str) -> dict[str, Any]: + return response(runtime.get_ingestion_job, job_id) + + @app.post(f"{prefix}/retrieval/index/refresh", tags=["retrieval"]) + def refresh_retrieval_index() -> dict[str, Any]: + return response(runtime.refresh_retrieval_index) + + @app.post(f"{prefix}/retrieval/assets", tags=["retrieval"]) + def query_assets( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.query_assets, payload, context) + + @app.post(f"{prefix}/retrieval/context-entities", tags=["retrieval"]) + def query_context_entities( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.query_context_entities, payload, context) + + @app.post(f"{prefix}/retrieval/relationships", tags=["retrieval"]) + def query_retrieval_relationships( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.query_relationships, payload, context) + + @app.post(f"{prefix}/retrieval/feedback", tags=["retrieval"]) + def record_retrieval_feedback( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.record_retrieval_feedback, payload, context) + + @app.get(f"{prefix}/retrieval/feedback", tags=["retrieval"]) + def list_retrieval_feedback( + correlation_id: str | None = Query(None), + label: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.list_retrieval_feedback, correlation_id=correlation_id, label=label) + + @app.get(f"{prefix}/retrieval/quality", tags=["retrieval"]) + def retrieval_quality_metrics() -> dict[str, Any]: + return response(runtime.retrieval_quality_metrics) + + @app.get(f"{prefix}/transformations/operations", tags=["transformations"]) + def list_transformation_operations() -> dict[str, Any]: + return response(runtime.list_transformation_operations) + + @app.post(f"{prefix}/transformations/runs", tags=["transformations"]) + def execute_transformation( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.execute_transformation, payload, context) + + @app.get(f"{prefix}/transformations/runs", tags=["transformations"]) + def list_transformation_runs( + status: str | None = Query(None), + operation_id: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.list_transformation_runs, status=status, operation_id=operation_id) + + @app.get(f"{prefix}/transformations/runs/{{run_id}}", tags=["transformations"]) + def get_transformation_run(run_id: str) -> dict[str, Any]: + return response(runtime.get_transformation_run, run_id) + + @app.post(f"{prefix}/transformations/runs/{{run_id}}/retry", tags=["transformations"]) + def retry_transformation_run( + run_id: str, + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.retry_transformation_run, run_id, context) + + @app.post(f"{prefix}/transformations/runs/{{run_id}}/cancel", tags=["transformations"]) + def cancel_transformation_run( + run_id: str, + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.cancel_transformation_run, run_id, payload, context) + + @app.post(f"{prefix}/workflows/templates", tags=["workflows"]) + def register_workflow_template( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.register_workflow_template, payload, context) + + @app.get(f"{prefix}/workflows/templates", tags=["workflows"]) + def list_workflow_templates(template_id: str | None = Query(None)) -> dict[str, Any]: + return response(runtime.list_workflow_templates, template_id=template_id) + + @app.get(f"{prefix}/workflows/templates/{{template_id}}", tags=["workflows"]) + def get_workflow_template( + template_id: str, + version: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.get_workflow_template, template_id, version=version) + + @app.post(f"{prefix}/workflows/runs", tags=["workflows"]) + def invoke_workflow_run( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.invoke_workflow_run, payload, context) + + @app.post(f"{prefix}/workflows/runs/queue", tags=["workflows"]) + def queue_workflow_run( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.queue_workflow_run, payload, context) + + @app.get(f"{prefix}/workflows/runs", tags=["workflows"]) + def list_workflow_runs( + status: str | None = Query(None), + template_id: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.list_workflow_runs, status=status, template_id=template_id) + + @app.get(f"{prefix}/workflows/runs/{{run_id}}", tags=["workflows"]) + def get_workflow_run(run_id: str) -> dict[str, Any]: + return response(runtime.get_workflow_run, run_id) + + @app.post(f"{prefix}/workflows/runs/{{run_id}}/resume", tags=["workflows"]) + def resume_workflow_run( + run_id: str, + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.resume_workflow_run, run_id, context) + + @app.post(f"{prefix}/workflows/runs/{{run_id}}/retry", tags=["workflows"]) + def retry_workflow_run( + run_id: str, + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.retry_workflow_run, run_id, context) + + @app.post(f"{prefix}/workflows/runs/{{run_id}}/cancel", tags=["workflows"]) + def cancel_workflow_run( + run_id: str, + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.cancel_workflow_run, run_id, payload, context) + + @app.get(f"{prefix}/workflows/runs/{{run_id}}/reconstruction", tags=["workflows"]) + def reconstruct_workflow_run(run_id: str) -> dict[str, Any]: + return response(runtime.reconstruct_workflow_run, run_id) + + @app.get(f"{prefix}/workflows/reviews", tags=["workflows"]) + def list_workflow_review_tasks( + status: str | None = Query(WorkflowReviewStatus.OPEN.value), + workflow_run_id: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.list_workflow_review_tasks, status=status, workflow_run_id=workflow_run_id) + + @app.get(f"{prefix}/workflows/exceptions", tags=["workflows"]) + def list_workflow_exceptions( + status: str | None = Query(WorkflowExceptionStatus.OPEN.value), + kind: str | None = Query(None), + workflow_run_id: str | None = Query(None), + ) -> dict[str, Any]: + return response( + runtime.list_workflow_exceptions, + status=status, + kind=kind, + workflow_run_id=workflow_run_id, + ) + + @app.post(f"{prefix}/workflows/runs/{{run_id}}/reviews/{{review_id}}/decision", tags=["workflows"]) + def record_workflow_review_decision( + run_id: str, + review_id: str, + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.record_workflow_review_decision, run_id, review_id, payload, context) + + @app.get(f"{prefix}/agents/operations", tags=["agents"]) + def list_agent_operations() -> dict[str, Any]: + return response(runtime.list_agent_operations) + + @app.get(f"{prefix}/agents/operations/{{operation_id}}", tags=["agents"]) + def get_agent_operation(operation_id: str) -> dict[str, Any]: + return response(runtime.get_agent_operation, operation_id) + + @app.post(f"{prefix}/agents/operations/{{operation_id}}", tags=["agents"]) + def execute_agent_operation( + operation_id: str, + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.execute_agent_operation, operation_id, payload, context) + + @app.get(f"{prefix}/context-packages/schema", tags=["context-packages"]) + def context_package_schema() -> dict[str, Any]: + return response(runtime.context_package_schema) + + @app.post(f"{prefix}/context-packages", tags=["context-packages"]) + def assemble_context_package( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.assemble_context_package, payload, context) + return app +def _agent_metadata( + *, + agent_id: str | None, + agent_name: str | None, + agent_run_id: str | None, + agent_tool: str | None, +) -> dict[str, Any]: + return { + key: value + for key, value in { + "agent_id": agent_id, + "agent_name": agent_name, + "agent_run_id": agent_run_id, + "agent_tool": agent_tool, + }.items() + if value + } + + +def _agent_operation(operation_id: str) -> dict[str, Any]: + for operation in AGENT_OPERATION_CATALOG: + if operation["operation_id"] == operation_id: + return operation + raise ValidationError( + "Unsupported agent operation", + details={ + "operation_id": operation_id, + "supported": [operation["operation_id"] for operation in AGENT_OPERATION_CATALOG], + }, + ) + + +def _agent_review_required_envelope( + operation: dict[str, Any], + decision: PolicyDecision, + event: AuditEvent, + context: OperationContext, +) -> dict[str, Any]: + return { + "operation_id": operation["operation_id"], + "dry_run": False, + "success": False, + "review_required": True, + "correlation_id": context.correlation_id, + "review": { + "reason": decision.reason, + "obligations": dict(decision.obligations), + "required_permissions": list(operation["required_permissions"]), + }, + "policy_decision": decision.to_dict(), + "audit_event": event.to_dict(), + } + + +def _agent_dry_run_required_envelope( + operation: dict[str, Any], + decision: PolicyDecision, + event: AuditEvent, + context: OperationContext, +) -> dict[str, Any]: + return { + "operation_id": operation["operation_id"], + "dry_run": False, + "success": False, + "dry_run_required": True, + "correlation_id": context.correlation_id, + "reason": decision.reason, + "policy_decision": decision.to_dict(), + "audit_event": event.to_dict(), + } + + +def _context_package_payload( + request_payload: dict[str, Any], + query_result: dict[str, Any], + context: OperationContext, + *, + constraints: dict[str, Any], +) -> dict[str, Any]: + package_format = request_payload.get("format", "kontextual") + if package_format not in {"kontextual", "markitect"}: + raise ValidationError( + "Unsupported context package format", + details={"format": package_format, "supported": ["kontextual", "markitect"]}, + ) + items = [_context_package_item(item) for item in query_result.get("results", ())] + package = { + "kind": "kontextual.context_package", + "version": "1", + "package_id": request_payload.get("package_id") or new_id("ctxpkg"), + "title": request_payload.get("title", "Kontextual Context Package"), + "intent": request_payload.get("intent", "Provide bounded, source-grounded context."), + "instructions": request_payload.get("instructions", ""), + "format": package_format, + "correlation_id": context.correlation_id, + "query": query_result.get("query", {}), + "result_count": query_result.get("result_count", len(items)), + "total": query_result.get("total", len(items)), + "source_grounded": bool(items) and all(item["source_refs"] or item["snippets"] for item in items), + "policy_constraints": constraints, + "external_memory_refs": [ + _opaque_memory_ref(item) for item in request_payload.get("external_memory_refs", ()) + ], + "items": items, + "metadata": dict(request_payload.get("metadata", {})), + } + if package_format == "markitect": + package["markitect_payload"] = _markitect_context_payload(package) + return package + + +def _context_package_item(result: dict[str, Any]) -> dict[str, Any]: + return { + "asset_id": result["asset_id"], + "title": result.get("title"), + "classification": dict(result.get("classification", {})), + "lifecycle": result.get("lifecycle"), + "source_refs": list(result.get("source_refs", ())), + "snippets": list(result.get("snippets", ())), + "metadata_records": list(result.get("metadata_records", ())), + "relationships": list(result.get("relationships", ())), + "representations": [ + { + "representation_id": item.get("representation_id"), + "kind": item.get("kind"), + "media_type": item.get("media_type"), + "source_ref_id": item.get("source_ref_id"), + "storage_ref": item.get("storage_ref"), + "producer": item.get("producer"), + "metadata": { + key: value + for key, value in dict(item.get("metadata", {})).items() + if key in {"extractor", "producer", "normalized_hash", "search_text_length"} + }, + } + for item in result.get("representations", ()) + ], + "relevance": dict(result.get("relevance", {})), + } + + +def _opaque_memory_ref(data: dict[str, Any]) -> dict[str, Any]: + ref_id = data.get("ref_id") or data.get("id") or data.get("uri") + if not ref_id: + raise ValidationError( + "External memory reference requires ref_id, id, or uri", + details={"required": ["ref_id"]}, + ) + return { + "ref_id": str(ref_id), + "system": data.get("system", "phase-memory"), + "kind": data.get("kind", "memory_ref"), + "opaque": True, + "metadata": dict(data.get("metadata", {})), + } + + +def _markitect_context_payload(package: dict[str, Any]) -> dict[str, Any]: + return { + "kind": "markitect.context_package", + "version": "1", + "id": package["package_id"], + "title": package["title"], + "intent": package["intent"], + "instructions": package["instructions"], + "policy_constraints": dict(package["policy_constraints"]), + "external_memory_refs": list(package["external_memory_refs"]), + "items": [ + { + "asset_id": item["asset_id"], + "title": item["title"], + "source_refs": list(item["source_refs"]), + "snippets": list(item["snippets"]), + "metadata_records": list(item["metadata_records"]), + "relationships": list(item["relationships"]), + } + for item in package["items"] + ], + "adapter_boundary": "markdown rendering and selector semantics are delegated to markitect-tool", + } + + +def _split_header_list(value: str | None) -> list[str] | None: + if value is None: + return None + return [item.strip() for item in value.split(",") if item.strip()] + + +def _json_header(value: str | None, header_name: str) -> dict[str, Any] | None: + if value is None or not value.strip(): + return None + try: + parsed = json.loads(value) + except json.JSONDecodeError as exc: + raise ValidationError( + "Header must contain a JSON object", + details={"header": header_name, "message": str(exc)}, + ) from exc + if not isinstance(parsed, dict): + raise ValidationError( + "Header must contain a JSON object", + details={"header": header_name, "actual_type": type(parsed).__name__}, + ) + return parsed + + +def _optional_classification(data: dict[str, Any] | None) -> Classification | None: + return Classification.from_dict(data) if data else None + + +def _enum_filter(enum_type: Any, value: Any, label: str) -> Any: + if value is None or isinstance(value, enum_type): + return value + try: + return enum_type(value) + except ValueError as exc: + raise ValidationError( + f"Unsupported {label}", + details={"value": value, "supported": [item.value for item in enum_type]}, + ) from exc + + +def _dataclass_request(request_type: Any, data: dict[str, Any]) -> Any: + try: + return request_type(**data) + except TypeError as exc: + raise ValidationError( + "Invalid request payload", + details={"request_type": request_type.__name__, "message": str(exc)}, + ) from exc + + +def _asset_query_request(data: dict[str, Any]) -> AssetQueryRequest: + payload = dict(data) + if "tags" in payload: + payload["tags"] = tuple(payload["tags"]) + return _dataclass_request(AssetQueryRequest, payload) + + +def _context_entity_query_request(data: dict[str, Any]) -> ContextEntityQueryRequest: + return _dataclass_request(ContextEntityQueryRequest, dict(data)) + + +def _relationship_query_request(data: dict[str, Any]) -> RelationshipQueryRequest: + return _dataclass_request(RelationshipQueryRequest, dict(data)) + + +def _transformation_request(data: dict[str, Any]) -> TransformationRequest: + payload = dict(data) + if "source_asset_ids" in payload: + payload["source_asset_ids"] = tuple(payload["source_asset_ids"]) + if "parameters" in payload: + payload["parameters"] = dict(payload["parameters"]) + if "metadata" in payload: + payload["metadata"] = dict(payload["metadata"]) + return _dataclass_request(TransformationRequest, payload) + + +def _workflow_template(data: dict[str, Any]) -> WorkflowTemplate: + if "created_at" in data and "updated_at" in data and "template_id" in data: + return WorkflowTemplate.from_dict(data) + kwargs: dict[str, Any] = { + "name": data["name"], + "version": data.get("version", "1"), + "description": data.get("description", ""), + "inputs": tuple(WorkflowInputDefinition.from_dict(item) for item in data.get("inputs", ())), + "steps": tuple(WorkflowStepDefinition.from_dict(item) for item in data.get("steps", ())), + "policy_checks": tuple(dict(item) for item in data.get("policy_checks", ())), + "failure_behavior": data.get("failure_behavior", "fail_workflow"), + "metadata": dict(data.get("metadata", {})), + } + for key in ("template_id", "created_by"): + if data.get(key) is not None: + kwargs[key] = data[key] + return WorkflowTemplate(**kwargs) + + +def _workflow_invocation(data: dict[str, Any]) -> WorkflowInvocation: + return WorkflowInvocation( + template_id=data["template_id"], + template_version=data.get("template_version"), + inputs=dict(data.get("inputs", {})), + metadata=dict(data.get("metadata", {})), + ) + + +def _ingestion_result_envelope(result: Any) -> dict[str, Any]: + payload = _ingestion_job_envelope(result.job) + payload["action"] = result.action + payload["asset"] = result.asset.to_dict() if result.asset else None + payload["asset_change"] = _asset_change_result(result.asset_change) if result.asset_change else None + return payload + + +def _ingestion_job_envelope(job: Any) -> dict[str, Any]: + data = job.to_dict() + retry_options = _ingestion_retry_options(job) + data["retry_options"] = retry_options + return { + "job_id": job.job_id, + "status": job.status.value, + "correlation_id": job.correlation_id, + "output_asset_ids": list(job.output_asset_ids), + "failures": [failure.to_dict() for failure in job.failures], + "retry_options": retry_options, + "job": data, + } + + +def _ingestion_retry_options(job: Any) -> dict[str, Any]: + if job.retry_options: + return dict(job.retry_options) + return { + "retryable": any(failure.retriable for failure in job.failures), + "retryable_failure_codes": [failure.code for failure in job.failures if failure.retriable], + } + + +def _transformation_result_envelope(result: Any) -> dict[str, Any]: + payload = result.to_dict() + if result.run is not None: + payload["run"] = _transformation_run_envelope(result.run) + payload["correlation_id"] = result.run.correlation_id + payload["retry_options"] = payload["run"]["retry_options"] + return payload + + +def _transformation_run_envelope(run: Any) -> dict[str, Any]: + data = run.to_dict() + retryable = run.status in (TransformationRunStatus.FAILED, TransformationRunStatus.CANCELED) + cancelable = run.status in (TransformationRunStatus.QUEUED, TransformationRunStatus.RUNNING) + data["retry_options"] = { + "retryable": retryable, + "retry_endpoint": f"/api/v1/transformations/runs/{run.run_id}/retry" if retryable else None, + "cancelable": cancelable, + "cancel_endpoint": f"/api/v1/transformations/runs/{run.run_id}/cancel" if cancelable else None, + } + return data + + +def _workflow_result_envelope(result: Any) -> dict[str, Any]: + payload = result.to_dict() + payload["run"] = _workflow_run_envelope(result.run) + payload["correlation_id"] = result.run.correlation_id + payload["retry_options"] = payload["run"]["retry_options"] + return payload + + +def _workflow_run_envelope(run: Any) -> dict[str, Any]: + data = run.to_dict() + retryable = run.status in ( + WorkflowRunStatus.FAILED, + WorkflowRunStatus.CANCELED, + WorkflowRunStatus.PARTIALLY_COMPLETED, + ) + cancelable = run.status in (WorkflowRunStatus.QUEUED, WorkflowRunStatus.RUNNING, WorkflowRunStatus.WAITING) + data["retry_options"] = { + "retryable": retryable, + "retry_endpoint": f"/api/v1/workflows/runs/{run.run_id}/retry" if retryable else None, + "cancelable": cancelable, + "cancel_endpoint": f"/api/v1/workflows/runs/{run.run_id}/cancel" if cancelable else None, + } + return data + + def _metadata_record(data: dict[str, Any]) -> MetadataRecord: if "record_id" in data and "created_at" in data: return MetadataRecord.from_dict(data) @@ -451,3 +1884,37 @@ def _error_payload(error: KontextualError) -> dict[str, Any]: "message": str(error), "details": dict(error.details), } + + +def _authorization_error_payload(error: AuthorizationError) -> dict[str, Any]: + payload = _error_payload(error) + details = dict(payload.get("details", {})) + decision = details.get("policy_decision") + if isinstance(decision, dict): + details["policy_decision"] = _public_policy_decision(decision) + payload["details"] = details + return payload + + +def _public_policy_decision(decision: dict[str, Any]) -> dict[str, Any]: + allowed_fields = { + "decision_id", + "effect", + "subject_id", + "action", + "resource", + "reason", + "obligations", + "decided_at", + } + public = {key: value for key, value in decision.items() if key in allowed_fields} + context = decision.get("context") + if isinstance(context, dict): + public_context = { + key: value + for key, value in context.items() + if key not in {"resource_metadata", "protected_metadata", "source_payload"} + } + if public_context: + public["context"] = public_context + return public diff --git a/tests/test_service_api.py b/tests/test_service_api.py index 45eb2ef..7c15fd5 100644 --- a/tests/test_service_api.py +++ b/tests/test_service_api.py @@ -1,7 +1,16 @@ import pytest -from kontextual_engine import AuthorizationError, OperationContext, PolicyDecision, ServiceRuntime, create_app +from kontextual_engine import ( + AuthorizationError, + OperationContext, + PolicyDecision, + PolicyEffect, + ServiceRuntime, + ValidationError, + create_app, +) from kontextual_engine.adapters.memory import InMemoryAssetRegistryRepository +from kontextual_engine.api.app import _authorization_error_payload def test_service_runtime_health_readiness_and_version_are_importable_without_fastapi() -> None: @@ -115,6 +124,399 @@ def test_service_runtime_policy_denial_blocks_protected_asset_operation() -> Non assert runtime.list_audit_events(target="asset:asset-denied")["items"][0]["outcome"] == "denied" +def test_service_runtime_exposes_ingestion_retrieval_transformation_and_workflow_operations( + tmp_path, +) -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context(actor_id="user-api", correlation_id="corr-jobs") + source = tmp_path / "kontextual-source.txt" + source.write_text( + "Kontextual engine captures markdown proxy context, system boundaries, and source-grounded retrieval.", + encoding="utf-8", + ) + + ingestion = runtime.start_ingestion_job( + { + "mode": "file", + "path": str(source), + "asset_id": "asset-ingested", + "classification": { + "asset_type": "document", + "sensitivity": "internal", + "owner": "Platform Knowledge", + "topics": ["retrieval", "workflow"], + }, + }, + context, + ) + retrieval = runtime.query_assets( + { + "text": "proxy context", + "include_snippets": True, + "representation_kind": "normalized", + "limit": 5, + }, + context, + ) + transformation = runtime.execute_transformation( + { + "operation_id": "structured_view", + "source_asset_ids": ["asset-ingested"], + "output_asset_id": "asset-derived", + "output_title": "Structured API View", + "metadata": {"request": "service-api-test"}, + }, + context, + ) + template = runtime.register_workflow_template( + { + "template_id": "workflow-structured", + "name": "Structured Workflow", + "inputs": [{"name": "source_asset_ids", "kind": "asset"}], + "steps": [ + { + "step_id": "build", + "kind": "transformation", + "operation_id": "structured_view", + "inputs": {"source_asset_ids": "$inputs.source_asset_ids"}, + "outputs": { + "asset_id": "asset-workflow-structured", + "title": "Workflow Structured View", + "asset_type": "derived_artifact", + "media_type": "application/json", + }, + } + ], + }, + context, + ) + workflow = runtime.invoke_workflow_run( + { + "template_id": template["template_id"], + "inputs": {"source_asset_ids": ["asset-ingested"]}, + }, + context, + ) + reconstruction = runtime.reconstruct_workflow_run(workflow["run"]["run_id"]) + + assert ingestion["status"] == "completed" + assert ingestion["correlation_id"] == "corr-jobs" + assert ingestion["output_asset_ids"] == ["asset-ingested"] + assert ingestion["retry_options"]["retryable"] is False + assert ingestion["job"]["partial_results"]["action"] == "created" + assert runtime.ingestion_capabilities()["connectors"][0]["supports_directories"] is True + + assert retrieval["success"] is True + assert retrieval["correlation_id"] == "corr-jobs" + assert retrieval["results"][0]["asset_id"] == "asset-ingested" + assert retrieval["results"][0]["source_refs"] + assert retrieval["results"][0]["snippets"][0]["source_ref_id"] + assert retrieval["metadata"]["policy_enforced"] is True + + assert transformation["success"] is True + assert transformation["run"]["status"] == "completed" + assert transformation["run"]["output_asset_ids"] == ["asset-derived"] + assert transformation["lineage"]["source_asset_ids"] == ["asset-ingested"] + assert transformation["audit_event"]["operation"] == "transformation.run.completed" + assert transformation["retry_options"]["retryable"] is False + assert runtime.get_transformation_run(transformation["run"]["run_id"])["status"] == "completed" + assert runtime.list_transformation_operations()["count"] >= 1 + assert runtime.list_transformation_runs(status="completed")["count"] >= 2 + + assert workflow["success"] is True + assert workflow["run"]["status"] == "completed" + assert workflow["run"]["output_asset_ids"] == ["asset-workflow-structured"] + assert workflow["retry_options"]["retryable"] is False + assert runtime.get_workflow_run(workflow["run"]["run_id"])["status"] == "completed" + assert runtime.list_workflow_runs(status="completed")["count"] == 1 + assert runtime.list_workflow_review_tasks(workflow_run_id=workflow["run"]["run_id"])["count"] == 0 + assert runtime.list_workflow_exceptions(workflow_run_id=workflow["run"]["run_id"])["count"] == 0 + assert reconstruction["run"]["run_id"] == workflow["run"]["run_id"] + assert reconstruction["template"]["template_id"] == "workflow-structured" + assert reconstruction["transformation_runs"][0]["status"] == "completed" + assert reconstruction["derived_lineage"][0]["output_asset_id"] == "asset-workflow-structured" + + +def test_service_runtime_exposes_failed_ingestion_job_state_and_recovery_envelope(tmp_path) -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context(actor_id="user-api", correlation_id="corr-failed-job") + + failed = runtime.start_ingestion_job( + {"mode": "file", "path": str(tmp_path / "missing.txt")}, + context, + ) + listed = runtime.list_ingestion_jobs(status="failed") + + assert failed["status"] == "failed" + assert failed["correlation_id"] == "corr-failed-job" + assert failed["failures"][0]["code"] == "kontextual.not_found" + assert failed["retry_options"]["retryable"] is False + assert runtime.get_ingestion_job(failed["job_id"])["job_id"] == failed["job_id"] + assert listed["count"] == 1 + assert listed["items"][0]["job_id"] == failed["job_id"] + + +def test_service_runtime_operation_context_represents_delegation_and_agent_identity() -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + + context = runtime.operation_context( + actor_id="agent-codex", + actor_type="ai_agent", + display_name="Codex", + external_ref="agent://codex", + groups=["automation", "engineering"], + delegated_actor_id="user-owner", + delegated_actor_type="human", + delegated_actor_display_name="Owner", + delegated_actor_groups=["knowledge-owners"], + request_scope={"tenant": "tenant-a", "surface": "service-api"}, + policy_scope={"allowed_sensitivity": "internal"}, + agent_id="agent-codex", + agent_name="Codex", + agent_run_id="run-123", + agent_tool="service-api", + correlation_id="corr-delegated", + ) + payload = context.to_dict() + + assert payload["actor"]["id"] == "agent-codex" + assert payload["actor"]["actor_type"] == "ai_agent" + assert payload["actor"]["groups"] == ["automation", "engineering"] + assert payload["delegated_actor"]["id"] == "user-owner" + assert payload["delegated_actor"]["actor_type"] == "human" + assert payload["request_scope"]["tenant"] == "tenant-a" + assert payload["policy_scope"]["allowed_sensitivity"] == "internal" + assert payload["metadata"]["agent"]["agent_run_id"] == "run-123" + assert payload["metadata"]["delegation"]["delegated_actor_id"] == "user-owner" + + +def test_service_api_authorization_error_payload_redacts_resource_metadata() -> None: + runtime = ServiceRuntime( + repository=InMemoryAssetRegistryRepository(), + policy_gateway=DenyCreatePolicy(), + ) + + with pytest.raises(AuthorizationError) as exc_info: + runtime.create_asset( + { + "asset_id": "asset-secret", + "title": "Protected Title", + "classification": { + "asset_type": "document", + "sensitivity": "restricted", + "metadata": {"secret": "do-not-echo"}, + }, + }, + runtime.operation_context(actor_id="user-denied"), + ) + + payload = _authorization_error_payload(exc_info.value) + + assert payload["details"]["policy_decision"]["effect"] == "deny" + assert "resource_metadata" not in payload["details"]["policy_decision"].get("context", {}) + assert "Protected Title" not in str(payload) + assert "do-not-echo" not in str(payload) + + +def test_service_runtime_exposes_bounded_agent_operation_catalog_and_dispatch() -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context( + actor_id="agent-codex", + actor_type="ai_agent", + agent_id="agent-codex", + agent_run_id="run-agent-1", + correlation_id="corr-agent", + ) + runtime.create_asset( + { + "asset_id": "asset-agent", + "title": "Agent Visible Asset", + "classification": {"asset_type": "document", "sensitivity": "internal"}, + "source_refs": [{"source_system": "test", "path": "agent.md"}], + }, + context, + ) + + catalog = runtime.list_agent_operations() + dry_run = runtime.execute_agent_operation( + "enrich_metadata", + { + "dry_run": True, + "payload": {"asset_id": "asset-agent", "metadata": {"key": "agent_note", "value": "planned"}}, + }, + context, + ) + search = runtime.execute_agent_operation( + "search_assets", + {"query": {"asset_type": "document", "limit": 5}}, + context, + ) + bundle = runtime.execute_agent_operation( + "retrieve_asset", + {"asset_id": "asset-agent"}, + context, + ) + report = runtime.execute_agent_operation( + "report_result", + {"summary": "Agent operation completed", "result_ref": {"asset_id": "asset-agent"}}, + context, + ) + + assert catalog["count"] >= 10 + assert runtime.get_agent_operation("search_assets")["audit_operation"] == "agent.operation.search_assets" + assert all(item["required_permissions"] for item in catalog["items"]) + assert all(item["failure_modes"] for item in catalog["items"]) + assert dry_run["dry_run"] is True + assert runtime.list_metadata_records("asset-agent")["count"] == 0 + assert search["success"] is True + assert search["result"]["results"][0]["asset_id"] == "asset-agent" + assert search["audit_event"]["operation"] == "agent.operation.search_assets" + assert bundle["result"]["asset"]["id"] == "asset-agent" + assert bundle["result"]["source_grounded"] is True + assert report["result"]["audit_event"]["operation"] == "agent.report.recorded" + assert runtime.list_audit_events(target="agent_operation:search_assets")["count"] == 2 + + with pytest.raises(ValidationError): + runtime.execute_agent_operation("shell", {"command": "echo no"}, context) + + +def test_service_runtime_assembles_source_grounded_context_package_with_opaque_memory_refs( + tmp_path, +) -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context(actor_id="agent-codex", actor_type="ai_agent", correlation_id="corr-context") + source = tmp_path / "context-source.txt" + source.write_text( + "Markdown-backed context packages should remain source grounded.", + encoding="utf-8", + ) + runtime.start_ingestion_job( + { + "mode": "file", + "path": str(source), + "asset_id": "asset-context", + "classification": {"asset_type": "markdown", "sensitivity": "internal"}, + }, + context, + ) + + package = runtime.assemble_context_package( + { + "package_id": "ctxpkg-test", + "title": "Context Package", + "intent": "Support implementation planning.", + "format": "markitect", + "query": { + "text": "source grounded", + "representation_kind": "normalized", + "include_snippets": True, + }, + "constraints": {"max_sensitivity": "internal", "no_external_publish": True}, + "external_memory_refs": [ + { + "ref_id": "phase-memory:episode-1", + "system": "phase-memory", + "content": "memory graph detail must not be embedded", + } + ], + }, + context, + ) + + assert runtime.context_package_schema()["formats"] == ["kontextual", "markitect"] + assert package["package_id"] == "ctxpkg-test" + assert package["source_grounded"] is True + assert package["result_count"] == 1 + assert package["items"][0]["asset_id"] == "asset-context" + assert package["items"][0]["source_refs"] + assert package["items"][0]["snippets"] + assert package["external_memory_refs"] == [ + { + "ref_id": "phase-memory:episode-1", + "system": "phase-memory", + "kind": "memory_ref", + "opaque": True, + "metadata": {}, + } + ] + assert "memory graph detail" not in str(package["external_memory_refs"]) + assert package["markitect_payload"]["kind"] == "markitect.context_package" + assert package["markitect_payload"]["adapter_boundary"] == ( + "markdown rendering and selector semantics are delegated to markitect-tool" + ) + assert package["audit_event"]["operation"] == "context_package.assemble" + assert package["policy_decision"]["effect"] == "allow" + + +def test_service_runtime_agent_operations_support_review_required_and_dry_run_only_policy() -> None: + review_runtime = ServiceRuntime( + repository=InMemoryAssetRegistryRepository(), + policy_gateway=ReviewRequiredAgentPolicy(), + ) + dry_run_runtime = ServiceRuntime( + repository=InMemoryAssetRegistryRepository(), + policy_gateway=DryRunOnlyAgentPolicy(), + ) + review_context = review_runtime.operation_context( + actor_id="agent-codex", + actor_type="ai_agent", + correlation_id="corr-review", + ) + dry_run_context = dry_run_runtime.operation_context( + actor_id="agent-codex", + actor_type="ai_agent", + correlation_id="corr-dry-run", + ) + + review = review_runtime.execute_agent_operation( + "enrich_metadata", + {"payload": {"asset_id": "asset-review", "metadata": {"key": "risk", "value": "high"}}}, + review_context, + ) + dry_run_required = dry_run_runtime.execute_agent_operation( + "transform_asset", + {"payload": {"transformation": {"operation_id": "structured_view"}}}, + dry_run_context, + ) + dry_run = dry_run_runtime.execute_agent_operation( + "transform_asset", + {"dry_run": True, "payload": {"transformation": {"operation_id": "structured_view"}}}, + dry_run_context, + ) + + assert review["success"] is False + assert review["review_required"] is True + assert review["audit_event"]["outcome"] == "review_required" + assert review["policy_decision"]["effect"] == "require_review" + assert dry_run_required["success"] is False + assert dry_run_required["dry_run_required"] is True + assert dry_run_required["audit_event"]["outcome"] == "dry_run" + assert dry_run_required["policy_decision"]["effect"] == "dry_run_only" + assert dry_run["success"] is True + assert dry_run["dry_run"] is True + assert dry_run["audit_event"]["outcome"] == "dry_run" + + +def test_service_runtime_ingestion_directory_reports_partial_failures(tmp_path) -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context(actor_id="user-api", correlation_id="corr-partial") + (tmp_path / "good.txt").write_text("source grounded text", encoding="utf-8") + (tmp_path / "unsupported.bin").write_bytes(b"\x00\x01\x02") + + result = runtime.start_ingestion_job( + {"mode": "directory", "path": str(tmp_path), "recursive": False}, + context, + ) + + assert result["status"] == "partially_completed" + assert result["output_asset_ids"] + assert result["failures"][0]["code"] == "kontextual.adapter_unavailable" + assert result["job"]["partial_results"]["files_total"] == 2 + assert result["job"]["partial_results"]["failed"] == 1 + assert result["job"]["partial_results"]["succeeded"] == 1 + assert result["retry_options"]["retryable"] is True + + def test_create_app_reports_missing_optional_dependency_when_fastapi_is_absent() -> None: try: import fastapi # noqa: F401 @@ -160,10 +562,20 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None: assert "/api/v1/health" in paths assert "/api/v1/ready" in paths assert "/api/v1/version" in paths + assert "/api/v1/context" in paths assert "/api/v1/assets" in paths assert "/api/v1/relationships" in paths assert "/api/v1/audit/events" in paths assert "/api/v1/policy/evaluate" in paths + assert "/api/v1/ingestion/jobs" in paths + assert "/api/v1/retrieval/assets" in paths + assert "/api/v1/transformations/runs" in paths + assert "/api/v1/workflows/templates" in paths + assert "/api/v1/workflows/runs" in paths + assert "/api/v1/workflows/reviews" in paths + assert "/api/v1/agents/operations" in paths + assert "/api/v1/context-packages" in paths + assert "/api/v1/context-packages/schema" in paths def test_create_app_attaches_runtime_to_application_state(client) -> None: @@ -188,3 +600,46 @@ class DenyCreatePolicy: context={"resource_metadata": resource_metadata or {}}, ) return PolicyDecision.allow(context.actor.id, action, resource) + + +class ReviewRequiredAgentPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, str] | None = None, + ) -> PolicyDecision: + if action == "agent.operation.enrich_metadata": + return PolicyDecision( + PolicyEffect.REQUIRE_REVIEW, + context.actor.id, + action, + resource, + reason="metadata enrichment requires review", + obligations={"queue": "knowledge-review"}, + context={"resource_metadata": resource_metadata or {}}, + ) + return PolicyDecision.allow(context.actor.id, action, resource) + + +class DryRunOnlyAgentPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, str] | None = None, + ) -> PolicyDecision: + if action == "agent.operation.transform_asset": + return PolicyDecision( + PolicyEffect.DRY_RUN_ONLY, + context.actor.id, + action, + resource, + reason="transformation requires dry-run preview", + context={"resource_metadata": resource_metadata or {}}, + ) + return PolicyDecision.allow(context.actor.id, action, resource) diff --git a/workplans/KONT-WP-0009-service-api-agent-safe-operation.md b/workplans/KONT-WP-0009-service-api-agent-safe-operation.md index 89bd158..ffa79b2 100644 --- a/workplans/KONT-WP-0009-service-api-agent-safe-operation.md +++ b/workplans/KONT-WP-0009-service-api-agent-safe-operation.md @@ -4,7 +4,7 @@ type: workplan title: "Service API And Agent-Safe Operation" domain: markitect repo: kontextual-engine -status: active +status: completed owner: codex topic_slug: markitect planning_priority: high @@ -48,14 +48,13 @@ review gates through Markitect APIs. ## Implementation Status -The first optional FastAPI service skeleton is implemented for health, -readiness, version, OpenAPI contracts, and the initial asset/metadata/ -relationship/audit/policy resource surface. See +The optional FastAPI service skeleton is implemented for health, readiness, +version, OpenAPI contracts, asset/metadata/relationship/audit/policy resources, +ingestion jobs, governed retrieval, transformations, and workflow resources. +See `docs/service-api-implementation.md`. -Ingestion, retrieval, transformation, workflow, actor context, agent -operations, context packages, and dry-run/review-gate response contracts remain -open in this workplan. +All MVP tasks in this workplan are implemented. ## S9.1 - Implement versioned FastAPI service skeleton and health contracts @@ -116,7 +115,7 @@ Implemented: ```task id: KONT-WP-0009-T003 -status: todo +status: done priority: high state_hub_task_id: "7271b26d-0dbb-4eca-9140-a7729ad296e4" ``` @@ -131,11 +130,25 @@ Acceptance: - Retrieval results are permission-aware and source-grounded. - Transformations and workflows expose lineage and audit references. +Implemented: + +- `ServiceRuntime` exposes ingestion capabilities and ingestion job start/list/get + over `AssetIngestionService`. +- Governed retrieval endpoints wrap `AssetRetrievalService` for asset, context + entity, relationship, feedback, index refresh, and quality metric contracts. +- Transformation operations and runs are exposed with retry/cancel hints, + lineage, output assets, audit references, and policy decisions. +- Workflow templates, queued/invoked runs, run recovery, review decisions, + review queues, exception queues, and reconstruction are exposed over + `WorkflowService`. +- Runtime tests cover completed and failed ingestion jobs, source-grounded + retrieval, transformation lineage/audit, and workflow reconstruction. + ## S9.4 - Implement actor context delegation and authorization middleware ```task id: KONT-WP-0009-T004 -status: todo +status: done priority: high state_hub_task_id: "7becdec7-ddbb-497f-b762-77043e16046e" ``` @@ -150,11 +163,23 @@ Acceptance: - Authorization failures do not leak protected content in errors or result shapes. +Implemented: + +- `ServiceRuntime.operation_context()` now supports actor external refs, + groups, delegated actors, request scope, policy scope, and AI-agent metadata. +- FastAPI request context parsing accepts actor, delegated actor, agent, group, + request-scope, and policy-scope headers. +- `GET /api/v1/context` exposes the resolved operation context for integration + checks and client diagnostics. +- FastAPI exception handlers preserve structured errors from dependencies. +- HTTP authorization error payloads redact protected resource metadata from + policy-decision context. + ## S9.5 - Implement bounded agent operation catalog ```task id: KONT-WP-0009-T005 -status: todo +status: done priority: high state_hub_task_id: "fc9e1def-229c-4224-8fd3-6fd4f9785c27" ``` @@ -171,11 +196,24 @@ Acceptance: - Agent operations are auditable separately from human and deterministic automation actions. +Implemented: + +- Added a bounded agent operation catalog for inspect, retrieve, search, + assemble-context preview, metadata enrichment, classification request, + transformation, workflow invocation, review submission, and result reporting. +- Added `GET /api/v1/agents/operations`, `GET /api/v1/agents/operations/{id}`, + and `POST /api/v1/agents/operations/{id}`. +- Agent operation execution authorizes `agent.operation.*`, emits separate + agent-operation audit events, supports generic dry-run envelopes, and + dispatches through existing runtime methods. +- Unsupported operation IDs fail with structured validation errors rather than + opening arbitrary command or internal-service access. + ## S9.6 - Implement context package API with policy constraints ```task id: KONT-WP-0009-T006 -status: todo +status: done priority: medium state_hub_task_id: "9ff1d345-d0a1-46eb-ae9a-f6beba2fa5e9" ``` @@ -192,11 +230,24 @@ Acceptance: - Markdown-backed packages can interoperate with Markitect context-package payloads while remaining wrapped in engine permission and audit contracts. +Implemented: + +- Added `GET /api/v1/context-packages/schema` and + `POST /api/v1/context-packages`. +- Context packages are assembled from permission-filtered retrieval results and + include source refs, snippets, metadata, relationships, representation + provenance, policy constraints, audit references, and policy decisions. +- External memory refs are represented as opaque pointers and do not embed + memory graph content. +- The `markitect` format emits a Markitect-compatible envelope while keeping + markdown rendering, selector semantics, and validation delegated to + `markitect-tool`. + ## S9.7 - Implement dry-run review-gate and contract-test coverage ```task id: KONT-WP-0009-T007 -status: todo +status: done priority: medium state_hub_task_id: "bbbdec75-d3c0-4367-b073-ef9c5dffa2b7" ``` @@ -211,6 +262,18 @@ Acceptance: responses, and partial failures. - OpenAPI output remains stable for implemented endpoints. +Implemented: + +- Agent operation policy decisions with `require_review` now return structured + `review_required` envelopes and audit `review_required` outcomes. +- Agent operation policy decisions with `dry_run_only` now return + `dry_run_required` envelopes unless the request is already a dry run. +- Generic agent dry-run requests audit with `dry_run` outcomes and avoid domain + mutation. +- Contract tests cover redacted authorization failures, review-required + responses, dry-run-required responses, partial ingestion failures, and OpenAPI + stability for implemented endpoint groups. + ## Definition Of Done - The service API exposes the MVP operation surface without requiring UI.