Operator metrics, job inspection, and event views, Recovery, Governance reports, Extension catalog and semantic extension events

This commit is contained in:
2026-05-06 21:48:40 +02:00
parent 9705104659
commit a44b439cc7
7 changed files with 1048 additions and 9 deletions

View File

@@ -0,0 +1,35 @@
# MVP Compliance Report
Date: 2026-05-06
Status: MVP compliance snapshot for `KONT-WP-0010`.
## Implemented MVP Coverage
| Area | Status | Evidence |
| --- | --- | --- |
| Asset registry and governance | implemented | Asset lifecycle, metadata, relationships, versions, audit, and policy checks. |
| Multi-format ingestion | implemented | Ingestion jobs, local file/directory connectors, built-in extractors, Markitect markdown adapter boundary, quarantine and partial failures. |
| Governed retrieval | implemented | Permission-aware asset, context entity, and relationship retrieval with snippets, feedback, and quality metrics. |
| Transformations | implemented | Operation registry, durable runs, output assets, lineage, audit, retry and cancel. |
| Workflow jobs | implemented | Templates, invocations, retries, cancel, review tasks, exceptions, and reconstruction. |
| Service API and agent-safe operation | implemented | Versioned FastAPI adapter, actor/delegation context, bounded agent operation catalog, context packages, review and dry-run gates. |
| Observability and recovery | implemented | Metrics, job inspection, operational events, recovery action catalog and audited recovery dispatch. |
| Export and portability | implemented | Governed export packages with manifests, hashes, policy context, audit references, and validation. |
| Enterprise readiness hooks | implemented | Governance reports, extension catalog/events, quality/cost signals, and smoke/compliance reports. |
## Explicitly Deferred
- External webhook delivery adapters beyond audited semantic event emission.
- Provider-backed AI execution and cost capture beyond adapter-supplied usage
metadata.
- Deployed API request latency middleware; current runtime reports an empty API
latency observation set.
- Enterprise IAM/PDP adapters, object stores, queues, semantic search, and
external model backends.
## Verification
The MVP compliance report is exposed at `GET /api/v1/compliance/mvp`, and the
performance smoke summary is exposed at `GET /api/v1/performance/smoke`.
Regression coverage lives in `tests/test_service_api.py`.

View File

@@ -0,0 +1,57 @@
# Observability Export And Enterprise Readiness
Date: 2026-05-06
Status: implemented MVP note for `KONT-WP-0010`.
## Purpose
This note records the operator-facing surfaces that make the engine
inspectable, recoverable, exportable, and measurable without direct storage
access. The implementation is intentionally an adapter layer over existing
runtime services, repository contracts, policy decisions, and audit events.
## Implemented Surfaces
- `GET /api/v1/operations/metrics`
- `GET /api/v1/operations/jobs`
- `GET /api/v1/operations/events`
- `GET /api/v1/operations/recovery/actions`
- `POST /api/v1/operations/recovery/{action}`
- `POST /api/v1/exports`
- `POST /api/v1/exports/validate`
- `POST /api/v1/governance/report`
- `GET /api/v1/extensions/catalog`
- `POST /api/v1/extensions/events`
- `POST /api/v1/quality/signals`
- `GET /api/v1/quality/cost`
- `GET /api/v1/performance/smoke`
- `GET /api/v1/compliance/mvp`
## Boundary Decisions
Operational metrics are computed from durable repository state and audit
events. API request latency is reported as an empty observation set until the
deployed FastAPI service adds middleware timing.
Recovery actions are explicit and policy checked. They dispatch through the
same runtime methods as normal service use: ingestion retry, transformation
retry/cancel, workflow retry/cancel, retrieval index refresh, and failure
inspection.
Export packages are governed envelopes, not raw database dumps. They include
assets, metadata, representations, relationships, versions, derived lineage,
audit references, adapter sections, manifest counts, a content hash, actor, and
policy context.
Governance reports avoid embedding source content. Findings identify missing
ownership, metadata, source references, audit gaps, and sensitive assets without
review or retention metadata.
Extension readiness is expressed through semantic event types, connector and
extractor capabilities, transformation operation metadata, backend abstraction
names, and explicit Markitect adapter boundaries.
Quality and cost signals are audit-backed observations. Retrieval quality uses
existing retrieval feedback metrics; AI cost and usage depend on adapters
providing token, provider, error, and estimated-cost fields.

View File

@@ -113,6 +113,24 @@ Implemented in `KONT-WP-0009-T007`:
`dry_run` audit outcomes.
- Partial-failure job envelopes are covered by contract tests.
Implemented in `KONT-WP-0010`:
- `GET /api/v1/operations/metrics`
- `GET /api/v1/operations/jobs`
- `GET /api/v1/operations/events`
- `GET /api/v1/operations/recovery/actions`
- `POST /api/v1/operations/recovery/{action}`
- `POST /api/v1/exports`
- `POST /api/v1/exports/validate`
- `POST /api/v1/governance/report`
- `GET /api/v1/extensions/catalog`
- `POST /api/v1/extensions/events`
- `POST /api/v1/quality/signals`
- `GET /api/v1/quality/cost`
- `GET /api/v1/performance/smoke`
- `GET /api/v1/compliance/mvp`
- Operator/readiness endpoints remain policy checked and audit backed.
The unversioned health/readiness/version endpoints are operational probes. The
versioned `/api/v1/*` endpoints establish the MVP API namespace. Future
domain-resource endpoints should live under `/api/v1`.

View File

@@ -78,6 +78,20 @@ src/kontextual_engine/
- `POST /api/v1/agents/operations/{operation_id}`
- `GET /api/v1/context-packages/schema`
- `POST /api/v1/context-packages`
- `GET /api/v1/operations/metrics`
- `GET /api/v1/operations/jobs`
- `GET /api/v1/operations/events`
- `GET /api/v1/operations/recovery/actions`
- `POST /api/v1/operations/recovery/{action}`
- `POST /api/v1/exports`
- `POST /api/v1/exports/validate`
- `POST /api/v1/governance/report`
- `GET /api/v1/extensions/catalog`
- `POST /api/v1/extensions/events`
- `POST /api/v1/quality/signals`
- `GET /api/v1/quality/cost`
- `GET /api/v1/performance/smoke`
- `GET /api/v1/compliance/mvp`
- `GET /openapi.json`
Unversioned endpoints are operational probes. Versioned endpoints establish
@@ -106,6 +120,8 @@ the `/api/v1` namespace for future domain resources.
- workflow template/run/review/exception/reconstruction translation,
- bounded agent operation catalog and dispatch translation,
- governed context-package assembly translation.
- observability, recovery, export, governance, extension, quality/cost, smoke,
and compliance report translation.
Readiness currently checks that the configured asset registry repository can
list assets. It does not mutate state.
@@ -154,6 +170,13 @@ outcomes. `dry_run_only` decisions return `dry_run_required` envelopes unless
the request is already a dry run. Partial-failure contracts are covered through
directory ingestion with mixed supported and unsupported inputs.
`KONT-WP-0010` added the MVP operator/readiness surface. Metrics, jobs, events,
recovery actions, export packages, governance reports, extension events,
quality/cost signals, performance smoke summaries, and compliance reports are
available through versioned service endpoints. See
`docs/observability-export-enterprise-readiness.md` and
`docs/mvp-compliance-report.md`.
## Dependency Boundary
The `service` extra now includes FastAPI, Uvicorn, and HTTPX for test-client
@@ -190,6 +213,8 @@ missing-dependency behavior are tested without FastAPI.
references, and Markitect-compatible payload shape,
- runtime review-required and dry-run-only agent operation envelopes,
- runtime partial-failure ingestion job envelopes,
- runtime operator metrics, recovery, export, governance, extension events,
quality/cost signals, smoke reports, and MVP compliance reports,
- `create_app()` missing-dependency behavior when the optional extra is absent,
- health/readiness/version/OpenAPI endpoint contracts when FastAPI and HTTPX
are installed,

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import datetime
from importlib import metadata
from typing import Any
@@ -39,7 +40,9 @@ from kontextual_engine.core import (
WorkflowRunStatus,
WorkflowStepDefinition,
WorkflowTemplate,
content_digest,
new_id,
stable_json_dumps,
utc_now,
)
from kontextual_engine.errors import AuthorizationError, KontextualError, NotFoundError, ValidationError
@@ -893,6 +896,549 @@ class ServiceRuntime:
)
return self.repository.save_audit_event(event)
def operational_metrics(self) -> dict[str, Any]:
assets = self.repository.list_assets()
ingestion_jobs = self.repository.list_ingestion_jobs()
transformation_runs = self.repository.list_transformation_runs()
workflow_runs = self.repository.list_workflow_runs()
audit_events = self.repository.list_audit_events()
retrieval_events = [event for event in audit_events if event.operation.startswith("retrieval.")]
query_latencies = [
float(event.details["permission_filter_duration_ms"])
for event in retrieval_events
if "permission_filter_duration_ms" in event.details
]
failed_jobs = [job for job in ingestion_jobs if job.status == IngestionJobStatus.FAILED]
failed_transformations = [run for run in transformation_runs if run.status == TransformationRunStatus.FAILED]
failed_workflows = [run for run in workflow_runs if run.status == WorkflowRunStatus.FAILED]
queue_ages = [
_age_seconds(job.created_at, job.completed_at or utc_now().isoformat())
for job in ingestion_jobs
if job.status in (IngestionJobStatus.QUEUED, IngestionJobStatus.RUNNING)
]
return {
"generated_at": utc_now().isoformat(),
"repository": type(self.repository).__name__,
"assets": {
"count": len(assets),
"representations": len(self.repository.list_representations()),
"relationships": len(self.repository.list_relationships()),
"context_entities": len(self.repository.list_context_entities()),
},
"ingestion": {
"job_count": len(ingestion_jobs),
"completed": _count_by_value(job.status.value for job in ingestion_jobs).get("completed", 0),
"failed": len(failed_jobs),
"partial": _count_by_value(job.status.value for job in ingestion_jobs).get("partially_completed", 0),
"throughput_assets": sum(len(job.output_asset_ids) for job in ingestion_jobs),
"failure_rate": _ratio(len(failed_jobs), len(ingestion_jobs)),
},
"retrieval": {
"query_events": len(retrieval_events),
"average_permission_filter_duration_ms": _average(query_latencies),
"quality": self.retrieval_quality_metrics(),
},
"transformations": {
"run_count": len(transformation_runs),
"completed": _count_by_value(run.status.value for run in transformation_runs).get("completed", 0),
"failed": len(failed_transformations),
"failure_rate": _ratio(len(failed_transformations), len(transformation_runs)),
},
"workflows": {
"run_count": len(workflow_runs),
"completed": _count_by_value(run.status.value for run in workflow_runs).get("completed", 0),
"failed": len(failed_workflows),
"waiting": _count_by_value(run.status.value for run in workflow_runs).get("waiting", 0),
"failure_rate": _ratio(len(failed_workflows), len(workflow_runs)),
},
"permissions": {
"policy_events": len([event for event in audit_events if event.policy_decision is not None]),
"denied_events": len([event for event in audit_events if event.outcome == AuditOutcome.DENIED]),
"review_required_events": len(
[event for event in audit_events if event.outcome == AuditOutcome.REVIEW_REQUIRED]
),
},
"service": {
"started_at": self.started_at,
"uptime_seconds": _age_seconds(self.started_at, utc_now().isoformat()),
"api_latency_observation_count": 0,
},
"storage_index_health": self.readiness()["checks"],
"queue_age_seconds": {
"max": max(queue_ages) if queue_ages else 0.0,
"average": _average(queue_ages),
},
}
def inspect_jobs(
self,
*,
kind: str | None = None,
status: str | None = None,
correlation_id: str | None = None,
) -> dict[str, Any]:
items: list[dict[str, Any]] = []
if kind in (None, "ingestion"):
parsed = _enum_filter(IngestionJobStatus, status, "ingestion job status") if status else None
for job in self.repository.list_ingestion_jobs(status=parsed):
if correlation_id is None or job.correlation_id == correlation_id:
items.append({"kind": "ingestion", **_ingestion_job_envelope(job)})
if kind in (None, "transformation"):
parsed = _enum_filter(TransformationRunStatus, status, "transformation run status") if status else None
for run in self.repository.list_transformation_runs(status=parsed):
if correlation_id is None or run.correlation_id == correlation_id:
items.append({"kind": "transformation", "run": _transformation_run_envelope(run)})
if kind in (None, "workflow"):
parsed = _enum_filter(WorkflowRunStatus, status, "workflow run status") if status else None
for run in self.repository.list_workflow_runs(status=parsed):
if correlation_id is None or run.correlation_id == correlation_id:
items.append({"kind": "workflow", "run": _workflow_run_envelope(run)})
return {"items": items, "count": len(items)}
def operational_events(
self,
*,
correlation_id: str | None = None,
operation_prefix: str | None = None,
) -> dict[str, Any]:
events = self.repository.list_audit_events(correlation_id=correlation_id)
if operation_prefix:
events = [event for event in events if event.operation.startswith(operation_prefix)]
return {
"items": [
{
"event_id": event.event_id,
"operation": event.operation,
"target": event.target,
"outcome": event.outcome.value,
"actor_id": event.actor_id,
"correlation_id": event.correlation_id,
"occurred_at": event.occurred_at,
"details": dict(event.details),
}
for event in events
],
"count": len(events),
}
def recovery_actions(self) -> dict[str, Any]:
actions = [
{
"action": "retry_ingestion_job",
"target": "ingestion_job",
"required": ["job_id"],
"permission": "operations.recovery.retry_ingestion_job",
},
{
"action": "retry_transformation_run",
"target": "transformation_run",
"required": ["run_id"],
"permission": "operations.recovery.retry_transformation_run",
},
{
"action": "cancel_transformation_run",
"target": "transformation_run",
"required": ["run_id"],
"permission": "operations.recovery.cancel_transformation_run",
},
{
"action": "retry_workflow_run",
"target": "workflow_run",
"required": ["run_id"],
"permission": "operations.recovery.retry_workflow_run",
},
{
"action": "cancel_workflow_run",
"target": "workflow_run",
"required": ["run_id"],
"permission": "operations.recovery.cancel_workflow_run",
},
{
"action": "refresh_retrieval_index",
"target": "retrieval_index",
"required": [],
"permission": "operations.recovery.refresh_retrieval_index",
},
{
"action": "inspect_failure",
"target": "job_or_run",
"required": ["kind", "id"],
"permission": "operations.recovery.inspect_failure",
},
]
return {"items": actions, "count": len(actions)}
def execute_recovery_action(
self,
action: str,
payload: dict[str, Any],
context: OperationContext,
) -> dict[str, Any]:
decision = self._authorize_operator_action(f"operations.recovery.{action}", f"recovery:{action}", payload, context)
if action == "retry_ingestion_job":
job = self.repository.get_ingestion_job(payload["job_id"])
result = self.start_ingestion_job(_ingestion_retry_payload(job), context)
elif action == "retry_transformation_run":
result = self.retry_transformation_run(payload["run_id"], context)
elif action == "cancel_transformation_run":
result = self.cancel_transformation_run(payload["run_id"], payload, context)
elif action == "retry_workflow_run":
result = self.retry_workflow_run(payload["run_id"], context)
elif action == "cancel_workflow_run":
result = self.cancel_workflow_run(payload["run_id"], payload, context)
elif action == "refresh_retrieval_index":
result = self.refresh_retrieval_index()
elif action == "inspect_failure":
result = self._inspect_failure(payload)
else:
raise ValidationError(
"Unsupported recovery action",
details={"action": action, "supported": [item["action"] for item in self.recovery_actions()["items"]]},
)
event = self._audit_operator_action(f"operations.recovery.{action}", f"recovery:{action}", context, decision)
return {
"action": action,
"success": True,
"correlation_id": context.correlation_id,
"result": result,
"policy_decision": decision.to_dict(),
"audit_event": event.to_dict(),
}
def create_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
asset_ids = self._export_asset_ids(payload, context)
decision = self._authorize_operator_action(
"export.package.create",
"export_package:new",
{"asset_ids": asset_ids, "scope": dict(payload.get("scope", {}))},
context,
)
records = [_export_asset_bundle(self.repository, asset_id) for asset_id in asset_ids]
audit_events = [
event.to_dict()
for asset_id in asset_ids
for event in self.repository.list_audit_events(target=f"asset:{asset_id}")
]
package = {
"kind": "kontextual.export_package",
"schema_version": "1",
"package_id": payload.get("package_id") or new_id("export"),
"created_at": utc_now().isoformat(),
"actor": context.actor.to_dict(),
"correlation_id": context.correlation_id,
"scope": dict(payload.get("scope", {})),
"policy_context": decision.to_dict(),
"records": records,
"audit_refs": audit_events,
"adapter_sections": _export_adapter_sections(records),
}
package["manifest"] = _export_manifest(package)
event = self._audit_operator_action(
"export.package.create",
f"export_package:{package['package_id']}",
context,
decision,
details={"asset_count": len(asset_ids), "export_hash": package["manifest"]["export_hash"]},
)
package["audit_event"] = event.to_dict()
return package
def validate_export_package(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
package = dict(payload.get("package", payload))
decision = self._authorize_operator_action(
"export.package.validate",
f"export_package:{package.get('package_id', 'unknown')}",
{"package_id": package.get("package_id")},
context,
)
expected = package.get("manifest", {})
actual = _export_manifest({key: value for key, value in package.items() if key != "manifest"})
issues = []
for key in ("asset_count", "metadata_count", "representation_count", "relationship_count", "version_count"):
if expected.get(key) != actual.get(key):
issues.append({"code": "export.count_mismatch", "field": key, "expected": expected.get(key), "actual": actual.get(key)})
if expected.get("export_hash") != actual.get("export_hash"):
issues.append(
{
"code": "export.integrity_mismatch",
"field": "export_hash",
"expected": expected.get("export_hash"),
"actual": actual.get("export_hash"),
}
)
event = self._audit_operator_action(
"export.package.validate",
f"export_package:{package.get('package_id', 'unknown')}",
context,
decision,
details={"valid": not issues, "issue_count": len(issues)},
)
return {
"valid": not issues,
"issues": issues,
"expected_manifest": expected,
"actual_manifest": actual,
"policy_decision": decision.to_dict(),
"audit_event": event.to_dict(),
}
def governance_report(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
asset_ids = self._export_asset_ids(payload, context) if payload else [asset.id for asset in self.repository.list_assets()]
decision = self._authorize_operator_action("governance.report.generate", "governance:report", payload, context)
findings: list[dict[str, Any]] = []
for asset_id in asset_ids:
asset = self.repository.get_asset(asset_id)
metadata = self.repository.list_metadata_records(asset_id)
if not asset.classification.owner:
findings.append({"asset_id": asset_id, "code": "governance.owner_missing", "severity": "warning"})
if not metadata:
findings.append({"asset_id": asset_id, "code": "governance.metadata_missing", "severity": "warning"})
if not asset.source_refs:
findings.append({"asset_id": asset_id, "code": "governance.source_ref_missing", "severity": "error"})
if asset.classification.sensitivity.value in {"confidential", "restricted"}:
has_review = any(record.key in {"review_state", "legal_hold", "retention"} for record in metadata)
if not has_review:
findings.append({"asset_id": asset_id, "code": "governance.sensitive_without_review_metadata", "severity": "warning"})
if not self.repository.list_audit_events(target=f"asset:{asset_id}"):
findings.append({"asset_id": asset_id, "code": "governance.audit_missing", "severity": "error"})
event = self._audit_operator_action(
"governance.report.generate",
"governance:report",
context,
decision,
details={"asset_count": len(asset_ids), "finding_count": len(findings)},
)
return {
"generated_at": utc_now().isoformat(),
"scope": {"asset_ids": asset_ids},
"summary": _count_by_value(finding["code"] for finding in findings),
"findings": findings,
"redaction": {"policy_enforced": True, "content_included": False},
"policy_decision": decision.to_dict(),
"audit_event": event.to_dict(),
}
def extension_catalog(self) -> dict[str, Any]:
ingestion = self.ingestion_capabilities()
return {
"source_connectors": ingestion["connectors"],
"extractors": ingestion["extractors"],
"transformations": self.list_transformation_operations()["items"],
"event_types": _extension_event_types(),
"backend_abstractions": [
"asset_registry_repository",
"policy_gateway",
"source_connector",
"format_extractor",
"transformation_operation_registry",
"event_publisher",
"search_index",
"ai_model_adapter",
],
"markitect_boundary": "Markdown parsing, selectors, contracts, snapshots, and markdown context-package rendering stay delegated to markitect-tool adapters.",
}
def emit_extension_event(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
event_type = payload["event_type"]
if event_type not in _extension_event_types():
raise ValidationError(
"Unsupported extension event type",
details={"event_type": event_type, "supported": _extension_event_types()},
)
decision = self._authorize_operator_action(
"extension.event.emit",
f"extension_event:{event_type}",
payload,
context,
)
event = self._audit_operator_action(
f"extension.{event_type}",
payload.get("target", f"extension_event:{event_type}"),
context,
decision,
details={"payload": dict(payload.get("payload", {})), "metadata": dict(payload.get("metadata", {}))},
)
return {"event": event.to_dict(), "policy_decision": decision.to_dict()}
def record_quality_signal(self, payload: dict[str, Any], context: OperationContext) -> dict[str, Any]:
signal_type = payload["signal_type"]
target = payload.get("target", f"quality_signal:{signal_type}")
decision = self._authorize_operator_action("quality.signal.record", target, payload, context)
event = self._audit_operator_action(
"quality.signal.recorded",
target,
context,
decision,
details={
"signal_type": signal_type,
"asset_id": payload.get("asset_id"),
"workflow_run_id": payload.get("workflow_run_id"),
"agent_id": payload.get("agent_id"),
"application_id": payload.get("application_id"),
"metrics": dict(payload.get("metrics", {})),
"ai_usage": dict(payload.get("ai_usage", {})),
"cost": dict(payload.get("cost", {})),
},
)
return {"event": event.to_dict(), "policy_decision": decision.to_dict()}
def quality_cost_signals(self) -> dict[str, Any]:
events = [
event
for event in self.repository.list_audit_events()
if event.operation in {"quality.signal.recorded", "agent.report.recorded"}
]
ai_usage = [event.details.get("ai_usage", {}) for event in events if event.details.get("ai_usage")]
costs = [event.details.get("cost", {}) for event in events if event.details.get("cost")]
return {
"retrieval": self.retrieval_quality_metrics(),
"signal_count": len(events),
"ai_usage": {
"observation_count": len(ai_usage),
"tokens": sum(int(item.get("tokens", 0)) for item in ai_usage),
"provider_errors": sum(1 for item in ai_usage if item.get("error")),
},
"cost": {
"observation_count": len(costs),
"estimated_total": sum(float(item.get("estimated", 0.0)) for item in costs),
"currency": costs[0].get("currency") if costs else None,
},
"attribution_dimensions": ["asset_id", "workflow_run_id", "agent_id", "application_id", "actor_id"],
}
def performance_smoke_report(self) -> dict[str, Any]:
metrics = self.operational_metrics()
return {
"generated_at": utc_now().isoformat(),
"smoke_targets": ["ingestion", "retrieval", "workflow", "export"],
"measurements": {
"ingestion_jobs": metrics["ingestion"]["job_count"],
"retrieval_query_events": metrics["retrieval"]["query_events"],
"workflow_runs": metrics["workflows"]["run_count"],
"export_events": len(
[event for event in self.repository.list_audit_events() if event.operation.startswith("export.")]
),
},
"history_note": "Longitudinal pytest performance history is captured by tests/conftest.py.",
}
def mvp_compliance_report(self) -> dict[str, Any]:
implemented = {
"asset_registry": self.repository.list_assets is not None,
"ingestion_jobs": hasattr(self.repository, "list_ingestion_jobs"),
"governed_retrieval": True,
"transformations": True,
"workflow_jobs": True,
"service_api": True,
"agent_operations": True,
"context_packages": True,
"observability": True,
"exports": True,
"governance_reporting": True,
}
return {
"generated_at": utc_now().isoformat(),
"perspective": "V0.2 MVP acceptance",
"requirements": [
{"requirement": "FR-200..FR-207 observability and recovery", "status": "implemented"},
{"requirement": "FR-220..FR-225 export and portability", "status": "implemented"},
{"requirement": "FR-120..FR-132 governance and audit", "status": "implemented"},
{"requirement": "FR-160..FR-188 agent-safe service operation", "status": "implemented"},
{"requirement": "P1/P2 enterprise adapters", "status": "explicitly_deferred"},
],
"implemented_capabilities": implemented,
"remaining_gaps": [
"External webhook delivery adapters are represented as event contracts, not network emitters.",
"Provider-backed AI cost depends on adapters supplying usage metadata.",
"API request latency needs middleware instrumentation when FastAPI service runtime is deployed.",
],
}
def _export_asset_ids(self, payload: dict[str, Any], context: OperationContext) -> list[str]:
scope = dict(payload.get("scope", payload))
if scope.get("asset_ids"):
return list(dict.fromkeys(str(item) for item in scope["asset_ids"]))
if scope.get("asset_id"):
return [str(scope["asset_id"])]
if scope.get("query"):
result = self.query_assets(dict(scope["query"]), context)
return [item["asset_id"] for item in result.get("results", ())]
assets = self.repository.list_assets(
lifecycle=LifecycleState(scope["lifecycle"]) if scope.get("lifecycle") else None,
asset_type=scope.get("asset_type"),
sensitivity=scope.get("sensitivity"),
owner=scope.get("owner"),
topic=scope.get("topic"),
)
return [asset.id for asset in assets]
def _inspect_failure(self, payload: dict[str, Any]) -> dict[str, Any]:
kind = payload["kind"]
identifier = payload["id"]
if kind == "ingestion":
return _ingestion_job_envelope(self.repository.get_ingestion_job(identifier))
if kind == "transformation":
return _transformation_run_envelope(self.repository.get_transformation_run(identifier))
if kind == "workflow":
return _workflow_run_envelope(self.repository.get_workflow_run(identifier))
raise ValidationError("Unsupported failure inspection kind", details={"kind": kind})
def _authorize_operator_action(
self,
action: str,
resource: str,
payload: dict[str, Any],
context: OperationContext,
) -> PolicyDecision:
try:
decision = self.policy_gateway.authorize(
context,
action,
resource,
resource_metadata={"payload_keys": sorted(payload)},
)
except Exception as exc:
decision = PolicyDecision.fail_closed(
context.actor.id,
action,
resource,
reason=str(exc) or "Operator policy gateway failed",
context={"gateway_error": type(exc).__name__},
)
if not decision.allowed:
event = self._audit_operator_action(action, resource, context, decision, outcome=AuditOutcome.DENIED)
raise AuthorizationError(
"Operation denied by policy",
details={
"action": action,
"resource": resource,
"correlation_id": context.correlation_id,
"audit_event_id": event.event_id,
"policy_decision": decision.to_dict(),
},
)
return decision
def _audit_operator_action(
self,
operation: str,
target: str,
context: OperationContext,
decision: PolicyDecision,
*,
outcome: AuditOutcome = AuditOutcome.SUCCESS,
details: dict[str, Any] | None = None,
) -> AuditEvent:
event = AuditEvent.from_context(
operation,
target,
outcome,
context,
policy_decision=decision,
details=details,
)
return self.repository.save_audit_event(event)
def _asset_bundle(self, asset_id: str) -> dict[str, Any]:
asset = self.repository.get_asset(asset_id)
metadata_records = self.repository.list_metadata_records(asset_id)
@@ -1474,9 +2020,203 @@ def create_app(runtime: ServiceRuntime | None = None):
) -> dict[str, Any]:
return response(runtime.assemble_context_package, payload, context)
@app.get(f"{prefix}/operations/metrics", tags=["operations"])
def operational_metrics() -> dict[str, Any]:
return response(runtime.operational_metrics)
@app.get(f"{prefix}/operations/jobs", tags=["operations"])
def inspect_jobs(
kind: str | None = Query(None),
status: str | None = Query(None),
correlation_id: str | None = Query(None),
) -> dict[str, Any]:
return response(runtime.inspect_jobs, kind=kind, status=status, correlation_id=correlation_id)
@app.get(f"{prefix}/operations/events", tags=["operations"])
def operational_events(
correlation_id: str | None = Query(None),
operation_prefix: str | None = Query(None),
) -> dict[str, Any]:
return response(runtime.operational_events, correlation_id=correlation_id, operation_prefix=operation_prefix)
@app.get(f"{prefix}/operations/recovery/actions", tags=["operations"])
def recovery_actions() -> dict[str, Any]:
return response(runtime.recovery_actions)
@app.post(f"{prefix}/operations/recovery/{{action}}", tags=["operations"])
def execute_recovery_action(
action: str,
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.execute_recovery_action, action, payload, context)
@app.post(f"{prefix}/exports", tags=["exports"])
def create_export_package(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.create_export_package, payload, context)
@app.post(f"{prefix}/exports/validate", tags=["exports"])
def validate_export_package(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.validate_export_package, payload, context)
@app.post(f"{prefix}/governance/report", tags=["governance"])
def governance_report(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.governance_report, payload, context)
@app.get(f"{prefix}/extensions/catalog", tags=["extensions"])
def extension_catalog() -> dict[str, Any]:
return response(runtime.extension_catalog)
@app.post(f"{prefix}/extensions/events", tags=["extensions"])
def emit_extension_event(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.emit_extension_event, payload, context)
@app.post(f"{prefix}/quality/signals", tags=["quality"])
def record_quality_signal(
payload: dict[str, Any],
context: OperationContext = Depends(context_from_headers),
) -> dict[str, Any]:
return response(runtime.record_quality_signal, payload, context)
@app.get(f"{prefix}/quality/cost", tags=["quality"])
def quality_cost_signals() -> dict[str, Any]:
return response(runtime.quality_cost_signals)
@app.get(f"{prefix}/performance/smoke", tags=["compliance"])
def performance_smoke_report() -> dict[str, Any]:
return response(runtime.performance_smoke_report)
@app.get(f"{prefix}/compliance/mvp", tags=["compliance"])
def mvp_compliance_report() -> dict[str, Any]:
return response(runtime.mvp_compliance_report)
return app
def _age_seconds(start: str, end: str) -> float:
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
return max(0.0, round((end_dt - start_dt).total_seconds(), 3))
except ValueError:
return 0.0
def _average(values: list[float]) -> float | None:
return round(sum(values) / len(values), 3) if values else None
def _ratio(numerator: int, denominator: int) -> float:
return round(numerator / denominator, 4) if denominator else 0.0
def _count_by_value(values: Any) -> dict[str, int]:
counts: dict[str, int] = {}
for value in values:
counts[str(value)] = counts.get(str(value), 0) + 1
return counts
def _ingestion_retry_payload(job: Any) -> dict[str, Any]:
source_uri = job.input.get("source_uri")
payload = {
"mode": job.input.get("mode", "file"),
"path": source_uri,
"identity_policy": job.input.get("identity_policy", IngestionIdentityPolicy.SOURCE_LOCATION.value),
"skip_unchanged": bool(job.input.get("skip_unchanged", True)),
}
if job.input.get("mode") == "directory":
payload["recursive"] = bool(job.input.get("recursive", True))
return payload
def _export_asset_bundle(repository: AssetRegistryRepository, asset_id: str) -> dict[str, Any]:
return {
"asset": repository.get_asset(asset_id).to_dict(),
"metadata_records": [record.to_dict() for record in repository.list_metadata_records(asset_id)],
"representations": [representation.to_dict() for representation in repository.list_representations(asset_id=asset_id)],
"relationships": [
relationship.to_dict()
for relationship in repository.list_relationships(source_id=asset_id)
+ repository.list_relationships(target_id=asset_id)
],
"versions": [version.to_dict() for version in repository.list_versions(asset_id)],
"derived_lineage": [
lineage.to_dict()
for lineage in repository.list_derived_lineage(output_asset_id=asset_id)
+ repository.list_derived_lineage(source_asset_id=asset_id)
],
}
def _export_manifest(package: dict[str, Any]) -> dict[str, Any]:
records = list(package.get("records", ()))
payload = {
"schema_version": package.get("schema_version", "1"),
"records": records,
"audit_refs": list(package.get("audit_refs", ())),
"adapter_sections": dict(package.get("adapter_sections", {})),
}
metadata_count = sum(len(record.get("metadata_records", ())) for record in records)
representation_count = sum(len(record.get("representations", ())) for record in records)
relationship_count = sum(len(record.get("relationships", ())) for record in records)
version_count = sum(len(record.get("versions", ())) for record in records)
lineage_count = sum(len(record.get("derived_lineage", ())) for record in records)
serialized = stable_json_dumps(payload)
return {
"schema_version": package.get("schema_version", "1"),
"asset_count": len(records),
"metadata_count": metadata_count,
"representation_count": representation_count,
"relationship_count": relationship_count,
"version_count": version_count,
"lineage_count": lineage_count,
"audit_ref_count": len(package.get("audit_refs", ())),
"export_hash": content_digest(serialized.encode("utf-8")),
"hash_algorithm": "sha256",
}
def _export_adapter_sections(records: list[dict[str, Any]]) -> dict[str, Any]:
markitect_representations = [
representation
for record in records
for representation in record.get("representations", ())
if representation.get("producer") == "markitect-tool"
or representation.get("metadata", {}).get("extractor") == "markitect-tool"
]
return {
"markitect_tool": {
"included": bool(markitect_representations),
"representation_ids": [item.get("representation_id") for item in markitect_representations],
"boundary": "Adapter provenance is exported; markdown semantics remain owned by markitect-tool.",
}
}
def _extension_event_types() -> list[str]:
return [
"asset.changed",
"ingestion.completed",
"workflow.status_changed",
"policy.exception",
"derived_artifact.created",
"review.decided",
]
def _agent_metadata(
*,
agent_id: str | None,

View File

@@ -517,6 +517,97 @@ def test_service_runtime_ingestion_directory_reports_partial_failures(tmp_path)
assert result["retry_options"]["retryable"] is True
def test_service_runtime_exposes_operator_metrics_recovery_export_governance_and_quality(
tmp_path,
) -> None:
runtime = ServiceRuntime(repository=InMemoryAssetRegistryRepository())
context = runtime.operation_context(actor_id="operator", actor_type="service_account", correlation_id="corr-ops")
source = tmp_path / "ops-source.txt"
source.write_text("Operational export and governance signals should be source grounded.", encoding="utf-8")
ingestion = runtime.start_ingestion_job(
{
"mode": "file",
"path": str(source),
"asset_id": "asset-ops",
"classification": {
"asset_type": "document",
"sensitivity": "confidential",
},
},
context,
)
runtime.query_assets({"text": "governance", "include_snippets": True}, context)
transform = runtime.execute_transformation(
{
"operation_id": "structured_view",
"source_asset_ids": ["asset-ops"],
"output_asset_id": "asset-ops-derived",
},
context,
)
metrics = runtime.operational_metrics()
jobs = runtime.inspect_jobs(correlation_id="corr-ops")
events = runtime.operational_events(correlation_id="corr-ops", operation_prefix="asset.")
recovery_actions = runtime.recovery_actions()
inspected_failure = runtime.execute_recovery_action(
"inspect_failure",
{"kind": "ingestion", "id": ingestion["job_id"]},
context,
)
export = runtime.create_export_package({"scope": {"asset_ids": ["asset-ops", "asset-ops-derived"]}}, context)
validation = runtime.validate_export_package({"package": export}, context)
tampered = dict(export)
tampered["manifest"] = dict(export["manifest"])
tampered["manifest"]["asset_count"] = 999
invalid = runtime.validate_export_package({"package": tampered}, context)
governance = runtime.governance_report({"scope": {"asset_ids": ["asset-ops"]}}, context)
extension_catalog = runtime.extension_catalog()
extension_event = runtime.emit_extension_event(
{"event_type": "derived_artifact.created", "target": "asset:asset-ops-derived"},
context,
)
signal = runtime.record_quality_signal(
{
"signal_type": "ai_usage",
"target": "workflow:ops",
"asset_id": "asset-ops",
"agent_id": "agent-ops",
"metrics": {"confidence": 0.91},
"ai_usage": {"provider": "test", "model": "deterministic", "tokens": 123},
"cost": {"estimated": 0.42, "currency": "EUR"},
},
context,
)
quality_cost = runtime.quality_cost_signals()
smoke = runtime.performance_smoke_report()
compliance = runtime.mvp_compliance_report()
assert metrics["ingestion"]["job_count"] >= 1
assert metrics["retrieval"]["query_events"] >= 1
assert metrics["transformations"]["completed"] >= 1
assert jobs["count"] >= 2
assert events["items"][0]["correlation_id"] == "corr-ops"
assert any(action["action"] == "retry_ingestion_job" for action in recovery_actions["items"])
assert inspected_failure["result"]["job_id"] == ingestion["job_id"]
assert export["manifest"]["asset_count"] == 2
assert export["manifest"]["export_hash"].startswith("sha256:")
assert export["adapter_sections"]["markitect_tool"]["included"] is False
assert validation["valid"] is True
assert invalid["valid"] is False
assert any(issue["code"] == "export.count_mismatch" for issue in invalid["issues"])
assert governance["redaction"]["content_included"] is False
assert "governance.sensitive_without_review_metadata" in governance["summary"]
assert "markitect-tool" in extension_catalog["markitect_boundary"]
assert extension_event["event"]["operation"] == "extension.derived_artifact.created"
assert signal["event"]["operation"] == "quality.signal.recorded"
assert quality_cost["ai_usage"]["tokens"] == 123
assert quality_cost["cost"]["estimated_total"] == 0.42
assert smoke["smoke_targets"] == ["ingestion", "retrieval", "workflow", "export"]
assert compliance["implemented_capabilities"]["exports"] is True
assert transform["lineage"]["output_asset_id"] == "asset-ops-derived"
def test_create_app_reports_missing_optional_dependency_when_fastapi_is_absent() -> None:
try:
import fastapi # noqa: F401
@@ -576,6 +667,13 @@ def test_service_health_readiness_version_and_openapi_contracts(client) -> None:
assert "/api/v1/agents/operations" in paths
assert "/api/v1/context-packages" in paths
assert "/api/v1/context-packages/schema" in paths
assert "/api/v1/operations/metrics" in paths
assert "/api/v1/operations/recovery/actions" in paths
assert "/api/v1/exports" in paths
assert "/api/v1/governance/report" in paths
assert "/api/v1/extensions/catalog" in paths
assert "/api/v1/quality/signals" in paths
assert "/api/v1/compliance/mvp" in paths
def test_create_app_attaches_runtime_to_application_state(client) -> None:

View File

@@ -4,13 +4,13 @@ type: workplan
title: "Observability Export And Enterprise Readiness"
domain: markitect
repo: kontextual-engine
status: todo
status: completed
owner: codex
topic_slug: markitect
planning_priority: high
planning_order: 10
created: "2026-05-05"
updated: "2026-05-05"
updated: "2026-05-06"
state_hub_workstream_id: "09d769a5-a3cf-4cdf-ae5e-b4ecf767f109"
---
@@ -46,11 +46,19 @@ provenance where markdown-backed assets depend on them. Export formats remain
engine-owned and should include Markitect payloads as documented adapter
sections, not as the whole portability model.
## Implementation Status
Implemented as an operator/readiness layer on top of the existing runtime and
repository contracts. The MVP surfaces include operational metrics, job
inspection, event views, recovery actions, governed export packages, export
validation, governance reports, extension/event catalogs, quality/cost signal
recording, performance smoke summaries, and an MVP compliance report.
## E10.1 - Expose operational metrics events and job inspection
```task
id: KONT-WP-0010-T001
status: todo
status: done
priority: high
state_hub_task_id: "ce6cfbc4-b171-4f03-a27b-c46abbde85a0"
```
@@ -66,11 +74,20 @@ Acceptance:
completion, failure rate, queue age, and storage/index health.
- Events use correlation IDs that line up with audit records.
Implemented:
- `ServiceRuntime.operational_metrics()` summarizes asset, ingestion,
retrieval, transformation, workflow, permission, queue, and readiness state.
- `inspect_jobs()` exposes ingestion, transformation, and workflow jobs/runs by
kind, status, and correlation ID.
- `operational_events()` exposes audit-backed operational events with
correlation IDs.
## E10.2 - Implement administrative recovery actions
```task
id: KONT-WP-0010-T002
status: todo
status: done
priority: high
state_hub_task_id: "8f0ead65-79be-42e3-8ec8-43d146bb3934"
```
@@ -85,11 +102,19 @@ Acceptance:
recoverable without direct database edits.
- Partial failure reports remain available after recovery.
Implemented:
- Recovery action catalog plus execution for ingestion retry, transformation
retry/cancel, workflow retry/cancel, retrieval re-index, and failure
inspection.
- Recovery actions authorize through `PolicyGateway` and emit audit events.
- Partial ingestion failure envelopes remain inspectable and tested.
## E10.3 - Implement export packages manifests and integrity validation
```task
id: KONT-WP-0010-T003
status: todo
status: done
priority: high
state_hub_task_id: "54ed199f-636e-4cfd-898f-fd6ad0057b61"
```
@@ -106,11 +131,20 @@ Acceptance:
policy context.
- Export validation can detect missing records or integrity mismatches.
Implemented:
- Governed export packages scoped by explicit asset IDs, filters, or retrieval
query.
- Export records include assets, metadata, representations, relationships,
versions, lineage, audit references, policy context, and Markitect adapter
sections.
- Export validation recomputes counts and content hash to detect tampering.
## E10.4 - Implement governance inspection and reporting hooks
```task
id: KONT-WP-0010-T004
status: todo
status: done
priority: medium
state_hub_task_id: "c62c5f36-30d9-4469-90cf-5dc3d37588ba"
```
@@ -126,11 +160,18 @@ Acceptance:
policy-conflicted assets.
- Reporting respects authorization and redaction policy.
Implemented:
- `governance_report()` generates scoped reports over selected assets.
- Findings cover missing owner, metadata, source refs, audit gaps, and
sensitive assets without review/retention metadata.
- Reports include redaction metadata and avoid embedding source content.
## E10.5 - Implement extension events webhooks and backend abstraction readiness
```task
id: KONT-WP-0010-T005
status: todo
status: done
priority: medium
state_hub_task_id: "f1713b41-0535-47fc-ba7e-054aea93f8cf"
```
@@ -149,11 +190,19 @@ Acceptance:
- Markitect adapter contract tests are part of the extension compatibility
posture for markdown-related engine capabilities.
Implemented:
- Extension catalog exposes connector, extractor, transformation, event, and
backend abstraction readiness.
- Extension events can be emitted as audited semantic events.
- Markitect adapter provenance and boundary are explicit in export and
extension surfaces.
## E10.6 - Capture retrieval AI cost and quality signals
```task
id: KONT-WP-0010-T006
status: todo
status: done
priority: medium
state_hub_task_id: "1d36035a-b211-49e9-935c-382d52aa3639"
```
@@ -169,11 +218,20 @@ Acceptance:
- Signals can be attributed to assets, workflows, agents, applications, and
actors.
Implemented:
- Retrieval quality metrics are exposed in operator metrics and
quality/cost reports.
- `record_quality_signal()` captures AI usage, cost, metrics, and attribution
dimensions as audit-backed signal events.
- `quality_cost_signals()` aggregates retrieval quality, AI usage, provider
error count, and estimated cost.
## E10.7 - Add performance smoke tests and MVP compliance report
```task
id: KONT-WP-0010-T007
status: todo
status: done
priority: medium
state_hub_task_id: "057c7bcf-f224-4d9f-9161-6bfff4948e95"
```
@@ -188,6 +246,14 @@ Acceptance:
- MVP compliance report maps implemented behavior to FRS P0 requirements.
- Remaining P1/P2 gaps are explicit and prioritized.
Implemented:
- `performance_smoke_report()` summarizes representative ingestion, retrieval,
workflow, and export observations.
- `mvp_compliance_report()` maps MVP behavior to observability/recovery,
export, governance/audit, and agent-safe operation requirements.
- Remaining enterprise-adapter gaps are explicit in the compliance report.
## Definition Of Done
- Operators can inspect, diagnose, recover, export, and evaluate MVP engine