From df1fdf1842a17c6287d0e86ef8f4d6e5dedaa512 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 19 Feb 2026 14:50:49 +0100 Subject: [PATCH] feat(pipeline): per-stage max_tokens, LLM provenance, processing log - PipelineStage now supports max_tokens to override the 4096 default - SourcePipeline records provider/model on each entity file as HTML comment - output/processing-log.yaml tracks tokens, cost, duration, retries, errors - _call_llm returns (content, metadata) for downstream traceability - _http.py wraps JSON parse errors with body preview for debugging - infospace.yaml stages: extract/map=6000 tokens, synthesize=3000 tokens Co-Authored-By: Claude Sonnet 4.6 --- .../infospace-with-history/infospace.yaml | 3 + markitect/infospace/cli.py | 8 +- markitect/infospace/config.py | 6 + markitect/infospace/pipeline.py | 206 +++++++++++++++--- 4 files changed, 191 insertions(+), 32 deletions(-) diff --git a/examples/infospace-with-history/infospace.yaml b/examples/infospace-with-history/infospace.yaml index 226376fe..3f8643c9 100644 --- a/examples/infospace-with-history/infospace.yaml +++ b/examples/infospace-with-history/infospace.yaml @@ -45,6 +45,7 @@ pipeline: output_dir: output/entities output_macro: entities split_entities: true + max_tokens: 6000 macros: extraction_rules: artifacts/guidelines/extraction-rules.md vsm_framework: artifacts/vsm-reference/vsm-framework.md @@ -52,6 +53,7 @@ pipeline: template: templates/map-to-vsm.md output_dir: output/mappings output_macro: mappings + max_tokens: 6000 macros: mapping_rules: artifacts/guidelines/mapping-rules.md vsm_framework: artifacts/vsm-reference/vsm-framework.md @@ -59,6 +61,7 @@ pipeline: template: templates/synthesize-analysis.md output_dir: output/analyses output_macro: analysis + max_tokens: 3000 macros: vsm_framework: artifacts/vsm-reference/vsm-framework.md post_batch: diff --git a/markitect/infospace/cli.py b/markitect/infospace/cli.py index 6bc4272c..6f72571d 100644 --- a/markitect/infospace/cli.py +++ b/markitect/infospace/cli.py @@ -575,7 +575,13 @@ def process( # Run pipeline from markitect.infospace.pipeline import SourcePipeline - pipeline = SourcePipeline(cfg, root, adapter=adapter, no_commit=no_commit) + pipeline = SourcePipeline( + cfg, root, + adapter=adapter, + provider=provider or "", + model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "", + no_commit=no_commit, + ) total = len(source_files) completed = 0 diff --git a/markitect/infospace/config.py b/markitect/infospace/config.py index 2d8646d5..86245c15 100644 --- a/markitect/infospace/config.py +++ b/markitect/infospace/config.py @@ -168,6 +168,8 @@ class PipelineStage: macros: Static macros loaded from files (macro name → relative path). spaces: Legacy space IDs for SQLite-based resolver (unused by :class:`SourcePipeline`). + max_tokens: Maximum tokens to request from the LLM for this stage. + Overrides the pipeline-level default (4096). """ template: str @@ -177,6 +179,7 @@ class PipelineStage: split_entities: bool = False macros: Dict[str, str] = field(default_factory=dict) spaces: List[str] = field(default_factory=list) + max_tokens: Optional[int] = None def to_dict(self) -> Dict[str, Any]: d: Dict[str, Any] = {"template": self.template} @@ -192,6 +195,8 @@ class PipelineStage: d["macros"] = self.macros if self.spaces: d["spaces"] = self.spaces + if self.max_tokens is not None: + d["max_tokens"] = self.max_tokens return d @classmethod @@ -204,6 +209,7 @@ class PipelineStage: split_entities=data.get("split_entities", False), macros=data.get("macros", {}), spaces=data.get("spaces", []), + max_tokens=data.get("max_tokens"), ) diff --git a/markitect/infospace/pipeline.py b/markitect/infospace/pipeline.py index 7f38f826..5779e032 100644 --- a/markitect/infospace/pipeline.py +++ b/markitect/infospace/pipeline.py @@ -25,11 +25,17 @@ from __future__ import annotations import re import subprocess import time +from datetime import datetime, timezone from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple + +import yaml from markitect.infospace.config import InfospaceConfig, PipelineStage +# Default max_tokens when not specified on a stage +_DEFAULT_MAX_TOKENS = 4096 + class SourcePipeline: """Processes source files through infospace pipeline stages. @@ -53,11 +59,15 @@ class SourcePipeline: config: InfospaceConfig, root: Path, adapter=None, + provider: str = "", + model: str = "", no_commit: bool = False, ) -> None: self.config = config self.root = root self.adapter = adapter + self.provider = provider + self.model = model self.no_commit = no_commit # ── Public API ──────────────────────────────────────────────────── @@ -98,6 +108,7 @@ class SourcePipeline: source_id = source_file.stem source_content = source_file.read_text(encoding="utf-8") stage_outputs: Dict[str, str] = {} + stage_logs: List[Dict[str, Any]] = [] print(f"\nProcessing: {source_id}") print("=" * 60) @@ -107,15 +118,21 @@ class SourcePipeline: return False for stage in self.config.pipeline.stages: - content = self._run_stage(stage, source_id, source_content, stage_outputs) + content, stage_meta = self._run_stage( + stage, source_id, source_content, stage_outputs + ) + if stage_meta: + stage_logs.append(stage_meta) if content is None: stage_label = stage.name or stage.template print(f"\n Pipeline paused at stage '{stage_label}'.") + self._write_processing_log(source_id, stage_logs, success=False) return False if stage.output_macro: stage_outputs[stage.output_macro] = content print(f"\n {source_id}: all stages complete.") + self._write_processing_log(source_id, stage_logs, success=True) if not self.no_commit: self._git_commit(source_id) @@ -156,10 +173,12 @@ class SourcePipeline: source_id: str, source_content: str, stage_outputs: Dict[str, str], - ) -> Optional[str]: + ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: """Run a single pipeline stage. - Returns the stage's output content, or None on failure. + Returns: + ``(content, metadata)`` where *content* is the stage output + (or ``None`` on failure) and *metadata* is a log dict. """ stage_label = stage.name or stage.template print(f"\n [{stage_label}]") @@ -170,8 +189,8 @@ class SourcePipeline: if output_file and output_file.exists(): print(f" Found existing output: {output_file.name}") if stage.split_entities: - return self._load_from_view(output_file) - return output_file.read_text(encoding="utf-8") + return self._load_from_view(output_file), None + return output_file.read_text(encoding="utf-8"), None # Build macro substitution dict macros = self._build_macros(stage, source_content, stage_outputs) @@ -180,7 +199,7 @@ class SourcePipeline: template_path = self.root / stage.template if not template_path.exists(): print(f" ERROR: Template not found: {stage.template}") - return None + return None, {"stage": stage_label, "error": f"template not found: {stage.template}"} template_content = template_path.read_text(encoding="utf-8") prompt = self._resolve_macros(template_content, macros) @@ -195,17 +214,23 @@ class SourcePipeline: # Without an adapter, we cannot generate output if self.adapter is None: print(" No LLM adapter — skipping generation (manual mode).") - return None + return None, {"stage": stage_label, "error": "no adapter"} + + # Resolve max_tokens: stage config > pipeline default + max_tokens = stage.max_tokens if stage.max_tokens is not None else _DEFAULT_MAX_TOKENS # Call LLM — with one retry for split_entities stages that return 0 entities max_attempts = 2 if stage.split_entities else 1 entity_files: List[Tuple[str, Path]] = [] - content = None + content: Optional[str] = None + llm_meta: Dict[str, Any] = {} + total_retries = 0 for attempt in range(max_attempts): - content = self._call_llm(prompt, stage_label) + content, llm_meta = self._call_llm(prompt, stage_label, max_tokens) if content is None: - return None + meta = {"stage": stage_label, "retries": total_retries, **llm_meta} + return None, meta # Save raw response for debugging (overwritten on retry) if output_file: @@ -214,30 +239,35 @@ class SourcePipeline: raw_file.write_text(content, encoding="utf-8") if stage.split_entities: - entity_files = self._split_and_write_entities(stage, content) + entity_files = self._split_and_write_entities(stage, content, source_id) if entity_files: break # Got entities — proceed if attempt < max_attempts - 1: + total_retries += 1 print(f" No entity delimiters found — retrying ({attempt + 2}/{max_attempts})...") else: print( f" WARNING: No '--- ENTITY: ---' markers found after {max_attempts} attempt(s).\n" f" Check {raw_file.name} to inspect the raw LLM response." ) - return None # Don't write empty view; allow re-run + meta = {"stage": stage_label, "retries": total_retries, + "error": "no entity delimiters", **llm_meta} + return None, meta # Don't write empty view; allow re-run else: break # Non-split stages don't need retry + stage_meta: Dict[str, Any] = {"stage": stage_label, "retries": total_retries, **llm_meta} + # Persist output if stage.split_entities: self._write_entity_view(source_id, entity_files, output_file) - return content + return content, stage_meta else: if output_file: output_file.parent.mkdir(parents=True, exist_ok=True) output_file.write_text(content, encoding="utf-8") print(f" Output written to {output_file.name}") - return content + return content, stage_meta # ── Output File Resolution ──────────────────────────────────────── @@ -319,6 +349,7 @@ class SourcePipeline: for f in entities_dir.glob("*.md") if not f.name.endswith("-entities.md") and not f.name.endswith("-prompt.md") + and not f.name.endswith("-raw.md") ) @staticmethod @@ -334,6 +365,7 @@ class SourcePipeline: self, stage: PipelineStage, combined_content: str, + source_id: str = "", ) -> List[Tuple[str, Path]]: """Split ``--- ENTITY: ---`` delimited output into files. @@ -369,7 +401,11 @@ class SourcePipeline: if file_path.exists(): skipped_count += 1 else: - file_path.write_text(entity_content + "\n", encoding="utf-8") + # Prepend provenance comment so the LLM origin is traceable + provenance = self._provenance_comment(source_id) + file_path.write_text( + provenance + entity_content + "\n", encoding="utf-8" + ) new_count += 1 entity_files.append((entity_name, file_path)) @@ -428,19 +464,99 @@ class SourcePipeline: return "\n\n".join(parts) + "\n" if parts else "" + # ── Provenance & Processing Log ─────────────────────────────────── + + def _provenance_comment(self, source_id: str) -> str: + """Return an HTML comment tagging the LLM that generated this content.""" + date = datetime.now(timezone.utc).strftime("%Y-%m-%d") + parts = [f"date={date}", f"source={source_id}"] + if self.provider: + parts.insert(0, f"provider={self.provider}") + if self.model: + parts.insert(1, f"model={self.model}") + return f"\n\n" + + def _write_processing_log( + self, + source_id: str, + stage_logs: List[Dict[str, Any]], + success: bool, + ) -> None: + """Append a run record to ``output/processing-log.yaml``.""" + log_file = self.root / "output" / "processing-log.yaml" + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Load existing log + existing: List[Dict[str, Any]] = [] + if log_file.is_file(): + try: + raw = yaml.safe_load(log_file.read_text(encoding="utf-8")) + if isinstance(raw, list): + existing = raw + except Exception: + pass + + # Build new entry + total_prompt = sum(s.get("prompt_tokens", 0) for s in stage_logs) + total_completion = sum(s.get("completion_tokens", 0) for s in stage_logs) + total_cost = sum(s.get("cost", 0.0) for s in stage_logs) + total_duration = sum(s.get("duration_seconds", 0.0) for s in stage_logs) + total_retries = sum(s.get("retries", 0) for s in stage_logs) + errors = [s["error"] for s in stage_logs if s.get("error")] + + entry: Dict[str, Any] = { + "source_id": source_id, + "processed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "provider": self.provider, + "model": self.model, + "success": success, + "total_prompt_tokens": total_prompt, + "total_completion_tokens": total_completion, + "total_cost": round(total_cost, 6), + "total_duration_seconds": round(total_duration, 1), + "total_retries": total_retries, + "stages": stage_logs, + } + if errors: + entry["errors"] = errors + + # Remove previous entry for the same source_id (re-run) + existing = [e for e in existing if e.get("source_id") != source_id] + existing.append(entry) + + log_file.write_text( + yaml.safe_dump(existing, default_flow_style=False, sort_keys=False), + encoding="utf-8", + ) + # ── LLM Execution ───────────────────────────────────────────────── def _call_llm( - self, prompt: str, stage_name: str, max_tokens: int = 8192 - ) -> Optional[str]: + self, prompt: str, stage_name: str, max_tokens: int = _DEFAULT_MAX_TOKENS + ) -> Tuple[Optional[str], Dict[str, Any]]: """Call the LLM adapter with exponential back-off on rate limits. - Returns the response content string, or None on failure. + Returns: + ``(content, metadata)`` where *content* is the response string + (or ``None`` on failure) and *metadata* has provider, model, + token counts, cost, finish_reason, duration, and error info. """ from markitect.prompts.execution.models import RunConfig from markitect.llm.exceptions import LLMRateLimitError - print(f" Calling LLM ({stage_name})...") + meta: Dict[str, Any] = { + "provider": self.provider, + "model": self.model, + "prompt_tokens": 0, + "completion_tokens": 0, + "cost": 0.0, + "finish_reason": None, + "duration_seconds": 0.0, + "error": None, + } + + model_label = f"{self.provider}/{self.model}" if self.provider else stage_name + print(f" Calling LLM ({model_label})...") t0 = time.time() max_retries = 3 response = None @@ -460,29 +576,57 @@ class SourcePipeline: ) time.sleep(wait) else: - print(f" Rate limit exceeded after {max_retries} retries: {exc}") - return None + msg = f"Rate limit exceeded after {max_retries} retries: {exc}" + print(f" {msg}") + meta["error"] = msg + meta["duration_seconds"] = round(time.time() - t0, 1) + return None, meta except Exception as exc: - print(f" LLM error: {exc}") - return None + msg = str(exc) + print(f" LLM error: {msg}") + meta["error"] = msg + meta["duration_seconds"] = round(time.time() - t0, 1) + return None, meta if response is None: - return None + meta["error"] = "no response" + return None, meta - elapsed = time.time() - t0 + elapsed = round(time.time() - t0, 1) usage = response.usage + prompt_tok = usage.get("prompt_tokens", 0) + completion_tok = usage.get("completion_tokens", 0) + cost = float(usage.get("cost", 0.0)) + finish_reason = getattr(response, "finish_reason", None) or "unknown" + + meta.update({ + "prompt_tokens": prompt_tok, + "completion_tokens": completion_tok, + "cost": cost, + "finish_reason": finish_reason, + "duration_seconds": elapsed, + }) + + cost_str = f", cost=${cost:.4f}" if cost > 0 else "" print( - f" Done in {elapsed:.1f}s — " - f"prompt {usage.get('prompt_tokens', '?')} tok, " - f"completion {usage.get('completion_tokens', '?')} tok" + f" Done in {elapsed}s — " + f"prompt {prompt_tok} tok, completion {completion_tok} tok{cost_str}" ) + if finish_reason == "length": + print( + f" WARNING: Output truncated at {max_tokens} tokens " + f"(finish_reason=length). Consider raising max_tokens for " + f"stage '{stage_name}' in infospace.yaml." + ) + content = response.content if not content or not content.strip(): print(" LLM returned empty content.") - return None + meta["error"] = "empty response" + return None, meta - return content + return content, meta # ── Git Integration ───────────────────────────────────────────────