"""Workflow definition loading and deterministic execution.""" from __future__ import annotations import glob import re from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any import yaml from markitect_tool.cache import scan_markdown_files from markitect_tool.contract import check_markdown_file, collect_metrics, load_contract_file from markitect_tool.core import Document, parse_markdown_file from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error from markitect_tool.extension import ProcessingProvenance, ProcessingTrace from markitect_tool.generation import ( GenerationHook, GenerationHookRequest, generate_stub_from_contract, ) from markitect_tool.ops import compose_files, resolve_includes, transform_markdown from markitect_tool.query import ( extract_document_with_engine, query_document_with_engine, ) from markitect_tool.template import MissingTemplateVariable, TemplateError, render_template WORKFLOW_FENCE_TAGS = {"workflow", "markitect-workflow", "mkt-workflow"} KNOWN_TOP_LEVEL = { "metadata", "intent", "inputs", "outputs", "steps", "dependencies", "conditions", "artifacts", "permissions", "resources", "timeouts", "retry_policies", "escalation_rules", "observability", "responsibilities", } _EXPRESSION_RE = re.compile(r"\$\{(?P[^}]+)\}") class WorkflowError(ValueError): """Raised when a workflow definition cannot be loaded or executed.""" @dataclass(frozen=True) class WorkflowPlan: """Loaded declarative workflow definition.""" metadata: dict[str, Any] = field(default_factory=dict) intent: dict[str, Any] = field(default_factory=dict) inputs: dict[str, dict[str, Any]] = field(default_factory=dict) steps: list[dict[str, Any]] = field(default_factory=list) outputs: dict[str, dict[str, Any]] = field(default_factory=dict) dependencies: list[Any] = field(default_factory=list) conditions: dict[str, Any] = field(default_factory=dict) artifacts: dict[str, Any] = field(default_factory=dict) permissions: dict[str, Any] = field(default_factory=dict) resources: dict[str, Any] = field(default_factory=dict) timeouts: dict[str, Any] = field(default_factory=dict) retry_policies: dict[str, Any] = field(default_factory=dict) escalation_rules: dict[str, Any] = field(default_factory=dict) observability: dict[str, Any] = field(default_factory=dict) responsibilities: dict[str, Any] = field(default_factory=dict) extensions: dict[str, Any] = field(default_factory=dict) source_path: str | None = None @property def id(self) -> str: return str(self.metadata.get("id") or self.metadata.get("name") or "workflow") def to_dict(self) -> dict[str, Any]: data = asdict(self) return {key: value for key, value in data.items() if value not in (None, [], {})} @dataclass(frozen=True) class WorkflowOutputRecord: """One output considered or written by a workflow run.""" id: str path: str | None content: str written: bool = False artifact: str | None = None def to_dict(self) -> dict[str, Any]: return {key: value for key, value in asdict(self).items() if value not in (None, "")} @dataclass(frozen=True) class WorkflowRunResult: """Result envelope for workflow inspect/plan/run operations.""" workflow_id: str plan_path: str | None dry_run: bool sources: dict[str, Any] = field(default_factory=dict) steps: dict[str, Any] = field(default_factory=dict) outputs: list[WorkflowOutputRecord] = field(default_factory=list) diagnostics: list[Diagnostic] = field(default_factory=list) provenance: list[ProcessingProvenance] = field(default_factory=list) trace: list[ProcessingTrace] = field(default_factory=list) @property def valid(self) -> bool: return not has_error(self.diagnostics) def to_dict(self) -> dict[str, Any]: data = { "workflow_id": self.workflow_id, "plan_path": self.plan_path, "dry_run": self.dry_run, "valid": self.valid, "sources": self.sources, "steps": self.steps, "outputs": [output.to_dict() for output in self.outputs], "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], "provenance": [event.to_dict() for event in self.provenance], "trace": [event.to_dict() for event in self.trace], } return {key: value for key, value in data.items() if value not in (None, [], {})} class WorkflowRunner: """Execute deterministic Markitect workflows.""" def __init__( self, plan: WorkflowPlan, *, base_dir: str | Path | None = None, output_dir: str | Path | None = None, assisted_hook: GenerationHook | None = None, ) -> None: self.plan = plan self.base_dir = Path(base_dir or Path(plan.source_path or ".").parent).resolve() self.output_dir = Path(output_dir).resolve() if output_dir else self.base_dir self.assisted_hook = assisted_hook def inspect(self) -> WorkflowRunResult: diagnostics = validate_workflow_plan(self.plan) return WorkflowRunResult( workflow_id=self.plan.id, plan_path=self.plan.source_path, dry_run=True, diagnostics=diagnostics, trace=[ProcessingTrace(event="workflow.inspected", metadata={"id": self.plan.id})], ) def run(self, *, dry_run: bool = False) -> WorkflowRunResult: diagnostics = validate_workflow_plan(self.plan) trace = [ProcessingTrace(event="workflow.started", metadata={"id": self.plan.id})] provenance: list[ProcessingProvenance] = [] sources: dict[str, Any] = {} steps: dict[str, Any] = {} outputs: list[WorkflowOutputRecord] = [] if has_error(diagnostics): return WorkflowRunResult( workflow_id=self.plan.id, plan_path=self.plan.source_path, dry_run=dry_run, diagnostics=diagnostics, trace=trace, ) context = _base_context(self.plan, sources, steps) for input_id, spec in self.plan.inputs.items(): try: sources[input_id] = self._collect_input(input_id, spec) trace.append(ProcessingTrace(event="workflow.input.collected", metadata={"id": input_id})) except Exception as exc: diagnostics.append(_diagnostic("workflow.input_failed", str(exc), details={"input": input_id})) if has_error(diagnostics): return WorkflowRunResult(self.plan.id, self.plan.source_path, dry_run, sources, steps, outputs, diagnostics, provenance, trace) context = _base_context(self.plan, sources, steps) for step in _ordered_steps(self.plan.steps, diagnostics): step_id = str(step["id"]) try: resolved_step = resolve_workflow_bindings(step, context) step_result = self._run_step(resolved_step) steps[step_id] = step_result context = _base_context(self.plan, sources, steps) trace.append(ProcessingTrace(event="workflow.step.completed", metadata={"id": step_id, "kind": step.get("kind")})) provenance.append( ProcessingProvenance( operation=f"workflow.step.{step.get('kind', 'unknown')}", source_path=self.plan.source_path, metadata={"step_id": step_id}, ) ) except Exception as exc: optional = bool(step.get("optional", False)) diagnostics.append( _diagnostic( "workflow.step_failed", str(exc), severity="warning" if optional else "error", details={"step": step_id, "kind": step.get("kind"), "optional": optional}, ) ) if not optional: break if has_error(diagnostics): return WorkflowRunResult(self.plan.id, self.plan.source_path, dry_run, sources, steps, outputs, diagnostics, provenance, trace) context = _base_context(self.plan, sources, steps) for output_id, spec in self.plan.outputs.items(): try: output = self._render_output(output_id, spec, context, dry_run=dry_run) outputs.append(output) trace.append(ProcessingTrace(event="workflow.output.ready", metadata={"id": output_id, "written": output.written})) provenance.append( ProcessingProvenance( operation="workflow.output", source_path=self.plan.source_path, dependencies=[output.path] if output.path else [], metadata={"output_id": output_id, "written": output.written}, ) ) except Exception as exc: diagnostics.append(_diagnostic("workflow.output_failed", str(exc), details={"output": output_id})) trace.append(ProcessingTrace(event="workflow.completed", metadata={"id": self.plan.id, "valid": not has_error(diagnostics)})) return WorkflowRunResult( workflow_id=self.plan.id, plan_path=self.plan.source_path, dry_run=dry_run, sources=sources, steps=steps, outputs=outputs, diagnostics=diagnostics, provenance=provenance, trace=trace, ) def _collect_input(self, input_id: str, spec: dict[str, Any]) -> Any: if "value" in spec: return {"id": input_id, "kind": "value", "value": spec["value"]} paths = _input_paths(spec, self.base_dir) selector = spec.get("selector") extract_specs = _extract_specs(spec.get("extract")) include_metrics = bool(spec.get("metrics", True)) include_frontmatter = bool(spec.get("frontmatter", True)) where = dict(spec.get("where") or {}) items: list[dict[str, Any]] = [] aggregate_extracts: dict[str, list[Any]] = {name: [] for name in extract_specs} aggregate_matches: list[dict[str, Any]] = [] for path in paths: document = parse_markdown_file(path) if where and not _matches_where(document.to_dict(), where): continue item: dict[str, Any] = { "path": _relative(path, self.base_dir), "markdown": path.read_text(encoding="utf-8"), "document": document.to_dict(), } if include_frontmatter: item["frontmatter"] = document.frontmatter if include_metrics: item["metrics"] = collect_metrics(document).to_dict() if selector: matches = [match.to_dict() for match in query_document_with_engine(document, str(selector), engine=str(spec.get("engine", "selector")))] item["matches"] = matches aggregate_matches.extend(matches) if extract_specs: item_extracts: dict[str, list[str]] = {} for name, extract_spec in extract_specs.items(): selected = extract_document_with_engine( document, str(extract_spec["selector"]), engine=str(extract_spec.get("engine", spec.get("engine", "selector"))), ) item_extracts[name] = selected aggregate_extracts[name].extend(selected) item["extracts"] = item_extracts items.append(item) return { "id": input_id, "kind": "markdown_collection", "count": len(items), "items": items, "paths": [item["path"] for item in items], "extracts": aggregate_extracts, "matches": aggregate_matches, } def _run_step(self, step: dict[str, Any]) -> dict[str, Any]: kind = str(step.get("kind", "")).strip() if not kind: raise WorkflowError("Workflow step requires `kind`") if kind == "shape": return {"kind": kind, "data": step.get("data", step.get("value", {}))} if kind == "extract": return self._step_extract(step) if kind == "query": return self._step_query(step) if kind == "template": return self._step_template(step) if kind == "compose": return self._step_compose(step) if kind == "transform": return self._step_transform(step) if kind == "include": return self._step_include(step) if kind == "contract_stub": return self._step_contract_stub(step) if kind == "contract_check": return self._step_contract_check(step) if kind == "assisted": return self._step_assisted(step) raise WorkflowError(f"Unsupported workflow step kind `{kind}`") def _step_extract(self, step: dict[str, Any]) -> dict[str, Any]: selector = str(step.get("selector", "")) if not selector: raise WorkflowError("extract step requires `selector`") values = _query_like_step(step, selector, query=False) return {"kind": "extract", "items": values, "count": len(values), "text": "\n\n".join(values)} def _step_query(self, step: dict[str, Any]) -> dict[str, Any]: selector = str(step.get("selector", "")) if not selector: raise WorkflowError("query step requires `selector`") matches = _query_like_step(step, selector, query=True) return {"kind": "query", "matches": matches, "count": len(matches)} def _step_template(self, step: dict[str, Any]) -> dict[str, Any]: template_text = _template_text(step, self.base_dir) try: rendered = render_template(template_text, dict(step.get("data") or {}), strict=bool(step.get("strict", True))) except MissingTemplateVariable as exc: raise WorkflowError(str(exc)) from exc except TemplateError as exc: raise WorkflowError(str(exc)) from exc return rendered.to_dict() | {"kind": "template"} def _step_compose(self, step: dict[str, Any]) -> dict[str, Any]: if step.get("files"): result = compose_files( [_safe_input_path(self.base_dir, value) for value in step["files"]], title=step.get("title"), heading_delta=int(step.get("heading_delta", 0)), include_frontmatter=bool(step.get("include_frontmatter", False)), separator=str(step.get("separator", "\n\n---\n\n")), ) return result.to_dict() | {"kind": "compose"} items = step.get("items", step.get("input", [])) if not isinstance(items, list): items = [items] separator = str(step.get("separator", "\n\n---\n\n")) parts = [str(item).strip() for item in items if item is not None and str(item).strip()] title = step.get("title") if title: parts.insert(0, f"# {str(title).strip()}") return {"kind": "compose", "markdown": separator.join(parts).strip() + "\n", "sources": []} def _step_transform(self, step: dict[str, Any]) -> dict[str, Any]: markdown = _markdown_input(step, self.base_dir) result = transform_markdown( markdown, strip_frontmatter=bool(step.get("strip_frontmatter", False)), set_frontmatter=dict(step.get("set_frontmatter") or {}), heading_delta=int(step.get("heading_delta", 0)), extract_selector=step.get("extract_selector"), source_path=step.get("source_path"), ) return result.to_dict() | {"kind": "transform"} def _step_include(self, step: dict[str, Any]) -> dict[str, Any]: markdown = _markdown_input(step, self.base_dir) result = resolve_includes( markdown, base_dir=_safe_dir(self.base_dir, step.get("base_dir", ".")), current_path=step.get("current_path"), max_depth=int(step.get("max_depth", 10)), ) return result.to_dict() | {"kind": "include"} def _step_contract_stub(self, step: dict[str, Any]) -> dict[str, Any]: contract = load_contract_file(_safe_input_path(self.base_dir, step.get("contract"))) generated = generate_stub_from_contract( contract, data=dict(step.get("data") or {}), include_optional=bool(step.get("include_optional", False)), ) return generated.to_dict() | {"kind": "contract_stub"} def _step_contract_check(self, step: dict[str, Any]) -> dict[str, Any]: document_path = _safe_input_path(self.base_dir, step.get("document")) contract_path = _safe_input_path(self.base_dir, step.get("contract")) result = check_markdown_file(document_path, contract_path) return result.to_dict() | {"kind": "contract_check"} def _step_assisted(self, step: dict[str, Any]) -> dict[str, Any]: optional = bool(step.get("optional", True)) if self.assisted_hook is None: diagnostic = _diagnostic( "workflow.assisted_unavailable", "Assisted workflow step has no generation hook adapter.", severity="warning" if optional else "error", details={"step": step.get("id"), "optional": optional}, ) if optional: return {"kind": "assisted", "skipped": True, "diagnostics": [diagnostic.to_dict()]} raise WorkflowError(diagnostic.message) prompt = str(step.get("prompt_text") or "") if step.get("prompt"): prompt = _safe_input_path(self.base_dir, step["prompt"]).read_text(encoding="utf-8") request = GenerationHookRequest( prompt=prompt, data=dict(step.get("data") or {}), template=step.get("template"), contract_id=step.get("contract_id"), metadata={"workflow_id": self.plan.id, "step_id": step.get("id")}, ) generated = self.assisted_hook.generate(request) return generated.to_dict() | {"kind": "assisted"} def _render_output( self, output_id: str, spec: dict[str, Any], context: dict[str, Any], *, dry_run: bool, ) -> WorkflowOutputRecord: resolved = resolve_workflow_bindings(spec, context) if "template" in resolved: template_text = _safe_input_path(self.base_dir, resolved["template"]).read_text(encoding="utf-8") content = render_template(template_text, dict(resolved.get("data") or {}), strict=bool(resolved.get("strict", True))).markdown else: content = _format_output_value(resolved.get("content", resolved.get("value", ""))) output_path: Path | None = None written = False if resolved.get("path"): output_path = _safe_output_path(self.output_dir, resolved["path"]) if not dry_run: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(content, encoding="utf-8") written = True return WorkflowOutputRecord( id=output_id, path=str(output_path) if output_path else None, content=content, written=written, artifact=resolved.get("artifact"), ) def load_workflow_file(path: str | Path) -> WorkflowPlan: """Load a YAML or Markdown-fenced workflow definition.""" file_path = Path(path) text = file_path.read_text(encoding="utf-8") data = _load_workflow_mapping(text, file_path) return _workflow_from_mapping(data, source_path=str(file_path)) def validate_workflow_plan(plan: WorkflowPlan) -> list[Diagnostic]: diagnostics: list[Diagnostic] = [] if not plan.inputs and not plan.steps: diagnostics.append(_diagnostic("workflow.empty", "Workflow requires at least inputs or steps.")) seen_steps: set[str] = set() for step in plan.steps: step_id = str(step.get("id", "")).strip() if not step_id: diagnostics.append(_diagnostic("workflow.step_missing_id", "Workflow step requires `id`.")) continue if step_id in seen_steps: diagnostics.append(_diagnostic("workflow.step_duplicate_id", f"Duplicate workflow step id `{step_id}`.")) seen_steps.add(step_id) if not step.get("kind"): diagnostics.append(_diagnostic("workflow.step_missing_kind", f"Workflow step `{step_id}` requires `kind`.")) return diagnostics def resolve_workflow_bindings(value: Any, context: dict[str, Any]) -> Any: """Resolve `${...}` expressions recursively.""" if isinstance(value, dict): return {key: resolve_workflow_bindings(item, context) for key, item in value.items()} if isinstance(value, list): return [resolve_workflow_bindings(item, context) for item in value] if not isinstance(value, str): return value matches = list(_EXPRESSION_RE.finditer(value)) if not matches: return value if len(matches) == 1 and matches[0].span() == (0, len(value)): return _resolve_path(context, matches[0].group("path").strip()) def replace(match: re.Match[str]) -> str: return _format_output_value(_resolve_path(context, match.group("path").strip())) return _EXPRESSION_RE.sub(replace, value) def _load_workflow_mapping(text: str, path: Path) -> dict[str, Any]: if path.suffix.lower() in {".yaml", ".yml"}: data = yaml.safe_load(text) or {} else: data = _extract_markdown_workflow_block(text) if not isinstance(data, dict): raise WorkflowError("Workflow definition must be a mapping") return data def _extract_markdown_workflow_block(text: str) -> dict[str, Any]: fence_re = re.compile(r"```(?P[^\n`]*)\n(?P.*?)\n```", re.DOTALL) for match in fence_re.finditer(text): info = set(match.group("info").strip().lower().split()) if info & WORKFLOW_FENCE_TAGS: data = yaml.safe_load(match.group("body")) or {} if not isinstance(data, dict): raise WorkflowError("Workflow fenced block must contain a YAML mapping") return data raise WorkflowError("No fenced workflow YAML block found") def _workflow_from_mapping(data: dict[str, Any], *, source_path: str | None) -> WorkflowPlan: metadata = _mapping(data.get("metadata")) inputs = _mapping(data.get("inputs")) outputs = _normalize_outputs(data.get("outputs")) steps = _normalize_steps(data.get("steps")) known = {key: data.get(key) for key in KNOWN_TOP_LEVEL} extensions = {key: value for key, value in data.items() if key not in KNOWN_TOP_LEVEL} return WorkflowPlan( metadata=metadata, intent=_intent(data.get("intent")), inputs=inputs, steps=steps, outputs=outputs, dependencies=list(data.get("dependencies") or []), conditions=_mapping(data.get("conditions")), artifacts=_mapping(data.get("artifacts")), permissions=_mapping(data.get("permissions")), resources=_mapping(data.get("resources")), timeouts=_mapping(data.get("timeouts")), retry_policies=_mapping(data.get("retry_policies")), escalation_rules=_mapping(data.get("escalation_rules")), observability=_mapping(data.get("observability")), responsibilities=_mapping(data.get("responsibilities")), extensions=extensions, source_path=source_path, ) def _normalize_steps(raw_steps: Any) -> list[dict[str, Any]]: if raw_steps is None: return [] if isinstance(raw_steps, dict): return [dict(spec or {}) | {"id": step_id} for step_id, spec in raw_steps.items()] if isinstance(raw_steps, list): steps: list[dict[str, Any]] = [] for item in raw_steps: if not isinstance(item, dict): raise WorkflowError("Workflow steps list must contain mappings") steps.append(dict(item)) return steps raise WorkflowError("Workflow `steps` must be a mapping or list") def _normalize_outputs(raw_outputs: Any) -> dict[str, dict[str, Any]]: if raw_outputs is None: return {} if isinstance(raw_outputs, dict): return {str(output_id): dict(spec or {}) for output_id, spec in raw_outputs.items()} if isinstance(raw_outputs, list): outputs: dict[str, dict[str, Any]] = {} for item in raw_outputs: if not isinstance(item, dict) or not item.get("id"): raise WorkflowError("Workflow output list entries require `id`") outputs[str(item["id"])] = dict(item) return outputs raise WorkflowError("Workflow `outputs` must be a mapping or list") def _input_paths(spec: dict[str, Any], base_dir: Path) -> list[Path]: paths: list[Path] = [] if spec.get("file") or spec.get("path"): paths.append(_safe_input_path(base_dir, spec.get("file") or spec.get("path"))) for raw_path in spec.get("files") or []: paths.append(_safe_input_path(base_dir, raw_path)) if spec.get("glob"): pattern = str((base_dir / str(spec["glob"])).resolve()) paths.extend(Path(path) for path in glob.glob(pattern, recursive=bool(spec.get("recursive", False)))) if spec.get("directory"): paths.extend(scan_markdown_files([_safe_dir(base_dir, spec["directory"])], recursive=bool(spec.get("recursive", True)))) return sorted({path.resolve() for path in paths if path.exists() and path.is_file()}) def _extract_specs(raw_extract: Any) -> dict[str, dict[str, Any]]: if raw_extract is None: return {} if not isinstance(raw_extract, dict): raise WorkflowError("Input `extract` must be a mapping") specs: dict[str, dict[str, Any]] = {} for name, spec in raw_extract.items(): if isinstance(spec, str): specs[str(name)] = {"selector": spec} elif isinstance(spec, dict) and spec.get("selector"): specs[str(name)] = dict(spec) else: raise WorkflowError(f"Input extract `{name}` requires a selector") return specs def _ordered_steps(steps: list[dict[str, Any]], diagnostics: list[Diagnostic]) -> list[dict[str, Any]]: by_id = {str(step.get("id")): step for step in steps if step.get("id")} ordered: list[dict[str, Any]] = [] temporary: set[str] = set() permanent: set[str] = set() def visit(step_id: str) -> None: if step_id in permanent: return if step_id in temporary: diagnostics.append(_diagnostic("workflow.dependency_cycle", f"Workflow dependency cycle includes `{step_id}`.")) return step = by_id.get(step_id) if step is None: diagnostics.append(_diagnostic("workflow.unknown_step_dependency", f"Unknown workflow step dependency `{step_id}`.")) return temporary.add(step_id) for dep in _as_list(step.get("depends_on")): visit(str(dep)) temporary.remove(step_id) permanent.add(step_id) ordered.append(step) for step in steps: if step.get("id"): visit(str(step["id"])) return ordered def _query_like_step(step: dict[str, Any], selector: str, *, query: bool) -> list[Any]: source = step.get("source", step.get("input")) engine = str(step.get("engine", "selector")) documents = _documents_from_source(source) results: list[Any] = [] for document in documents: if query: results.extend(match.to_dict() for match in query_document_with_engine(document, selector, engine=engine)) else: results.extend(extract_document_with_engine(document, selector, engine=engine)) return results def _documents_from_source(source: Any) -> list[Document]: if isinstance(source, dict) and "items" in source: return [Document.from_dict(item["document"]) for item in source["items"] if isinstance(item, dict) and "document" in item] if isinstance(source, dict) and "document" in source: return [Document.from_dict(source["document"])] if isinstance(source, dict) and {"headings", "blocks", "sections"} <= set(source): return [Document.from_dict(source)] raise WorkflowError("query/extract step requires a source collection or document") def _template_text(step: dict[str, Any], base_dir: Path) -> str: if step.get("template_text") is not None: return str(step["template_text"]) if step.get("template"): return _safe_input_path(base_dir, step["template"]).read_text(encoding="utf-8") raise WorkflowError("template step requires `template` or `template_text`") def _markdown_input(step: dict[str, Any], base_dir: Path) -> str: if step.get("markdown") is not None: return str(step["markdown"]) if step.get("input") is not None: return _format_output_value(step["input"]) if step.get("file"): return _safe_input_path(base_dir, step["file"]).read_text(encoding="utf-8") raise WorkflowError("step requires `markdown`, `input`, or `file`") def _base_context(plan: WorkflowPlan, sources: dict[str, Any], steps: dict[str, Any]) -> dict[str, Any]: return { "metadata": plan.metadata, "intent": plan.intent, "sources": sources, "steps": steps, "artifacts": plan.artifacts, "workflow": plan.to_dict(), } def _resolve_path(context: dict[str, Any], path: str) -> Any: current: Any = context for part in path.split("."): if isinstance(current, dict) and part in current: current = current[part] elif isinstance(current, list): if part.isdigit(): current = current[int(part)] else: current = [ item[part] for item in current if isinstance(item, dict) and part in item ] else: raise WorkflowError(f"Cannot resolve workflow binding `${{{path}}}`") return current def _matches_where(document: dict[str, Any], where: dict[str, Any]) -> bool: context = {"frontmatter": document.get("frontmatter", {}), "document": document} for path, expected in where.items(): try: value = _resolve_path(context, str(path)) except WorkflowError: return False if value != expected: return False return True def _safe_input_path(base_dir: Path, raw_path: Any) -> Path: if not raw_path: raise WorkflowError("Expected path") path = (base_dir / str(raw_path)).resolve() if not _is_within(path, base_dir): raise WorkflowError(f"Path escapes workflow directory: {raw_path}") if not path.exists(): raise WorkflowError(f"Path does not exist: {raw_path}") return path def _safe_dir(base_dir: Path, raw_path: Any) -> Path: path = _safe_input_path(base_dir, raw_path) if not path.is_dir(): raise WorkflowError(f"Expected directory: {raw_path}") return path def _safe_output_path(output_dir: Path, raw_path: Any) -> Path: path = (output_dir / str(raw_path)).resolve() if not _is_within(path, output_dir): raise WorkflowError(f"Output path escapes output directory: {raw_path}") return path def _is_within(path: Path, root: Path) -> bool: try: path.relative_to(root) return True except ValueError: return False def _relative(path: Path, root: Path) -> str: try: return path.resolve().relative_to(root).as_posix() except ValueError: return str(path) def _mapping(value: Any) -> dict[str, Any]: if value is None: return {} if isinstance(value, dict): return dict(value) raise WorkflowError("Expected mapping") def _intent(value: Any) -> dict[str, Any]: if value is None: return {} if isinstance(value, str): return {"summary": value} if isinstance(value, dict): return dict(value) raise WorkflowError("Workflow `intent` must be a string or mapping") def _as_list(value: Any) -> list[Any]: if value is None: return [] if isinstance(value, list): return value return [value] def _format_output_value(value: Any) -> str: if value is None: return "" if isinstance(value, str): return value if isinstance(value, list): return "\n".join(f"- {_format_output_value(item)}" for item in value) if isinstance(value, dict): return yaml.safe_dump(value, sort_keys=False).strip() return str(value) def _diagnostic( code: str, message: str, *, severity: str = "error", details: dict[str, Any] | None = None, ) -> Diagnostic: return Diagnostic( severity=severity, code=code, message=message, source=SourceLocation(), details=details or {}, )