Implement first knowledge engine runtime slice

This commit is contained in:
2026-05-05 01:47:19 +02:00
parent 902ba7352d
commit cca9ebe172
17 changed files with 1445 additions and 26 deletions

View File

@@ -2,23 +2,12 @@
# Custodian Brief — kontextual-engine
**Domain:** markitect
**Last synced:** 2026-05-04 23:16 UTC
**Last synced:** 2026-05-04 23:44 UTC
**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*
## Active Workstreams
### Headless Knowledge Engine Implementation
Progress: 0/8 done | workstream_id: `0fd08391-e8c9-4f1b-ace4-06439f958e88`
**Open tasks:**
- · I3.1 - Scaffold package and test harness `3d3400a5`
- · I3.2 - Implement artifact and collection model `638ce4d0`
- · I3.3 - Implement storage abstraction `41c84f25`
- · I3.4 - Implement ingestion and normalization interfaces `0be5fff9`
- · I3.5 - Implement query and retrieval API `c8841fab`
- · I3.6 - Implement workflow run model `0d62f47c`
- · I3.7 - Implement agent context surface `96689817`
- … and 1 more open tasks
No active workstreams for this repo.
---
## MCP Orientation (when available)

View File

@@ -14,6 +14,7 @@ Start here:
- `docs/markitect-tool-reuse-boundary.md`
- `docs/system-layer-extraction-inventory.md`
- `docs/system-layer-migration-backlog.md`
- `docs/service-api-boundary.md`
- `workplans/`
## Development
@@ -23,3 +24,7 @@ This repo uses Python 3.12+, setuptools, a `src/` package layout, and pytest.
```bash
python3 -m pytest
```
The first runtime slice implements artifacts, collections, relationships,
in-memory storage, ingestion adapters, query, workflow run manifests, and
agent-facing context packages.

View File

@@ -0,0 +1,42 @@
# Service API Boundary
Date: 2026-05-05
## Decision
The first service boundary will be an optional FastAPI layer over the
programmatic contracts implemented in `kontextual_engine`. The service must not
define separate business behavior; it should expose the same artifact,
collection, relationship, ingestion, query, workflow, and context operations as
HTTP resources.
## First Resource Shape
Planned endpoint groups:
- `GET /health`
- `POST /collections`, `GET /collections`, `GET /collections/{id}`
- `POST /artifacts`, `GET /artifacts/{id}`, `GET /artifacts`
- `POST /relationships`, `GET /relationships`
- `POST /ingest`
- `POST /query/artifacts`, `POST /query/relationships`
- `POST /runs`, `GET /runs/{id}`, `GET /runs/{id}/manifest`
- `POST /context/artifact/{id}`
## Rules
- The Python API is canonical until contracts stabilize.
- HTTP handlers must translate request/response envelopes only.
- Markdown parsing, document transforms, selectors, contracts, and local
indexes must remain adapter calls to `markitect-tool`.
- Provider calls must remain adapter calls to `llm-connect`.
- Structured diagnostics and errors must be preserved in responses.
## Deferred
- Authentication and authorization.
- Durable database backend.
- OpenAPI model polishing.
- Streaming run execution.
- Provider-backed assisted steps.

View File

@@ -1,6 +1,71 @@
"""Headless knowledge runtime package."""
__all__ = ["__version__"]
from .artifacts import (
Artifact,
ArtifactMetadata,
ArtifactReference,
ArtifactType,
Collection,
Relationship,
RelationshipType,
bundle_digest,
content_digest,
)
from .context import ContextAssembler, ContextItem, ContextPackage
from .errors import (
AdapterUnavailableError,
Diagnostic,
DuplicateResourceError,
KontextualError,
NotFoundError,
ValidationError,
)
from .ingestion import IngestionRequest, IngestionResult, IngestionService
from .query import QueryEngine, QueryResult
from .relationships import RelationshipGraph
from .storage import InMemoryKnowledgeRepository
from .workflows import (
InputBundle,
OperationRun,
OperationStage,
RunManifest,
RunStatus,
WorkflowStep,
)
__all__ = [
"__version__",
"AdapterUnavailableError",
"Artifact",
"ArtifactMetadata",
"ArtifactReference",
"ArtifactType",
"Collection",
"ContextAssembler",
"ContextItem",
"ContextPackage",
"Diagnostic",
"DuplicateResourceError",
"InMemoryKnowledgeRepository",
"IngestionRequest",
"IngestionResult",
"IngestionService",
"InputBundle",
"KontextualError",
"NotFoundError",
"OperationRun",
"OperationStage",
"QueryEngine",
"QueryResult",
"Relationship",
"RelationshipGraph",
"RelationshipType",
"RunManifest",
"RunStatus",
"ValidationError",
"WorkflowStep",
"bundle_digest",
"content_digest",
]
__version__ = "0.1.0"

View File

@@ -0,0 +1,295 @@
"""Core artifact, collection, and relationship models."""
from __future__ import annotations
import hashlib
import json
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def content_digest(content: str | bytes) -> str:
"""Return a stable SHA-256 digest prefixed with the algorithm name."""
if isinstance(content, str):
data = content.encode("utf-8")
else:
data = content
return "sha256:" + hashlib.sha256(data).hexdigest()
def stable_json_dumps(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"), default=str)
def bundle_digest(value: Any) -> str:
"""Digest arbitrary JSON-like data with deterministic key ordering."""
return content_digest(stable_json_dumps(value))
class ArtifactType(str, Enum):
CONTENT = "content"
TEMPLATE = "template"
GENERATED = "generated"
DATASET = "dataset"
DOCUMENT = "document"
CONFIG = "config"
SCHEMA = "schema"
RUN_OUTPUT = "run_output"
class RelationshipType(str, Enum):
RELATES_TO = "relates_to"
DEPENDS_ON = "depends_on"
PRODUCES = "produces"
DERIVED_FROM = "derived_from"
EVALUATES = "evaluates"
CONTAINS = "contains"
CUSTOM = "custom"
@dataclass
class ArtifactMetadata:
description: str | None = None
tags: list[str] = field(default_factory=list)
media_type: str | None = None
source_uri: str | None = None
version: str | None = None
custom: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any] | None) -> "ArtifactMetadata":
data = data or {}
return cls(
description=data.get("description"),
tags=list(data.get("tags", [])),
media_type=data.get("media_type"),
source_uri=data.get("source_uri"),
version=data.get("version"),
custom=dict(data.get("custom", {})),
)
@dataclass
class Collection:
id: str
name: str
domain: str | None = None
parent_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=utc_now)
updated_at: datetime = field(default_factory=utc_now)
@classmethod
def create(
cls,
name: str,
*,
domain: str | None = None,
parent_id: str | None = None,
collection_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> "Collection":
return cls(
id=collection_id or str(uuid.uuid4()),
name=name,
domain=domain,
parent_id=parent_id,
metadata=metadata or {},
)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"domain": self.domain,
"parent_id": self.parent_id,
"metadata": dict(self.metadata),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Collection":
return cls(
id=data["id"],
name=data["name"],
domain=data.get("domain"),
parent_id=data.get("parent_id"),
metadata=dict(data.get("metadata", {})),
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
)
@dataclass
class Artifact:
id: str
collection_id: str
name: str
artifact_type: ArtifactType
content_digest: str
content_size: int
content: str = ""
metadata: ArtifactMetadata = field(default_factory=ArtifactMetadata)
created_at: datetime = field(default_factory=utc_now)
updated_at: datetime = field(default_factory=utc_now)
@classmethod
def create(
cls,
collection_id: str,
name: str,
content: str,
*,
artifact_type: ArtifactType = ArtifactType.CONTENT,
metadata: ArtifactMetadata | None = None,
artifact_id: str | None = None,
) -> "Artifact":
return cls(
id=artifact_id or str(uuid.uuid4()),
collection_id=collection_id,
name=name,
artifact_type=artifact_type,
content_digest=content_digest(content),
content_size=len(content.encode("utf-8")),
content=content,
metadata=metadata or ArtifactMetadata(),
)
def update_content(self, content: str) -> None:
self.content = content
self.content_digest = content_digest(content)
self.content_size = len(content.encode("utf-8"))
self.updated_at = utc_now()
def has_changed(self, digest: str) -> bool:
return self.content_digest != digest
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"collection_id": self.collection_id,
"name": self.name,
"artifact_type": self.artifact_type.value,
"content_digest": self.content_digest,
"content_size": self.content_size,
"content": self.content,
"metadata": self.metadata.to_dict(),
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Artifact":
return cls(
id=data["id"],
collection_id=data["collection_id"],
name=data["name"],
artifact_type=ArtifactType(data["artifact_type"]),
content_digest=data["content_digest"],
content_size=int(data["content_size"]),
content=data.get("content", ""),
metadata=ArtifactMetadata.from_dict(data.get("metadata")),
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
)
@dataclass(frozen=True)
class ArtifactReference:
name: str
collection_id: str | None = None
version: str | None = None
@classmethod
def parse(cls, value: str) -> "ArtifactReference":
if not value:
raise ValueError("Artifact reference cannot be empty")
ref, version = (value.rsplit("@", 1) + [None])[:2] if "@" in value else (value, None)
if ":" in ref:
collection_id, name = ref.split(":", 1)
else:
collection_id, name = None, ref
if not name:
raise ValueError(f"Invalid artifact reference: {value}")
return cls(name=name, collection_id=collection_id or None, version=version)
def __str__(self) -> str:
ref = f"{self.collection_id}:{self.name}" if self.collection_id else self.name
return f"{ref}@{self.version}" if self.version else ref
@dataclass
class Relationship:
id: str
source_artifact_id: str
target_artifact_id: str
predicate: str
relationship_type: RelationshipType = RelationshipType.RELATES_TO
evidence: str = ""
provenance: dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=utc_now)
@classmethod
def create(
cls,
source_artifact_id: str,
target_artifact_id: str,
predicate: str,
*,
relationship_type: RelationshipType = RelationshipType.RELATES_TO,
evidence: str = "",
provenance: dict[str, Any] | None = None,
relationship_id: str | None = None,
) -> "Relationship":
return cls(
id=relationship_id or str(uuid.uuid4()),
source_artifact_id=source_artifact_id,
target_artifact_id=target_artifact_id,
predicate=predicate,
relationship_type=relationship_type,
evidence=evidence,
provenance=provenance or {},
)
def edge(self) -> tuple[str, str, str]:
return (self.source_artifact_id, self.target_artifact_id, self.predicate)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"source_artifact_id": self.source_artifact_id,
"target_artifact_id": self.target_artifact_id,
"predicate": self.predicate,
"relationship_type": self.relationship_type.value,
"evidence": self.evidence,
"provenance": dict(self.provenance),
"created_at": self.created_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Relationship":
return cls(
id=data["id"],
source_artifact_id=data["source_artifact_id"],
target_artifact_id=data["target_artifact_id"],
predicate=data["predicate"],
relationship_type=RelationshipType(data.get("relationship_type", "relates_to")),
evidence=data.get("evidence", ""),
provenance=dict(data.get("provenance", {})),
created_at=datetime.fromisoformat(data["created_at"]),
)

View File

@@ -0,0 +1,102 @@
"""Agent-facing context assembly."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .artifacts import Artifact
from .query import QueryEngine
from .storage import KnowledgeRepository
@dataclass
class ContextItem:
artifact_id: str
name: str
text: str
summary: str
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"artifact_id": self.artifact_id,
"name": self.name,
"text": self.text,
"summary": self.summary,
"metadata": dict(self.metadata),
}
@dataclass
class ContextPackage:
id: str
title: str
intent: str
items: list[ContextItem]
metadata: dict[str, Any] = field(default_factory=dict)
def render_markdown(self) -> str:
lines = [f"# {self.title}", "", self.intent.strip(), ""]
for item in self.items:
lines.extend([f"## {item.name}", "", item.summary, "", item.text.strip(), ""])
return "\n".join(lines).rstrip() + "\n"
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"title": self.title,
"intent": self.intent,
"items": [item.to_dict() for item in self.items],
"metadata": dict(self.metadata),
}
class ContextAssembler:
"""Build simple, inspectable context bundles from repository state."""
def __init__(self, repository: KnowledgeRepository) -> None:
self.repository = repository
self.query = QueryEngine(repository)
def for_artifact(
self,
artifact_id: str,
*,
title: str | None = None,
intent: str = "Inspect artifact and directly related knowledge.",
include_related: bool = True,
) -> ContextPackage:
root = self.repository.get_artifact(artifact_id)
artifacts = [root]
if include_related:
related = self.query.related_artifacts(artifact_id)
artifacts.extend(
self.repository.get_artifact(row["id"])
for row in related.results
if row["id"] != artifact_id
)
return ContextPackage(
id=f"context:{artifact_id}",
title=title or f"Context for {root.name}",
intent=intent,
items=[_context_item(a) for a in artifacts],
metadata={"root_artifact_id": artifact_id, "include_related": include_related},
)
def _context_item(artifact: Artifact) -> ContextItem:
text = artifact.content
summary = text.strip().splitlines()[0] if text.strip() else "(empty artifact)"
return ContextItem(
artifact_id=artifact.id,
name=artifact.name,
text=text,
summary=summary[:240],
metadata={
"collection_id": artifact.collection_id,
"artifact_type": artifact.artifact_type.value,
"content_digest": artifact.content_digest,
},
)

View File

@@ -0,0 +1,59 @@
"""Shared diagnostics and structured errors."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class Diagnostic:
"""A structured finding emitted by engine operations."""
severity: str
code: str
message: str
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"severity": self.severity,
"code": self.code,
"message": self.message,
"details": dict(self.details),
}
class KontextualError(Exception):
"""Base class for explicit engine failures."""
code = "kontextual.error"
def __init__(self, message: str, *, details: dict[str, Any] | None = None) -> None:
super().__init__(message)
self.details = details or {}
def diagnostic(self, *, severity: str = "error") -> Diagnostic:
return Diagnostic(
severity=severity,
code=self.code,
message=str(self),
details=dict(self.details),
)
class NotFoundError(KontextualError):
code = "kontextual.not_found"
class DuplicateResourceError(KontextualError):
code = "kontextual.duplicate"
class ValidationError(KontextualError):
code = "kontextual.validation"
class AdapterUnavailableError(KontextualError):
code = "kontextual.adapter_unavailable"

View File

@@ -0,0 +1,125 @@
"""Format-agnostic ingestion contracts and first adapters."""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Protocol
from .artifacts import Artifact, ArtifactMetadata, ArtifactType
from .errors import AdapterUnavailableError
@dataclass
class IngestionRequest:
collection_id: str
name: str
content: str | bytes | None = None
path: str | Path | None = None
media_type: str = "text/plain"
artifact_type: ArtifactType = ArtifactType.CONTENT
metadata: dict[str, Any] = field(default_factory=dict)
def read_text(self) -> str:
if self.content is not None:
if isinstance(self.content, bytes):
return self.content.decode("utf-8")
return self.content
if self.path is None:
return ""
return Path(self.path).read_text(encoding="utf-8")
@dataclass
class IngestionResult:
artifacts: list[Artifact]
normalized: dict[str, Any] = field(default_factory=dict)
adapter: str = ""
class IngestionAdapter(Protocol):
media_types: tuple[str, ...]
def ingest(self, request: IngestionRequest) -> IngestionResult: ...
class PlainTextIngestionAdapter:
media_types = ("text/plain",)
def ingest(self, request: IngestionRequest) -> IngestionResult:
text = request.read_text()
metadata = ArtifactMetadata(
media_type=request.media_type,
source_uri=str(request.path) if request.path else None,
custom=dict(request.metadata),
)
artifact = Artifact.create(
request.collection_id,
request.name,
text,
artifact_type=request.artifact_type,
metadata=metadata,
)
return IngestionResult(
artifacts=[artifact],
normalized={"text": text, "media_type": request.media_type},
adapter="plain-text",
)
class MarkdownIngestionAdapter:
"""Adapter boundary to markitect-tool; no markdown parsing lives here."""
media_types = ("text/markdown", "text/x-markdown")
def ingest(self, request: IngestionRequest) -> IngestionResult:
try:
from markitect_tool.core.parser import parse_markdown
except Exception as exc: # pragma: no cover - exercised when optional dep absent
raise AdapterUnavailableError(
"markitect-tool is required for markdown ingestion",
details={"adapter": "markitect-tool", "media_type": request.media_type},
) from exc
text = request.read_text()
document = parse_markdown(text, source_path=str(request.path) if request.path else None)
metadata = ArtifactMetadata(
media_type=request.media_type,
source_uri=str(request.path) if request.path else None,
custom={
**request.metadata,
"frontmatter": dict(document.frontmatter),
"headings": [heading.__dict__ for heading in document.headings],
},
)
artifact = Artifact.create(
request.collection_id,
request.name,
text,
artifact_type=request.artifact_type,
metadata=metadata,
)
return IngestionResult(
artifacts=[artifact],
normalized={
"frontmatter": dict(document.frontmatter),
"headings": [heading.__dict__ for heading in document.headings],
"sections": len(document.sections),
},
adapter="markitect-tool",
)
class IngestionService:
def __init__(self, adapters: list[IngestionAdapter] | None = None) -> None:
self.adapters = adapters or [PlainTextIngestionAdapter(), MarkdownIngestionAdapter()]
def ingest(self, request: IngestionRequest) -> IngestionResult:
for adapter in self.adapters:
if request.media_type in adapter.media_types:
return adapter.ingest(request)
raise AdapterUnavailableError(
"No ingestion adapter registered for media type",
details={"media_type": request.media_type},
)

View File

@@ -0,0 +1,123 @@
"""Programmatic query and retrieval API."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from .artifacts import Artifact, Relationship
from .errors import Diagnostic, KontextualError
from .storage import KnowledgeRepository
@dataclass
class QueryResult:
query: str
results: list[dict[str, Any]]
metadata: dict[str, Any] = field(default_factory=dict)
diagnostics: list[Diagnostic] = field(default_factory=list)
success: bool = True
@property
def result_count(self) -> int:
return len(self.results)
def to_dict(self) -> dict[str, Any]:
return {
"query": self.query,
"result_count": self.result_count,
"results": self.results,
"metadata": dict(self.metadata),
"diagnostics": [d.to_dict() for d in self.diagnostics],
"success": self.success,
}
class QueryEngine:
"""Small deterministic query facade over repository state."""
def __init__(self, repository: KnowledgeRepository) -> None:
self.repository = repository
def artifacts(
self,
*,
collection_id: str | None = None,
name: str | None = None,
digest: str | None = None,
metadata: dict[str, Any] | None = None,
text_contains: str | None = None,
) -> QueryResult:
query_parts = ["artifacts"]
matches = self.repository.list_artifacts(collection_id=collection_id)
if collection_id:
query_parts.append(f"collection_id={collection_id}")
if name is not None:
matches = [a for a in matches if a.name == name]
query_parts.append(f"name={name}")
if digest is not None:
matches = [a for a in matches if a.content_digest == digest]
query_parts.append(f"digest={digest}")
if metadata:
for key, value in metadata.items():
matches = [a for a in matches if _metadata_value(a, key) == value]
query_parts.append(f"metadata.{key}={value}")
if text_contains:
needle = text_contains.lower()
matches = [a for a in matches if needle in a.content.lower()]
query_parts.append(f"text~={text_contains}")
return QueryResult(
query=" ".join(query_parts),
results=[a.to_dict() for a in matches],
metadata={"type": "artifact"},
)
def relationships(self, *, artifact_id: str | None = None) -> QueryResult:
relationships = self.repository.list_relationships(artifact_id=artifact_id)
query = "relationships" + (f" artifact_id={artifact_id}" if artifact_id else "")
return QueryResult(
query=query,
results=[r.to_dict() for r in relationships],
metadata={"type": "relationship"},
)
def related_artifacts(self, artifact_id: str) -> QueryResult:
try:
relationships = self.repository.list_relationships(artifact_id=artifact_id)
related: list[Artifact] = []
seen: set[str] = set()
for relationship in relationships:
other_id = _other_endpoint(relationship, artifact_id)
if other_id and other_id not in seen:
related.append(self.repository.get_artifact(other_id))
seen.add(other_id)
return QueryResult(
query=f"related artifact_id={artifact_id}",
results=[a.to_dict() for a in related],
metadata={"type": "related_artifacts", "relationship_count": len(relationships)},
)
except KontextualError as exc:
return QueryResult(
query=f"related artifact_id={artifact_id}",
results=[],
diagnostics=[exc.diagnostic()],
success=False,
)
def _metadata_value(artifact: Artifact, path: str) -> Any:
data: Any = artifact.metadata.to_dict()
for part in path.split("."):
if not isinstance(data, dict):
return None
data = data.get(part)
return data
def _other_endpoint(relationship: Relationship, artifact_id: str) -> str | None:
if relationship.source_artifact_id == artifact_id:
return relationship.target_artifact_id
if relationship.target_artifact_id == artifact_id:
return relationship.source_artifact_id
return None

View File

@@ -0,0 +1,83 @@
"""Relationship graph helpers."""
from __future__ import annotations
from dataclasses import dataclass, field
from .artifacts import Relationship
from .errors import ValidationError
@dataclass
class RelationshipGraph:
"""In-memory directed graph over artifact relationships."""
edges: list[Relationship] = field(default_factory=list)
@property
def nodes(self) -> set[str]:
nodes: set[str] = set()
for edge in self.edges:
nodes.add(edge.source_artifact_id)
nodes.add(edge.target_artifact_id)
return nodes
def add(self, relationship: Relationship) -> None:
self.edges.append(relationship)
def successors(self, artifact_id: str) -> set[str]:
return {edge.target_artifact_id for edge in self.edges if edge.source_artifact_id == artifact_id}
def predecessors(self, artifact_id: str) -> set[str]:
return {edge.source_artifact_id for edge in self.edges if edge.target_artifact_id == artifact_id}
def detect_cycles(self) -> list[list[str]]:
color = {node: "white" for node in self.nodes}
parent: dict[str, str | None] = {node: None for node in self.nodes}
cycles: list[list[str]] = []
def visit(node: str) -> None:
color[node] = "gray"
for neighbor in self.successors(node):
if color.get(neighbor) == "gray":
cycle = [neighbor]
current: str | None = node
while current and current != neighbor:
cycle.append(current)
current = parent[current]
cycle.append(neighbor)
cycle.reverse()
cycles.append(cycle)
elif color.get(neighbor, "white") == "white":
parent[neighbor] = node
visit(neighbor)
color[node] = "black"
for node in sorted(self.nodes):
if color[node] == "white":
visit(node)
return cycles
def topological_sort(self) -> list[str]:
cycles = self.detect_cycles()
if cycles:
raise ValidationError("Relationship graph contains cycles", details={"cycles": cycles})
visited: set[str] = set()
result: list[str] = []
def visit(node: str) -> None:
if node in visited:
return
visited.add(node)
for successor in sorted(self.successors(node)):
visit(successor)
result.append(node)
for node in sorted(self.nodes):
visit(node)
return result
@classmethod
def from_relationships(cls, relationships: list[Relationship]) -> "RelationshipGraph":
return cls(edges=list(relationships))

View File

@@ -0,0 +1,100 @@
"""Storage contracts and an in-memory repository backend."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable, Protocol
from .artifacts import Artifact, Collection, Relationship
from .errors import DuplicateResourceError, NotFoundError
class KnowledgeRepository(Protocol):
"""Repository contract for durable knowledge state."""
def save_collection(self, collection: Collection) -> Collection: ...
def get_collection(self, collection_id: str) -> Collection: ...
def list_collections(self) -> list[Collection]: ...
def save_artifact(self, artifact: Artifact) -> Artifact: ...
def get_artifact(self, artifact_id: str) -> Artifact: ...
def get_artifact_by_name(self, collection_id: str, name: str) -> Artifact: ...
def list_artifacts(self, *, collection_id: str | None = None) -> list[Artifact]: ...
def save_relationship(self, relationship: Relationship) -> Relationship: ...
def list_relationships(self, *, artifact_id: str | None = None) -> list[Relationship]: ...
@dataclass
class InMemoryKnowledgeRepository:
"""Deterministic repository for tests and first-contract development."""
collections: dict[str, Collection] = field(default_factory=dict)
artifacts: dict[str, Artifact] = field(default_factory=dict)
relationships: dict[str, Relationship] = field(default_factory=dict)
def save_collection(self, collection: Collection) -> Collection:
self.collections[collection.id] = collection
return collection
def get_collection(self, collection_id: str) -> Collection:
try:
return self.collections[collection_id]
except KeyError as exc:
raise NotFoundError("Collection not found", details={"collection_id": collection_id}) from exc
def list_collections(self) -> list[Collection]:
return sorted(self.collections.values(), key=lambda c: (c.name, c.id))
def save_artifact(self, artifact: Artifact) -> Artifact:
self.get_collection(artifact.collection_id)
for existing in self.artifacts.values():
if (
existing.id != artifact.id
and existing.collection_id == artifact.collection_id
and existing.name == artifact.name
):
raise DuplicateResourceError(
"Artifact name already exists in collection",
details={"collection_id": artifact.collection_id, "name": artifact.name},
)
self.artifacts[artifact.id] = artifact
return artifact
def get_artifact(self, artifact_id: str) -> Artifact:
try:
return self.artifacts[artifact_id]
except KeyError as exc:
raise NotFoundError("Artifact not found", details={"artifact_id": artifact_id}) from exc
def get_artifact_by_name(self, collection_id: str, name: str) -> Artifact:
for artifact in self.artifacts.values():
if artifact.collection_id == collection_id and artifact.name == name:
return artifact
raise NotFoundError(
"Artifact not found",
details={"collection_id": collection_id, "name": name},
)
def list_artifacts(self, *, collection_id: str | None = None) -> list[Artifact]:
artifacts: Iterable[Artifact] = self.artifacts.values()
if collection_id is not None:
artifacts = [a for a in artifacts if a.collection_id == collection_id]
return sorted(artifacts, key=lambda a: (a.collection_id, a.name, a.id))
def save_relationship(self, relationship: Relationship) -> Relationship:
self.get_artifact(relationship.source_artifact_id)
self.get_artifact(relationship.target_artifact_id)
self.relationships[relationship.id] = relationship
return relationship
def list_relationships(self, *, artifact_id: str | None = None) -> list[Relationship]:
relationships: Iterable[Relationship] = self.relationships.values()
if artifact_id is not None:
relationships = [
r for r in relationships
if r.source_artifact_id == artifact_id or r.target_artifact_id == artifact_id
]
return sorted(
relationships,
key=lambda r: (r.source_artifact_id, r.target_artifact_id, r.predicate, r.id),
)

View File

@@ -0,0 +1,172 @@
"""Workflow run and manifest models."""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
from .artifacts import Relationship, bundle_digest, utc_now
from .errors import Diagnostic
class OperationStage(str, Enum):
PENDING = "pending"
PREPARING = "preparing"
RUNNING = "running"
VALIDATING = "validating"
COMPLETE = "complete"
FAILED = "failed"
class RunStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class InputBundle:
operation: str
inputs: dict[str, str]
adapter: str | None = None
options: dict[str, Any] = field(default_factory=dict)
def calculate_hash(self) -> str:
return bundle_digest({
"operation": self.operation,
"inputs": self.inputs,
"adapter": self.adapter,
"options": self.options,
})
def to_dict(self) -> dict[str, Any]:
return {
"operation": self.operation,
"inputs": dict(self.inputs),
"adapter": self.adapter,
"options": dict(self.options),
"input_bundle_hash": self.calculate_hash(),
}
@dataclass
class WorkflowStep:
id: str
kind: str
depends_on: list[str] = field(default_factory=list)
adapter: str | None = None
inputs: dict[str, Any] = field(default_factory=dict)
outputs: dict[str, Any] = field(default_factory=dict)
status: RunStatus = RunStatus.PENDING
diagnostics: list[Diagnostic] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"kind": self.kind,
"depends_on": list(self.depends_on),
"adapter": self.adapter,
"inputs": dict(self.inputs),
"outputs": dict(self.outputs),
"status": self.status.value,
"diagnostics": [d.to_dict() for d in self.diagnostics],
}
@dataclass
class OperationRun:
id: str
operation: str
input_bundle_hash: str
status: RunStatus = RunStatus.PENDING
stage: OperationStage = OperationStage.PENDING
parent_run_id: str | None = None
depth: int = 0
started_at: datetime = field(default_factory=utc_now)
completed_at: datetime | None = None
diagnostics: list[Diagnostic] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
operation: str,
input_bundle_hash: str,
*,
parent_run_id: str | None = None,
depth: int = 0,
run_id: str | None = None,
) -> "OperationRun":
return cls(
id=run_id or str(uuid.uuid4()),
operation=operation,
input_bundle_hash=input_bundle_hash,
parent_run_id=parent_run_id,
depth=depth,
)
def advance(self, stage: OperationStage) -> None:
self.stage = stage
if stage in (OperationStage.PREPARING, OperationStage.RUNNING, OperationStage.VALIDATING):
self.status = RunStatus.RUNNING
def mark_complete(self) -> None:
self.stage = OperationStage.COMPLETE
self.status = RunStatus.SUCCESS
self.completed_at = utc_now()
def mark_failed(self, diagnostic: Diagnostic) -> None:
self.stage = OperationStage.FAILED
self.status = RunStatus.FAILED
self.completed_at = utc_now()
self.diagnostics.append(diagnostic)
def mark_skipped(self, reason: str) -> None:
self.status = RunStatus.SKIPPED
self.completed_at = utc_now()
self.metadata["skip_reason"] = reason
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"operation": self.operation,
"input_bundle_hash": self.input_bundle_hash,
"status": self.status.value,
"stage": self.stage.value,
"parent_run_id": self.parent_run_id,
"depth": self.depth,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"diagnostics": [d.to_dict() for d in self.diagnostics],
"metadata": dict(self.metadata),
}
@dataclass
class RunManifest:
run: OperationRun
input_bundle: InputBundle
steps: list[WorkflowStep] = field(default_factory=list)
produced_artifact_ids: list[str] = field(default_factory=list)
dependency_edges: list[Relationship] = field(default_factory=list)
validation_results: list[dict[str, Any]] = field(default_factory=list)
impact_debt: list[dict[str, Any]] = field(default_factory=list)
timing: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"run": self.run.to_dict(),
"input_bundle": self.input_bundle.to_dict(),
"steps": [step.to_dict() for step in self.steps],
"produced_artifact_ids": list(self.produced_artifact_ids),
"dependency_edges": [edge.to_dict() for edge in self.dependency_edges],
"validation_results": list(self.validation_results),
"impact_debt": list(self.impact_debt),
"timing": dict(self.timing),
}

75
tests/test_artifacts.py Normal file
View File

@@ -0,0 +1,75 @@
from kontextual_engine import (
Artifact,
ArtifactMetadata,
ArtifactReference,
ArtifactType,
Collection,
Relationship,
RelationshipGraph,
RelationshipType,
content_digest,
)
def test_artifact_create_updates_digest_and_roundtrips() -> None:
artifact = Artifact.create(
"collection-1",
"brief",
"first version",
artifact_type=ArtifactType.DOCUMENT,
metadata=ArtifactMetadata(tags=["example"], media_type="text/plain"),
)
assert artifact.content_digest == content_digest("first version")
assert artifact.content_size == len("first version".encode("utf-8"))
artifact.update_content("second version")
assert artifact.content_digest == content_digest("second version")
restored = Artifact.from_dict(artifact.to_dict())
assert restored.id == artifact.id
assert restored.metadata.tags == ["example"]
assert restored.artifact_type == ArtifactType.DOCUMENT
def test_artifact_reference_parse_formats() -> None:
assert ArtifactReference.parse("name") == ArtifactReference(name="name")
assert ArtifactReference.parse("space:name") == ArtifactReference(
name="name",
collection_id="space",
)
assert str(ArtifactReference.parse("space:name@v1")) == "space:name@v1"
def test_collection_and_relationship_roundtrip() -> None:
collection = Collection.create("project", domain="markitect")
restored_collection = Collection.from_dict(collection.to_dict())
assert restored_collection.name == "project"
assert restored_collection.domain == "markitect"
relationship = Relationship.create(
"a",
"b",
"depends on",
relationship_type=RelationshipType.DEPENDS_ON,
evidence="test evidence",
)
assert relationship.edge() == ("a", "b", "depends on")
restored_relationship = Relationship.from_dict(relationship.to_dict())
assert restored_relationship.relationship_type == RelationshipType.DEPENDS_ON
assert restored_relationship.evidence == "test evidence"
def test_relationship_graph_detects_cycles_and_orders_dependencies() -> None:
edge_ab = Relationship.create("A", "B", "depends on")
edge_bc = Relationship.create("B", "C", "depends on")
graph = RelationshipGraph.from_relationships([edge_ab, edge_bc])
assert graph.successors("A") == {"B"}
assert graph.predecessors("C") == {"B"}
assert graph.detect_cycles() == []
assert graph.topological_sort().index("C") < graph.topological_sort().index("A")
graph.add(Relationship.create("C", "A", "depends on"))
assert graph.detect_cycles()

View File

@@ -0,0 +1,100 @@
from kontextual_engine import (
ArtifactType,
Collection,
ContextAssembler,
Diagnostic,
InMemoryKnowledgeRepository,
IngestionRequest,
IngestionService,
InputBundle,
OperationRun,
OperationStage,
Relationship,
RunManifest,
RunStatus,
WorkflowStep,
)
def test_plain_text_ingestion_creates_normalized_artifact() -> None:
service = IngestionService()
result = service.ingest(
IngestionRequest(
collection_id="collection-1",
name="note",
content="hello",
media_type="text/plain",
artifact_type=ArtifactType.CONTENT,
)
)
assert result.adapter == "plain-text"
assert result.artifacts[0].name == "note"
assert result.artifacts[0].metadata.media_type == "text/plain"
assert result.normalized["text"] == "hello"
def test_input_bundle_hash_is_deterministic() -> None:
first = InputBundle(
operation="compose",
inputs={"b": "sha256:2", "a": "sha256:1"},
options={"heading_delta": 1},
)
second = InputBundle(
operation="compose",
inputs={"a": "sha256:1", "b": "sha256:2"},
options={"heading_delta": 1},
)
assert first.calculate_hash() == second.calculate_hash()
def test_operation_run_lifecycle_and_manifest() -> None:
bundle = InputBundle(operation="ingest", inputs={"source": "sha256:abc"})
run = OperationRun.create("ingest", bundle.calculate_hash(), depth=1)
run.advance(OperationStage.RUNNING)
assert run.status == RunStatus.RUNNING
run.mark_complete()
manifest = RunManifest(
run=run,
input_bundle=bundle,
steps=[WorkflowStep(id="step-1", kind="ingest", status=RunStatus.SUCCESS)],
produced_artifact_ids=["artifact-1"],
)
data = manifest.to_dict()
assert data["run"]["status"] == "success"
assert data["input_bundle"]["input_bundle_hash"] == bundle.calculate_hash()
assert data["steps"][0]["kind"] == "ingest"
def test_operation_run_failure_records_diagnostic() -> None:
bundle = InputBundle(operation="query", inputs={})
run = OperationRun.create("query", bundle.calculate_hash())
run.mark_failed(Diagnostic(severity="error", code="test", message="failed"))
assert run.status == RunStatus.FAILED
assert run.to_dict()["diagnostics"][0]["code"] == "test"
def test_context_assembler_includes_related_artifacts() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("runtime"))
root = repo.save_artifact(repo_artifact(collection.id, "root", "Root summary\nDetails"))
related = repo.save_artifact(repo_artifact(collection.id, "related", "Related summary"))
repo.save_relationship(Relationship.create(root.id, related.id, "relates to"))
package = ContextAssembler(repo).for_artifact(root.id)
rendered = package.render_markdown()
assert len(package.items) == 2
assert "Root summary" in rendered
assert "Related summary" in rendered
def repo_artifact(collection_id: str, name: str, content: str):
from kontextual_engine import Artifact
return Artifact.create(collection_id, name, content)

View File

@@ -1,6 +1,12 @@
from kontextual_engine import __version__
from kontextual_engine import Artifact, Collection, InMemoryKnowledgeRepository, __version__
def test_package_exports_version() -> None:
assert __version__ == "0.1.0"
def test_package_exports_core_runtime_contracts() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("export-check"))
artifact = repo.save_artifact(Artifact.create(collection.id, "note", "content"))
assert repo.get_artifact(artifact.id).name == "note"

View File

@@ -0,0 +1,61 @@
from kontextual_engine import (
Artifact,
ArtifactMetadata,
Collection,
DuplicateResourceError,
InMemoryKnowledgeRepository,
QueryEngine,
Relationship,
)
def test_in_memory_repository_stores_collections_artifacts_and_relationships() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("runtime"))
source = repo.save_artifact(Artifact.create(collection.id, "source", "source text"))
derived = repo.save_artifact(Artifact.create(collection.id, "derived", "derived text"))
relationship = repo.save_relationship(Relationship.create(derived.id, source.id, "derived from"))
assert repo.get_collection(collection.id).name == "runtime"
assert repo.get_artifact_by_name(collection.id, "source").id == source.id
assert repo.list_relationships(artifact_id=source.id) == [relationship]
def test_repository_rejects_duplicate_artifact_names_in_collection() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("runtime"))
repo.save_artifact(Artifact.create(collection.id, "same", "one"))
try:
repo.save_artifact(Artifact.create(collection.id, "same", "two"))
except DuplicateResourceError as exc:
assert exc.details["name"] == "same"
else:
raise AssertionError("Expected duplicate artifact name to fail")
def test_query_engine_filters_artifacts_and_related_artifacts() -> None:
repo = InMemoryKnowledgeRepository()
collection = repo.save_collection(Collection.create("runtime"))
source = repo.save_artifact(
Artifact.create(
collection.id,
"source",
"alpha content",
metadata=ArtifactMetadata(tags=["seed"], custom={"kind": "source"}),
)
)
derived = repo.save_artifact(Artifact.create(collection.id, "derived", "beta content"))
repo.save_relationship(Relationship.create(derived.id, source.id, "derived from"))
query = QueryEngine(repo)
by_text = query.artifacts(text_contains="alpha")
by_metadata = query.artifacts(metadata={"custom.kind": "source"})
related = query.related_artifacts(source.id)
assert by_text.result_count == 1
assert by_text.results[0]["id"] == source.id
assert by_metadata.result_count == 1
assert related.result_count == 1
assert related.results[0]["id"] == derived.id

View File

@@ -4,11 +4,11 @@ type: workplan
title: "Headless Knowledge Engine Implementation"
domain: markitect
repo: kontextual-engine
status: active
status: done
owner: codex
topic_slug: markitect
created: "2026-05-03"
updated: "2026-05-03"
updated: "2026-05-05"
state_hub_workstream_id: "0fd08391-e8c9-4f1b-ace4-06439f958e88"
---
@@ -24,7 +24,7 @@ retrieval, workflows, and agent-operable context.
```task
id: KONT-WP-0003-T001
status: todo
status: done
priority: high
state_hub_task_id: "3d3400a5-63ee-4c64-8bfb-fd3caa1ce787"
```
@@ -32,11 +32,13 @@ state_hub_task_id: "3d3400a5-63ee-4c64-8bfb-fd3caa1ce787"
Create the initial Python package, dependency metadata, test harness, and CI
commands after the stack decision in `KONT-WP-0001`.
Output: `pyproject.toml`, `src/kontextual_engine/`, and `tests/`.
## I3.2 - Implement artifact and collection model
```task
id: KONT-WP-0003-T002
status: todo
status: done
priority: high
state_hub_task_id: "638ce4d0-b96b-4d8f-8bd9-4c77acad0a59"
```
@@ -44,11 +46,14 @@ state_hub_task_id: "638ce4d0-b96b-4d8f-8bd9-4c77acad0a59"
Implement core models for artifacts, metadata, collections, domains, and
relationships. Cover FR-001 through FR-011 with focused unit tests.
Output: `src/kontextual_engine/artifacts.py`,
`src/kontextual_engine/relationships.py`, and `tests/test_artifacts.py`.
## I3.3 - Implement storage abstraction
```task
id: KONT-WP-0003-T003
status: todo
status: done
priority: high
state_hub_task_id: "41c84f25-96b0-40eb-b2b4-834938cf9bb9"
```
@@ -57,11 +62,14 @@ Define repository interfaces and the first backend. Start with the simplest
backend that can support deterministic tests, then document the path to SQLite
or service-backed persistence.
Output: `src/kontextual_engine/storage.py` and
`tests/test_storage_query.py`.
## I3.4 - Implement ingestion and normalization interfaces
```task
id: KONT-WP-0003-T004
status: todo
status: done
priority: high
state_hub_task_id: "0be5fff9-f4a2-4aee-86b9-b84feef477f7"
```
@@ -70,11 +78,13 @@ Define format-agnostic ingestion contracts. Add a markdown adapter boundary
that can later call `markitect-tool`, without embedding markdown primitives in
this repo.
Output: `src/kontextual_engine/ingestion.py`.
## I3.5 - Implement query and retrieval API
```task
id: KONT-WP-0003-T005
status: todo
status: done
priority: high
state_hub_task_id: "c8841fab-6dd1-47e1-a794-798bef1663e7"
```
@@ -82,11 +92,13 @@ state_hub_task_id: "c8841fab-6dd1-47e1-a794-798bef1663e7"
Provide programmatic query over identifiers, metadata, content references, and
relationships. Cover FR-030 and FR-031 with tests.
Output: `src/kontextual_engine/query.py`.
## I3.6 - Implement workflow run model
```task
id: KONT-WP-0003-T006
status: todo
status: done
priority: high
state_hub_task_id: "0d62f47c-339a-4d86-9ab0-737bd4ab553c"
```
@@ -94,11 +106,13 @@ state_hub_task_id: "0d62f47c-339a-4d86-9ab0-737bd4ab553c"
Represent workflow runs, steps, dependencies, inputs, outputs, structured
errors, and derived artifacts. Cover FR-040 through FR-052.
Output: `src/kontextual_engine/workflows.py`.
## I3.7 - Implement agent context surface
```task
id: KONT-WP-0003-T007
status: todo
status: done
priority: medium
state_hub_task_id: "96689817-e8e9-42e4-9af3-f03021e3ae4e"
```
@@ -106,11 +120,13 @@ state_hub_task_id: "96689817-e8e9-42e4-9af3-f03021e3ae4e"
Provide a provider-neutral context assembly API so agents can inspect
knowledge state, trigger operations, and receive traceable results.
Output: `src/kontextual_engine/context.py`.
## I3.8 - Define service API boundary
```task
id: KONT-WP-0003-T008
status: todo
status: done
priority: medium
state_hub_task_id: "0f4748f9-934a-438c-ac2d-e9796ad98818"
```
@@ -118,3 +134,4 @@ state_hub_task_id: "0f4748f9-934a-438c-ac2d-e9796ad98818"
Decide and document the first HTTP/RPC service boundary after the programmatic
API is stable enough to avoid framework-driven architecture.
Output: `docs/service-api-boundary.md`.