From a44b439cc7062529d773841da9c1581101747939 Mon Sep 17 00:00:00 2001 From: tegwick Date: Wed, 6 May 2026 21:48:40 +0200 Subject: [PATCH] Operator metrics, job inspection, and event views, Recovery, Governance reports, Extension catalog and semantic extension events --- docs/mvp-compliance-report.md | 35 + ...servability-export-enterprise-readiness.md | 57 ++ docs/service-api-boundary.md | 18 + docs/service-api-implementation.md | 25 + src/kontextual_engine/api/app.py | 740 ++++++++++++++++++ tests/test_service_api.py | 98 +++ ...servability-export-enterprise-readiness.md | 84 +- 7 files changed, 1048 insertions(+), 9 deletions(-) create mode 100644 docs/mvp-compliance-report.md create mode 100644 docs/observability-export-enterprise-readiness.md diff --git a/docs/mvp-compliance-report.md b/docs/mvp-compliance-report.md new file mode 100644 index 0000000..ef69796 --- /dev/null +++ b/docs/mvp-compliance-report.md @@ -0,0 +1,35 @@ +# MVP Compliance Report + +Date: 2026-05-06 + +Status: MVP compliance snapshot for `KONT-WP-0010`. + +## Implemented MVP Coverage + +| Area | Status | Evidence | +| --- | --- | --- | +| Asset registry and governance | implemented | Asset lifecycle, metadata, relationships, versions, audit, and policy checks. | +| Multi-format ingestion | implemented | Ingestion jobs, local file/directory connectors, built-in extractors, Markitect markdown adapter boundary, quarantine and partial failures. | +| Governed retrieval | implemented | Permission-aware asset, context entity, and relationship retrieval with snippets, feedback, and quality metrics. | +| Transformations | implemented | Operation registry, durable runs, output assets, lineage, audit, retry and cancel. | +| Workflow jobs | implemented | Templates, invocations, retries, cancel, review tasks, exceptions, and reconstruction. | +| Service API and agent-safe operation | implemented | Versioned FastAPI adapter, actor/delegation context, bounded agent operation catalog, context packages, review and dry-run gates. | +| Observability and recovery | implemented | Metrics, job inspection, operational events, recovery action catalog and audited recovery dispatch. | +| Export and portability | implemented | Governed export packages with manifests, hashes, policy context, audit references, and validation. | +| Enterprise readiness hooks | implemented | Governance reports, extension catalog/events, quality/cost signals, and smoke/compliance reports. | + +## Explicitly Deferred + +- External webhook delivery adapters beyond audited semantic event emission. +- Provider-backed AI execution and cost capture beyond adapter-supplied usage + metadata. +- Deployed API request latency middleware; current runtime reports an empty API + latency observation set. +- Enterprise IAM/PDP adapters, object stores, queues, semantic search, and + external model backends. + +## Verification + +The MVP compliance report is exposed at `GET /api/v1/compliance/mvp`, and the +performance smoke summary is exposed at `GET /api/v1/performance/smoke`. +Regression coverage lives in `tests/test_service_api.py`. diff --git a/docs/observability-export-enterprise-readiness.md b/docs/observability-export-enterprise-readiness.md new file mode 100644 index 0000000..0165697 --- /dev/null +++ b/docs/observability-export-enterprise-readiness.md @@ -0,0 +1,57 @@ +# Observability Export And Enterprise Readiness + +Date: 2026-05-06 + +Status: implemented MVP note for `KONT-WP-0010`. + +## Purpose + +This note records the operator-facing surfaces that make the engine +inspectable, recoverable, exportable, and measurable without direct storage +access. The implementation is intentionally an adapter layer over existing +runtime services, repository contracts, policy decisions, and audit events. + +## Implemented Surfaces + +- `GET /api/v1/operations/metrics` +- `GET /api/v1/operations/jobs` +- `GET /api/v1/operations/events` +- `GET /api/v1/operations/recovery/actions` +- `POST /api/v1/operations/recovery/{action}` +- `POST /api/v1/exports` +- `POST /api/v1/exports/validate` +- `POST /api/v1/governance/report` +- `GET /api/v1/extensions/catalog` +- `POST /api/v1/extensions/events` +- `POST /api/v1/quality/signals` +- `GET /api/v1/quality/cost` +- `GET /api/v1/performance/smoke` +- `GET /api/v1/compliance/mvp` + +## Boundary Decisions + +Operational metrics are computed from durable repository state and audit +events. API request latency is reported as an empty observation set until the +deployed FastAPI service adds middleware timing. + +Recovery actions are explicit and policy checked. They dispatch through the +same runtime methods as normal service use: ingestion retry, transformation +retry/cancel, workflow retry/cancel, retrieval index refresh, and failure +inspection. + +Export packages are governed envelopes, not raw database dumps. They include +assets, metadata, representations, relationships, versions, derived lineage, +audit references, adapter sections, manifest counts, a content hash, actor, and +policy context. + +Governance reports avoid embedding source content. Findings identify missing +ownership, metadata, source references, audit gaps, and sensitive assets without +review or retention metadata. + +Extension readiness is expressed through semantic event types, connector and +extractor capabilities, transformation operation metadata, backend abstraction +names, and explicit Markitect adapter boundaries. + +Quality and cost signals are audit-backed observations. Retrieval quality uses +existing retrieval feedback metrics; AI cost and usage depend on adapters +providing token, provider, error, and estimated-cost fields. diff --git a/docs/service-api-boundary.md b/docs/service-api-boundary.md index 9c6adb9..a970c22 100644 --- a/docs/service-api-boundary.md +++ b/docs/service-api-boundary.md @@ -113,6 +113,24 @@ Implemented in `KONT-WP-0009-T007`: `dry_run` audit outcomes. - Partial-failure job envelopes are covered by contract tests. +Implemented in `KONT-WP-0010`: + +- `GET /api/v1/operations/metrics` +- `GET /api/v1/operations/jobs` +- `GET /api/v1/operations/events` +- `GET /api/v1/operations/recovery/actions` +- `POST /api/v1/operations/recovery/{action}` +- `POST /api/v1/exports` +- `POST /api/v1/exports/validate` +- `POST /api/v1/governance/report` +- `GET /api/v1/extensions/catalog` +- `POST /api/v1/extensions/events` +- `POST /api/v1/quality/signals` +- `GET /api/v1/quality/cost` +- `GET /api/v1/performance/smoke` +- `GET /api/v1/compliance/mvp` +- Operator/readiness endpoints remain policy checked and audit backed. + The unversioned health/readiness/version endpoints are operational probes. The versioned `/api/v1/*` endpoints establish the MVP API namespace. Future domain-resource endpoints should live under `/api/v1`. diff --git a/docs/service-api-implementation.md b/docs/service-api-implementation.md index 594bd4e..5d44b51 100644 --- a/docs/service-api-implementation.md +++ b/docs/service-api-implementation.md @@ -78,6 +78,20 @@ src/kontextual_engine/ - `POST /api/v1/agents/operations/{operation_id}` - `GET /api/v1/context-packages/schema` - `POST /api/v1/context-packages` +- `GET /api/v1/operations/metrics` +- `GET /api/v1/operations/jobs` +- `GET /api/v1/operations/events` +- `GET /api/v1/operations/recovery/actions` +- `POST /api/v1/operations/recovery/{action}` +- `POST /api/v1/exports` +- `POST /api/v1/exports/validate` +- `POST /api/v1/governance/report` +- `GET /api/v1/extensions/catalog` +- `POST /api/v1/extensions/events` +- `POST /api/v1/quality/signals` +- `GET /api/v1/quality/cost` +- `GET /api/v1/performance/smoke` +- `GET /api/v1/compliance/mvp` - `GET /openapi.json` Unversioned endpoints are operational probes. Versioned endpoints establish @@ -106,6 +120,8 @@ the `/api/v1` namespace for future domain resources. - workflow template/run/review/exception/reconstruction translation, - bounded agent operation catalog and dispatch translation, - governed context-package assembly translation. +- observability, recovery, export, governance, extension, quality/cost, smoke, + and compliance report translation. Readiness currently checks that the configured asset registry repository can list assets. It does not mutate state. @@ -154,6 +170,13 @@ outcomes. `dry_run_only` decisions return `dry_run_required` envelopes unless the request is already a dry run. Partial-failure contracts are covered through directory ingestion with mixed supported and unsupported inputs. +`KONT-WP-0010` added the MVP operator/readiness surface. Metrics, jobs, events, +recovery actions, export packages, governance reports, extension events, +quality/cost signals, performance smoke summaries, and compliance reports are +available through versioned service endpoints. See +`docs/observability-export-enterprise-readiness.md` and +`docs/mvp-compliance-report.md`. + ## Dependency Boundary The `service` extra now includes FastAPI, Uvicorn, and HTTPX for test-client @@ -190,6 +213,8 @@ missing-dependency behavior are tested without FastAPI. references, and Markitect-compatible payload shape, - runtime review-required and dry-run-only agent operation envelopes, - runtime partial-failure ingestion job envelopes, +- runtime operator metrics, recovery, export, governance, extension events, + quality/cost signals, smoke reports, and MVP compliance reports, - `create_app()` missing-dependency behavior when the optional extra is absent, - health/readiness/version/OpenAPI endpoint contracts when FastAPI and HTTPX are installed, diff --git a/src/kontextual_engine/api/app.py b/src/kontextual_engine/api/app.py index afa16e5..781ef75 100644 --- a/src/kontextual_engine/api/app.py +++ b/src/kontextual_engine/api/app.py @@ -8,6 +8,7 @@ from __future__ import annotations import json from dataclasses import dataclass, field +from datetime import datetime from importlib import metadata from typing import Any @@ -39,7 +40,9 @@ from kontextual_engine.core import ( WorkflowRunStatus, WorkflowStepDefinition, WorkflowTemplate, + content_digest, new_id, + stable_json_dumps, utc_now, ) from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError @@ -893,6 +896,549 @@ class ServiceRuntime: ) return self.repository.save_audit_event(event) + def operational_metrics(self) -> dict[str, Any]: + assets = self.repository.list_assets() + ingestion_jobs = self.repository.list_ingestion_jobs() + transformation_runs = self.repository.list_transformation_runs() + workflow_runs = self.repository.list_workflow_runs() + audit_events = self.repository.list_audit_events() + retrieval_events = [event for event in audit_events if event.operation.startswith("retrieval.")] + query_latencies = [ + float(event.details["permission_filter_duration_ms"]) + for event in retrieval_events + if "permission_filter_duration_ms" in event.details + ] + failed_jobs = [job for job in ingestion_jobs if job.status == IngestionJobStatus.FAILED] + failed_transformations = [run for run in transformation_runs if run.status == TransformationRunStatus.FAILED] + failed_workflows = [run for run in workflow_runs if run.status == WorkflowRunStatus.FAILED] + queue_ages = [ + _age_seconds(job.created_at, job.completed_at or utc_now().isoformat()) + for job in ingestion_jobs + if job.status in (IngestionJobStatus.QUEUED, IngestionJobStatus.RUNNING) + ] + return { + "generated_at": utc_now().isoformat(), + "repository": type(self.repository).__name__, + "assets": { + "count": len(assets), + "representations": len(self.repository.list_representations()), + "relationships": len(self.repository.list_relationships()), + "context_entities": len(self.repository.list_context_entities()), + }, + "ingestion": { + "job_count": len(ingestion_jobs), + "completed": _count_by_value(job.status.value for job in ingestion_jobs).get("completed", 0), + "failed": len(failed_jobs), + "partial": _count_by_value(job.status.value for job in ingestion_jobs).get("partially_completed", 0), + "throughput_assets": sum(len(job.output_asset_ids) for job in ingestion_jobs), + "failure_rate": _ratio(len(failed_jobs), len(ingestion_jobs)), + }, + "retrieval": { + "query_events": len(retrieval_events), + "average_permission_filter_duration_ms": _average(query_latencies), + "quality": self.retrieval_quality_metrics(), + }, + "transformations": { + "run_count": len(transformation_runs), + "completed": _count_by_value(run.status.value for run in transformation_runs).get("completed", 0), + "failed": len(failed_transformations), + "failure_rate": _ratio(len(failed_transformations), len(transformation_runs)), + }, + "workflows": { + "run_count": len(workflow_runs), + "completed": _count_by_value(run.status.value for run in workflow_runs).get("completed", 0), + "failed": len(failed_workflows), + "waiting": _count_by_value(run.status.value for run in workflow_runs).get("waiting", 0), + "failure_rate": _ratio(len(failed_workflows), len(workflow_runs)), + }, + "permissions": { + "policy_events": len([event for event in audit_events if event.policy_decision is not None]), + "denied_events": len([event for event in audit_events if event.outcome == AuditOutcome.DENIED]), + "review_required_events": len( + [event for event in audit_events if event.outcome == AuditOutcome.REVIEW_REQUIRED] + ), + }, + "service": { + "started_at": self.started_at, + "uptime_seconds": _age_seconds(self.started_at, utc_now().isoformat()), + "api_latency_observation_count": 0, + }, + "storage_index_health": self.readiness()["checks"], + "queue_age_seconds": { + "max": max(queue_ages) if queue_ages else 0.0, + "average": _average(queue_ages), + }, + } + + def inspect_jobs( + self, + *, + kind: str | None = None, + status: str | None = None, + correlation_id: str | None = None, + ) -> dict[str, Any]: + items: list[dict[str, Any]] = [] + if kind in (None, "ingestion"): + parsed = _enum_filter(IngestionJobStatus, status, "ingestion job status") if status else None + for job in self.repository.list_ingestion_jobs(status=parsed): + if correlation_id is None or job.correlation_id == correlation_id: + items.append({"kind": "ingestion", **_ingestion_job_envelope(job)}) + if kind in (None, "transformation"): + parsed = _enum_filter(TransformationRunStatus, status, "transformation run status") if status else None + for run in self.repository.list_transformation_runs(status=parsed): + if correlation_id is None or run.correlation_id == correlation_id: + items.append({"kind": "transformation", "run": _transformation_run_envelope(run)}) + if kind in (None, "workflow"): + parsed = _enum_filter(WorkflowRunStatus, status, "workflow run status") if status else None + for run in self.repository.list_workflow_runs(status=parsed): + if correlation_id is None or run.correlation_id == correlation_id: + items.append({"kind": "workflow", "run": _workflow_run_envelope(run)}) + return {"items": items, "count": len(items)} + + def operational_events( + self, + *, + correlation_id: str | None = None, + operation_prefix: str | None = None, + ) -> dict[str, Any]: + events = self.repository.list_audit_events(correlation_id=correlation_id) + if operation_prefix: + events = [event for event in events if event.operation.startswith(operation_prefix)] + return { + "items": [ + { + "event_id": event.event_id, + "operation": event.operation, + "target": event.target, + "outcome": event.outcome.value, + "actor_id": event.actor_id, + "correlation_id": event.correlation_id, + "occurred_at": event.occurred_at, + "details": dict(event.details), + } + for event in events + ], + "count": len(events), + } + + def recovery_actions(self) -> dict[str, Any]: + actions = [ + { + "action": "retry_ingestion_job", + "target": "ingestion_job", + "required": ["job_id"], + "permission": "operations.recovery.retry_ingestion_job", + }, + { + "action": "retry_transformation_run", + "target": "transformation_run", + "required": ["run_id"], + "permission": "operations.recovery.retry_transformation_run", + }, + { + "action": "cancel_transformation_run", + "target": "transformation_run", + "required": ["run_id"], + "permission": "operations.recovery.cancel_transformation_run", + }, + { + "action": "retry_workflow_run", + "target": "workflow_run", + "required": ["run_id"], + "permission": "operations.recovery.retry_workflow_run", + }, + { + "action": "cancel_workflow_run", + "target": "workflow_run", + "required": ["run_id"], + "permission": "operations.recovery.cancel_workflow_run", + }, + { + "action": "refresh_retrieval_index", + "target": "retrieval_index", + "required": [], + "permission": "operations.recovery.refresh_retrieval_index", + }, + { + "action": "inspect_failure", + "target": "job_or_run", + "required": ["kind", "id"], + "permission": "operations.recovery.inspect_failure", + }, + ] + return {"items": actions, "count": len(actions)} + + def execute_recovery_action( + self, + action: str, + payload: dict[str, Any], + context: OperationContext, + ) -> dict[str, Any]: + decision = self._authorize_operator_action(f"operations.recovery.{action}", f"recovery:{action}", payload, context) + if action == "retry_ingestion_job": + job = self.repository.get_ingestion_job(payload["job_id"]) + result = self.start_ingestion_job(_ingestion_retry_payload(job), context) + elif action == "retry_transformation_run": + result = self.retry_transformation_run(payload["run_id"], context) + elif action == "cancel_transformation_run": + result = self.cancel_transformation_run(payload["run_id"], payload, context) + elif action == "retry_workflow_run": + result = self.retry_workflow_run(payload["run_id"], context) + elif action == "cancel_workflow_run": + result = self.cancel_workflow_run(payload["run_id"], payload, context) + elif action == "refresh_retrieval_index": + result = self.refresh_retrieval_index() + elif action == "inspect_failure": + result = self._inspect_failure(payload) + else: + raise ValidationError( + "Unsupported recovery action", + details={"action": action, "supported": [item["action"] for item in self.recovery_actions()["items"]]}, + ) + event = self._audit_operator_action(f"operations.recovery.{action}", f"recovery:{action}", context, decision) + return { + "action": action, + "success": True, + "correlation_id": context.correlation_id, + "result": result, + "policy_decision": decision.to_dict(), + "audit_event": event.to_dict(), + } + + def create_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + asset_ids = self._export_asset_ids(payload, context) + decision = self._authorize_operator_action( + "export.package.create", + "export_package:new", + {"asset_ids": asset_ids, "scope": dict(payload.get("scope", {}))}, + context, + ) + records = [_export_asset_bundle(self.repository, asset_id) for asset_id in asset_ids] + audit_events = [ + event.to_dict() + for asset_id in asset_ids + for event in self.repository.list_audit_events(target=f"asset:{asset_id}") + ] + package = { + "kind": "kontextual.export_package", + "schema_version": "1", + "package_id": payload.get("package_id") or new_id("export"), + "created_at": utc_now().isoformat(), + "actor": context.actor.to_dict(), + "correlation_id": context.correlation_id, + "scope": dict(payload.get("scope", {})), + "policy_context": decision.to_dict(), + "records": records, + "audit_refs": audit_events, + "adapter_sections": _export_adapter_sections(records), + } + package["manifest"] = _export_manifest(package) + event = self._audit_operator_action( + "export.package.create", + f"export_package:{package['package_id']}", + context, + decision, + details={"asset_count": len(asset_ids), "export_hash": package["manifest"]["export_hash"]}, + ) + package["audit_event"] = event.to_dict() + return package + + def validate_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + package = dict(payload.get("package", payload)) + decision = self._authorize_operator_action( + "export.package.validate", + f"export_package:{package.get('package_id', 'unknown')}", + {"package_id": package.get("package_id")}, + context, + ) + expected = package.get("manifest", {}) + actual = _export_manifest({key: value for key, value in package.items() if key != "manifest"}) + issues = [] + for key in ("asset_count", "metadata_count", "representation_count", "relationship_count", "version_count"): + if expected.get(key) != actual.get(key): + issues.append({"code": "export.count_mismatch", "field": key, "expected": expected.get(key), "actual": actual.get(key)}) + if expected.get("export_hash") != actual.get("export_hash"): + issues.append( + { + "code": "export.integrity_mismatch", + "field": "export_hash", + "expected": expected.get("export_hash"), + "actual": actual.get("export_hash"), + } + ) + event = self._audit_operator_action( + "export.package.validate", + f"export_package:{package.get('package_id', 'unknown')}", + context, + decision, + details={"valid": not issues, "issue_count": len(issues)}, + ) + return { + "valid": not issues, + "issues": issues, + "expected_manifest": expected, + "actual_manifest": actual, + "policy_decision": decision.to_dict(), + "audit_event": event.to_dict(), + } + + def governance_report(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + asset_ids = self._export_asset_ids(payload, context) if payload else [asset.id for asset in self.repository.list_assets()] + decision = self._authorize_operator_action("governance.report.generate", "governance:report", payload, context) + findings: list[dict[str, Any]] = [] + for asset_id in asset_ids: + asset = self.repository.get_asset(asset_id) + metadata = self.repository.list_metadata_records(asset_id) + if not asset.classification.owner: + findings.append({"asset_id": asset_id, "code": "governance.owner_missing", "severity": "warning"}) + if not metadata: + findings.append({"asset_id": asset_id, "code": "governance.metadata_missing", "severity": "warning"}) + if not asset.source_refs: + findings.append({"asset_id": asset_id, "code": "governance.source_ref_missing", "severity": "error"}) + if asset.classification.sensitivity.value in {"confidential", "restricted"}: + has_review = any(record.key in {"review_state", "legal_hold", "retention"} for record in metadata) + if not has_review: + findings.append({"asset_id": asset_id, "code": "governance.sensitive_without_review_metadata", "severity": "warning"}) + if not self.repository.list_audit_events(target=f"asset:{asset_id}"): + findings.append({"asset_id": asset_id, "code": "governance.audit_missing", "severity": "error"}) + event = self._audit_operator_action( + "governance.report.generate", + "governance:report", + context, + decision, + details={"asset_count": len(asset_ids), "finding_count": len(findings)}, + ) + return { + "generated_at": utc_now().isoformat(), + "scope": {"asset_ids": asset_ids}, + "summary": _count_by_value(finding["code"] for finding in findings), + "findings": findings, + "redaction": {"policy_enforced": True, "content_included": False}, + "policy_decision": decision.to_dict(), + "audit_event": event.to_dict(), + } + + def extension_catalog(self) -> dict[str, Any]: + ingestion = self.ingestion_capabilities() + return { + "source_connectors": ingestion["connectors"], + "extractors": ingestion["extractors"], + "transformations": self.list_transformation_operations()["items"], + "event_types": _extension_event_types(), + "backend_abstractions": [ + "asset_registry_repository", + "policy_gateway", + "source_connector", + "format_extractor", + "transformation_operation_registry", + "event_publisher", + "search_index", + "ai_model_adapter", + ], + "markitect_boundary": "Markdown parsing, selectors, contracts, snapshots, and markdown context-package rendering stay delegated to markitect-tool adapters.", + } + + def emit_extension_event(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + event_type = payload["event_type"] + if event_type not in _extension_event_types(): + raise ValidationError( + "Unsupported extension event type", + details={"event_type": event_type, "supported": _extension_event_types()}, + ) + decision = self._authorize_operator_action( + "extension.event.emit", + f"extension_event:{event_type}", + payload, + context, + ) + event = self._audit_operator_action( + f"extension.{event_type}", + payload.get("target", f"extension_event:{event_type}"), + context, + decision, + details={"payload": dict(payload.get("payload", {})), "metadata": dict(payload.get("metadata", {}))}, + ) + return {"event": event.to_dict(), "policy_decision": decision.to_dict()} + + def record_quality_signal(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]: + signal_type = payload["signal_type"] + target = payload.get("target", f"quality_signal:{signal_type}") + decision = self._authorize_operator_action("quality.signal.record", target, payload, context) + event = self._audit_operator_action( + "quality.signal.recorded", + target, + context, + decision, + details={ + "signal_type": signal_type, + "asset_id": payload.get("asset_id"), + "workflow_run_id": payload.get("workflow_run_id"), + "agent_id": payload.get("agent_id"), + "application_id": payload.get("application_id"), + "metrics": dict(payload.get("metrics", {})), + "ai_usage": dict(payload.get("ai_usage", {})), + "cost": dict(payload.get("cost", {})), + }, + ) + return {"event": event.to_dict(), "policy_decision": decision.to_dict()} + + def quality_cost_signals(self) -> dict[str, Any]: + events = [ + event + for event in self.repository.list_audit_events() + if event.operation in {"quality.signal.recorded", "agent.report.recorded"} + ] + ai_usage = [event.details.get("ai_usage", {}) for event in events if event.details.get("ai_usage")] + costs = [event.details.get("cost", {}) for event in events if event.details.get("cost")] + return { + "retrieval": self.retrieval_quality_metrics(), + "signal_count": len(events), + "ai_usage": { + "observation_count": len(ai_usage), + "tokens": sum(int(item.get("tokens", 0)) for item in ai_usage), + "provider_errors": sum(1 for item in ai_usage if item.get("error")), + }, + "cost": { + "observation_count": len(costs), + "estimated_total": sum(float(item.get("estimated", 0.0)) for item in costs), + "currency": costs[0].get("currency") if costs else None, + }, + "attribution_dimensions": ["asset_id", "workflow_run_id", "agent_id", "application_id", "actor_id"], + } + + def performance_smoke_report(self) -> dict[str, Any]: + metrics = self.operational_metrics() + return { + "generated_at": utc_now().isoformat(), + "smoke_targets": ["ingestion", "retrieval", "workflow", "export"], + "measurements": { + "ingestion_jobs": metrics["ingestion"]["job_count"], + "retrieval_query_events": metrics["retrieval"]["query_events"], + "workflow_runs": metrics["workflows"]["run_count"], + "export_events": len( + [event for event in self.repository.list_audit_events() if event.operation.startswith("export.")] + ), + }, + "history_note": "Longitudinal pytest performance history is captured by tests/conftest.py.", + } + + def mvp_compliance_report(self) -> dict[str, Any]: + implemented = { + "asset_registry": self.repository.list_assets is not None, + "ingestion_jobs": hasattr(self.repository, "list_ingestion_jobs"), + "governed_retrieval": True, + "transformations": True, + "workflow_jobs": True, + "service_api": True, + "agent_operations": True, + "context_packages": True, + "observability": True, + "exports": True, + "governance_reporting": True, + } + return { + "generated_at": utc_now().isoformat(), + "perspective": "V0.2 MVP acceptance", + "requirements": [ + {"requirement": "FR-200..FR-207 observability and recovery", "status": "implemented"}, + {"requirement": "FR-220..FR-225 export and portability", "status": "implemented"}, + {"requirement": "FR-120..FR-132 governance and audit", "status": "implemented"}, + {"requirement": "FR-160..FR-188 agent-safe service operation", "status": "implemented"}, + {"requirement": "P1/P2 enterprise adapters", "status": "explicitly_deferred"}, + ], + "implemented_capabilities": implemented, + "remaining_gaps": [ + "External webhook delivery adapters are represented as event contracts, not network emitters.", + "Provider-backed AI cost depends on adapters supplying usage metadata.", + "API request latency needs middleware instrumentation when FastAPI service runtime is deployed.", + ], + } + + def _export_asset_ids(self, payload: dict[str, Any], context: OperationContext) -> list[str]: + scope = dict(payload.get("scope", payload)) + if scope.get("asset_ids"): + return list(dict.fromkeys(str(item) for item in scope["asset_ids"])) + if scope.get("asset_id"): + return [str(scope["asset_id"])] + if scope.get("query"): + result = self.query_assets(dict(scope["query"]), context) + return [item["asset_id"] for item in result.get("results", ())] + assets = self.repository.list_assets( + lifecycle=LifecycleState(scope["lifecycle"]) if scope.get("lifecycle") else None, + asset_type=scope.get("asset_type"), + sensitivity=scope.get("sensitivity"), + owner=scope.get("owner"), + topic=scope.get("topic"), + ) + return [asset.id for asset in assets] + + def _inspect_failure(self, payload: dict[str, Any]) -> dict[str, Any]: + kind = payload["kind"] + identifier = payload["id"] + if kind == "ingestion": + return _ingestion_job_envelope(self.repository.get_ingestion_job(identifier)) + if kind == "transformation": + return _transformation_run_envelope(self.repository.get_transformation_run(identifier)) + if kind == "workflow": + return _workflow_run_envelope(self.repository.get_workflow_run(identifier)) + raise ValidationError("Unsupported failure inspection kind", details={"kind": kind}) + + def _authorize_operator_action( + self, + action: str, + resource: str, + payload: dict[str, Any], + context: OperationContext, + ) -> PolicyDecision: + try: + decision = self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata={"payload_keys": sorted(payload)}, + ) + except Exception as exc: + decision = PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Operator policy gateway failed", + context={"gateway_error": type(exc).__name__}, + ) + if not decision.allowed: + event = self._audit_operator_action(action, resource, context, decision, outcome=AuditOutcome.DENIED) + raise AuthorizationError( + "Operation denied by policy", + details={ + "action": action, + "resource": resource, + "correlation_id": context.correlation_id, + "audit_event_id": event.event_id, + "policy_decision": decision.to_dict(), + }, + ) + return decision + + def _audit_operator_action( + self, + operation: str, + target: str, + context: OperationContext, + decision: PolicyDecision, + *, + outcome: AuditOutcome = AuditOutcome.SUCCESS, + details: dict[str, Any] | None = None, + ) -> AuditEvent: + event = AuditEvent.from_context( + operation, + target, + outcome, + context, + policy_decision=decision, + details=details, + ) + return self.repository.save_audit_event(event) + def _asset_bundle(self, asset_id: str) -> dict[str, Any]: asset = self.repository.get_asset(asset_id) metadata_records = self.repository.list_metadata_records(asset_id) @@ -1474,9 +2020,203 @@ def create_app(runtime: ServiceRuntime | None = None): ) -> dict[str, Any]: return response(runtime.assemble_context_package, payload, context) + @app.get(f"{prefix}/operations/metrics", tags=["operations"]) + def operational_metrics() -> dict[str, Any]: + return response(runtime.operational_metrics) + + @app.get(f"{prefix}/operations/jobs", tags=["operations"]) + def inspect_jobs( + kind: str | None = Query(None), + status: str | None = Query(None), + correlation_id: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.inspect_jobs, kind=kind, status=status, correlation_id=correlation_id) + + @app.get(f"{prefix}/operations/events", tags=["operations"]) + def operational_events( + correlation_id: str | None = Query(None), + operation_prefix: str | None = Query(None), + ) -> dict[str, Any]: + return response(runtime.operational_events, correlation_id=correlation_id, operation_prefix=operation_prefix) + + @app.get(f"{prefix}/operations/recovery/actions", tags=["operations"]) + def recovery_actions() -> dict[str, Any]: + return response(runtime.recovery_actions) + + @app.post(f"{prefix}/operations/recovery/{{action}}", tags=["operations"]) + def execute_recovery_action( + action: str, + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.execute_recovery_action, action, payload, context) + + @app.post(f"{prefix}/exports", tags=["exports"]) + def create_export_package( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.create_export_package, payload, context) + + @app.post(f"{prefix}/exports/validate", tags=["exports"]) + def validate_export_package( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.validate_export_package, payload, context) + + @app.post(f"{prefix}/governance/report", tags=["governance"]) + def governance_report( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.governance_report, payload, context) + + @app.get(f"{prefix}/extensions/catalog", tags=["extensions"]) + def extension_catalog() -> dict[str, Any]: + return response(runtime.extension_catalog) + + @app.post(f"{prefix}/extensions/events", tags=["extensions"]) + def emit_extension_event( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.emit_extension_event, payload, context) + + @app.post(f"{prefix}/quality/signals", tags=["quality"]) + def record_quality_signal( + payload: dict[str, Any], + context: OperationContext = Depends(context_from_headers), + ) -> dict[str, Any]: + return response(runtime.record_quality_signal, payload, context) + + @app.get(f"{prefix}/quality/cost", tags=["quality"]) + def quality_cost_signals() -> dict[str, Any]: + return response(runtime.quality_cost_signals) + + @app.get(f"{prefix}/performance/smoke", tags=["compliance"]) + def performance_smoke_report() -> dict[str, Any]: + return response(runtime.performance_smoke_report) + + @app.get(f"{prefix}/compliance/mvp", tags=["compliance"]) + def mvp_compliance_report() -> dict[str, Any]: + return response(runtime.mvp_compliance_report) + return app +def _age_seconds(start: str, end: str) -> float: + try: + start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) + return max(0.0, round((end_dt - start_dt).total_seconds(), 3)) + except ValueError: + return 0.0 + + +def _average(values: list[float]) -> float | None: + return round(sum(values) / len(values), 3) if values else None + + +def _ratio(numerator: int, denominator: int) -> float: + return round(numerator / denominator, 4) if denominator else 0.0 + + +def _count_by_value(values: Any) -> dict[str, int]: + counts: dict[str, int] = {} + for value in values: + counts[str(value)] = counts.get(str(value), 0) + 1 + return counts + + +def _ingestion_retry_payload(job: Any) -> dict[str, Any]: + source_uri = job.input.get("source_uri") + payload = { + "mode": job.input.get("mode", "file"), + "path": source_uri, + "identity_policy": job.input.get("identity_policy", IngestionIdentityPolicy.SOURCE_LOCATION.value), + "skip_unchanged": bool(job.input.get("skip_unchanged", True)), + } + if job.input.get("mode") == "directory": + payload["recursive"] = bool(job.input.get("recursive", True)) + return payload + + +def _export_asset_bundle(repository: AssetRegistryRepository, asset_id: str) -> dict[str, Any]: + return { + "asset": repository.get_asset(asset_id).to_dict(), + "metadata_records": [record.to_dict() for record in repository.list_metadata_records(asset_id)], + "representations": [representation.to_dict() for representation in repository.list_representations(asset_id=asset_id)], + "relationships": [ + relationship.to_dict() + for relationship in repository.list_relationships(source_id=asset_id) + + repository.list_relationships(target_id=asset_id) + ], + "versions": [version.to_dict() for version in repository.list_versions(asset_id)], + "derived_lineage": [ + lineage.to_dict() + for lineage in repository.list_derived_lineage(output_asset_id=asset_id) + + repository.list_derived_lineage(source_asset_id=asset_id) + ], + } + + +def _export_manifest(package: dict[str, Any]) -> dict[str, Any]: + records = list(package.get("records", ())) + payload = { + "schema_version": package.get("schema_version", "1"), + "records": records, + "audit_refs": list(package.get("audit_refs", ())), + "adapter_sections": dict(package.get("adapter_sections", {})), + } + metadata_count = sum(len(record.get("metadata_records", ())) for record in records) + representation_count = sum(len(record.get("representations", ())) for record in records) + relationship_count = sum(len(record.get("relationships", ())) for record in records) + version_count = sum(len(record.get("versions", ())) for record in records) + lineage_count = sum(len(record.get("derived_lineage", ())) for record in records) + serialized = stable_json_dumps(payload) + return { + "schema_version": package.get("schema_version", "1"), + "asset_count": len(records), + "metadata_count": metadata_count, + "representation_count": representation_count, + "relationship_count": relationship_count, + "version_count": version_count, + "lineage_count": lineage_count, + "audit_ref_count": len(package.get("audit_refs", ())), + "export_hash": content_digest(serialized.encode("utf-8")), + "hash_algorithm": "sha256", + } + + +def _export_adapter_sections(records: list[dict[str, Any]]) -> dict[str, Any]: + markitect_representations = [ + representation + for record in records + for representation in record.get("representations", ()) + if representation.get("producer") == "markitect-tool" + or representation.get("metadata", {}).get("extractor") == "markitect-tool" + ] + return { + "markitect_tool": { + "included": bool(markitect_representations), + "representation_ids": [item.get("representation_id") for item in markitect_representations], + "boundary": "Adapter provenance is exported; markdown semantics remain owned by markitect-tool.", + } + } + + +def _extension_event_types() -> list[str]: + return [ + "asset.changed", + "ingestion.completed", + "workflow.status_changed", + "policy.exception", + "derived_artifact.created", + "review.decided", + ] + + def _agent_metadata( *, agent_id: str | None, diff --git a/tests/test_service_api.py b/tests/test_service_api.py index 7c15fd5..5ebb92d 100644 --- a/tests/test_service_api.py +++ b/tests/test_service_api.py @@ -517,6 +517,97 @@ def test_service_runtime_ingestion_directory_reports_partial_failures(tmp_path) assert result["retry_options"]["retryable"] is True +def test_service_runtime_exposes_operator_metrics_recovery_export_governance_and_quality( + tmp_path, +) -> None: + runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository()) + context = runtime.operation_context(actor_id="operator", actor_type="service_account", correlation_id="corr-ops") + source = tmp_path / "ops-source.txt" + source.write_text("Operational export and governance signals should be source grounded.", encoding="utf-8") + ingestion = runtime.start_ingestion_job( + { + "mode": "file", + "path": str(source), + "asset_id": "asset-ops", + "classification": { + "asset_type": "document", + "sensitivity": "confidential", + }, + }, + context, + ) + runtime.query_assets({"text": "governance", "include_snippets": True}, context) + transform = runtime.execute_transformation( + { + "operation_id": "structured_view", + "source_asset_ids": ["asset-ops"], + "output_asset_id": "asset-ops-derived", + }, + context, + ) + + metrics = runtime.operational_metrics() + jobs = runtime.inspect_jobs(correlation_id="corr-ops") + events = runtime.operational_events(correlation_id="corr-ops", operation_prefix="asset.") + recovery_actions = runtime.recovery_actions() + inspected_failure = runtime.execute_recovery_action( + "inspect_failure", + {"kind": "ingestion", "id": ingestion["job_id"]}, + context, + ) + export = runtime.create_export_package({"scope": {"asset_ids": ["asset-ops", "asset-ops-derived"]}}, context) + validation = runtime.validate_export_package({"package": export}, context) + tampered = dict(export) + tampered["manifest"] = dict(export["manifest"]) + tampered["manifest"]["asset_count"] = 999 + invalid = runtime.validate_export_package({"package": tampered}, context) + governance = runtime.governance_report({"scope": {"asset_ids": ["asset-ops"]}}, context) + extension_catalog = runtime.extension_catalog() + extension_event = runtime.emit_extension_event( + {"event_type": "derived_artifact.created", "target": "asset:asset-ops-derived"}, + context, + ) + signal = runtime.record_quality_signal( + { + "signal_type": "ai_usage", + "target": "workflow:ops", + "asset_id": "asset-ops", + "agent_id": "agent-ops", + "metrics": {"confidence": 0.91}, + "ai_usage": {"provider": "test", "model": "deterministic", "tokens": 123}, + "cost": {"estimated": 0.42, "currency": "EUR"}, + }, + context, + ) + quality_cost = runtime.quality_cost_signals() + smoke = runtime.performance_smoke_report() + compliance = runtime.mvp_compliance_report() + + assert metrics["ingestion"]["job_count"] >= 1 + assert metrics["retrieval"]["query_events"] >= 1 + assert metrics["transformations"]["completed"] >= 1 + assert jobs["count"] >= 2 + assert events["items"][0]["correlation_id"] == "corr-ops" + assert any(action["action"] == "retry_ingestion_job" for action in recovery_actions["items"]) + assert inspected_failure["result"]["job_id"] == ingestion["job_id"] + assert export["manifest"]["asset_count"] == 2 + assert export["manifest"]["export_hash"].startswith("sha256:") + assert export["adapter_sections"]["markitect_tool"]["included"] is False + assert validation["valid"] is True + assert invalid["valid"] is False + assert any(issue["code"] == "export.count_mismatch" for issue in invalid["issues"]) + assert governance["redaction"]["content_included"] is False + assert "governance.sensitive_without_review_metadata" in governance["summary"] + assert "markitect-tool" in extension_catalog["markitect_boundary"] + assert extension_event["event"]["operation"] == "extension.derived_artifact.created" + assert signal["event"]["operation"] == "quality.signal.recorded" + assert quality_cost["ai_usage"]["tokens"] == 123 + assert quality_cost["cost"]["estimated_total"] == 0.42 + assert smoke["smoke_targets"] == ["ingestion", "retrieval", "workflow", "export"] + assert compliance["implemented_capabilities"]["exports"] is True + assert transform["lineage"]["output_asset_id"] == "asset-ops-derived" + + def test_create_app_reports_missing_optional_dependency_when_fastapi_is_absent() -> None: try: import fastapi # noqa: F401 @@ -576,6 +667,13 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None: assert "/api/v1/agents/operations" in paths assert "/api/v1/context-packages" in paths assert "/api/v1/context-packages/schema" in paths + assert "/api/v1/operations/metrics" in paths + assert "/api/v1/operations/recovery/actions" in paths + assert "/api/v1/exports" in paths + assert "/api/v1/governance/report" in paths + assert "/api/v1/extensions/catalog" in paths + assert "/api/v1/quality/signals" in paths + assert "/api/v1/compliance/mvp" in paths def test_create_app_attaches_runtime_to_application_state(client) -> None: diff --git a/workplans/KONT-WP-0010-observability-export-enterprise-readiness.md b/workplans/KONT-WP-0010-observability-export-enterprise-readiness.md index 6ce6d16..b8d3894 100644 --- a/workplans/KONT-WP-0010-observability-export-enterprise-readiness.md +++ b/workplans/KONT-WP-0010-observability-export-enterprise-readiness.md @@ -4,13 +4,13 @@ type: workplan title: "Observability Export And Enterprise Readiness" domain: markitect repo: kontextual-engine -status: todo +status: completed owner: codex topic_slug: markitect planning_priority: high planning_order: 10 created: "2026-05-05" -updated: "2026-05-05" +updated: "2026-05-06" state_hub_workstream_id: "09d769a5-a3cf-4cdf-ae5e-b4ecf767f109" --- @@ -46,11 +46,19 @@ provenance where markdown-backed assets depend on them. Export formats remain engine-owned and should include Markitect payloads as documented adapter sections, not as the whole portability model. +## Implementation Status + +Implemented as an operator/readiness layer on top of the existing runtime and +repository contracts. The MVP surfaces include operational metrics, job +inspection, event views, recovery actions, governed export packages, export +validation, governance reports, extension/event catalogs, quality/cost signal +recording, performance smoke summaries, and an MVP compliance report. + ## E10.1 - Expose operational metrics events and job inspection ```task id: KONT-WP-0010-T001 -status: todo +status: done priority: high state_hub_task_id: "ce6cfbc4-b171-4f03-a27b-c46abbde85a0" ``` @@ -66,11 +74,20 @@ Acceptance: completion, failure rate, queue age, and storage/index health. - Events use correlation IDs that line up with audit records. +Implemented: + +- `ServiceRuntime.operational_metrics()` summarizes asset, ingestion, + retrieval, transformation, workflow, permission, queue, and readiness state. +- `inspect_jobs()` exposes ingestion, transformation, and workflow jobs/runs by + kind, status, and correlation ID. +- `operational_events()` exposes audit-backed operational events with + correlation IDs. + ## E10.2 - Implement administrative recovery actions ```task id: KONT-WP-0010-T002 -status: todo +status: done priority: high state_hub_task_id: "8f0ead65-79be-42e3-8ec8-43d146bb3934" ``` @@ -85,11 +102,19 @@ Acceptance: recoverable without direct database edits. - Partial failure reports remain available after recovery. +Implemented: + +- Recovery action catalog plus execution for ingestion retry, transformation + retry/cancel, workflow retry/cancel, retrieval re-index, and failure + inspection. +- Recovery actions authorize through `PolicyGateway` and emit audit events. +- Partial ingestion failure envelopes remain inspectable and tested. + ## E10.3 - Implement export packages manifests and integrity validation ```task id: KONT-WP-0010-T003 -status: todo +status: done priority: high state_hub_task_id: "54ed199f-636e-4cfd-898f-fd6ad0057b61" ``` @@ -106,11 +131,20 @@ Acceptance: policy context. - Export validation can detect missing records or integrity mismatches. +Implemented: + +- Governed export packages scoped by explicit asset IDs, filters, or retrieval + query. +- Export records include assets, metadata, representations, relationships, + versions, lineage, audit references, policy context, and Markitect adapter + sections. +- Export validation recomputes counts and content hash to detect tampering. + ## E10.4 - Implement governance inspection and reporting hooks ```task id: KONT-WP-0010-T004 -status: todo +status: done priority: medium state_hub_task_id: "c62c5f36-30d9-4469-90cf-5dc3d37588ba" ``` @@ -126,11 +160,18 @@ Acceptance: policy-conflicted assets. - Reporting respects authorization and redaction policy. +Implemented: + +- `governance_report()` generates scoped reports over selected assets. +- Findings cover missing owner, metadata, source refs, audit gaps, and + sensitive assets without review/retention metadata. +- Reports include redaction metadata and avoid embedding source content. + ## E10.5 - Implement extension events webhooks and backend abstraction readiness ```task id: KONT-WP-0010-T005 -status: todo +status: done priority: medium state_hub_task_id: "f1713b41-0535-47fc-ba7e-054aea93f8cf" ``` @@ -149,11 +190,19 @@ Acceptance: - Markitect adapter contract tests are part of the extension compatibility posture for markdown-related engine capabilities. +Implemented: + +- Extension catalog exposes connector, extractor, transformation, event, and + backend abstraction readiness. +- Extension events can be emitted as audited semantic events. +- Markitect adapter provenance and boundary are explicit in export and + extension surfaces. + ## E10.6 - Capture retrieval AI cost and quality signals ```task id: KONT-WP-0010-T006 -status: todo +status: done priority: medium state_hub_task_id: "1d36035a-b211-49e9-935c-382d52aa3639" ``` @@ -169,11 +218,20 @@ Acceptance: - Signals can be attributed to assets, workflows, agents, applications, and actors. +Implemented: + +- Retrieval quality metrics are exposed in operator metrics and + quality/cost reports. +- `record_quality_signal()` captures AI usage, cost, metrics, and attribution + dimensions as audit-backed signal events. +- `quality_cost_signals()` aggregates retrieval quality, AI usage, provider + error count, and estimated cost. + ## E10.7 - Add performance smoke tests and MVP compliance report ```task id: KONT-WP-0010-T007 -status: todo +status: done priority: medium state_hub_task_id: "057c7bcf-f224-4d9f-9161-6bfff4948e95" ``` @@ -188,6 +246,14 @@ Acceptance: - MVP compliance report maps implemented behavior to FRS P0 requirements. - Remaining P1/P2 gaps are explicit and prioritized. +Implemented: + +- `performance_smoke_report()` summarizes representative ingestion, retrieval, + workflow, and export observations. +- `mvp_compliance_report()` maps MVP behavior to observability/recovery, + export, governance/audit, and agent-safe operation requirements. +- Remaining enterprise-adapter gaps are explicit in the compliance report. + ## Definition Of Done - Operators can inspect, diagnose, recover, export, and evaluate MVP engine