diff --git a/docs/markdown-workflows.md b/docs/markdown-workflows.md new file mode 100644 index 0000000..626ff09 --- /dev/null +++ b/docs/markdown-workflows.md @@ -0,0 +1,102 @@ +# Markdown Workflows + +Markitect workflows provide declarative orchestration for Markdown-centered +document pipelines. + +Use them when you want to: + +- collect Markdown files, globs, or directories +- extract frontmatter, sections, blocks, metrics, or selector results +- bind named data products into later steps +- run deterministic template/compose/transform/include steps +- define optional assisted-generation boundaries without requiring a provider +- write one or more Markdown outputs with provenance and diagnostics + +The workflow definition standard is documented in +`docs/workflow-definition-standard.md`. + +## Commands + +Inspect a workflow definition: + +```text +mkt workflow inspect examples/workflows/adr-release-notes.workflow.md +``` + +Plan a run without writing outputs: + +```text +mkt workflow plan examples/workflows/adr-release-notes.workflow.md +``` + +Run and write outputs: + +```text +mkt workflow run examples/workflows/adr-release-notes.workflow.md --output-dir build +``` + +JSON/YAML output is available for agents: + +```text +mkt workflow run workflow.md --format json +``` + +## Execution Model + +The first runner is deterministic and local-first: + +1. Load a YAML or Markdown-fenced workflow definition. +2. Validate required ids/kinds and duplicate step ids. +3. Collect inputs from Markdown files, directories, globs, or literal values. +4. Resolve `${...}` bindings. +5. Execute steps in dependency order. +6. Render outputs and enforce output path safety. +7. Return diagnostics, provenance, and trace events. + +Assisted steps are explicit boundaries. Without an injected generation hook: + +- optional assisted steps are skipped with a warning +- required assisted steps fail + +This makes workflows useful without provider dependencies. + +## Supported Step Kinds + +| Kind | Result | +| --- | --- | +| `shape` | Structured data object. | +| `extract` | `items`, `count`, and joined `text`. | +| `query` | query `matches` and `count`. | +| `template` | rendered `markdown`, variables, missing variables, completion flag. | +| `compose` | composed `markdown` and sources. | +| `transform` | transformed `markdown`, operations, provenance. | +| `include` | include-resolved `markdown`, included paths, provenance. | +| `contract_stub` | generated contract stub Markdown. | +| `contract_check` | contract diagnostics and metrics. | +| `assisted` | generated Markdown if a hook is supplied, otherwise skipped/diagnostic. | + +## Data Bindings + +Bindings use `${...}`: + +```yaml +data: + decisions: ${sources.adrs.extracts.decisions} + summary: ${steps.render.markdown} +``` + +If the full string is one expression, the native type is preserved. If the +expression appears inside a longer string, it is rendered as text. + +List projection is supported: + +```yaml +paths: ${sources.adrs.items.path} +``` + +## Relationship To Extensions + +The workflow engine is registered as the built-in extension +`workflow.markdown-dataflow`. It uses the canonical architecture for +diagnostics, provenance, trace events, capabilities, and CLI affordance +metadata, but workflow files remain business-facing orchestration artifacts. diff --git a/docs/workflow-definition-standard.md b/docs/workflow-definition-standard.md new file mode 100644 index 0000000..b80908e --- /dev/null +++ b/docs/workflow-definition-standard.md @@ -0,0 +1,304 @@ +# Markitect Workflow Definition Standard + +## Purpose + +Markitect workflows describe repeatable Markdown-centered dataflow: + +```text +Markdown inputs -> extracted data products -> deterministic/assisted steps + -> artifacts and Markdown outputs +``` + +The workflow standard is business-facing orchestration. It uses the internal +extension framework for execution semantics, diagnostics, provenance, +capabilities, and future policy gates, but it is not itself the extension +framework. + +## File Format + +A workflow can be either: + +- a YAML file +- a Markdown file with a fenced YAML block tagged `workflow`, + `markitect-workflow`, or `mkt-workflow` + +Example: + +````markdown +# Release Notes Workflow + +```yaml workflow +metadata: + id: release-notes + title: Release Notes +intent: + summary: Build release notes from accepted ADR decisions. +inputs: + adrs: + glob: docs/adr/*.md + extract: + decisions: + selector: sections[heading=Decision] +outputs: + release_notes: + path: out/release-notes.md + content: ${steps.render.markdown} +steps: + render: + kind: template + template: templates/release-notes.md + data: + decisions: ${sources.adrs.extracts.decisions} +``` +```` + +## Top-Level Sections + +| Section | Required | Purpose | +| --- | --- | --- | +| `metadata` | recommended | Stable id, title, owner, version, tags, timestamps. | +| `intent` | recommended | Why the workflow exists and what success means. | +| `inputs` | yes | Markdown files, directories, globs, literal values, or future index references. | +| `steps` | yes for processing | Deterministic or assisted operations over bound data. | +| `outputs` | optional | Files/artifacts produced from step or source data. | +| `dependencies` | optional | Workflow-level dependencies on files, workplans, contracts, or other workflows. | +| `conditions` | optional | Preconditions or skip rules. First version records these for inspection. | +| `artifacts` | optional | Named non-output products such as manifests, traces, or reports. | +| `permissions` | optional | Declared filesystem/network/provider/capability requirements. | +| `resources` | optional | CPU, memory, token, model, or storage expectations. | +| `timeouts` | optional | Workflow and step timeout budgets. First version records these. | +| `retry_policies` | optional | Retry rules by step kind or id. First version records these. | +| `escalation_rules` | optional | When human approval or operator attention is needed. | +| `observability` | optional | Events, trace detail, metrics, and audit expectations. | +| `responsibilities` | optional | Human/agent/system boundaries for review, approval, and execution. | + +Unknown top-level sections are preserved as `extensions` in the loaded model so +the standard can evolve without immediately breaking older runners. + +## Metadata + +```yaml +metadata: + id: release-notes + title: Release Notes + version: "1" + owner: documentation + tags: [adr, release] +``` + +`metadata.id` should be stable. It is used in diagnostics and provenance when +available. + +## Intent + +```yaml +intent: + summary: Build release notes from accepted decisions. + success_criteria: + - One output file is generated. + - Every accepted ADR contributes its decision section. +``` + +Intent is descriptive in the first implementation. Later policy and assessment +layers may use it for review or LLM-assisted checks. + +## Inputs + +Inputs are named source collections or literal values. + +```yaml +inputs: + adrs: + glob: docs/adr/*.md + recursive: false + where: + frontmatter.status: accepted + extract: + decisions: + selector: sections[heading=Decision] + status: + selector: frontmatter.status + static_context: + value: + product: Markitect +``` + +Supported source fields: + +- `file`: one Markdown file +- `path`: alias for `file` +- `files`: list of Markdown files +- `glob`: glob pattern relative to the workflow directory +- `directory`: directory of Markdown files +- `recursive`: recurse when using `directory` +- `selector`: selector to collect matches +- `extract`: named selector map +- `metrics`: include document metrics +- `frontmatter`: include frontmatter +- `value`: literal structured value, no file parsing + +Each Markdown input produces a collection: + +```yaml +items: + - path: docs/adr/001.md + frontmatter: {...} + metrics: {...} + extracts: + decisions: + - "## Decision\n\n..." +extracts: + decisions: + - "## Decision\n\n..." +``` + +## Steps + +Steps are named operations. `steps` may be either a mapping or a list with `id`. + +```yaml +steps: + render: + kind: template + template: templates/release-notes.md + data: + decisions: ${sources.adrs.extracts.decisions} +``` + +Common fields: + +- `kind`: step kind +- `depends_on`: other step ids +- `when`: condition expression, reserved for future execution gating +- `optional`: do not fail the whole workflow when this step is skipped or blocked +- `permissions`: step-level permissions +- `timeout`: step-level timeout declaration +- `retry`: step-level retry policy reference or inline rule +- `responsibility`: `human`, `agent`, `system`, or `mixed` + +First implementation step kinds: + +| Kind | Purpose | +| --- | --- | +| `shape` | Resolve data bindings into a structured object. | +| `extract` | Extract text from a source collection or document. | +| `query` | Return query match envelopes from a source collection or document. | +| `template` | Render a deterministic Markdown template. | +| `compose` | Join Markdown strings or files into one Markdown document. | +| `transform` | Apply deterministic Markdown transforms. | +| `include` | Resolve include markers in Markdown. | +| `contract_stub` | Generate a Markdown stub from a contract. | +| `contract_check` | Check a Markdown document against a contract. | +| `assisted` | Provider-neutral assisted step boundary, optional by default. | + +## Data Bindings + +Workflow expressions use `${...}` references: + +```yaml +data: + decisions: ${sources.adrs.extracts.decisions} + summary: ${steps.render.markdown} +``` + +If a string is exactly one expression, the resolved value keeps its native type. +If an expression appears inside a longer string, it is rendered as text. + +Supported roots: + +- `metadata` +- `intent` +- `sources` +- `steps` +- `artifacts` +- `workflow` + +Path behavior: + +- dictionary keys use dot notation +- numeric list indexes are supported +- applying a field to a list maps that field over every dictionary item + +## Outputs + +```yaml +outputs: + release_notes: + path: out/release-notes.md + content: ${steps.render.markdown} +``` + +Supported output fields: + +- `path`: output path relative to `--output-dir` or workflow directory +- `content`: Markdown/string/structured value to write +- `template`: optional template path +- `data`: data for template rendering +- `artifact`: optional artifact name + +Output paths must stay within the output root. + +## Assisted Steps + +Assisted steps define the boundary; they do not require a provider dependency: + +```yaml +review: + kind: assisted + optional: true + prompt: prompts/review.md + input: ${steps.render.markdown} + data: + rubric: concise release note review +``` + +When no assisted adapter is supplied: + +- optional assisted steps are skipped with a warning diagnostic +- required assisted steps fail with an error diagnostic + +This keeps workflows runnable in deterministic environments. + +## Permissions And Responsibilities + +Permissions and responsibilities are declarative in the first runner and become +policy inputs for later access-control work. + +```yaml +permissions: + filesystem: + read: [docs, templates] + write: [out] + network: false + assisted_generation: false +responsibilities: + human: + approves_outputs: true + agent: + may_run_deterministic_steps: true + system: + enforces_path_safety: true +``` + +## Observability + +```yaml +observability: + events: + - workflow.started + - step.completed + - output.written + trace: summary +``` + +Workflow runs return trace events and diagnostics in JSON/YAML output. Future +backends can persist these events. + +## Design Rules + +- Keep deterministic execution useful without providers. +- Keep all outputs explainable through provenance and diagnostics. +- Preserve unknown metadata rather than rejecting reasonable future extensions. +- Make assisted, external, networked, or sensitive steps explicit. +- Keep internal extension registration separate from business workflow + orchestration. diff --git a/docs/workplan-planning-map.md b/docs/workplan-planning-map.md index 8903fc0..8fd9de2 100644 --- a/docs/workplan-planning-map.md +++ b/docs/workplan-planning-map.md @@ -36,7 +36,7 @@ and descriptions mirror the operational view. | `MKTT-WP-0007` | complete | done | `MKTT-WP-0006` | Advanced query and local index backend is complete: AST inspection, optional JSONPath, SQLite snapshots/metadata, FTS5 search, incremental refresh, and local index CLI. | | `MKTT-WP-0013` | complete | done | `MKTT-WP-0003`, `MKTT-WP-0004`, `MKTT-WP-0006`, `MKTT-WP-0007`, `MKTT-WP-0010` | Internal extension framework is complete: characterization tests, canonical processing model, descriptors, registries, lifecycle callbacks, query-engine registry, built-in extension catalog, CLI command specs, and authoring guide. | | `MKTT-WP-0005` | P2 | todo | `MKTT-WP-0003`, `MKTT-WP-0004` | Pick up when generation/form/context or semantic assessment pressure appears. | -| `MKTT-WP-0011` | P2 | todo | `MKTT-WP-0003`; task-level triggers: `MKTT-WP-0010-T001`, `MKTT-WP-0010-T005` | Declarative Markdown dataflow workflows: source extraction, deterministic/assisted processing, and multi-output generation. | +| `MKTT-WP-0011` | complete | done | `MKTT-WP-0003`; task-level triggers: `MKTT-WP-0010-T001`, `MKTT-WP-0010-T005` | Markdown dataflow workflow layer is complete: workflow standard, source collectors, binding model, deterministic steps, assisted boundary, safe outputs, CLI, docs, and examples. | | `MKTT-WP-0009` | P2 | todo | `MKTT-WP-0006` | Establish access-control gateway before security-sensitive cache/context use. | | `MKTT-WP-0012` | P3 | todo | `MKTT-WP-0004`, `MKTT-WP-0010`, `MKTT-WP-0011` | Future Quarkdown-inspired document function layer: reusable Markdown-native function calls over processors, references, contracts, workflows, and later assisted steps. | | `MKTT-WP-0008` | P3 | todo | `MKTT-WP-0006`, `MKTT-WP-0007`, `MKTT-WP-0009` | Agent working-memory cache after backend and policy floor are available. | diff --git a/examples/workflows/adr-release-notes.workflow.md b/examples/workflows/adr-release-notes.workflow.md new file mode 100644 index 0000000..ab44b2f --- /dev/null +++ b/examples/workflows/adr-release-notes.workflow.md @@ -0,0 +1,55 @@ +# ADR Release Notes Workflow + +```yaml workflow +metadata: + id: adr-release-notes + title: ADR Release Notes + version: "1" + owner: documentation +intent: + summary: Build release notes from accepted ADR decisions. +inputs: + adrs: + glob: adrs/*.md + where: + frontmatter.status: accepted + extract: + decisions: + selector: sections[heading=Decision] + statuses: + selector: frontmatter.status +steps: + render: + kind: template + template: templates/release-notes.md + data: + title: ADR Release Notes + decisions: ${sources.adrs.extracts.decisions} +outputs: + release_notes: + path: out/release-notes.md + content: ${steps.render.markdown} +permissions: + filesystem: + read: [adrs, templates] + write: [out] + network: false +resources: + cpu: local +timeouts: + workflow: 30s +retry_policies: + default: + max_attempts: 1 +escalation_rules: + output_review: + when: before_publish + responsible: human +observability: + events: [workflow.started, workflow.step.completed, workflow.output.ready] +responsibilities: + agent: + may_run_deterministic_steps: true + human: + approves_publication: true +``` diff --git a/examples/workflows/adrs/001-record-contract-framework.md b/examples/workflows/adrs/001-record-contract-framework.md new file mode 100644 index 0000000..2386c3d --- /dev/null +++ b/examples/workflows/adrs/001-record-contract-framework.md @@ -0,0 +1,14 @@ +--- +status: accepted +--- + +# Record Contract Framework + +## Context + +Document generation needs clear semantic contracts. + +## Decision + +Use Markdown-native contracts with deterministic assertions before optional +assisted assessments. diff --git a/examples/workflows/adrs/002-record-cache-backend.md b/examples/workflows/adrs/002-record-cache-backend.md new file mode 100644 index 0000000..5ee24fa --- /dev/null +++ b/examples/workflows/adrs/002-record-cache-backend.md @@ -0,0 +1,13 @@ +--- +status: accepted +--- + +# Record Cache Backend + +## Context + +Repeated query and context workflows benefit from cached parsed structure. + +## Decision + +Use an optional local SQLite backend for snapshots, metadata, JSON, and FTS. diff --git a/examples/workflows/assisted-review.workflow.md b/examples/workflows/assisted-review.workflow.md new file mode 100644 index 0000000..6cca2fc --- /dev/null +++ b/examples/workflows/assisted-review.workflow.md @@ -0,0 +1,24 @@ +# Assisted Review Workflow + +```yaml workflow +metadata: + id: assisted-review-boundary +intent: + summary: Show an optional assisted-generation boundary. +steps: + review: + kind: assisted + optional: true + prompt_text: Review the generated Markdown for clarity. + data: + rubric: concise +outputs: + review: + path: out/review.md + content: ${steps.review.skipped} +permissions: + assisted_generation: false +responsibilities: + human: + approves_assisted_use: true +``` diff --git a/examples/workflows/snippets/source.md b/examples/workflows/snippets/source.md new file mode 100644 index 0000000..5c80d61 --- /dev/null +++ b/examples/workflows/snippets/source.md @@ -0,0 +1,6 @@ +# Source + +```python +def example(): + return "markitect" +``` diff --git a/examples/workflows/source-snippets.workflow.md b/examples/workflows/source-snippets.workflow.md new file mode 100644 index 0000000..cc1163d --- /dev/null +++ b/examples/workflows/source-snippets.workflow.md @@ -0,0 +1,32 @@ +# Source Snippets Workflow + +```yaml workflow +metadata: + id: source-snippets + title: Source Snippets +intent: + summary: Extract code fences from Markdown documents into a generated note. +inputs: + docs: + file: snippets/source.md + extract: + code: + selector: blocks[type=code] +steps: + render: + kind: template + template: templates/snippets.md + data: + snippets: ${sources.docs.extracts.code} +outputs: + snippets: + path: out/snippets.md + content: ${steps.render.markdown} +permissions: + filesystem: + read: [snippets, templates] + write: [out] +responsibilities: + agent: + may_run_deterministic_steps: true +``` diff --git a/examples/workflows/templates/release-notes.md b/examples/workflows/templates/release-notes.md new file mode 100644 index 0000000..9ea69d2 --- /dev/null +++ b/examples/workflows/templates/release-notes.md @@ -0,0 +1,3 @@ +# {{title}} + +{{decisions}} diff --git a/examples/workflows/templates/snippets.md b/examples/workflows/templates/snippets.md new file mode 100644 index 0000000..8b954d8 --- /dev/null +++ b/examples/workflows/templates/snippets.md @@ -0,0 +1,3 @@ +# Extracted Snippets + +{{snippets}} diff --git a/src/markitect_tool/__init__.py b/src/markitect_tool/__init__.py index 04c23cb..2e72e46 100644 --- a/src/markitect_tool/__init__.py +++ b/src/markitect_tool/__init__.py @@ -156,6 +156,15 @@ from markitect_tool.template import ( analyze_template, render_template, ) +from markitect_tool.workflow import ( + WorkflowError, + WorkflowOutputRecord, + WorkflowPlan, + WorkflowRunResult, + WorkflowRunner, + load_workflow_file, + resolve_workflow_bindings, +) __all__ = [ "ContentBlock", @@ -287,4 +296,11 @@ __all__ = [ "TemplateRenderResult", "analyze_template", "render_template", + "WorkflowError", + "WorkflowOutputRecord", + "WorkflowPlan", + "WorkflowRunResult", + "WorkflowRunner", + "load_workflow_file", + "resolve_workflow_bindings", ] diff --git a/src/markitect_tool/cli/main.py b/src/markitect_tool/cli/main.py index 2277b87..9894c70 100644 --- a/src/markitect_tool/cli/main.py +++ b/src/markitect_tool/cli/main.py @@ -72,6 +72,7 @@ from markitect_tool.template import ( analyze_template, render_template, ) +from markitect_tool.workflow import WorkflowError, WorkflowRunner, load_workflow_file @click.group() @@ -1124,6 +1125,90 @@ def search( _emit_search_results(data, output_format) +@main.group() +def workflow() -> None: + """Inspect, plan, and run declarative Markdown workflows.""" + + +@workflow.command("inspect") +@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path)) +@click.option( + "--format", + "output_format", + type=click.Choice(["json", "yaml", "text"], case_sensitive=False), + default="text", + show_default=True, +) +def workflow_inspect(workflow_file: Path, output_format: str) -> None: + """Inspect a workflow definition without executing steps.""" + + try: + plan = load_workflow_file(workflow_file) + result = WorkflowRunner(plan).inspect() + except WorkflowError as exc: + raise click.ClickException(str(exc)) from exc + _emit_workflow_result(result.to_dict() | {"workflow": plan.to_dict()}, output_format) + raise click.exceptions.Exit(0 if result.valid else 1) + + +@workflow.command("plan") +@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path)) +@click.option( + "--output-dir", + type=click.Path(file_okay=False, path_type=Path), + help="Output root for path-safety checks. No files are written in plan mode.", +) +@click.option( + "--format", + "output_format", + type=click.Choice(["json", "yaml", "text"], case_sensitive=False), + default="text", + show_default=True, +) +def workflow_plan(workflow_file: Path, output_dir: Path | None, output_format: str) -> None: + """Run a workflow in dry-run mode and report planned outputs.""" + + try: + plan = load_workflow_file(workflow_file) + result = WorkflowRunner(plan, output_dir=output_dir).run(dry_run=True) + except WorkflowError as exc: + raise click.ClickException(str(exc)) from exc + _emit_workflow_result(result.to_dict(), output_format) + raise click.exceptions.Exit(0 if result.valid else 1) + + +@workflow.command("run") +@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path)) +@click.option( + "--output-dir", + type=click.Path(file_okay=False, path_type=Path), + help="Output root for workflow outputs. Defaults to the workflow directory.", +) +@click.option("--dry-run", is_flag=True, help="Execute without writing output files.") +@click.option( + "--format", + "output_format", + type=click.Choice(["json", "yaml", "text"], case_sensitive=False), + default="text", + show_default=True, +) +def workflow_run( + workflow_file: Path, + output_dir: Path | None, + dry_run: bool, + output_format: str, +) -> None: + """Run a deterministic Markdown workflow.""" + + try: + plan = load_workflow_file(workflow_file) + result = WorkflowRunner(plan, output_dir=output_dir).run(dry_run=dry_run) + except WorkflowError as exc: + raise click.ClickException(str(exc)) from exc + _emit_workflow_result(result.to_dict(), output_format) + raise click.exceptions.Exit(0 if result.valid else 1) + + @main.group() def template() -> None: """Render and inspect deterministic Markdown templates.""" @@ -1560,6 +1645,32 @@ def _emit_search_results(data: dict, output_format: str) -> None: click.echo(f" {preview[:160]}") +def _emit_workflow_result(data: dict, output_format: str) -> None: + if output_format == "json": + click.echo(json.dumps(data, indent=2, ensure_ascii=False)) + elif output_format == "yaml": + click.echo(yaml.safe_dump(data, sort_keys=False)) + else: + click.echo("valid" if data.get("valid", True) else "invalid") + click.echo(f"workflow: {data.get('workflow_id') or data.get('workflow', {}).get('id', '')}") + if data.get("sources"): + click.echo(f"sources: {len(data['sources'])}") + for source_id, source_data in data["sources"].items(): + click.echo(f"- {source_id}: {source_data.get('count', 1)}") + if data.get("steps"): + click.echo(f"steps: {len(data['steps'])}") + for step_id, step_data in data["steps"].items(): + click.echo(f"- {step_id}: {step_data.get('kind', '')}") + if data.get("outputs"): + click.echo(f"outputs: {len(data['outputs'])}") + for output in data["outputs"]: + status = "written" if output.get("written") else "planned" + path = output.get("path") or "" + click.echo(f"- {output['id']}: {status} {path}") + for diagnostic in data.get("diagnostics", []): + click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}") + + def _emit_reference_result(data: dict, output_format: str) -> None: if output_format == "json": click.echo(json.dumps(data, indent=2, ensure_ascii=False)) diff --git a/src/markitect_tool/extension/builtins.py b/src/markitect_tool/extension/builtins.py index 5037f02..f021b46 100644 --- a/src/markitect_tool/extension/builtins.py +++ b/src/markitect_tool/extension/builtins.py @@ -11,7 +11,10 @@ def builtin_extension_registry() -> ExtensionRegistry: """Return descriptors for built-in Markitect extensions.""" registry = default_query_engine_registry().extension_registry() - for descriptor in _processor_descriptors() + [_local_sqlite_backend_descriptor()]: + for descriptor in _processor_descriptors() + [ + _local_sqlite_backend_descriptor(), + _workflow_engine_descriptor(), + ]: registry.register(descriptor) return registry @@ -90,3 +93,30 @@ def _local_sqlite_backend_descriptor() -> ExtensionDescriptor: docs=["docs/local-index-backend.md", "docs/backend-fabric.md"], examples=["examples/backends/local-sqlite-backend.md"], ) + + +def _workflow_engine_descriptor() -> ExtensionDescriptor: + return ExtensionDescriptor( + id="workflow.markdown-dataflow", + kind="workflow-engine", + summary="Declarative Markdown dataflow workflow engine.", + capabilities=[ + ProcessingCapability(id="workflow", kind="execute"), + ProcessingCapability(id="markdown", kind="read"), + ProcessingCapability(id="templates", kind="execute"), + ProcessingCapability(id="provenance", kind="emit"), + ], + safety={ + "reads_files": True, + "writes_output_files": True, + "network": False, + "assisted_generation": "adapter-only", + }, + input_contract="Markdown/YAML workflow definition", + output_contract="WorkflowRunResult", + diagnostics_namespace="workflow", + provenance_prefix="workflow", + cli={"commands": ["mkt workflow inspect", "mkt workflow plan", "mkt workflow run"]}, + docs=["docs/workflow-definition-standard.md"], + examples=["examples/workflows/adr-release-notes.workflow.md"], + ) diff --git a/src/markitect_tool/workflow/__init__.py b/src/markitect_tool/workflow/__init__.py new file mode 100644 index 0000000..c10e224 --- /dev/null +++ b/src/markitect_tool/workflow/__init__.py @@ -0,0 +1,21 @@ +"""Declarative Markdown dataflow workflows.""" + +from markitect_tool.workflow.engine import ( + WorkflowError, + WorkflowOutputRecord, + WorkflowPlan, + WorkflowRunResult, + WorkflowRunner, + load_workflow_file, + resolve_workflow_bindings, +) + +__all__ = [ + "WorkflowError", + "WorkflowOutputRecord", + "WorkflowPlan", + "WorkflowRunResult", + "WorkflowRunner", + "load_workflow_file", + "resolve_workflow_bindings", +] diff --git a/src/markitect_tool/workflow/engine.py b/src/markitect_tool/workflow/engine.py new file mode 100644 index 0000000..2f0a5a7 --- /dev/null +++ b/src/markitect_tool/workflow/engine.py @@ -0,0 +1,827 @@ +"""Workflow definition loading and deterministic execution.""" + +from __future__ import annotations + +import glob +import re +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any + +import yaml + +from markitect_tool.cache import scan_markdown_files +from markitect_tool.contract import check_markdown_file, collect_metrics, load_contract_file +from markitect_tool.core import Document, parse_markdown_file +from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error +from markitect_tool.extension import ProcessingProvenance, ProcessingTrace +from markitect_tool.generation import ( + GenerationHook, + GenerationHookRequest, + generate_stub_from_contract, +) +from markitect_tool.ops import compose_files, resolve_includes, transform_markdown +from markitect_tool.query import ( + extract_document_with_engine, + query_document_with_engine, +) +from markitect_tool.template import MissingTemplateVariable, TemplateError, render_template + + +WORKFLOW_FENCE_TAGS = {"workflow", "markitect-workflow", "mkt-workflow"} +KNOWN_TOP_LEVEL = { + "metadata", + "intent", + "inputs", + "outputs", + "steps", + "dependencies", + "conditions", + "artifacts", + "permissions", + "resources", + "timeouts", + "retry_policies", + "escalation_rules", + "observability", + "responsibilities", +} +_EXPRESSION_RE = re.compile(r"\$\{(?P[^}]+)\}") + + +class WorkflowError(ValueError): + """Raised when a workflow definition cannot be loaded or executed.""" + + +@dataclass(frozen=True) +class WorkflowPlan: + """Loaded declarative workflow definition.""" + + metadata: dict[str, Any] = field(default_factory=dict) + intent: dict[str, Any] = field(default_factory=dict) + inputs: dict[str, dict[str, Any]] = field(default_factory=dict) + steps: list[dict[str, Any]] = field(default_factory=list) + outputs: dict[str, dict[str, Any]] = field(default_factory=dict) + dependencies: list[Any] = field(default_factory=list) + conditions: dict[str, Any] = field(default_factory=dict) + artifacts: dict[str, Any] = field(default_factory=dict) + permissions: dict[str, Any] = field(default_factory=dict) + resources: dict[str, Any] = field(default_factory=dict) + timeouts: dict[str, Any] = field(default_factory=dict) + retry_policies: dict[str, Any] = field(default_factory=dict) + escalation_rules: dict[str, Any] = field(default_factory=dict) + observability: dict[str, Any] = field(default_factory=dict) + responsibilities: dict[str, Any] = field(default_factory=dict) + extensions: dict[str, Any] = field(default_factory=dict) + source_path: str | None = None + + @property + def id(self) -> str: + return str(self.metadata.get("id") or self.metadata.get("name") or "workflow") + + def to_dict(self) -> dict[str, Any]: + data = asdict(self) + return {key: value for key, value in data.items() if value not in (None, [], {})} + + +@dataclass(frozen=True) +class WorkflowOutputRecord: + """One output considered or written by a workflow run.""" + + id: str + path: str | None + content: str + written: bool = False + artifact: str | None = None + + def to_dict(self) -> dict[str, Any]: + return {key: value for key, value in asdict(self).items() if value not in (None, "")} + + +@dataclass(frozen=True) +class WorkflowRunResult: + """Result envelope for workflow inspect/plan/run operations.""" + + workflow_id: str + plan_path: str | None + dry_run: bool + sources: dict[str, Any] = field(default_factory=dict) + steps: dict[str, Any] = field(default_factory=dict) + outputs: list[WorkflowOutputRecord] = field(default_factory=list) + diagnostics: list[Diagnostic] = field(default_factory=list) + provenance: list[ProcessingProvenance] = field(default_factory=list) + trace: list[ProcessingTrace] = field(default_factory=list) + + @property + def valid(self) -> bool: + return not has_error(self.diagnostics) + + def to_dict(self) -> dict[str, Any]: + data = { + "workflow_id": self.workflow_id, + "plan_path": self.plan_path, + "dry_run": self.dry_run, + "valid": self.valid, + "sources": self.sources, + "steps": self.steps, + "outputs": [output.to_dict() for output in self.outputs], + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + "provenance": [event.to_dict() for event in self.provenance], + "trace": [event.to_dict() for event in self.trace], + } + return {key: value for key, value in data.items() if value not in (None, [], {})} + + +class WorkflowRunner: + """Execute deterministic Markitect workflows.""" + + def __init__( + self, + plan: WorkflowPlan, + *, + base_dir: str | Path | None = None, + output_dir: str | Path | None = None, + assisted_hook: GenerationHook | None = None, + ) -> None: + self.plan = plan + self.base_dir = Path(base_dir or Path(plan.source_path or ".").parent).resolve() + self.output_dir = Path(output_dir).resolve() if output_dir else self.base_dir + self.assisted_hook = assisted_hook + + def inspect(self) -> WorkflowRunResult: + diagnostics = validate_workflow_plan(self.plan) + return WorkflowRunResult( + workflow_id=self.plan.id, + plan_path=self.plan.source_path, + dry_run=True, + diagnostics=diagnostics, + trace=[ProcessingTrace(event="workflow.inspected", metadata={"id": self.plan.id})], + ) + + def run(self, *, dry_run: bool = False) -> WorkflowRunResult: + diagnostics = validate_workflow_plan(self.plan) + trace = [ProcessingTrace(event="workflow.started", metadata={"id": self.plan.id})] + provenance: list[ProcessingProvenance] = [] + sources: dict[str, Any] = {} + steps: dict[str, Any] = {} + outputs: list[WorkflowOutputRecord] = [] + + if has_error(diagnostics): + return WorkflowRunResult( + workflow_id=self.plan.id, + plan_path=self.plan.source_path, + dry_run=dry_run, + diagnostics=diagnostics, + trace=trace, + ) + + context = _base_context(self.plan, sources, steps) + for input_id, spec in self.plan.inputs.items(): + try: + sources[input_id] = self._collect_input(input_id, spec) + trace.append(ProcessingTrace(event="workflow.input.collected", metadata={"id": input_id})) + except Exception as exc: + diagnostics.append(_diagnostic("workflow.input_failed", str(exc), details={"input": input_id})) + if has_error(diagnostics): + return WorkflowRunResult(self.plan.id, self.plan.source_path, dry_run, sources, steps, outputs, diagnostics, provenance, trace) + + context = _base_context(self.plan, sources, steps) + for step in _ordered_steps(self.plan.steps, diagnostics): + step_id = str(step["id"]) + try: + resolved_step = resolve_workflow_bindings(step, context) + step_result = self._run_step(resolved_step) + steps[step_id] = step_result + context = _base_context(self.plan, sources, steps) + trace.append(ProcessingTrace(event="workflow.step.completed", metadata={"id": step_id, "kind": step.get("kind")})) + provenance.append( + ProcessingProvenance( + operation=f"workflow.step.{step.get('kind', 'unknown')}", + source_path=self.plan.source_path, + metadata={"step_id": step_id}, + ) + ) + except Exception as exc: + optional = bool(step.get("optional", False)) + diagnostics.append( + _diagnostic( + "workflow.step_failed", + str(exc), + severity="warning" if optional else "error", + details={"step": step_id, "kind": step.get("kind"), "optional": optional}, + ) + ) + if not optional: + break + if has_error(diagnostics): + return WorkflowRunResult(self.plan.id, self.plan.source_path, dry_run, sources, steps, outputs, diagnostics, provenance, trace) + + context = _base_context(self.plan, sources, steps) + for output_id, spec in self.plan.outputs.items(): + try: + output = self._render_output(output_id, spec, context, dry_run=dry_run) + outputs.append(output) + trace.append(ProcessingTrace(event="workflow.output.ready", metadata={"id": output_id, "written": output.written})) + provenance.append( + ProcessingProvenance( + operation="workflow.output", + source_path=self.plan.source_path, + dependencies=[output.path] if output.path else [], + metadata={"output_id": output_id, "written": output.written}, + ) + ) + except Exception as exc: + diagnostics.append(_diagnostic("workflow.output_failed", str(exc), details={"output": output_id})) + + trace.append(ProcessingTrace(event="workflow.completed", metadata={"id": self.plan.id, "valid": not has_error(diagnostics)})) + return WorkflowRunResult( + workflow_id=self.plan.id, + plan_path=self.plan.source_path, + dry_run=dry_run, + sources=sources, + steps=steps, + outputs=outputs, + diagnostics=diagnostics, + provenance=provenance, + trace=trace, + ) + + def _collect_input(self, input_id: str, spec: dict[str, Any]) -> Any: + if "value" in spec: + return {"id": input_id, "kind": "value", "value": spec["value"]} + + paths = _input_paths(spec, self.base_dir) + selector = spec.get("selector") + extract_specs = _extract_specs(spec.get("extract")) + include_metrics = bool(spec.get("metrics", True)) + include_frontmatter = bool(spec.get("frontmatter", True)) + where = dict(spec.get("where") or {}) + items: list[dict[str, Any]] = [] + aggregate_extracts: dict[str, list[Any]] = {name: [] for name in extract_specs} + aggregate_matches: list[dict[str, Any]] = [] + + for path in paths: + document = parse_markdown_file(path) + if where and not _matches_where(document.to_dict(), where): + continue + item: dict[str, Any] = { + "path": _relative(path, self.base_dir), + "markdown": path.read_text(encoding="utf-8"), + "document": document.to_dict(), + } + if include_frontmatter: + item["frontmatter"] = document.frontmatter + if include_metrics: + item["metrics"] = collect_metrics(document).to_dict() + if selector: + matches = [match.to_dict() for match in query_document_with_engine(document, str(selector), engine=str(spec.get("engine", "selector")))] + item["matches"] = matches + aggregate_matches.extend(matches) + if extract_specs: + item_extracts: dict[str, list[str]] = {} + for name, extract_spec in extract_specs.items(): + selected = extract_document_with_engine( + document, + str(extract_spec["selector"]), + engine=str(extract_spec.get("engine", spec.get("engine", "selector"))), + ) + item_extracts[name] = selected + aggregate_extracts[name].extend(selected) + item["extracts"] = item_extracts + items.append(item) + + return { + "id": input_id, + "kind": "markdown_collection", + "count": len(items), + "items": items, + "paths": [item["path"] for item in items], + "extracts": aggregate_extracts, + "matches": aggregate_matches, + } + + def _run_step(self, step: dict[str, Any]) -> dict[str, Any]: + kind = str(step.get("kind", "")).strip() + if not kind: + raise WorkflowError("Workflow step requires `kind`") + if kind == "shape": + return {"kind": kind, "data": step.get("data", step.get("value", {}))} + if kind == "extract": + return self._step_extract(step) + if kind == "query": + return self._step_query(step) + if kind == "template": + return self._step_template(step) + if kind == "compose": + return self._step_compose(step) + if kind == "transform": + return self._step_transform(step) + if kind == "include": + return self._step_include(step) + if kind == "contract_stub": + return self._step_contract_stub(step) + if kind == "contract_check": + return self._step_contract_check(step) + if kind == "assisted": + return self._step_assisted(step) + raise WorkflowError(f"Unsupported workflow step kind `{kind}`") + + def _step_extract(self, step: dict[str, Any]) -> dict[str, Any]: + selector = str(step.get("selector", "")) + if not selector: + raise WorkflowError("extract step requires `selector`") + values = _query_like_step(step, selector, query=False) + return {"kind": "extract", "items": values, "count": len(values), "text": "\n\n".join(values)} + + def _step_query(self, step: dict[str, Any]) -> dict[str, Any]: + selector = str(step.get("selector", "")) + if not selector: + raise WorkflowError("query step requires `selector`") + matches = _query_like_step(step, selector, query=True) + return {"kind": "query", "matches": matches, "count": len(matches)} + + def _step_template(self, step: dict[str, Any]) -> dict[str, Any]: + template_text = _template_text(step, self.base_dir) + try: + rendered = render_template(template_text, dict(step.get("data") or {}), strict=bool(step.get("strict", True))) + except MissingTemplateVariable as exc: + raise WorkflowError(str(exc)) from exc + except TemplateError as exc: + raise WorkflowError(str(exc)) from exc + return rendered.to_dict() | {"kind": "template"} + + def _step_compose(self, step: dict[str, Any]) -> dict[str, Any]: + if step.get("files"): + result = compose_files( + [_safe_input_path(self.base_dir, value) for value in step["files"]], + title=step.get("title"), + heading_delta=int(step.get("heading_delta", 0)), + include_frontmatter=bool(step.get("include_frontmatter", False)), + separator=str(step.get("separator", "\n\n---\n\n")), + ) + return result.to_dict() | {"kind": "compose"} + items = step.get("items", step.get("input", [])) + if not isinstance(items, list): + items = [items] + separator = str(step.get("separator", "\n\n---\n\n")) + parts = [str(item).strip() for item in items if item is not None and str(item).strip()] + title = step.get("title") + if title: + parts.insert(0, f"# {str(title).strip()}") + return {"kind": "compose", "markdown": separator.join(parts).strip() + "\n", "sources": []} + + def _step_transform(self, step: dict[str, Any]) -> dict[str, Any]: + markdown = _markdown_input(step, self.base_dir) + result = transform_markdown( + markdown, + strip_frontmatter=bool(step.get("strip_frontmatter", False)), + set_frontmatter=dict(step.get("set_frontmatter") or {}), + heading_delta=int(step.get("heading_delta", 0)), + extract_selector=step.get("extract_selector"), + source_path=step.get("source_path"), + ) + return result.to_dict() | {"kind": "transform"} + + def _step_include(self, step: dict[str, Any]) -> dict[str, Any]: + markdown = _markdown_input(step, self.base_dir) + result = resolve_includes( + markdown, + base_dir=_safe_dir(self.base_dir, step.get("base_dir", ".")), + current_path=step.get("current_path"), + max_depth=int(step.get("max_depth", 10)), + ) + return result.to_dict() | {"kind": "include"} + + def _step_contract_stub(self, step: dict[str, Any]) -> dict[str, Any]: + contract = load_contract_file(_safe_input_path(self.base_dir, step.get("contract"))) + generated = generate_stub_from_contract( + contract, + data=dict(step.get("data") or {}), + include_optional=bool(step.get("include_optional", False)), + ) + return generated.to_dict() | {"kind": "contract_stub"} + + def _step_contract_check(self, step: dict[str, Any]) -> dict[str, Any]: + document_path = _safe_input_path(self.base_dir, step.get("document")) + contract_path = _safe_input_path(self.base_dir, step.get("contract")) + result = check_markdown_file(document_path, contract_path) + return result.to_dict() | {"kind": "contract_check"} + + def _step_assisted(self, step: dict[str, Any]) -> dict[str, Any]: + optional = bool(step.get("optional", True)) + if self.assisted_hook is None: + diagnostic = _diagnostic( + "workflow.assisted_unavailable", + "Assisted workflow step has no generation hook adapter.", + severity="warning" if optional else "error", + details={"step": step.get("id"), "optional": optional}, + ) + if optional: + return {"kind": "assisted", "skipped": True, "diagnostics": [diagnostic.to_dict()]} + raise WorkflowError(diagnostic.message) + prompt = str(step.get("prompt_text") or "") + if step.get("prompt"): + prompt = _safe_input_path(self.base_dir, step["prompt"]).read_text(encoding="utf-8") + request = GenerationHookRequest( + prompt=prompt, + data=dict(step.get("data") or {}), + template=step.get("template"), + contract_id=step.get("contract_id"), + metadata={"workflow_id": self.plan.id, "step_id": step.get("id")}, + ) + generated = self.assisted_hook.generate(request) + return generated.to_dict() | {"kind": "assisted"} + + def _render_output( + self, + output_id: str, + spec: dict[str, Any], + context: dict[str, Any], + *, + dry_run: bool, + ) -> WorkflowOutputRecord: + resolved = resolve_workflow_bindings(spec, context) + if "template" in resolved: + template_text = _safe_input_path(self.base_dir, resolved["template"]).read_text(encoding="utf-8") + content = render_template(template_text, dict(resolved.get("data") or {}), strict=bool(resolved.get("strict", True))).markdown + else: + content = _format_output_value(resolved.get("content", resolved.get("value", ""))) + output_path: Path | None = None + written = False + if resolved.get("path"): + output_path = _safe_output_path(self.output_dir, resolved["path"]) + if not dry_run: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(content, encoding="utf-8") + written = True + return WorkflowOutputRecord( + id=output_id, + path=str(output_path) if output_path else None, + content=content, + written=written, + artifact=resolved.get("artifact"), + ) + + +def load_workflow_file(path: str | Path) -> WorkflowPlan: + """Load a YAML or Markdown-fenced workflow definition.""" + + file_path = Path(path) + text = file_path.read_text(encoding="utf-8") + data = _load_workflow_mapping(text, file_path) + return _workflow_from_mapping(data, source_path=str(file_path)) + + +def validate_workflow_plan(plan: WorkflowPlan) -> list[Diagnostic]: + diagnostics: list[Diagnostic] = [] + if not plan.inputs and not plan.steps: + diagnostics.append(_diagnostic("workflow.empty", "Workflow requires at least inputs or steps.")) + seen_steps: set[str] = set() + for step in plan.steps: + step_id = str(step.get("id", "")).strip() + if not step_id: + diagnostics.append(_diagnostic("workflow.step_missing_id", "Workflow step requires `id`.")) + continue + if step_id in seen_steps: + diagnostics.append(_diagnostic("workflow.step_duplicate_id", f"Duplicate workflow step id `{step_id}`.")) + seen_steps.add(step_id) + if not step.get("kind"): + diagnostics.append(_diagnostic("workflow.step_missing_kind", f"Workflow step `{step_id}` requires `kind`.")) + return diagnostics + + +def resolve_workflow_bindings(value: Any, context: dict[str, Any]) -> Any: + """Resolve `${...}` expressions recursively.""" + + if isinstance(value, dict): + return {key: resolve_workflow_bindings(item, context) for key, item in value.items()} + if isinstance(value, list): + return [resolve_workflow_bindings(item, context) for item in value] + if not isinstance(value, str): + return value + matches = list(_EXPRESSION_RE.finditer(value)) + if not matches: + return value + if len(matches) == 1 and matches[0].span() == (0, len(value)): + return _resolve_path(context, matches[0].group("path").strip()) + + def replace(match: re.Match[str]) -> str: + return _format_output_value(_resolve_path(context, match.group("path").strip())) + + return _EXPRESSION_RE.sub(replace, value) + + +def _load_workflow_mapping(text: str, path: Path) -> dict[str, Any]: + if path.suffix.lower() in {".yaml", ".yml"}: + data = yaml.safe_load(text) or {} + else: + data = _extract_markdown_workflow_block(text) + if not isinstance(data, dict): + raise WorkflowError("Workflow definition must be a mapping") + return data + + +def _extract_markdown_workflow_block(text: str) -> dict[str, Any]: + fence_re = re.compile(r"```(?P[^\n`]*)\n(?P.*?)\n```", re.DOTALL) + for match in fence_re.finditer(text): + info = set(match.group("info").strip().lower().split()) + if info & WORKFLOW_FENCE_TAGS: + data = yaml.safe_load(match.group("body")) or {} + if not isinstance(data, dict): + raise WorkflowError("Workflow fenced block must contain a YAML mapping") + return data + raise WorkflowError("No fenced workflow YAML block found") + + +def _workflow_from_mapping(data: dict[str, Any], *, source_path: str | None) -> WorkflowPlan: + metadata = _mapping(data.get("metadata")) + inputs = _mapping(data.get("inputs")) + outputs = _normalize_outputs(data.get("outputs")) + steps = _normalize_steps(data.get("steps")) + known = {key: data.get(key) for key in KNOWN_TOP_LEVEL} + extensions = {key: value for key, value in data.items() if key not in KNOWN_TOP_LEVEL} + return WorkflowPlan( + metadata=metadata, + intent=_intent(data.get("intent")), + inputs=inputs, + steps=steps, + outputs=outputs, + dependencies=list(data.get("dependencies") or []), + conditions=_mapping(data.get("conditions")), + artifacts=_mapping(data.get("artifacts")), + permissions=_mapping(data.get("permissions")), + resources=_mapping(data.get("resources")), + timeouts=_mapping(data.get("timeouts")), + retry_policies=_mapping(data.get("retry_policies")), + escalation_rules=_mapping(data.get("escalation_rules")), + observability=_mapping(data.get("observability")), + responsibilities=_mapping(data.get("responsibilities")), + extensions=extensions, + source_path=source_path, + ) + + +def _normalize_steps(raw_steps: Any) -> list[dict[str, Any]]: + if raw_steps is None: + return [] + if isinstance(raw_steps, dict): + return [dict(spec or {}) | {"id": step_id} for step_id, spec in raw_steps.items()] + if isinstance(raw_steps, list): + steps: list[dict[str, Any]] = [] + for item in raw_steps: + if not isinstance(item, dict): + raise WorkflowError("Workflow steps list must contain mappings") + steps.append(dict(item)) + return steps + raise WorkflowError("Workflow `steps` must be a mapping or list") + + +def _normalize_outputs(raw_outputs: Any) -> dict[str, dict[str, Any]]: + if raw_outputs is None: + return {} + if isinstance(raw_outputs, dict): + return {str(output_id): dict(spec or {}) for output_id, spec in raw_outputs.items()} + if isinstance(raw_outputs, list): + outputs: dict[str, dict[str, Any]] = {} + for item in raw_outputs: + if not isinstance(item, dict) or not item.get("id"): + raise WorkflowError("Workflow output list entries require `id`") + outputs[str(item["id"])] = dict(item) + return outputs + raise WorkflowError("Workflow `outputs` must be a mapping or list") + + +def _input_paths(spec: dict[str, Any], base_dir: Path) -> list[Path]: + paths: list[Path] = [] + if spec.get("file") or spec.get("path"): + paths.append(_safe_input_path(base_dir, spec.get("file") or spec.get("path"))) + for raw_path in spec.get("files") or []: + paths.append(_safe_input_path(base_dir, raw_path)) + if spec.get("glob"): + pattern = str((base_dir / str(spec["glob"])).resolve()) + paths.extend(Path(path) for path in glob.glob(pattern, recursive=bool(spec.get("recursive", False)))) + if spec.get("directory"): + paths.extend(scan_markdown_files([_safe_dir(base_dir, spec["directory"])], recursive=bool(spec.get("recursive", True)))) + return sorted({path.resolve() for path in paths if path.exists() and path.is_file()}) + + +def _extract_specs(raw_extract: Any) -> dict[str, dict[str, Any]]: + if raw_extract is None: + return {} + if not isinstance(raw_extract, dict): + raise WorkflowError("Input `extract` must be a mapping") + specs: dict[str, dict[str, Any]] = {} + for name, spec in raw_extract.items(): + if isinstance(spec, str): + specs[str(name)] = {"selector": spec} + elif isinstance(spec, dict) and spec.get("selector"): + specs[str(name)] = dict(spec) + else: + raise WorkflowError(f"Input extract `{name}` requires a selector") + return specs + + +def _ordered_steps(steps: list[dict[str, Any]], diagnostics: list[Diagnostic]) -> list[dict[str, Any]]: + by_id = {str(step.get("id")): step for step in steps if step.get("id")} + ordered: list[dict[str, Any]] = [] + temporary: set[str] = set() + permanent: set[str] = set() + + def visit(step_id: str) -> None: + if step_id in permanent: + return + if step_id in temporary: + diagnostics.append(_diagnostic("workflow.dependency_cycle", f"Workflow dependency cycle includes `{step_id}`.")) + return + step = by_id.get(step_id) + if step is None: + diagnostics.append(_diagnostic("workflow.unknown_step_dependency", f"Unknown workflow step dependency `{step_id}`.")) + return + temporary.add(step_id) + for dep in _as_list(step.get("depends_on")): + visit(str(dep)) + temporary.remove(step_id) + permanent.add(step_id) + ordered.append(step) + + for step in steps: + if step.get("id"): + visit(str(step["id"])) + return ordered + + +def _query_like_step(step: dict[str, Any], selector: str, *, query: bool) -> list[Any]: + source = step.get("source", step.get("input")) + engine = str(step.get("engine", "selector")) + documents = _documents_from_source(source) + results: list[Any] = [] + for document in documents: + if query: + results.extend(match.to_dict() for match in query_document_with_engine(document, selector, engine=engine)) + else: + results.extend(extract_document_with_engine(document, selector, engine=engine)) + return results + + +def _documents_from_source(source: Any) -> list[Document]: + if isinstance(source, dict) and "items" in source: + return [Document.from_dict(item["document"]) for item in source["items"] if isinstance(item, dict) and "document" in item] + if isinstance(source, dict) and "document" in source: + return [Document.from_dict(source["document"])] + if isinstance(source, dict) and {"headings", "blocks", "sections"} <= set(source): + return [Document.from_dict(source)] + raise WorkflowError("query/extract step requires a source collection or document") + + +def _template_text(step: dict[str, Any], base_dir: Path) -> str: + if step.get("template_text") is not None: + return str(step["template_text"]) + if step.get("template"): + return _safe_input_path(base_dir, step["template"]).read_text(encoding="utf-8") + raise WorkflowError("template step requires `template` or `template_text`") + + +def _markdown_input(step: dict[str, Any], base_dir: Path) -> str: + if step.get("markdown") is not None: + return str(step["markdown"]) + if step.get("input") is not None: + return _format_output_value(step["input"]) + if step.get("file"): + return _safe_input_path(base_dir, step["file"]).read_text(encoding="utf-8") + raise WorkflowError("step requires `markdown`, `input`, or `file`") + + +def _base_context(plan: WorkflowPlan, sources: dict[str, Any], steps: dict[str, Any]) -> dict[str, Any]: + return { + "metadata": plan.metadata, + "intent": plan.intent, + "sources": sources, + "steps": steps, + "artifacts": plan.artifacts, + "workflow": plan.to_dict(), + } + + +def _resolve_path(context: dict[str, Any], path: str) -> Any: + current: Any = context + for part in path.split("."): + if isinstance(current, dict) and part in current: + current = current[part] + elif isinstance(current, list): + if part.isdigit(): + current = current[int(part)] + else: + current = [ + item[part] + for item in current + if isinstance(item, dict) and part in item + ] + else: + raise WorkflowError(f"Cannot resolve workflow binding `${{{path}}}`") + return current + + +def _matches_where(document: dict[str, Any], where: dict[str, Any]) -> bool: + context = {"frontmatter": document.get("frontmatter", {}), "document": document} + for path, expected in where.items(): + try: + value = _resolve_path(context, str(path)) + except WorkflowError: + return False + if value != expected: + return False + return True + + +def _safe_input_path(base_dir: Path, raw_path: Any) -> Path: + if not raw_path: + raise WorkflowError("Expected path") + path = (base_dir / str(raw_path)).resolve() + if not _is_within(path, base_dir): + raise WorkflowError(f"Path escapes workflow directory: {raw_path}") + if not path.exists(): + raise WorkflowError(f"Path does not exist: {raw_path}") + return path + + +def _safe_dir(base_dir: Path, raw_path: Any) -> Path: + path = _safe_input_path(base_dir, raw_path) + if not path.is_dir(): + raise WorkflowError(f"Expected directory: {raw_path}") + return path + + +def _safe_output_path(output_dir: Path, raw_path: Any) -> Path: + path = (output_dir / str(raw_path)).resolve() + if not _is_within(path, output_dir): + raise WorkflowError(f"Output path escapes output directory: {raw_path}") + return path + + +def _is_within(path: Path, root: Path) -> bool: + try: + path.relative_to(root) + return True + except ValueError: + return False + + +def _relative(path: Path, root: Path) -> str: + try: + return path.resolve().relative_to(root).as_posix() + except ValueError: + return str(path) + + +def _mapping(value: Any) -> dict[str, Any]: + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + raise WorkflowError("Expected mapping") + + +def _intent(value: Any) -> dict[str, Any]: + if value is None: + return {} + if isinstance(value, str): + return {"summary": value} + if isinstance(value, dict): + return dict(value) + raise WorkflowError("Workflow `intent` must be a string or mapping") + + +def _as_list(value: Any) -> list[Any]: + if value is None: + return [] + if isinstance(value, list): + return value + return [value] + + +def _format_output_value(value: Any) -> str: + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, list): + return "\n".join(f"- {_format_output_value(item)}" for item in value) + if isinstance(value, dict): + return yaml.safe_dump(value, sort_keys=False).strip() + return str(value) + + +def _diagnostic( + code: str, + message: str, + *, + severity: str = "error", + details: dict[str, Any] | None = None, +) -> Diagnostic: + return Diagnostic( + severity=severity, + code=code, + message=message, + source=SourceLocation(), + details=details or {}, + ) diff --git a/tests/test_builtin_extension_catalog.py b/tests/test_builtin_extension_catalog.py index d9e2185..ca7a037 100644 --- a/tests/test_builtin_extension_catalog.py +++ b/tests/test_builtin_extension_catalog.py @@ -13,6 +13,7 @@ def test_builtin_extension_registry_lists_query_processors_and_backend(): assert "processor.uppercase" in ids assert "processor.include" in ids assert "backend.local-sqlite" in ids + assert "workflow.markdown-dataflow" in ids def test_builtin_processor_descriptors_capture_safety_and_provenance(): @@ -48,3 +49,18 @@ def test_builtin_local_sqlite_descriptor_exposes_backend_capabilities(): "mkt cache query", "mkt search", ] + + +def test_builtin_workflow_descriptor_exposes_cli_and_safety(): + registry = builtin_extension_registry() + + descriptor = registry.get("workflow.markdown-dataflow") + + assert descriptor.kind == "workflow-engine" + assert descriptor.safety["writes_output_files"] is True + assert descriptor.safety["assisted_generation"] == "adapter-only" + assert descriptor.cli["commands"] == [ + "mkt workflow inspect", + "mkt workflow plan", + "mkt workflow run", + ] diff --git a/tests/test_workflow_engine.py b/tests/test_workflow_engine.py new file mode 100644 index 0000000..2861a0e --- /dev/null +++ b/tests/test_workflow_engine.py @@ -0,0 +1,228 @@ +from pathlib import Path + +from click.testing import CliRunner + +from markitect_tool.cli import main +from markitect_tool.generation import GenerationHookRequest, GenerationHookResult +from markitect_tool.workflow import ( + WorkflowRunner, + load_workflow_file, + resolve_workflow_bindings, +) + + +WORKFLOW = """# ADR Release Workflow + +```yaml workflow +metadata: + id: adr-release + title: ADR Release Notes +intent: + summary: Collect accepted ADR decisions. +permissions: + filesystem: + read: [adrs, templates] + write: [out] +responsibilities: + agent: + may_run_deterministic_steps: true +inputs: + adrs: + glob: adrs/*.md + where: + frontmatter.status: accepted + extract: + decisions: + selector: sections[heading=Decision] +steps: + render: + kind: template + template: templates/release.md + data: + decisions: ${sources.adrs.extracts.decisions} +outputs: + release_notes: + path: out/release-notes.md + content: ${steps.render.markdown} +observability: + events: [workflow.started, workflow.completed] +``` +""" + + +def _write_workflow_fixture(tmp_path: Path) -> Path: + (tmp_path / "adrs").mkdir() + (tmp_path / "templates").mkdir() + (tmp_path / "adrs" / "one.md").write_text( + "---\nstatus: accepted\n---\n# One\n\n## Decision\n\nUse the workflow engine.\n", + encoding="utf-8", + ) + (tmp_path / "adrs" / "two.md").write_text( + "---\nstatus: proposed\n---\n# Two\n\n## Decision\n\nIgnore this one.\n", + encoding="utf-8", + ) + (tmp_path / "templates" / "release.md").write_text( + "# Release Notes\n\n{{decisions}}\n", + encoding="utf-8", + ) + workflow = tmp_path / "workflow.md" + workflow.write_text(WORKFLOW, encoding="utf-8") + return workflow + + +def test_load_workflow_file_preserves_standard_sections(tmp_path: Path): + workflow = _write_workflow_fixture(tmp_path) + + plan = load_workflow_file(workflow) + + assert plan.id == "adr-release" + assert plan.intent["summary"] == "Collect accepted ADR decisions." + assert plan.permissions["filesystem"]["read"] == ["adrs", "templates"] + assert plan.responsibilities["agent"]["may_run_deterministic_steps"] is True + assert plan.observability["events"] == ["workflow.started", "workflow.completed"] + assert plan.steps[0]["id"] == "render" + + +def test_workflow_runner_collects_sources_and_renders_output(tmp_path: Path): + workflow = _write_workflow_fixture(tmp_path) + plan = load_workflow_file(workflow) + + result = WorkflowRunner(plan).run(dry_run=True) + + assert result.valid + assert result.sources["adrs"]["count"] == 1 + assert "Use the workflow engine." in result.sources["adrs"]["extracts"]["decisions"][0] + assert result.steps["render"]["complete"] + assert result.outputs[0].path.endswith("out/release-notes.md") + assert not result.outputs[0].written + assert "Ignore this one" not in result.outputs[0].content + + +def test_workflow_runner_writes_outputs_under_output_dir(tmp_path: Path): + workflow = _write_workflow_fixture(tmp_path) + plan = load_workflow_file(workflow) + output_dir = tmp_path / "build" + + result = WorkflowRunner(plan, output_dir=output_dir).run() + + assert result.valid + output = output_dir / "out" / "release-notes.md" + assert output.exists() + assert "Use the workflow engine" in output.read_text(encoding="utf-8") + assert result.outputs[0].written + + +def test_resolve_workflow_bindings_preserves_native_types_and_projects_lists(): + context = { + "sources": { + "docs": { + "items": [ + {"path": "a.md", "frontmatter": {"status": "accepted"}}, + {"path": "b.md", "frontmatter": {"status": "proposed"}}, + ], + } + } + } + + value = resolve_workflow_bindings( + { + "paths": "${sources.docs.items.path}", + "sentence": "Files: ${sources.docs.items.path}", + }, + context, + ) + + assert value["paths"] == ["a.md", "b.md"] + assert value["sentence"] == "Files: - a.md\n- b.md" + + +def test_optional_assisted_step_skips_without_hook(tmp_path: Path): + workflow = tmp_path / "workflow.yaml" + workflow.write_text( + """ +metadata: + id: assisted-demo +steps: + review: + kind: assisted + optional: true + prompt_text: Review this. +outputs: + review: + content: ${steps.review.skipped} +""", + encoding="utf-8", + ) + + result = WorkflowRunner(load_workflow_file(workflow)).run(dry_run=True) + + assert result.valid + assert result.steps["review"]["skipped"] is True + assert result.steps["review"]["diagnostics"][0]["code"] == "workflow.assisted_unavailable" + + +class _FakeHook: + def generate(self, request: GenerationHookRequest) -> GenerationHookResult: + return GenerationHookResult( + markdown=f"Reviewed: {request.prompt}", + provider="fake", + ) + + +def test_assisted_step_uses_injected_hook(tmp_path: Path): + workflow = tmp_path / "workflow.yaml" + workflow.write_text( + """ +metadata: + id: assisted-demo +steps: + review: + kind: assisted + optional: false + prompt_text: Review this. +outputs: + review: + content: ${steps.review.markdown} +""", + encoding="utf-8", + ) + + result = WorkflowRunner(load_workflow_file(workflow), assisted_hook=_FakeHook()).run(dry_run=True) + + assert result.valid + assert result.steps["review"]["provider"] == "fake" + assert result.outputs[0].content == "Reviewed: Review this." + + +def test_mkt_workflow_inspect_plan_and_run(tmp_path: Path): + workflow = _write_workflow_fixture(tmp_path) + runner = CliRunner() + + inspected = runner.invoke(main, ["workflow", "inspect", str(workflow)]) + planned = runner.invoke(main, ["workflow", "plan", str(workflow)]) + run = runner.invoke(main, ["workflow", "run", str(workflow), "--output-dir", str(tmp_path / "build")]) + + assert inspected.exit_code == 0 + assert "valid" in inspected.output + assert planned.exit_code == 0 + assert "outputs: 1" in planned.output + assert run.exit_code == 0 + assert "written" in run.output + + +def test_example_adr_release_notes_workflow_runs(tmp_path: Path): + workflow = Path("examples/workflows/adr-release-notes.workflow.md") + result = WorkflowRunner(load_workflow_file(workflow), output_dir=tmp_path).run() + + assert result.valid + assert result.sources["adrs"]["count"] == 2 + assert (tmp_path / "out" / "release-notes.md").exists() + assert "local SQLite backend" in (tmp_path / "out" / "release-notes.md").read_text(encoding="utf-8") + + +def test_example_assisted_review_workflow_is_deterministic_without_hook(tmp_path: Path): + workflow = Path("examples/workflows/assisted-review.workflow.md") + result = WorkflowRunner(load_workflow_file(workflow), output_dir=tmp_path).run(dry_run=True) + + assert result.valid + assert result.steps["review"]["skipped"] is True diff --git a/workplans/MKTT-WP-0011-markdown-dataflow-pipeline-workflows.md b/workplans/MKTT-WP-0011-markdown-dataflow-pipeline-workflows.md index 7cf8d96..53f0a71 100644 --- a/workplans/MKTT-WP-0011-markdown-dataflow-pipeline-workflows.md +++ b/workplans/MKTT-WP-0011-markdown-dataflow-pipeline-workflows.md @@ -3,7 +3,7 @@ id: MKTT-WP-0011 type: workplan title: "Markdown Dataflow Pipeline Workflows" domain: markitect -status: todo +status: done owner: markitect-tool topic_slug: markitect planning_priority: P2 @@ -51,7 +51,7 @@ See `docs/markdown-dataflow-workflow-assessment.md`. ```task id: MKTT-WP-0011-T001 -status: todo +status: done priority: high state_hub_task_id: "c335cbaa-dfb9-4df5-b1ae-87aaf6097bd8" ``` @@ -61,11 +61,18 @@ steps, outputs, variables, dry-run behavior, diagnostics, and provenance. Output: workflow schema, examples, and validation diagnostics. +Implemented: `docs/workflow-definition-standard.md` defines the workflow +standard with metadata, intent, inputs, outputs, steps, dependencies, +conditions, artifacts, permissions, resource requirements, timeouts, retry +policies, escalation rules, observability events, and human/agent/system +responsibility boundaries. `WorkflowPlan` preserves all standard sections and +unknown extension fields. + ## P11.2 - Implement Markdown source collectors ```task id: MKTT-WP-0011-T002 -status: todo +status: done priority: high state_hub_task_id: "16a89801-d96d-437f-a883-81d09586f47a" ``` @@ -75,11 +82,15 @@ selectors, sections, blocks, metrics, and future reference/index backends. Output: source collector API, selector integration, and tests. +Implemented: workflow inputs collect `file`, `path`, `files`, `glob`, and +`directory` Markdown sources, support frontmatter filters, metrics, +frontmatter, selectors, and named extractions. + ## P11.3 - Implement deterministic step registry ```task id: MKTT-WP-0011-T003 -status: todo +status: done priority: high state_hub_task_id: "808bed93-c7e2-4b34-90f4-f6f961fef503" ``` @@ -89,11 +100,15 @@ render, contract stub generation, contract checks, and data shaping. Output: deterministic workflow runner with dependency ordering. +Implemented: deterministic workflow runner supports dependency ordering and +step kinds `shape`, `extract`, `query`, `template`, `compose`, `transform`, +`include`, `contract_stub`, `contract_check`, and `assisted` boundary. + ## P11.4 - Implement data expression and binding model ```task id: MKTT-WP-0011-T004 -status: todo +status: done priority: high state_hub_task_id: "ea1ad9d2-3668-4b65-afb4-f490e5bfd0c6" ``` @@ -103,11 +118,15 @@ for example `${sources.adrs.decisions}` or `${steps.summary.markdown}`. Output: expression resolver, type checks, and missing-reference diagnostics. +Implemented: `${...}` bindings preserve native types for full-expression +values, interpolate text inside longer strings, support dictionary paths, +numeric list indexes, and list projection over dictionaries. + ## P11.5 - Add optional assisted processing step boundary ```task id: MKTT-WP-0011-T005 -status: todo +status: done priority: medium state_hub_task_id: "ed1adc60-fdd8-4d4c-b4d7-7ce906e641c6" ``` @@ -118,11 +137,15 @@ dry-run, optional steps, and policy gates before sending data to a provider. Output: hook adapter interface and tests with fake providers. +Implemented: assisted steps use the provider-neutral generation hook boundary. +Without a hook, optional assisted steps are skipped with warning diagnostics and +required assisted steps fail. Tests include an injected fake hook. + ## P11.6 - Implement multi-output sinks ```task id: MKTT-WP-0011-T006 -status: todo +status: done priority: high state_hub_task_id: "902707d7-46fe-45d6-a9ec-b85763065ff9" ``` @@ -133,11 +156,15 @@ their source data. Output: output sink API, path-safety checks, and provenance manifests. +Implemented: outputs can render content or templates, write multiple Markdown +files under a safe output root, support dry-run planning, and emit output +records plus provenance/trace events. + ## P11.7 - Add workflow CLI ```task id: MKTT-WP-0011-T007 -status: todo +status: done priority: high state_hub_task_id: "ccc26867-5724-4205-b3fe-a8b9d046775d" ``` @@ -152,11 +179,19 @@ mkt workflow run Include JSON/YAML outputs for agent use. +Implemented: + +- `mkt workflow inspect ` +- `mkt workflow plan ` +- `mkt workflow run ` + +All commands support text, JSON, and YAML output. + ## P11.8 - Add representative end-to-end examples ```task id: MKTT-WP-0011-T008 -status: todo +status: done priority: high state_hub_task_id: "f8501ea6-1ead-477d-8f64-c196e7edfe68" ``` @@ -168,6 +203,9 @@ Create examples covering: - source snippets -> docs - deterministic summary -> optional assisted review -> final Markdown +Implemented: examples under `examples/workflows/` cover ADR release notes, +source snippet extraction, and an optional assisted review boundary. + ## Exit Criteria - A non-programmer can write a Markdown/YAML workflow that extracts data from