acceptance matrix and workflow generation

This commit is contained in:
2026-05-14 16:01:32 +02:00
parent 4026f34174
commit 55405d8a5a
11 changed files with 1051 additions and 14 deletions

View File

@@ -21,6 +21,7 @@ Start with:
- `docs/markitect-tool-adapter.md`
- `docs/entity-relation-model.md`
- `docs/evaluation-history-and-metrics.md`
- `docs/workflow-generation-pipeline.md`
- `docs/orthogonal-successor-roadmap.md`
- `docs/legacy-infospace-feature-inventory.md`
- `docs/successor-boundary-interface-map.md`

View File

@@ -22,9 +22,13 @@ infospace.
```text
artifacts/sources/
artifacts/entities/
artifacts/relations/
artifacts/generated/
workflows/templates/
output/evaluations/
output/metrics/
output/workflows/runs/
reports/
exports/
```

View File

@@ -15,17 +15,17 @@ considered a replacement for each in-scope legacy infospace behavior from
| Create an infospace config | File-backed `infospaces/<slug>/infospace.yaml` and manifest | Lifecycle tests, CLI create/inspect docs | `IB-WP-0002` | baseline done |
| Load and inspect an infospace | Structured config/artifact model and JSON CLI | Tests for load, missing config, structured errors | `IB-WP-0002` | baseline done |
| Add/list artifacts | Manifest-backed artifact registration | Tests for add, duplicate rejection, inspect output | `IB-WP-0002` | baseline done |
| Parse entity Markdown | Entity parser built on `markitect-tool` document sections | Entity fixture tests, diagnostics for missing sections | `IB-WP-0006`, `IB-WP-0007` | planned |
| Validate entity/relation shape | Contract/schema validation through `markitect-tool` | Valid/invalid fixture tests, diagnostic mapping | `IB-WP-0006`, `IB-WP-0007` | planned |
| List entities | Entity listing from parsed artifact model | CLI/API example and fixture tests | `IB-WP-0007` | planned |
| Parse and list relations | Relation triplet model and endpoint checks | Relation fixture tests, graph edge tests | `IB-WP-0007` | planned |
| Parse entity Markdown | Entity parser built on `markitect-tool` document sections | Entity fixture tests, diagnostics for missing sections | `IB-WP-0006`, `IB-WP-0007` | done |
| Validate entity/relation shape | Contract/schema validation through `markitect-tool` | Valid/invalid fixture tests, diagnostic mapping | `IB-WP-0006`, `IB-WP-0007` | baseline done |
| List entities | Entity listing from parsed artifact model | CLI/API example and fixture tests | `IB-WP-0007` | done |
| Parse and list relations | Relation triplet model and endpoint checks | Relation fixture tests, graph edge tests | `IB-WP-0007` | done |
| Export semantic graph | Infospace graph model with Mermaid/DOT output | Graph output tests and pilot report | `IB-WP-0007`, `IB-WP-0008` | partial baseline |
| Run collection checks | Methodology-owned metrics in `infospace-bench` | Deterministic metric tests and fixture output | `IB-WP-0008` | partial baseline |
| Check viability | Threshold report from metrics | Viability tests and CLI/report output | `IB-WP-0008` | partial baseline |
| Write evaluation results | Evaluation files with structured metadata | Round-trip tests and pilot fixture | `IB-WP-0008` | planned |
| Maintain metrics history | Snapshot history append/read/diff | History and history-diff tests | `IB-WP-0008` | planned |
| Evaluate with LLM assistance | Provider-neutral assisted evaluation workflow | Dry-run plan, adapter contract, audited output | `IB-WP-0009` | planned |
| Process source chapters | Explicit infospace workflow stages | Deterministic runner tests, generated artifact provenance | `IB-WP-0009` | planned |
| Write evaluation results | Evaluation files with structured metadata | Round-trip tests and pilot fixture | `IB-WP-0008` | done |
| Maintain metrics history | Snapshot history append/read/diff | History and history-diff tests | `IB-WP-0008` | done |
| Evaluate with LLM assistance | Provider-neutral assisted evaluation workflow | Dry-run plan, adapter contract, audited output | `IB-WP-0009` | boundary done |
| Process source chapters | Explicit infospace workflow stages | Deterministic runner tests, generated artifact provenance | `IB-WP-0009` | deterministic baseline done |
| Track stale outputs | Digest/provenance comparison | Tests after workflow provenance exists | `IB-WP-0009` | deferred |
| Persist durable assets | Optional engine-backed repository adapter | Dry-run sync tests and integration design | `IB-WP-0010` | planned |
| Run a legacy-derived pilot | Pruned `infospace-with-history` migration | Pilot corpus, migration report, parity comparison | `IB-WP-0011` | planned |

View File

@@ -0,0 +1,84 @@
# Workflow Generation Pipeline
`infospace-bench` replaces the old `markitect infospace process` concept with
explicit workflow declarations in `infospace.yaml`.
The boundary is intentionally narrow:
- `infospace-bench` owns concrete workflow declarations, source artifact
selection, generated artifact provenance, run records, and CLI behavior.
- `markitect-tool` owns deterministic Markdown rendering and parsing. The
workflow layer calls it only through `infospace_bench.markdown_adapter`.
- Assisted generation is represented as provider-neutral requests. Actual
model/provider calls must arrive through an explicit adapter, not through the
workflow definition itself.
## Declaration Shape
```yaml
workflows:
- id: source-summary
description: Render deterministic summaries for source artifacts.
inputs:
source:
kind: source
static_macros:
discipline: Viable System Model
stages:
- id: render-summary
kind: template
input: source
template: workflows/templates/summary.md
output:
path: artifacts/generated/{{ input.slug }}-summary.md
artifact_id: generated/{{ input.slug }}-summary.md
kind: generated
title: "{{ input.title }} Summary"
expected_evaluations:
- metrics
```
Workflow template files use the `markitect-tool` `{{ variable.path }}` template
syntax. Current deterministic stage data includes:
- `input`: selected artifact metadata and content
- `macros`: workflow-level plus stage-level static macros
- `workflow`: the workflow declaration
- `stage`: the current stage declaration
- `stages`: previous deterministic stage outputs, keyed by stage ID
## Deterministic Runs
`kind: template` stages render Markdown and write generated artifacts. Each
generated artifact is registered in `artifacts/index.yaml` with provenance:
- `workflow_id`
- `stage_id`
- `input_artifact_id`
Runs write trace records under:
```text
output/workflows/runs/<run-id>.yaml
```
These records are file-backed evidence for later `kontextual-engine` integration
without making this repo a workflow engine.
## Assisted Boundary
`kind: assisted` stages render a provider-neutral prompt request during planning.
Running an assisted stage requires an explicit `AssistedGenerationAdapter`.
Without one, the runner raises `assisted_stage_requires_adapter`, which keeps
provider behavior optional, auditable, and outside the application workflow
declaration.
## CLI
```bash
python3 -m infospace_bench workflow inspect infospaces/bootstrap-pilot
python3 -m infospace_bench workflow plan infospaces/bootstrap-pilot source-summary
python3 -m infospace_bench workflow run infospaces/bootstrap-pilot source-summary
```
All commands emit JSON for scripted migration and parity checks.

View File

@@ -17,7 +17,7 @@ from .history import (
record_check_results,
write_metrics_file,
)
from .lifecycle import add_artifact, create_infospace, load_infospace
from .lifecycle import add_artifact, create_infospace, load_infospace, register_artifact
from .models import (
DisciplineBinding,
Infospace,
@@ -27,6 +27,7 @@ from .models import (
ViabilityThreshold,
)
from .semantics import EntityRecord, RelationRecord, list_entities, list_relations
from .workflow import load_workflows, plan_workflow, run_workflow
__all__ = [
"DisciplineBinding",
@@ -57,6 +58,10 @@ __all__ = [
"read_metrics_file",
"read_snapshot",
"record_check_results",
"register_artifact",
"load_workflows",
"plan_workflow",
"run_workflow",
"write_entity_evaluation",
"write_metrics_file",
"write_snapshot",

View File

@@ -11,6 +11,7 @@ from .history import find_snapshot, get_history, metric_trend, record_check_resu
from .lifecycle import add_artifact, create_infospace, load_infospace
from .markdown_adapter import validate_infospace_artifacts
from .semantics import list_entities, list_relations
from .workflow import load_workflows, plan_workflow, run_workflow
def build_parser() -> argparse.ArgumentParser:
@@ -62,6 +63,29 @@ def build_parser() -> argparse.ArgumentParser:
)
metrics.add_argument("root")
workflow = sub.add_parser("workflow", help="Inspect, plan, and run workflows")
workflow_sub = workflow.add_subparsers(dest="workflow_command", required=True)
workflow_inspect = workflow_sub.add_parser(
"inspect",
help="Inspect workflow declarations",
)
workflow_inspect.add_argument("root")
workflow_plan = workflow_sub.add_parser(
"plan",
help="Plan a workflow without writing outputs",
)
workflow_plan.add_argument("root")
workflow_plan.add_argument("workflow_id")
workflow_run = workflow_sub.add_parser(
"run",
help="Run a deterministic workflow",
)
workflow_run.add_argument("root")
workflow_run.add_argument("workflow_id")
return parser
@@ -150,6 +174,26 @@ def main(argv: list[str] | None = None) -> int:
run_collection_checks(infospace.artifacts),
)
_write_json(result.to_dict())
elif args.command == "workflow":
if args.workflow_command == "inspect":
_write_json(
{
"workflows": [
workflow.to_dict()
for workflow in load_workflows(Path(args.root))
]
}
)
elif args.workflow_command == "plan":
_write_json(
plan_workflow(Path(args.root), args.workflow_id).to_dict()
)
elif args.workflow_command == "run":
_write_json(
run_workflow(Path(args.root), args.workflow_id).to_dict()
)
else:
parser.error(f"Unhandled workflow command: {args.workflow_command}")
else:
parser.error(f"Unhandled command: {args.command}")
except InfospaceError as exc:

View File

@@ -18,8 +18,10 @@ LAYOUT_DIRS = (
"artifacts/entities",
"artifacts/relations",
"artifacts/generated",
"workflows/templates",
"output/evaluations",
"output/metrics",
"output/workflows/runs",
"reports",
"exports",
)
@@ -142,6 +144,50 @@ def add_artifact(
return artifact
def register_artifact(
root: Path | str,
*,
artifact_id: str,
path: Path | str,
kind: str,
title: str = "",
provenance: dict[str, Any] | None = None,
relationships: list[dict[str, Any]] | None = None,
) -> KnowledgeArtifact:
infospace = load_infospace(root)
if kind not in KIND_DIRS:
raise InfospaceError(
"invalid_artifact_kind",
f"Unsupported artifact kind: {kind}",
{"kind": kind, "valid_kinds": sorted(KIND_DIRS)},
)
relative_path = _relative_to_root(infospace.root, path)
artifact = KnowledgeArtifact(
id=artifact_id,
path=relative_path,
kind=kind,
title=title,
provenance=provenance or {},
relationships=relationships or [],
)
artifacts: list[KnowledgeArtifact] = []
replaced = False
for item in infospace.artifacts:
if item.id == artifact_id:
artifacts.append(artifact)
replaced = True
else:
artifacts.append(item)
if not replaced:
artifacts.append(artifact)
_write_yaml(
infospace.root / ARTIFACT_INDEX,
{"artifacts": [item.to_dict() for item in artifacts]},
)
return artifact
def _validate_slug(slug: str) -> None:
if not SLUG_RE.match(slug):
raise InfospaceError(
@@ -171,6 +217,21 @@ def _read_yaml(path: Path) -> dict[str, Any]:
return data
def _relative_to_root(root: Path, path: Path | str) -> str:
raw = Path(path)
target = raw if raw.is_absolute() else root / raw
root_resolved = root.resolve()
target_resolved = target.resolve()
try:
return str(target_resolved.relative_to(root_resolved))
except ValueError as exc:
raise InfospaceError(
"artifact_path_escapes_infospace",
f"Artifact path escapes infospace root: {path}",
{"root": str(root), "path": str(path)},
) from exc
def _write_yaml(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:

View File

@@ -6,6 +6,7 @@ from typing import Any
from markitect_tool import Heading, Section, parse_markdown_file
from markitect_tool.contract import check_markdown_file
from markitect_tool.template import render_template
from .errors import InfospaceError
from .lifecycle import load_infospace
@@ -91,6 +92,27 @@ class ArtifactValidationResult:
}
@dataclass(frozen=True)
class RenderedMarkdownTemplate:
markdown: str
variables: list[str]
missing_variables: list[str]
strict: bool = True
@property
def complete(self) -> bool:
return not self.missing_variables
def to_dict(self) -> dict[str, Any]:
return {
"markdown": self.markdown,
"variables": self.variables,
"missing_variables": self.missing_variables,
"strict": self.strict,
"complete": self.complete,
}
def parse_markdown_artifact(path: str | Path) -> ParsedMarkdownArtifact:
artifact_path = Path(path)
document = parse_markdown_file(artifact_path)
@@ -102,6 +124,21 @@ def parse_markdown_artifact(path: str | Path) -> ParsedMarkdownArtifact:
)
def render_markdown_template(
template_text: str,
data: dict[str, Any],
*,
strict: bool = True,
) -> RenderedMarkdownTemplate:
rendered = render_template(template_text, data, strict=strict)
return RenderedMarkdownTemplate(
markdown=rendered.markdown,
variables=rendered.variables,
missing_variables=rendered.missing_variables,
strict=rendered.strict,
)
def extract_section_text(
parsed: ParsedMarkdownArtifact,
heading: str,

View File

@@ -0,0 +1,584 @@
from __future__ import annotations
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Protocol
import yaml
from .errors import InfospaceError
from .lifecycle import load_infospace, register_artifact
from .markdown_adapter import render_markdown_template
from .models import KnowledgeArtifact
@dataclass(frozen=True)
class WorkflowInputSpec:
kind: str = ""
artifact_ids: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "WorkflowInputSpec":
return cls(
kind=str(data.get("kind") or ""),
artifact_ids=[str(item) for item in data.get("artifact_ids", [])],
)
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {}
if self.kind:
data["kind"] = self.kind
if self.artifact_ids:
data["artifact_ids"] = self.artifact_ids
return data
@dataclass(frozen=True)
class WorkflowOutputSpec:
path: str
kind: str = "generated"
artifact_id: str = ""
title: str = ""
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "WorkflowOutputSpec":
return cls(
path=str(data["path"]),
kind=str(data.get("kind") or "generated"),
artifact_id=str(data.get("artifact_id") or ""),
title=str(data.get("title") or ""),
)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
return {key: value for key, value in data.items() if value not in ("", [])}
@dataclass(frozen=True)
class WorkflowStage:
id: str
kind: str
input: str
template: str
output: WorkflowOutputSpec | None = None
static_macros: dict[str, Any] = field(default_factory=dict)
provider_hint: str | None = None
optional: bool = False
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "WorkflowStage":
output = data.get("output")
return cls(
id=str(data["id"]),
kind=str(data.get("kind") or "template"),
input=str(data.get("input") or ""),
template=str(data["template"]),
output=WorkflowOutputSpec.from_dict(output) if isinstance(output, dict) else None,
static_macros=dict(data.get("static_macros") or {}),
provider_hint=(
str(data["provider_hint"]) if data.get("provider_hint") else None
),
optional=bool(data.get("optional", False)),
)
def to_dict(self) -> dict[str, Any]:
data: dict[str, Any] = {
"id": self.id,
"kind": self.kind,
"input": self.input,
"template": self.template,
"static_macros": self.static_macros,
"optional": self.optional,
}
if self.output is not None:
data["output"] = self.output.to_dict()
if self.provider_hint:
data["provider_hint"] = self.provider_hint
return {key: value for key, value in data.items() if value not in ("", [], {})}
@dataclass(frozen=True)
class WorkflowDefinition:
id: str
description: str = ""
inputs: dict[str, WorkflowInputSpec] = field(default_factory=dict)
stages: list[WorkflowStage] = field(default_factory=list)
static_macros: dict[str, Any] = field(default_factory=dict)
expected_evaluations: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "WorkflowDefinition":
return cls(
id=str(data["id"]),
description=str(data.get("description") or ""),
inputs={
str(name): WorkflowInputSpec.from_dict(spec)
for name, spec in (data.get("inputs") or {}).items()
if isinstance(spec, dict)
},
stages=[
WorkflowStage.from_dict(item)
for item in data.get("stages", [])
if isinstance(item, dict)
],
static_macros=dict(data.get("static_macros") or {}),
expected_evaluations=[
str(item) for item in data.get("expected_evaluations", [])
],
)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"inputs": {
name: spec.to_dict() for name, spec in self.inputs.items()
},
"stages": [stage.to_dict() for stage in self.stages],
"static_macros": self.static_macros,
"expected_evaluations": self.expected_evaluations,
}
@dataclass(frozen=True)
class WorkflowInputRecord:
name: str
artifact_id: str
kind: str
title: str
path: str
slug: str
content: str
def to_template_data(self) -> dict[str, Any]:
return asdict(self)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["content"] = self.content
return data
@dataclass(frozen=True)
class WorkflowOutputRecord:
stage_id: str
artifact_id: str
path: str
kind: str
title: str
input_artifact_id: str
written: bool
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class AssistedGenerationRequest:
stage_id: str
workflow_id: str
input_artifact_id: str
prompt: str
provider_hint: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"stage_id": self.stage_id,
"workflow_id": self.workflow_id,
"input_artifact_id": self.input_artifact_id,
"prompt": self.prompt,
"provider_hint": self.provider_hint,
"metadata": self.metadata,
}
@dataclass(frozen=True)
class AssistedGenerationResult:
markdown: str
provider: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
class AssistedGenerationAdapter(Protocol):
def generate(
self,
request: AssistedGenerationRequest,
) -> AssistedGenerationResult:
"""Generate Markdown for an assisted workflow request."""
@dataclass(frozen=True)
class WorkflowStageRecord:
stage_id: str
kind: str
status: str
input_artifact_id: str
output_artifact_id: str = ""
message: str = ""
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
return {key: value for key, value in data.items() if value != ""}
@dataclass(frozen=True)
class WorkflowRunResult:
run_id: str
workflow_id: str
status: str
dry_run: bool
inputs: list[WorkflowInputRecord] = field(default_factory=list)
stages: list[WorkflowStageRecord] = field(default_factory=list)
outputs: list[WorkflowOutputRecord] = field(default_factory=list)
assisted_requests: list[AssistedGenerationRequest] = field(default_factory=list)
run_record_path: str = ""
def to_dict(self) -> dict[str, Any]:
data = {
"run_id": self.run_id,
"workflow_id": self.workflow_id,
"status": self.status,
"dry_run": self.dry_run,
"inputs": [item.to_dict() for item in self.inputs],
"stages": [item.to_dict() for item in self.stages],
"outputs": [item.to_dict() for item in self.outputs],
"assisted_requests": [
item.to_dict() for item in self.assisted_requests
],
"run_record_path": self.run_record_path,
}
return {key: value for key, value in data.items() if value not in ("", [])}
def load_workflows(root: str | Path) -> list[WorkflowDefinition]:
infospace = load_infospace(root)
return [
WorkflowDefinition.from_dict(item)
for item in infospace.config.workflows
if isinstance(item, dict)
]
def get_workflow(root: str | Path, workflow_id: str) -> WorkflowDefinition:
for workflow in load_workflows(root):
if workflow.id == workflow_id:
return workflow
raise InfospaceError(
"missing_workflow",
f"Workflow is not declared: {workflow_id}",
{"workflow_id": workflow_id},
)
def plan_workflow(root: str | Path, workflow_id: str) -> WorkflowRunResult:
return _execute_workflow(root, workflow_id, dry_run=True)
def run_workflow(
root: str | Path,
workflow_id: str,
*,
assisted_adapter: AssistedGenerationAdapter | None = None,
) -> WorkflowRunResult:
return _execute_workflow(
root,
workflow_id,
dry_run=False,
assisted_adapter=assisted_adapter,
)
def _execute_workflow(
root: str | Path,
workflow_id: str,
*,
dry_run: bool,
assisted_adapter: AssistedGenerationAdapter | None = None,
) -> WorkflowRunResult:
infospace = load_infospace(root)
workflow = get_workflow(infospace.root, workflow_id)
run_id = uuid.uuid4().hex[:12]
inputs = _collect_inputs(infospace.root, infospace.artifacts, workflow)
stages: list[WorkflowStageRecord] = []
outputs: list[WorkflowOutputRecord] = []
assisted_requests: list[AssistedGenerationRequest] = []
stage_outputs: dict[str, dict[str, Any]] = {}
for stage in workflow.stages:
selected_inputs = [item for item in inputs if item.name == stage.input]
if not selected_inputs:
raise InfospaceError(
"workflow_stage_has_no_inputs",
f"Workflow stage has no matching inputs: {stage.id}",
{"workflow_id": workflow.id, "stage_id": stage.id},
)
for input_record in selected_inputs:
data = _template_data(workflow, stage, input_record, stage_outputs)
template_text = _read_template(infospace.root, stage.template)
rendered = render_markdown_template(template_text, data)
if stage.kind == "template":
output = _resolve_output(
workflow,
stage,
input_record,
rendered.markdown,
data,
infospace.root,
dry_run=dry_run,
)
outputs.append(output)
stage_outputs[stage.id] = {
"content": rendered.markdown,
"artifact_id": output.artifact_id,
"path": output.path,
}
stages.append(
WorkflowStageRecord(
stage_id=stage.id,
kind=stage.kind,
status="planned" if dry_run else "completed",
input_artifact_id=input_record.artifact_id,
output_artifact_id=output.artifact_id,
)
)
elif stage.kind == "assisted":
request = AssistedGenerationRequest(
stage_id=stage.id,
workflow_id=workflow.id,
input_artifact_id=input_record.artifact_id,
prompt=rendered.markdown,
provider_hint=stage.provider_hint,
metadata={"output": stage.output.to_dict() if stage.output else {}},
)
assisted_requests.append(request)
if dry_run:
stages.append(
WorkflowStageRecord(
stage_id=stage.id,
kind=stage.kind,
status="requires_adapter",
input_artifact_id=input_record.artifact_id,
)
)
continue
if assisted_adapter is None:
raise InfospaceError(
"assisted_stage_requires_adapter",
"Assisted workflow stages require an explicit adapter",
{
"workflow_id": workflow.id,
"stage_id": stage.id,
"input_artifact_id": input_record.artifact_id,
},
)
result = assisted_adapter.generate(request)
output = _resolve_output(
workflow,
stage,
input_record,
result.markdown,
data,
infospace.root,
dry_run=False,
provider=result.provider,
)
outputs.append(output)
stages.append(
WorkflowStageRecord(
stage_id=stage.id,
kind=stage.kind,
status="completed",
input_artifact_id=input_record.artifact_id,
output_artifact_id=output.artifact_id,
)
)
else:
raise InfospaceError(
"unsupported_workflow_stage",
f"Unsupported workflow stage kind: {stage.kind}",
{
"workflow_id": workflow.id,
"stage_id": stage.id,
"kind": stage.kind,
},
)
status = "planned" if dry_run else "completed"
run_record_path = ""
result = WorkflowRunResult(
run_id=run_id,
workflow_id=workflow.id,
status=status,
dry_run=dry_run,
inputs=inputs,
stages=stages,
outputs=outputs,
assisted_requests=assisted_requests,
)
if not dry_run:
run_record_path = _write_run_record(infospace.root, result)
result = WorkflowRunResult(
run_id=run_id,
workflow_id=workflow.id,
status=status,
dry_run=dry_run,
inputs=inputs,
stages=stages,
outputs=outputs,
assisted_requests=assisted_requests,
run_record_path=run_record_path,
)
return result
def _collect_inputs(
root: Path,
artifacts: list[KnowledgeArtifact],
workflow: WorkflowDefinition,
) -> list[WorkflowInputRecord]:
records: list[WorkflowInputRecord] = []
for name, spec in workflow.inputs.items():
selected = [
artifact
for artifact in artifacts
if _matches_input_spec(artifact, spec)
]
for artifact in selected:
artifact_path = root / artifact.path
records.append(
WorkflowInputRecord(
name=name,
artifact_id=artifact.id,
kind=artifact.kind,
title=artifact.title or Path(artifact.path).stem,
path=artifact.path,
slug=Path(artifact.path).stem,
content=artifact_path.read_text(encoding="utf-8"),
)
)
return records
def _matches_input_spec(
artifact: KnowledgeArtifact,
spec: WorkflowInputSpec,
) -> bool:
if spec.artifact_ids and artifact.id not in spec.artifact_ids:
return False
if spec.kind and artifact.kind != spec.kind:
return False
return True
def _template_data(
workflow: WorkflowDefinition,
stage: WorkflowStage,
input_record: WorkflowInputRecord,
stage_outputs: dict[str, dict[str, Any]],
) -> dict[str, Any]:
return {
"workflow": workflow.to_dict(),
"stage": stage.to_dict(),
"input": input_record.to_template_data(),
"macros": {**workflow.static_macros, **stage.static_macros},
"stages": stage_outputs,
}
def _read_template(root: Path, relative_path: str) -> str:
path = root / relative_path
if not path.is_file():
raise InfospaceError(
"missing_workflow_template",
f"Workflow template does not exist: {relative_path}",
{"template": relative_path},
)
return path.read_text(encoding="utf-8")
def _resolve_output(
workflow: WorkflowDefinition,
stage: WorkflowStage,
input_record: WorkflowInputRecord,
markdown: str,
data: dict[str, Any],
root: Path,
*,
dry_run: bool,
provider: str = "",
) -> WorkflowOutputRecord:
if stage.output is None:
raise InfospaceError(
"missing_workflow_output",
f"Workflow stage has no output declaration: {stage.id}",
{"workflow_id": workflow.id, "stage_id": stage.id},
)
output_path = _render_inline(stage.output.path, data)
artifact_id = _render_inline(stage.output.artifact_id, data)
if not artifact_id:
artifact_id = f"{stage.output.kind}/{Path(output_path).name}"
title = _render_inline(stage.output.title, data)
target = _safe_target(root, output_path)
if not dry_run:
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(markdown, encoding="utf-8")
register_artifact(
root,
artifact_id=artifact_id,
path=output_path,
kind=stage.output.kind,
title=title,
provenance={
"workflow_id": workflow.id,
"stage_id": stage.id,
"input_artifact_id": input_record.artifact_id,
**({"provider": provider} if provider else {}),
},
relationships=[
{
"type": "generated_from",
"target": input_record.artifact_id,
}
],
)
return WorkflowOutputRecord(
stage_id=stage.id,
artifact_id=artifact_id,
path=output_path,
kind=stage.output.kind,
title=title,
input_artifact_id=input_record.artifact_id,
written=not dry_run,
)
def _render_inline(template_text: str, data: dict[str, Any]) -> str:
if not template_text:
return ""
return render_markdown_template(template_text, data).markdown
def _safe_target(root: Path, relative_path: str) -> Path:
target = (root / relative_path).resolve()
root_resolved = root.resolve()
try:
target.relative_to(root_resolved)
except ValueError as exc:
raise InfospaceError(
"workflow_output_escapes_infospace",
f"Workflow output path escapes infospace: {relative_path}",
{"root": str(root), "path": relative_path},
) from exc
return target
def _write_run_record(root: Path, result: WorkflowRunResult) -> str:
run_path = root / "output" / "workflows" / "runs" / f"{result.run_id}.yaml"
payload = result.to_dict()
payload["recorded_at"] = datetime.now(timezone.utc).isoformat()
run_path.parent.mkdir(parents=True, exist_ok=True)
run_path.write_text(yaml.safe_dump(payload, sort_keys=False), encoding="utf-8")
return str(run_path)

217
tests/test_workflow.py Normal file
View File

@@ -0,0 +1,217 @@
import json
import os
import subprocess
import sys
from pathlib import Path
import pytest
import yaml
from infospace_bench import InfospaceError, add_artifact, create_infospace, load_infospace
from infospace_bench.workflow import (
load_workflows,
plan_workflow,
run_workflow,
)
SOURCE = """# Chapter One
Division of labour increases output by specializing tasks.
"""
SUMMARY_TEMPLATE = """# {{ input.title }} Summary
Lens: {{ macros.discipline }}
Source:
{{ input.content }}
"""
ASSISTED_TEMPLATE = """Review {{ input.title }} through {{ macros.discipline }}.
{{ input.content }}
"""
def cli_env() -> dict[str, str]:
env = os.environ.copy()
env["PYTHONPATH"] = "src:/home/worsch/markitect-tool/src"
return env
def make_workflow_infospace(tmp_path: Path) -> Path:
infospace = create_infospace(tmp_path, "pilot", name="Pilot")
source = tmp_path / "chapter.md"
source.write_text(SOURCE, encoding="utf-8")
add_artifact(infospace.root, source, kind="source", title="Chapter One")
template_dir = infospace.root / "workflows" / "templates"
template_dir.mkdir(parents=True, exist_ok=True)
(template_dir / "summary.md").write_text(SUMMARY_TEMPLATE, encoding="utf-8")
(template_dir / "review.md").write_text(ASSISTED_TEMPLATE, encoding="utf-8")
config_path = infospace.root / "infospace.yaml"
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
config["workflows"] = [
{
"id": "source-summary",
"description": "Render deterministic summaries for source artifacts.",
"inputs": {"source": {"kind": "source"}},
"static_macros": {"discipline": "Viable System Model"},
"stages": [
{
"id": "render-summary",
"kind": "template",
"input": "source",
"template": "workflows/templates/summary.md",
"output": {
"path": "artifacts/generated/{{ input.slug }}-summary.md",
"artifact_id": "generated/{{ input.slug }}-summary.md",
"kind": "generated",
"title": "{{ input.title }} Summary",
},
}
],
"expected_evaluations": ["metrics"],
},
{
"id": "assisted-review",
"description": "Plan an assisted review without binding to a provider.",
"inputs": {"source": {"kind": "source"}},
"static_macros": {"discipline": "Viable System Model"},
"stages": [
{
"id": "draft-review",
"kind": "assisted",
"input": "source",
"template": "workflows/templates/review.md",
"output": {
"path": "artifacts/generated/{{ input.slug }}-review.md",
"artifact_id": "generated/{{ input.slug }}-review.md",
"kind": "generated",
"title": "{{ input.title }} Review",
},
}
],
},
]
config_path.write_text(yaml.safe_dump(config, sort_keys=False), encoding="utf-8")
return infospace.root
def test_load_workflow_definitions_from_infospace_yaml(tmp_path: Path) -> None:
root = make_workflow_infospace(tmp_path)
workflows = load_workflows(root)
assert [workflow.id for workflow in workflows] == [
"source-summary",
"assisted-review",
]
assert workflows[0].inputs["source"].kind == "source"
assert workflows[0].stages[0].kind == "template"
assert workflows[0].expected_evaluations == ["metrics"]
def test_plan_workflow_resolves_inputs_outputs_and_assisted_requests(
tmp_path: Path,
) -> None:
root = make_workflow_infospace(tmp_path)
plan = plan_workflow(root, "source-summary")
assisted_plan = plan_workflow(root, "assisted-review")
assert plan.dry_run is True
assert plan.status == "planned"
assert plan.inputs[0].artifact_id == "source/chapter.md"
assert plan.outputs[0].artifact_id == "generated/chapter-summary.md"
assert plan.outputs[0].path == "artifacts/generated/chapter-summary.md"
assert not (root / "artifacts" / "generated" / "chapter-summary.md").exists()
assert assisted_plan.assisted_requests[0].stage_id == "draft-review"
assert "Chapter One" in assisted_plan.assisted_requests[0].prompt
assert assisted_plan.assisted_requests[0].provider_hint is None
def test_run_workflow_writes_generated_artifact_manifest_and_run_record(
tmp_path: Path,
) -> None:
root = make_workflow_infospace(tmp_path)
result = run_workflow(root, "source-summary")
output_path = root / "artifacts" / "generated" / "chapter-summary.md"
run_record = Path(result.run_record_path)
loaded = load_infospace(root)
generated = next(item for item in loaded.artifacts if item.kind == "generated")
assert result.status == "completed"
assert output_path.is_file()
assert "Lens: Viable System Model" in output_path.read_text(encoding="utf-8")
assert generated.id == "generated/chapter-summary.md"
assert generated.provenance["workflow_id"] == "source-summary"
assert generated.provenance["input_artifact_id"] == "source/chapter.md"
assert run_record.is_file()
assert yaml.safe_load(run_record.read_text(encoding="utf-8"))["status"] == "completed"
def test_assisted_stage_requires_explicit_adapter_for_run(tmp_path: Path) -> None:
root = make_workflow_infospace(tmp_path)
with pytest.raises(InfospaceError) as raised:
run_workflow(root, "assisted-review")
assert raised.value.code == "assisted_stage_requires_adapter"
assert raised.value.detail["stage_id"] == "draft-review"
def test_cli_workflow_inspect_plan_and_run(tmp_path: Path) -> None:
root = make_workflow_infospace(tmp_path)
inspected = subprocess.run(
[sys.executable, "-m", "infospace_bench", "workflow", "inspect", str(root)],
check=False,
env=cli_env(),
text=True,
capture_output=True,
)
planned = subprocess.run(
[
sys.executable,
"-m",
"infospace_bench",
"workflow",
"plan",
str(root),
"source-summary",
],
check=False,
env=cli_env(),
text=True,
capture_output=True,
)
run = subprocess.run(
[
sys.executable,
"-m",
"infospace_bench",
"workflow",
"run",
str(root),
"source-summary",
],
check=False,
env=cli_env(),
text=True,
capture_output=True,
)
assert inspected.returncode == 0, inspected.stderr
assert planned.returncode == 0, planned.stderr
assert run.returncode == 0, run.stderr
assert json.loads(inspected.stdout)["workflows"][0]["id"] == "source-summary"
assert json.loads(planned.stdout)["status"] == "planned"
assert json.loads(run.stdout)["outputs"][0]["artifact_id"] == (
"generated/chapter-summary.md"
)

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Applied Workflow And Generation Pipeline"
domain: markitect
repo: infospace-bench
status: planned
status: completed
owner: markitect
topic_slug: markitect
created: "2026-05-14"
@@ -27,7 +27,7 @@ keeps LLM/provider behavior behind explicit adapters.
```task
id: IB-WP-0009-T01
status: todo
status: done
priority: high
state_hub_task_id: "32c1dafc-c523-4ab7-9985-0820a65be514"
```
@@ -41,7 +41,7 @@ state_hub_task_id: "32c1dafc-c523-4ab7-9985-0820a65be514"
```task
id: IB-WP-0009-T02
status: todo
status: done
priority: high
state_hub_task_id: "2bb1f82b-f9a1-4d64-9de8-29a9f4eb78d0"
```
@@ -54,7 +54,7 @@ state_hub_task_id: "2bb1f82b-f9a1-4d64-9de8-29a9f4eb78d0"
```task
id: IB-WP-0009-T03
status: todo
status: done
priority: high
state_hub_task_id: "2dd1ad70-8b5a-4afc-8f63-b1db3915ff31"
```
@@ -67,7 +67,7 @@ state_hub_task_id: "2dd1ad70-8b5a-4afc-8f63-b1db3915ff31"
```task
id: IB-WP-0009-T04
status: todo
status: done
priority: medium
state_hub_task_id: "63d34a33-63c6-45cb-8625-2e59ae0b6d57"
```