""" Source file processing pipeline for infospace tooling. Processes source files through the pipeline stages defined in ``infospace.yaml`` using simple ``@{macro}`` template substitution — no SQLite or space-resolver dependencies. Example usage from CLI:: markitect infospace process "book-1-chapter-0[1-3].md" \\ --provider openrouter --check-after-each Or programmatically:: from markitect.infospace.pipeline import SourcePipeline from markitect.llm import create_adapter adapter = create_adapter("openrouter") pipeline = SourcePipeline(config, root, adapter=adapter) pipeline.process_sources(source_files) """ from __future__ import annotations import re import subprocess import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml from markitect.infospace.config import InfospaceConfig, PipelineStage # Default max_tokens when not specified on a stage _DEFAULT_MAX_TOKENS = 4096 class SourcePipeline: """Processes source files through infospace pipeline stages. Each stage in :attr:`config.pipeline.stages` is run in order for every source file. Stage outputs are passed as macros to subsequent stages via the ``output_macro`` field. Template substitution uses ``@{macro_name}`` tokens. Static macros are loaded from files listed under ``stage.macros``; dynamic macros include ``chapter_text`` (source file content), ``existing_entities`` (for split-entities stages), and outputs from prior stages. """ _PROVIDER_DEFAULTS: Dict[str, str] = { "openrouter": "arcee-ai/trinity-large-preview:free", } def __init__( self, config: InfospaceConfig, root: Path, adapter=None, provider: str = "", model: str = "", no_commit: bool = False, eval_after_source: bool = False, classify_after_source: bool = False, ) -> None: self.config = config self.root = root self.adapter = adapter self.provider = provider self.model = model self.no_commit = no_commit self.eval_after_source = eval_after_source self.classify_after_source = classify_after_source # ── Public API ──────────────────────────────────────────────────── def process_sources( self, source_files: List[Path], check_after_each: bool = False, ) -> int: """Process a list of source files through all pipeline stages. Args: source_files: Ordered list of source files to process. check_after_each: If True, run collection checks after each source and record a metrics snapshot. Returns: Number of source files fully processed (all stages complete). """ completed = 0 for source_file in source_files: success = self.process_source(source_file) if success: completed += 1 if check_after_each: self.run_collection_check() return completed def process_source(self, source_file: Path) -> bool: """Run the full pipeline for a single source file. Args: source_file: Path to the source markdown file. Returns: True if all stages completed successfully. """ source_id = source_file.stem source_content = source_file.read_text(encoding="utf-8") stage_outputs: Dict[str, str] = {} stage_logs: List[Dict[str, Any]] = [] # Snapshot entity slugs before any stage runs so we can identify # which entities were newly produced by this source. Used to scope # --eval-after-source / --classify-after-source to only the new # entities. pre_entity_slugs = self._current_entity_slugs() print(f"\nProcessing: {source_id}") print("=" * 60) if not self.config.pipeline or not self.config.pipeline.stages: print(" No pipeline stages configured.") return False for stage in self.config.pipeline.stages: content, stage_meta = self._run_stage( stage, source_id, source_content, stage_outputs ) if stage_meta: stage_logs.append(stage_meta) if content is None: stage_label = stage.name or stage.template print(f"\n Pipeline paused at stage '{stage_label}'.") self._write_processing_log(source_id, stage_logs, success=False) return False if stage.output_macro: stage_outputs[stage.output_macro] = content print(f"\n {source_id}: all stages complete.") self._write_processing_log(source_id, stage_logs, success=True) # Per-source follow-ups: evaluate and/or classify just the new # entities this source produced, so the next commit contains a # fully-processed chapter. new_slugs = self._current_entity_slugs() - pre_entity_slugs if new_slugs and (self.eval_after_source or self.classify_after_source): self._run_per_source_followups(new_slugs) if not self.no_commit: self._git_commit(source_id) return True def run_collection_check(self) -> None: """Run collection-level quality checks (C1–C5) and print metrics.""" from markitect.infospace.entity_parser import parse_entity_directory from markitect.infospace.checks import run_all_checks from markitect.infospace.history import record_check_results entities_dir = self.root / self.config.entities_dir if not entities_dir.is_dir(): print("\n Collection check: no entities directory.") return entities = parse_entity_directory(entities_dir) if not entities: print("\n Collection check: no entities found.") return print(f"\n Collection check ({len(entities)} entities):") report = run_all_checks(entities=entities) m = report.metrics() for k, v in sorted(m.items()): print(f" {k}: {v:.4f}") snap = record_check_results( report, self.config, self.root, entity_count=len(entities) ) print(f" Snapshot recorded: {snap.snapshot_id}") # ── Stage Execution ─────────────────────────────────────────────── def _run_stage( self, stage: PipelineStage, source_id: str, source_content: str, stage_outputs: Dict[str, str], ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: """Run a single pipeline stage. Returns: ``(content, metadata)`` where *content* is the stage output (or ``None`` on failure) and *metadata* is a log dict. """ stage_label = stage.name or stage.template print(f"\n [{stage_label}]") output_file = self._get_output_file(stage, source_id) # Skip-if-exists — load from disk if output_file and output_file.exists(): print(f" Found existing output: {output_file.name}") if stage.split_entities: return self._load_from_view(output_file), None return output_file.read_text(encoding="utf-8"), None # Build macro substitution dict macros = self._build_macros(stage, source_content, stage_outputs) # Load and resolve template template_path = self.root / stage.template if not template_path.exists(): print(f" ERROR: Template not found: {stage.template}") return None, {"stage": stage_label, "error": f"template not found: {stage.template}"} template_content = template_path.read_text(encoding="utf-8") prompt = self._resolve_macros(template_content, macros) # Write prompt for inspection if output_file: prompt_file = output_file.parent / f"{source_id}-prompt.md" prompt_file.parent.mkdir(parents=True, exist_ok=True) prompt_file.write_text(prompt, encoding="utf-8") print(f" Prompt written to {prompt_file.name}") # Without an adapter, we cannot generate output if self.adapter is None: print(" No LLM adapter — skipping generation (manual mode).") return None, {"stage": stage_label, "error": "no adapter"} # Resolve max_tokens: stage config > pipeline default max_tokens = stage.max_tokens if stage.max_tokens is not None else _DEFAULT_MAX_TOKENS # Call LLM — with one retry for split_entities stages that return 0 entities max_attempts = 2 if stage.split_entities else 1 entity_files: List[Tuple[str, Path]] = [] content: Optional[str] = None llm_meta: Dict[str, Any] = {} total_retries = 0 for attempt in range(max_attempts): content, llm_meta = self._call_llm(prompt, stage_label, max_tokens) if content is None: meta = {"stage": stage_label, "retries": total_retries, **llm_meta} return None, meta # Save raw response for debugging (overwritten on retry) if output_file: raw_file = output_file.parent / f"{source_id}-{stage.name or 'stage'}-raw.md" raw_file.parent.mkdir(parents=True, exist_ok=True) raw_file.write_text(content, encoding="utf-8") if stage.split_entities: entity_files = self._split_and_write_entities(stage, content, source_id) if entity_files: break # Got entities — proceed if attempt < max_attempts - 1: total_retries += 1 print(f" No entity delimiters found — retrying ({attempt + 2}/{max_attempts})...") else: print( f" WARNING: No '--- ENTITY: ---' markers found after {max_attempts} attempt(s).\n" f" Check {raw_file.name} to inspect the raw LLM response." ) meta = {"stage": stage_label, "retries": total_retries, "error": "no entity delimiters", **llm_meta} return None, meta # Don't write empty view; allow re-run else: break # Non-split stages don't need retry stage_meta: Dict[str, Any] = {"stage": stage_label, "retries": total_retries, **llm_meta} # Persist output if stage.split_entities: self._write_entity_view(source_id, entity_files, output_file) return content, stage_meta else: if output_file: output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(content, encoding="utf-8") print(f" Output written to {output_file.name}") return content, stage_meta # ── Output File Resolution ──────────────────────────────────────── def _get_output_file( self, stage: PipelineStage, source_id: str ) -> Optional[Path]: """Return the expected output file path for a stage, or None.""" if not stage.output_dir or not stage.output_macro: return None return self.root / stage.output_dir / f"{source_id}-{stage.output_macro}.md" # ── Macro Resolution ────────────────────────────────────────────── def _build_macros( self, stage: PipelineStage, source_content: str, stage_outputs: Dict[str, str], ) -> Dict[str, str]: """Assemble the macro dict for template substitution.""" macros: Dict[str, str] = {"chapter_text": source_content} # Static macros loaded from files for macro_name, rel_path in stage.macros.items(): file_path = self.root / rel_path if file_path.exists(): macros[macro_name] = file_path.read_text(encoding="utf-8") else: print(f" WARNING: macro file not found: {rel_path}") macros[macro_name] = f"(File not found: {rel_path})" # Existing entities list (for split_entities stages) if stage.split_entities: entities_dir = self._get_entities_dir(stage) existing = self._list_existing_entities(entities_dir) if existing: macros["existing_entities"] = "\n".join( f"- {s}" for s in existing ) else: macros["existing_entities"] = ( "(none — this is the first source file)" ) # Outputs from prior stages (override static macros if same name) macros.update(stage_outputs) return macros @staticmethod def _resolve_macros(template: str, macros: Dict[str, str]) -> str: """Substitute ``@{macro_name}`` tokens with values from *macros*.""" def replacer(match: re.Match) -> str: key = match.group(1) return macros.get(key, match.group(0)) return re.sub(r"@\{([^}]+)\}", replacer, template) # ── Entity Splitting ────────────────────────────────────────────── def _get_entities_dir(self, stage: Optional[PipelineStage] = None) -> Path: """Return the entities output directory.""" if stage and stage.output_dir: return self.root / stage.output_dir # Fall back to the first split_entities stage's output_dir if self.config.pipeline: for s in self.config.pipeline.stages: if s.split_entities and s.output_dir: return self.root / s.output_dir return self.root / "output" / "entities" def _list_existing_entities(self, entities_dir: Path) -> List[str]: """Return sorted slugs of all canonical entity files on disk.""" if not entities_dir.is_dir(): return [] return sorted( f.stem for f in entities_dir.glob("*.md") if not f.name.endswith("-entities.md") and not f.name.endswith("-prompt.md") and not f.name.endswith("-raw.md") ) @staticmethod def _normalize_entity_name(name: str) -> str: """Normalise entity name to a kebab-case filename stem.""" slug = name.lower().strip() slug = slug.replace("_", "-").replace(" ", "-") slug = re.sub(r"[^a-z0-9-]", "", slug) slug = re.sub(r"-{2,}", "-", slug) return slug.strip("-") def _split_and_write_entities( self, stage: PipelineStage, combined_content: str, source_id: str = "", ) -> List[Tuple[str, Path]]: """Split ``--- ENTITY: ---`` delimited output into files. Writes each entity to ``/.md``. Existing files are skipped (first-occurrence wins). Returns: List of ``(entity_name, file_path)`` for all entities in *combined_content* (new and pre-existing alike). """ entities_dir = self._get_entities_dir(stage) entities_dir.mkdir(parents=True, exist_ok=True) parts = re.split( r"^---\s*ENTITY:\s*(.+?)\s*---\s*$", combined_content, flags=re.MULTILINE, ) entity_files: List[Tuple[str, Path]] = [] new_count = 0 skipped_count = 0 for i in range(1, len(parts), 2): entity_name = parts[i] entity_content = parts[i + 1].strip() if i + 1 < len(parts) else "" slug = self._normalize_entity_name(entity_name) if not slug: continue file_path = entities_dir / f"{slug}.md" if file_path.exists(): skipped_count += 1 else: # Prepend provenance comment so the LLM origin is traceable provenance = self._provenance_comment(source_id) file_path.write_text( provenance + entity_content + "\n", encoding="utf-8" ) new_count += 1 entity_files.append((entity_name, file_path)) msg = f" {new_count} new entities written" if skipped_count: msg += f", {skipped_count} pre-existing (skipped)" print(msg) return entity_files def _write_entity_view( self, source_id: str, entity_files: List[Tuple[str, Path]], view_file: Optional[Path], ) -> None: """Write a per-source view file with ``{{ include }}`` directives.""" if view_file is None: return lines = [f"# Entities: {source_id}\n"] for _name, file_path in entity_files: lines.append(f'{{{{ include "{file_path.name}" }}}}') lines.append("") lines.append("---") lines.append("") # Remove trailing separator after last entity if lines and lines[-1] == "" and len(lines) >= 3 and lines[-2] == "---": lines = lines[:-2] view_file.parent.mkdir(parents=True, exist_ok=True) view_file.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f" View written to {view_file.name}") def _load_from_view(self, view_file: Path) -> str: """Reconstruct combined entity content from a view file. Parses ``{{ include "filename.md" }}`` directives and assembles the ``--- ENTITY: ---`` delimited content for downstream stages. """ view_content = view_file.read_text(encoding="utf-8") includes = re.findall( r'\{\{\s*include\s+"([^"]+)"\s*\}\}', view_content ) entities_dir = view_file.parent parts: List[str] = [] for rel_path in includes: file_path = entities_dir / rel_path if file_path.exists(): slug = file_path.stem body = file_path.read_text(encoding="utf-8").strip() parts.append(f"--- ENTITY: {slug} ---\n\n{body}") return "\n\n".join(parts) + "\n" if parts else "" # ── Provenance & Processing Log ─────────────────────────────────── def _provenance_comment(self, source_id: str) -> str: """Return an HTML comment tagging the LLM that generated this content.""" date = datetime.now(timezone.utc).strftime("%Y-%m-%d") parts = [f"date={date}", f"source={source_id}"] if self.provider: parts.insert(0, f"provider={self.provider}") if self.model: parts.insert(1, f"model={self.model}") return f"\n\n" def _write_processing_log( self, source_id: str, stage_logs: List[Dict[str, Any]], success: bool, ) -> None: """Append a run record to ``output/processing-log.yaml``.""" log_file = self.root / "output" / "processing-log.yaml" log_file.parent.mkdir(parents=True, exist_ok=True) # Load existing log existing: List[Dict[str, Any]] = [] if log_file.is_file(): try: raw = yaml.safe_load(log_file.read_text(encoding="utf-8")) if isinstance(raw, list): existing = raw except Exception: pass # Build new entry total_prompt = sum(s.get("prompt_tokens", 0) for s in stage_logs) total_completion = sum(s.get("completion_tokens", 0) for s in stage_logs) total_cost = sum(s.get("cost", 0.0) for s in stage_logs) total_duration = sum(s.get("duration_seconds", 0.0) for s in stage_logs) total_retries = sum(s.get("retries", 0) for s in stage_logs) errors = [s["error"] for s in stage_logs if s.get("error")] entry: Dict[str, Any] = { "source_id": source_id, "processed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "provider": self.provider, "model": self.model, "success": success, "total_prompt_tokens": total_prompt, "total_completion_tokens": total_completion, "total_cost": round(total_cost, 6), "total_duration_seconds": round(total_duration, 1), "total_retries": total_retries, "stages": stage_logs, } if errors: entry["errors"] = errors # Remove previous entry for the same source_id (re-run) existing = [e for e in existing if e.get("source_id") != source_id] existing.append(entry) log_file.write_text( yaml.safe_dump(existing, default_flow_style=False, sort_keys=False), encoding="utf-8", ) # ── LLM Execution ───────────────────────────────────────────────── def _call_llm( self, prompt: str, stage_name: str, max_tokens: int = _DEFAULT_MAX_TOKENS ) -> Tuple[Optional[str], Dict[str, Any]]: """Call the LLM adapter with exponential back-off on rate limits. Returns: ``(content, metadata)`` where *content* is the response string (or ``None`` on failure) and *metadata* has provider, model, token counts, cost, finish_reason, duration, and error info. """ from markitect.prompts.execution.models import RunConfig from markitect.llm.exceptions import LLMRateLimitError meta: Dict[str, Any] = { "provider": self.provider, "model": self.model, "prompt_tokens": 0, "completion_tokens": 0, "cost": 0.0, "finish_reason": None, "duration_seconds": 0.0, "error": None, } model_label = f"{self.provider}/{self.model}" if self.provider else stage_name print(f" Calling LLM ({model_label})...") t0 = time.time() max_retries = 3 response = None for attempt in range(max_retries + 1): try: response = self.adapter.execute_prompt( prompt, RunConfig(max_tokens=max_tokens) ) break except LLMRateLimitError as exc: if attempt < max_retries: wait = 15 * (attempt + 1) print( f" Rate limited — retrying in {wait}s " f"(attempt {attempt + 1}/{max_retries})..." ) time.sleep(wait) else: msg = f"Rate limit exceeded after {max_retries} retries: {exc}" print(f" {msg}") meta["error"] = msg meta["duration_seconds"] = round(time.time() - t0, 1) return None, meta except Exception as exc: msg = str(exc) print(f" LLM error: {msg}") if attempt < max_retries: wait = 5 * (attempt + 1) print(f" Retrying in {wait}s (attempt {attempt + 1}/{max_retries})...") time.sleep(wait) else: meta["error"] = msg meta["duration_seconds"] = round(time.time() - t0, 1) return None, meta if response is None: meta["error"] = "no response" return None, meta elapsed = round(time.time() - t0, 1) usage = response.usage prompt_tok = usage.get("prompt_tokens", 0) completion_tok = usage.get("completion_tokens", 0) cost = float(usage.get("cost", 0.0)) finish_reason = getattr(response, "finish_reason", None) or "unknown" meta.update({ "prompt_tokens": prompt_tok, "completion_tokens": completion_tok, "cost": cost, "finish_reason": finish_reason, "duration_seconds": elapsed, }) cost_str = f", cost=${cost:.4f}" if cost > 0 else "" print( f" Done in {elapsed}s — " f"prompt {prompt_tok} tok, completion {completion_tok} tok{cost_str}" ) if finish_reason == "length": print( f" WARNING: Output truncated at {max_tokens} tokens " f"(finish_reason=length). Consider raising max_tokens for " f"stage '{stage_name}' in infospace.yaml." ) content = response.content if not content or not content.strip(): print(" LLM returned empty content.") meta["error"] = "empty response" return None, meta return content, meta # ── Git Integration ─────────────────────────────────────────────── def _git_commit(self, source_id: str) -> None: """Stage all output changes and commit them for *source_id*. The commit message body summarises what actually changed — counts of entities / evaluations / classifications / analyses added — so ``git log`` reads like the chapter-by-chapter story of the infospace growing, not a wall of identical messages. """ output_dir = self.root / "output" try: subprocess.run( ["git", "add", str(output_dir)], cwd=str(self.root), check=True, capture_output=True, ) body = self._compose_commit_body(source_id) result = subprocess.run( [ "git", "commit", "-m", f"infospace: process {source_id}\n\n{body}", ], cwd=str(self.root), capture_output=True, text=True, ) if result.returncode == 0: print(f" Git commit: process {source_id}") else: output = result.stdout + result.stderr if "nothing to commit" in output or "nothing added" in output: print(f" Git: nothing to commit for {source_id}") else: print(f" Warning: Git commit failed: {output.strip()}") except subprocess.CalledProcessError as e: stderr = e.stderr.decode() if isinstance(e.stderr, bytes) else (e.stderr or "") print(f" Warning: Git error: {stderr.strip()}") # ── Per-source helpers ──────────────────────────────────────────── def _current_entity_slugs(self) -> set: """Return the set of entity file stems currently on disk.""" entities_dir = self.root / self.config.entities_dir if not entities_dir.is_dir(): return set() return {p.stem for p in entities_dir.glob("*.md")} def _run_per_source_followups(self, new_slugs: set) -> None: """Run per-source evaluation and/or classification on *new_slugs*. Called after a source's pipeline stages succeed, before the git commit, so each chapter's commit contains the full set of artefacts derived from it. """ from markitect.infospace.entity_parser import parse_entity_directory entities_dir = self.root / self.config.entities_dir all_entities = parse_entity_directory(entities_dir) new_entities = [e for e in all_entities if e.slug in new_slugs] if not new_entities: return if self.adapter is None: print( " Skipping per-source eval/classify: no LLM adapter " "configured (run with --provider)." ) return from markitect.prompts.execution.models import RunConfig run_config = RunConfig( model_name=self.model or None, temperature=0.3, max_tokens=2000 ) if self.eval_after_source: from markitect.infospace.evaluate import run_entity_evaluation print(f" Evaluating {len(new_entities)} new entity/entities…") try: run_entity_evaluation( config=self.config, entities=new_entities, adapter=self.adapter, run_config=run_config, output_dir=self.root / self.config.evaluations_dir, ) except Exception as exc: print(f" Warning: per-source evaluation failed: {exc}") if self.classify_after_source: from markitect.infospace.classifier import run_entity_classification print(f" Classifying {len(new_entities)} new entity/entities…") try: run_entity_classification( config=self.config, entities=new_entities, adapter=self.adapter, run_config=run_config, output_dir=self.root / self.config.classifications_dir, ) except Exception as exc: print(f" Warning: per-source classification failed: {exc}") def _compose_commit_body(self, source_id: str) -> str: """Summarise staged output changes into a commit-message body. Counts added files per output subdirectory (entities, evaluations, classifications, analyses, mappings…) and produces one line per bucket that actually saw additions. Modified/deleted files are noted separately for auditability. """ default = "Extract entities, map to VSM, and synthesize analysis." try: result = subprocess.run( ["git", "diff", "--cached", "--name-status", "--", "output"], cwd=str(self.root), check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError: return default added_by_bucket: Dict[str, int] = {} modified = 0 deleted = 0 for line in result.stdout.splitlines(): parts = line.split("\t") if len(parts) < 2: continue status = parts[0] path = parts[-1] if status.startswith("A"): bucket = self._bucket_for(path) if bucket: added_by_bucket[bucket] = added_by_bucket.get(bucket, 0) + 1 elif status.startswith("M"): modified += 1 elif status.startswith("D"): deleted += 1 if not added_by_bucket and not modified and not deleted: return default # Emit buckets in a deterministic, reader-friendly order. order = ["entities", "mappings", "analyses", "evaluations", "classifications", "metrics", "logs", "other"] lines: List[str] = [] for bucket in order: n = added_by_bucket.get(bucket, 0) if n: lines.append(f"- {bucket}: +{n}") if modified: lines.append(f"- modified: {modified}") if deleted: lines.append(f"- deleted: {deleted}") return "\n".join(lines) if lines else default def _bucket_for(self, path: str) -> Optional[str]: """Map an ``output/...`` path to a commit-summary bucket name.""" # Use configured directory basenames where possible so non-default # layouts still bucket correctly. buckets = { Path(self.config.entities_dir).name: "entities", Path(self.config.evaluations_dir).name: "evaluations", Path(self.config.classifications_dir).name: "classifications", } parts = Path(path).parts if len(parts) < 2 or parts[0] != "output": return None sub = parts[1] if sub in buckets: return buckets[sub] # Heuristic fallback for common additional output subdirectories. known = {"mappings", "analyses", "metrics", "logs"} if sub in known: return sub return "other"