engine and lifecycle

This commit is contained in:
2026-05-14 16:26:42 +02:00
parent c0a535f1d1
commit fc70acb257
11 changed files with 702 additions and 8 deletions

View File

@@ -1,5 +1,15 @@
from .errors import InfospaceError
from .evaluation import EntityEvaluation, EvaluationSnapshot, MetricValue, ScoreEntry
from .engine import (
AssetRecord,
AssetSyncAction,
AssetSyncPlan,
EngineCapabilityContract,
LocalAssetRepository,
engine_capability_contract,
plan_asset_sync,
sync_assets,
)
from .evaluation_io import (
append_to_history,
read_entity_evaluation,
@@ -39,13 +49,19 @@ __all__ = [
"KnowledgeArtifact",
"MetricValue",
"EntityRecord",
"EngineCapabilityContract",
"RelationRecord",
"AssetRecord",
"AssetSyncAction",
"AssetSyncPlan",
"LocalAssetRepository",
"ScoreEntry",
"TopicConfig",
"ViabilityThreshold",
"add_artifact",
"append_to_history",
"create_infospace",
"engine_capability_contract",
"find_snapshot",
"get_history",
"get_latest_snapshot",
@@ -62,6 +78,8 @@ __all__ = [
"load_workflows",
"plan_workflow",
"run_workflow",
"plan_asset_sync",
"sync_assets",
"write_entity_evaluation",
"write_metrics_file",
"write_snapshot",

View File

@@ -6,6 +6,7 @@ import sys
from pathlib import Path
from .checks import run_collection_checks
from .engine import engine_capability_contract, plan_asset_sync, sync_assets
from .errors import InfospaceError
from .history import find_snapshot, get_history, metric_trend, record_check_results
from .lifecycle import add_artifact, create_infospace, load_infospace
@@ -86,6 +87,28 @@ def build_parser() -> argparse.ArgumentParser:
workflow_run.add_argument("root")
workflow_run.add_argument("workflow_id")
engine = sub.add_parser("engine", help="Inspect and sync engine boundary state")
engine_sub = engine.add_subparsers(dest="engine_command", required=True)
engine_inspect = engine_sub.add_parser(
"inspect",
help="Inspect the optional engine capability contract",
)
engine_inspect.add_argument("root")
engine_plan = engine_sub.add_parser(
"plan-sync",
help="Plan artifact-to-asset sync without mutation",
)
engine_plan.add_argument("root")
engine_sync = engine_sub.add_parser(
"sync",
help="Dry-run artifact-to-asset sync unless --apply is passed",
)
engine_sync.add_argument("root")
engine_sync.add_argument("--apply", action="store_true")
return parser
@@ -194,6 +217,22 @@ def main(argv: list[str] | None = None) -> int:
)
else:
parser.error(f"Unhandled workflow command: {args.workflow_command}")
elif args.command == "engine":
if args.engine_command == "inspect":
_write_json(
{
"root": str(Path(args.root)),
"contract": engine_capability_contract().to_dict(),
}
)
elif args.engine_command == "plan-sync":
_write_json(plan_asset_sync(Path(args.root)).to_dict())
elif args.engine_command == "sync":
_write_json(
sync_assets(Path(args.root), dry_run=not args.apply).to_dict()
)
else:
parser.error(f"Unhandled engine command: {args.engine_command}")
else:
parser.error(f"Unhandled command: {args.command}")
except InfospaceError as exc:

View File

@@ -0,0 +1,365 @@
from __future__ import annotations
import hashlib
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Protocol
import yaml
from .errors import InfospaceError
from .lifecycle import load_infospace
from .models import Infospace, KnowledgeArtifact
ENGINE_DIR = Path("output/engine")
ASSET_STORE = ENGINE_DIR / "assets.yaml"
SYNC_RUN_DIR = ENGINE_DIR / "sync-runs"
@dataclass(frozen=True)
class EngineCapabilityContract:
integration_posture: str
file_backed: list[str]
engine_backed: list[str]
explicit_non_goals: list[str]
review_required: list[str]
assumptions: dict[str, str]
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class AssetRecord:
asset_id: str
artifact_id: str
infospace_slug: str
kind: str
title: str
path: str
digest: str
size_bytes: int
version: str
provenance: dict[str, Any] = field(default_factory=dict)
relationships: list[dict[str, Any]] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AssetRecord":
return cls(
asset_id=str(data["asset_id"]),
artifact_id=str(data["artifact_id"]),
infospace_slug=str(data["infospace_slug"]),
kind=str(data["kind"]),
title=str(data.get("title") or ""),
path=str(data["path"]),
digest=str(data["digest"]),
size_bytes=int(data.get("size_bytes") or 0),
version=str(data.get("version") or data["digest"][:12]),
provenance=dict(data.get("provenance") or {}),
relationships=list(data.get("relationships") or []),
metadata=dict(data.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class AssetSyncAction:
action: str
asset_id: str
artifact_id: str
path: str
digest: str
previous_digest: str = ""
workflow_id: str = ""
reason: str = ""
def to_dict(self) -> dict[str, Any]:
return {
key: value
for key, value in asdict(self).items()
if value not in ("", None)
}
@dataclass(frozen=True)
class AssetSyncPlan:
infospace_slug: str
repository: str
dry_run: bool
status: str
actions: list[AssetSyncAction]
capabilities: EngineCapabilityContract
run_record_path: str = ""
def to_dict(self) -> dict[str, Any]:
data = {
"infospace_slug": self.infospace_slug,
"repository": self.repository,
"dry_run": self.dry_run,
"status": self.status,
"actions": [action.to_dict() for action in self.actions],
"contract": self.capabilities.to_dict(),
"run_record_path": self.run_record_path,
}
return {key: value for key, value in data.items() if value not in ("", [])}
class AssetRepository(Protocol):
repository_id: str
def get_asset(self, asset_id: str) -> AssetRecord | None:
"""Return an asset by durable ID if present."""
def list_assets(self) -> list[AssetRecord]:
"""Return all assets known to the repository."""
def upsert_asset(self, asset: AssetRecord) -> None:
"""Create or update a durable asset record."""
class LocalAssetRepository:
"""File-backed adapter used until a kontextual-engine client exists."""
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
self.repository_id = f"local-file:{self.path}"
def get_asset(self, asset_id: str) -> AssetRecord | None:
return self._asset_map().get(asset_id)
def list_assets(self) -> list[AssetRecord]:
return list(self._asset_map().values())
def upsert_asset(self, asset: AssetRecord) -> None:
assets = self._asset_map()
assets[asset.asset_id] = asset
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(
yaml.safe_dump(
{
"assets": [
item.to_dict()
for item in sorted(
assets.values(),
key=lambda record: record.asset_id,
)
]
},
sort_keys=False,
),
encoding="utf-8",
)
def _asset_map(self) -> dict[str, AssetRecord]:
if not self.path.is_file():
return {}
data = yaml.safe_load(self.path.read_text(encoding="utf-8")) or {}
if not isinstance(data, dict):
return {}
return {
item.asset_id: item
for item in [
AssetRecord.from_dict(raw)
for raw in data.get("assets", [])
if isinstance(raw, dict)
]
}
def engine_capability_contract() -> EngineCapabilityContract:
return EngineCapabilityContract(
integration_posture="optional",
file_backed=[
"artifact_manifest",
"infospace_config",
"markdown_artifacts",
"evaluation_history",
"workflow_run_records",
"dry_run_sync_plans",
],
engine_backed=[
"asset_identity",
"asset_persistence",
"provenance_graph",
"relationship_index",
"retrieval",
"workflow_runs",
"permissions",
"audit",
],
explicit_non_goals=[
"markdown_syntax_rules",
"infospace_domain_methodology",
"repository_identity",
"provider_specific_llm_calls",
],
review_required=[
"apply_sync",
"engine_client_configuration",
"permission_changes",
],
assumptions={
"source": "kontextual-engine/INTENT.md",
"engine_role": "durable knowledge operations engine",
"bench_role": "application workspace with file-backed default behavior",
},
)
def plan_asset_sync(
root: str | Path,
repository: AssetRepository | None = None,
) -> AssetSyncPlan:
infospace = load_infospace(root)
repo = repository or LocalAssetRepository(infospace.root / ASSET_STORE)
actions = [
_sync_action(record, repo.get_asset(record.asset_id))
for record in _asset_records(infospace)
]
return AssetSyncPlan(
infospace_slug=infospace.config.slug,
repository=repo.repository_id,
dry_run=True,
status="planned",
actions=actions,
capabilities=engine_capability_contract(),
)
def sync_assets(
root: str | Path,
*,
repository: AssetRepository | None = None,
dry_run: bool = True,
) -> AssetSyncPlan:
infospace = load_infospace(root)
repo = repository or LocalAssetRepository(infospace.root / ASSET_STORE)
plan = plan_asset_sync(infospace.root, repo)
if dry_run:
return plan
records = {record.asset_id: record for record in _asset_records(infospace)}
for action in plan.actions:
if action.action in {"create", "update"}:
repo.upsert_asset(records[action.asset_id])
applied = AssetSyncPlan(
infospace_slug=plan.infospace_slug,
repository=plan.repository,
dry_run=False,
status="completed",
actions=plan.actions,
capabilities=plan.capabilities,
)
run_record_path = _write_sync_run(infospace.root, applied)
return AssetSyncPlan(
infospace_slug=applied.infospace_slug,
repository=applied.repository,
dry_run=applied.dry_run,
status=applied.status,
actions=applied.actions,
capabilities=applied.capabilities,
run_record_path=run_record_path,
)
def asset_id_for(infospace_slug: str, artifact_id: str) -> str:
return f"infospace:{infospace_slug}:artifact:{artifact_id}"
def _asset_records(infospace: Infospace) -> list[AssetRecord]:
return [
_asset_record_for(infospace, artifact)
for artifact in infospace.artifacts
]
def _asset_record_for(
infospace: Infospace,
artifact: KnowledgeArtifact,
) -> AssetRecord:
artifact_path = infospace.root / artifact.path
if not artifact_path.is_file():
raise InfospaceError(
"missing_artifact_file",
f"Manifest artifact file does not exist: {artifact.path}",
{"artifact_id": artifact.id, "path": artifact.path},
)
data = artifact_path.read_bytes()
digest = hashlib.sha256(data).hexdigest()
workflow_id = str(artifact.provenance.get("workflow_id") or "")
return AssetRecord(
asset_id=asset_id_for(infospace.config.slug, artifact.id),
artifact_id=artifact.id,
infospace_slug=infospace.config.slug,
kind=artifact.kind,
title=artifact.title,
path=artifact.path,
digest=digest,
size_bytes=len(data),
version=digest[:12],
provenance={
**artifact.provenance,
"artifact_id": artifact.id,
"artifact_path": artifact.path,
"infospace_slug": infospace.config.slug,
},
relationships=artifact.relationships,
metadata={
"workflow_id": workflow_id,
"source_path": artifact.provenance.get("source_path", ""),
},
)
def _sync_action(
desired: AssetRecord,
existing: AssetRecord | None,
) -> AssetSyncAction:
workflow_id = str(desired.provenance.get("workflow_id") or "")
if existing is None:
return AssetSyncAction(
action="create",
asset_id=desired.asset_id,
artifact_id=desired.artifact_id,
path=desired.path,
digest=desired.digest,
workflow_id=workflow_id,
reason="asset_not_present",
)
if existing.digest != desired.digest:
return AssetSyncAction(
action="update",
asset_id=desired.asset_id,
artifact_id=desired.artifact_id,
path=desired.path,
digest=desired.digest,
previous_digest=existing.digest,
workflow_id=workflow_id,
reason="digest_changed",
)
return AssetSyncAction(
action="unchanged",
asset_id=desired.asset_id,
artifact_id=desired.artifact_id,
path=desired.path,
digest=desired.digest,
previous_digest=existing.digest,
workflow_id=workflow_id,
reason="digest_matches",
)
def _write_sync_run(root: Path, plan: AssetSyncPlan) -> str:
run_id = uuid.uuid4().hex[:12]
path = root / SYNC_RUN_DIR / f"{run_id}.yaml"
payload = plan.to_dict()
payload["recorded_at"] = datetime.now(timezone.utc).isoformat()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")
return str(path)

View File

@@ -21,6 +21,7 @@ LAYOUT_DIRS = (
"workflows/templates",
"output/evaluations",
"output/metrics",
"output/engine/sync-runs",
"output/workflows/runs",
"reports",
"exports",