diff --git a/examples/infospace-with-history/infospace.yaml b/examples/infospace-with-history/infospace.yaml index cfe0ba74..226376fe 100644 --- a/examples/infospace-with-history/infospace.yaml +++ b/examples/infospace-with-history/infospace.yaml @@ -42,10 +42,25 @@ pipeline: stages: - name: extract-entities template: templates/extract-entities.md + output_dir: output/entities + output_macro: entities + split_entities: true + macros: + extraction_rules: artifacts/guidelines/extraction-rules.md + vsm_framework: artifacts/vsm-reference/vsm-framework.md - name: map-to-vsm template: templates/map-to-vsm.md + output_dir: output/mappings + output_macro: mappings + macros: + mapping_rules: artifacts/guidelines/mapping-rules.md + vsm_framework: artifacts/vsm-reference/vsm-framework.md - name: synthesize-analysis template: templates/synthesize-analysis.md + output_dir: output/analyses + output_macro: analysis + macros: + vsm_framework: artifacts/vsm-reference/vsm-framework.md post_batch: - name: assess-metrics template: templates/assess-metrics.md diff --git a/markitect/infospace/cli.py b/markitect/infospace/cli.py index c4380069..6bc4272c 100644 --- a/markitect/infospace/cli.py +++ b/markitect/infospace/cli.py @@ -479,6 +479,117 @@ def disciplines(config_path: Optional[str]): click.echo(f" Error: {status.error}") +# ── process ───────────────────────────────────────────────────── + + +@infospace_commands.command() +@click.argument("glob_pattern", default=None, required=False) +@click.option("--all", "process_all", is_flag=True, help="Process all source files.") +@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") +@click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).") +@click.option("--model", default=None, help="LLM model name.") +@click.option( + "--check-after-each", + is_flag=True, + help="Run collection checks (C1–C5) after each source file.", +) +@click.option("--no-commit", is_flag=True, help="Skip git commits.") +def process( + glob_pattern: Optional[str], + process_all: bool, + config_path: Optional[str], + provider: Optional[str], + model: Optional[str], + check_after_each: bool, + no_commit: bool, +): + """Process source files through the pipeline defined in infospace.yaml. + + GLOB_PATTERN is matched against the sources directory declared in + infospace.yaml (default ``*.md``). Use ``--all`` to process every + source file. + + \b + Examples: + # Process chapters 1-3 from book 1 + markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter + + # Process all source files and check metrics after each + markitect infospace process --all --provider openrouter --check-after-each + + # Dry run — load existing outputs only, no LLM calls + markitect infospace process --all + """ + cfg, cfg_path = _load_config_or_exit(config_path) + root = cfg_path.parent + + if not cfg.pipeline or not cfg.pipeline.stages: + click.echo( + "Error: No pipeline stages defined in infospace.yaml.\n" + "Add a 'pipeline.stages' section with at least one stage.", + err=True, + ) + raise SystemExit(1) + + # Resolve sources directory + sources_dir = root / cfg.topic.sources if cfg.topic.sources else root + if not sources_dir.is_dir(): + click.echo( + f"Error: Sources directory not found: {sources_dir}\n" + f"Set 'topic.sources' in infospace.yaml.", + err=True, + ) + raise SystemExit(1) + + # Collect source files + if process_all: + source_files = sorted(sources_dir.glob("*.md")) + else: + pattern = glob_pattern or "*.md" + source_files = sorted(sources_dir.glob(pattern)) + + if not source_files: + if process_all: + click.echo(f"No source files found in {sources_dir}") + else: + click.echo( + f"No files matched: {glob_pattern or '*.md'}\n" + f"Sources directory: {sources_dir}" + ) + return + + click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/") + + # Create LLM adapter + adapter = None + if provider: + from markitect.llm import create_adapter + + _PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"} + resolved_model = model or _PROVIDER_DEFAULTS.get(provider) + adapter = create_adapter(provider, model=resolved_model) + click.echo(f"LLM: {provider} ({resolved_model or 'default'})") + else: + click.echo("No LLM provider — will use existing outputs only (manual mode).") + + # Run pipeline + from markitect.infospace.pipeline import SourcePipeline + + pipeline = SourcePipeline(cfg, root, adapter=adapter, no_commit=no_commit) + + total = len(source_files) + completed = 0 + for i, source_file in enumerate(source_files, 1): + click.echo(f"\n[{i}/{total}] {source_file.name}") + success = pipeline.process_source(source_file) + if success: + completed += 1 + if check_after_each: + pipeline.run_collection_check() + + click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.") + + # ── stale-mappings ────────────────────────────────────────────────── diff --git a/markitect/infospace/config.py b/markitect/infospace/config.py index ba184ca4..2d8646d5 100644 --- a/markitect/infospace/config.py +++ b/markitect/infospace/config.py @@ -155,13 +155,41 @@ class ViabilityThreshold: @dataclass class PipelineStage: - """A single stage in the processing pipeline.""" + """A single stage in the processing pipeline. + + Attributes: + template: Path to the template file (relative to infospace root). + name: Human-readable stage name used in progress output. + output_dir: Directory for stage outputs (relative to root). + output_macro: Macro name for this stage's output, also used as + the filename suffix (e.g. ``entities`` → ``-entities.md``). + split_entities: If True, parse ``--- ENTITY: ---`` delimiters + from LLM output and write individual entity files. + macros: Static macros loaded from files (macro name → relative path). + spaces: Legacy space IDs for SQLite-based resolver (unused by + :class:`SourcePipeline`). + """ template: str + name: str = "" + output_dir: str = "" + output_macro: str = "" + split_entities: bool = False + macros: Dict[str, str] = field(default_factory=dict) spaces: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: d: Dict[str, Any] = {"template": self.template} + if self.name: + d["name"] = self.name + if self.output_dir: + d["output_dir"] = self.output_dir + if self.output_macro: + d["output_macro"] = self.output_macro + if self.split_entities: + d["split_entities"] = self.split_entities + if self.macros: + d["macros"] = self.macros if self.spaces: d["spaces"] = self.spaces return d @@ -170,6 +198,11 @@ class PipelineStage: def from_dict(cls, data: Dict[str, Any]) -> PipelineStage: return cls( template=data["template"], + name=data.get("name", ""), + output_dir=data.get("output_dir", ""), + output_macro=data.get("output_macro", ""), + split_entities=data.get("split_entities", False), + macros=data.get("macros", {}), spaces=data.get("spaces", []), ) diff --git a/markitect/infospace/pipeline.py b/markitect/infospace/pipeline.py new file mode 100644 index 00000000..bae0ee07 --- /dev/null +++ b/markitect/infospace/pipeline.py @@ -0,0 +1,494 @@ +""" +Source file processing pipeline for infospace tooling. + +Processes source files through the pipeline stages defined in +``infospace.yaml`` using simple ``@{macro}`` template substitution — +no SQLite or space-resolver dependencies. + +Example usage from CLI:: + + markitect infospace process "book-1-chapter-0[1-3].md" \\ + --provider openrouter --check-after-each + +Or programmatically:: + + from markitect.infospace.pipeline import SourcePipeline + from markitect.llm import create_adapter + + adapter = create_adapter("openrouter") + pipeline = SourcePipeline(config, root, adapter=adapter) + pipeline.process_sources(source_files) +""" + +from __future__ import annotations + +import re +import subprocess +import time +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from markitect.infospace.config import InfospaceConfig, PipelineStage + + +class SourcePipeline: + """Processes source files through infospace pipeline stages. + + Each stage in :attr:`config.pipeline.stages` is run in order for + every source file. Stage outputs are passed as macros to subsequent + stages via the ``output_macro`` field. + + Template substitution uses ``@{macro_name}`` tokens. Static macros + are loaded from files listed under ``stage.macros``; dynamic macros + include ``chapter_text`` (source file content), ``existing_entities`` + (for split-entities stages), and outputs from prior stages. + """ + + _PROVIDER_DEFAULTS: Dict[str, str] = { + "openrouter": "arcee-ai/trinity-large-preview:free", + } + + def __init__( + self, + config: InfospaceConfig, + root: Path, + adapter=None, + no_commit: bool = False, + ) -> None: + self.config = config + self.root = root + self.adapter = adapter + self.no_commit = no_commit + + # ── Public API ──────────────────────────────────────────────────── + + def process_sources( + self, + source_files: List[Path], + check_after_each: bool = False, + ) -> int: + """Process a list of source files through all pipeline stages. + + Args: + source_files: Ordered list of source files to process. + check_after_each: If True, run collection checks after each + source and record a metrics snapshot. + + Returns: + Number of source files fully processed (all stages complete). + """ + completed = 0 + for source_file in source_files: + success = self.process_source(source_file) + if success: + completed += 1 + if check_after_each: + self.run_collection_check() + return completed + + def process_source(self, source_file: Path) -> bool: + """Run the full pipeline for a single source file. + + Args: + source_file: Path to the source markdown file. + + Returns: + True if all stages completed successfully. + """ + source_id = source_file.stem + source_content = source_file.read_text(encoding="utf-8") + stage_outputs: Dict[str, str] = {} + + print(f"\nProcessing: {source_id}") + print("=" * 60) + + if not self.config.pipeline or not self.config.pipeline.stages: + print(" No pipeline stages configured.") + return False + + for stage in self.config.pipeline.stages: + content = self._run_stage(stage, source_id, source_content, stage_outputs) + if content is None: + stage_label = stage.name or stage.template + print(f"\n Pipeline paused at stage '{stage_label}'.") + return False + if stage.output_macro: + stage_outputs[stage.output_macro] = content + + print(f"\n {source_id}: all stages complete.") + if not self.no_commit: + self._git_commit(source_id) + + return True + + def run_collection_check(self) -> None: + """Run collection-level quality checks (C1–C5) and print metrics.""" + from markitect.infospace.entity_parser import parse_entity_directory + from markitect.infospace.checks import run_all_checks + from markitect.infospace.history import record_check_results + + entities_dir = self.root / self.config.entities_dir + if not entities_dir.is_dir(): + print("\n Collection check: no entities directory.") + return + + entities = parse_entity_directory(entities_dir) + if not entities: + print("\n Collection check: no entities found.") + return + + print(f"\n Collection check ({len(entities)} entities):") + report = run_all_checks(entities=entities) + m = report.metrics() + for k, v in sorted(m.items()): + print(f" {k}: {v:.4f}") + + snap = record_check_results( + report, self.config, self.root, entity_count=len(entities) + ) + print(f" Snapshot recorded: {snap.snapshot_id}") + + # ── Stage Execution ─────────────────────────────────────────────── + + def _run_stage( + self, + stage: PipelineStage, + source_id: str, + source_content: str, + stage_outputs: Dict[str, str], + ) -> Optional[str]: + """Run a single pipeline stage. + + Returns the stage's output content, or None on failure. + """ + stage_label = stage.name or stage.template + print(f"\n [{stage_label}]") + + output_file = self._get_output_file(stage, source_id) + + # Skip-if-exists — load from disk + if output_file and output_file.exists(): + print(f" Found existing output: {output_file.name}") + if stage.split_entities: + return self._load_from_view(output_file) + return output_file.read_text(encoding="utf-8") + + # Build macro substitution dict + macros = self._build_macros(stage, source_content, stage_outputs) + + # Load and resolve template + template_path = self.root / stage.template + if not template_path.exists(): + print(f" ERROR: Template not found: {stage.template}") + return None + + template_content = template_path.read_text(encoding="utf-8") + prompt = self._resolve_macros(template_content, macros) + + # Write prompt for inspection + if output_file: + prompt_file = output_file.parent / f"{source_id}-prompt.md" + prompt_file.parent.mkdir(parents=True, exist_ok=True) + prompt_file.write_text(prompt, encoding="utf-8") + print(f" Prompt written to {prompt_file.name}") + + # Without an adapter, we cannot generate output + if self.adapter is None: + print(" No LLM adapter — skipping generation (manual mode).") + return None + + # Call LLM + content = self._call_llm(prompt, stage_label) + if content is None: + return None + + # Persist output + if stage.split_entities: + entity_files = self._split_and_write_entities(stage, content) + self._write_entity_view(source_id, entity_files, output_file) + return content + else: + if output_file: + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(content, encoding="utf-8") + print(f" Output written to {output_file.name}") + return content + + # ── Output File Resolution ──────────────────────────────────────── + + def _get_output_file( + self, stage: PipelineStage, source_id: str + ) -> Optional[Path]: + """Return the expected output file path for a stage, or None.""" + if not stage.output_dir or not stage.output_macro: + return None + return self.root / stage.output_dir / f"{source_id}-{stage.output_macro}.md" + + # ── Macro Resolution ────────────────────────────────────────────── + + def _build_macros( + self, + stage: PipelineStage, + source_content: str, + stage_outputs: Dict[str, str], + ) -> Dict[str, str]: + """Assemble the macro dict for template substitution.""" + macros: Dict[str, str] = {"chapter_text": source_content} + + # Static macros loaded from files + for macro_name, rel_path in stage.macros.items(): + file_path = self.root / rel_path + if file_path.exists(): + macros[macro_name] = file_path.read_text(encoding="utf-8") + else: + print(f" WARNING: macro file not found: {rel_path}") + macros[macro_name] = f"(File not found: {rel_path})" + + # Existing entities list (for split_entities stages) + if stage.split_entities: + entities_dir = self._get_entities_dir(stage) + existing = self._list_existing_entities(entities_dir) + if existing: + macros["existing_entities"] = "\n".join( + f"- {s}" for s in existing + ) + else: + macros["existing_entities"] = ( + "(none — this is the first source file)" + ) + + # Outputs from prior stages (override static macros if same name) + macros.update(stage_outputs) + + return macros + + @staticmethod + def _resolve_macros(template: str, macros: Dict[str, str]) -> str: + """Substitute ``@{macro_name}`` tokens with values from *macros*.""" + + def replacer(match: re.Match) -> str: + key = match.group(1) + return macros.get(key, match.group(0)) + + return re.sub(r"@\{([^}]+)\}", replacer, template) + + # ── Entity Splitting ────────────────────────────────────────────── + + def _get_entities_dir(self, stage: Optional[PipelineStage] = None) -> Path: + """Return the entities output directory.""" + if stage and stage.output_dir: + return self.root / stage.output_dir + # Fall back to the first split_entities stage's output_dir + if self.config.pipeline: + for s in self.config.pipeline.stages: + if s.split_entities and s.output_dir: + return self.root / s.output_dir + return self.root / "output" / "entities" + + def _list_existing_entities(self, entities_dir: Path) -> List[str]: + """Return sorted slugs of all canonical entity files on disk.""" + if not entities_dir.is_dir(): + return [] + return sorted( + f.stem + for f in entities_dir.glob("*.md") + if not f.name.endswith("-entities.md") + and not f.name.endswith("-prompt.md") + ) + + @staticmethod + def _normalize_entity_name(name: str) -> str: + """Normalise entity name to a kebab-case filename stem.""" + slug = name.lower().strip() + slug = slug.replace("_", "-").replace(" ", "-") + slug = re.sub(r"[^a-z0-9-]", "", slug) + slug = re.sub(r"-{2,}", "-", slug) + return slug.strip("-") + + def _split_and_write_entities( + self, + stage: PipelineStage, + combined_content: str, + ) -> List[Tuple[str, Path]]: + """Split ``--- ENTITY: ---`` delimited output into files. + + Writes each entity to ``/.md``. Existing files + are skipped (first-occurrence wins). + + Returns: + List of ``(entity_name, file_path)`` for all entities in + *combined_content* (new and pre-existing alike). + """ + entities_dir = self._get_entities_dir(stage) + entities_dir.mkdir(parents=True, exist_ok=True) + + parts = re.split( + r"^---\s*ENTITY:\s*(.+?)\s*---\s*$", + combined_content, + flags=re.MULTILINE, + ) + + entity_files: List[Tuple[str, Path]] = [] + new_count = 0 + skipped_count = 0 + + for i in range(1, len(parts), 2): + entity_name = parts[i] + entity_content = parts[i + 1].strip() if i + 1 < len(parts) else "" + + slug = self._normalize_entity_name(entity_name) + if not slug: + continue + + file_path = entities_dir / f"{slug}.md" + if file_path.exists(): + skipped_count += 1 + else: + file_path.write_text(entity_content + "\n", encoding="utf-8") + new_count += 1 + + entity_files.append((entity_name, file_path)) + + msg = f" {new_count} new entities written" + if skipped_count: + msg += f", {skipped_count} pre-existing (skipped)" + print(msg) + return entity_files + + def _write_entity_view( + self, + source_id: str, + entity_files: List[Tuple[str, Path]], + view_file: Optional[Path], + ) -> None: + """Write a per-source view file with ``{{ include }}`` directives.""" + if view_file is None: + return + + lines = [f"# Entities: {source_id}\n"] + for _name, file_path in entity_files: + lines.append(f'{{{{ include "{file_path.name}" }}}}') + lines.append("") + lines.append("---") + lines.append("") + + # Remove trailing separator after last entity + if lines and lines[-1] == "" and len(lines) >= 3 and lines[-2] == "---": + lines = lines[:-2] + + view_file.parent.mkdir(parents=True, exist_ok=True) + view_file.write_text("\n".join(lines) + "\n", encoding="utf-8") + print(f" View written to {view_file.name}") + + def _load_from_view(self, view_file: Path) -> str: + """Reconstruct combined entity content from a view file. + + Parses ``{{ include "filename.md" }}`` directives and assembles + the ``--- ENTITY: ---`` delimited content for downstream + stages. + """ + view_content = view_file.read_text(encoding="utf-8") + includes = re.findall( + r'\{\{\s*include\s+"([^"]+)"\s*\}\}', view_content + ) + + entities_dir = view_file.parent + parts: List[str] = [] + for rel_path in includes: + file_path = entities_dir / rel_path + if file_path.exists(): + slug = file_path.stem + body = file_path.read_text(encoding="utf-8").strip() + parts.append(f"--- ENTITY: {slug} ---\n\n{body}") + + return "\n\n".join(parts) + "\n" if parts else "" + + # ── LLM Execution ───────────────────────────────────────────────── + + def _call_llm( + self, prompt: str, stage_name: str, max_tokens: int = 8192 + ) -> Optional[str]: + """Call the LLM adapter with exponential back-off on rate limits. + + Returns the response content string, or None on failure. + """ + from markitect.prompts.execution.models import RunConfig + from markitect.llm.exceptions import LLMRateLimitError + + print(f" Calling LLM ({stage_name})...") + t0 = time.time() + max_retries = 3 + response = None + + for attempt in range(max_retries + 1): + try: + response = self.adapter.execute_prompt( + prompt, RunConfig(max_tokens=max_tokens) + ) + break + except LLMRateLimitError as exc: + if attempt < max_retries: + wait = 15 * (attempt + 1) + print( + f" Rate limited — retrying in {wait}s " + f"(attempt {attempt + 1}/{max_retries})..." + ) + time.sleep(wait) + else: + print(f" Rate limit exceeded after {max_retries} retries: {exc}") + return None + except Exception as exc: + print(f" LLM error: {exc}") + return None + + if response is None: + return None + + elapsed = time.time() - t0 + usage = response.usage + print( + f" Done in {elapsed:.1f}s — " + f"prompt {usage.get('prompt_tokens', '?')} tok, " + f"completion {usage.get('completion_tokens', '?')} tok" + ) + + content = response.content + if not content or not content.strip(): + print(" LLM returned empty content.") + return None + + return content + + # ── Git Integration ─────────────────────────────────────────────── + + def _git_commit(self, source_id: str) -> None: + """Stage all output changes and commit them for *source_id*.""" + output_dir = self.root / "output" + try: + subprocess.run( + ["git", "add", str(output_dir)], + cwd=str(self.root), + check=True, + capture_output=True, + ) + result = subprocess.run( + [ + "git", "commit", "-m", + f"infospace: process {source_id}\n\n" + f"Extract entities, map to VSM, and synthesize analysis.", + ], + cwd=str(self.root), + capture_output=True, + text=True, + ) + if result.returncode == 0: + print(f" Git commit: process {source_id}") + else: + output = result.stdout + result.stderr + if "nothing to commit" in output or "nothing added" in output: + print(f" Git: nothing to commit for {source_id}") + else: + print(f" Warning: Git commit failed: {output.strip()}") + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "") + print(f" Warning: Git error: {stderr.strip()}")