Files
tegwick e3e5b8ecc1
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat(infospace): systematic long-text processing — rich commit bodies, per-source eval/classify, chapters view
Three coordinated changes that let the pipeline produce a clean
chapter-by-chapter git history on long texts without archaeology after
the fact.

1. Richer commit messages. `SourcePipeline._git_commit` now diffs the
   staged changes, buckets added files by output subdirectory (entities,
   evaluations, classifications, mappings, analyses, metrics, logs), and
   includes counts in the commit body. So `git log` reads "entities:
   +23, evaluations: +23" per chapter instead of the same generic blurb
   on every commit. Zero behaviour change when no output changed; falls
   back to the original message if the diff query fails.

2. --eval-after-source / --classify-after-source on `infospace process`.
   After a source's stages succeed, the pipeline identifies which entity
   files are *new* (set diff of entity slugs before vs after), loads
   their EntityMeta, and runs per-entity evaluation and/or
   classification scoped to just those slugs before the per-source git
   commit lands. Result: each chapter's commit is self-contained —
   extraction + evaluation + classification in one atomic unit. Gated
   behind explicit flags because the cost is real (LLM latency per
   chapter rather than amortised across one bulk batch).

3. `markitect infospace chapters` subcommand. Lists source files in
   canonical order with entity count, evaluated count, classified
   count, and mean per-entity score per source. Text or JSON output.
   Natural triage surface for long-text infospaces — spot chapters that
   under-extracted or evaluated poorly.

Also: `docs/advanced-usage.md` gets a new "Systematic processing of
long texts" section with the recommended flag combo and the tradeoff
note on cost.

11 new unit tests cover the chapters command (text/json/no-sources),
the process flag wiring (help + provider requirement), and the
commit-body bucket logic. Full infospace+llm unit suite (315 tests)
green; 3 pre-existing infospace failures unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 08:24:26 +02:00

1589 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CLI commands for infospace lifecycle management.
Provides ``markitect infospace`` subcommands for initialising,
inspecting, and evaluating infospaces.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Dict, Optional
import click
from markitect.infospace.config import (
DisciplineBinding,
InfospaceConfig,
SchemaRegistry,
TopicConfig,
find_infospace_config,
load_infospace_config,
save_infospace_config,
)
from markitect.infospace.entity_parser import parse_entity_directory
from markitect.infospace.state import build_state
def _load_config_or_exit(config_path: Optional[str] = None) -> tuple:
"""Resolve and load infospace.yaml, or exit with an error."""
if config_path:
p = Path(config_path)
else:
p = find_infospace_config()
if p is None:
click.echo("Error: No infospace.yaml found. Run 'markitect infospace init' first.", err=True)
raise SystemExit(1)
cfg = load_infospace_config(p)
return cfg, p
@click.group(name="infospace")
def infospace_commands():
"""Manage infospaces — create, inspect, evaluate."""
pass
# ── init ─────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--topic", required=True, help="Topic name for the infospace.")
@click.option("--domain", default="", help="Knowledge domain.")
@click.option("--sources", default="", help="Path to source material directory.")
@click.option("--discipline", multiple=True, help="Discipline name (repeatable).")
@click.option("--output", "-o", default="infospace.yaml", help="Output config file path.")
def init(topic: str, domain: str, sources: str, discipline: tuple, output: str):
"""Initialise a new infospace configuration file."""
out_path = Path(output)
if out_path.exists():
click.echo(f"Error: {out_path} already exists.", err=True)
raise SystemExit(1)
disciplines = [DisciplineBinding(name=d) for d in discipline]
config = InfospaceConfig(
topic=TopicConfig(name=topic, domain=domain, sources=sources),
disciplines=disciplines,
)
save_infospace_config(config, out_path)
click.echo(f"Created {out_path}")
# ── status ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def status(config_path: Optional[str]):
"""Show infospace status — entity count, domains, evaluation state."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
# Parse entities
entities_dir = root / cfg.entities_dir
entities = []
if entities_dir.is_dir():
entities = parse_entity_directory(entities_dir)
# Load latest snapshot if available
snapshot = None
history_path = root / cfg.metrics_dir / "history.yaml"
if history_path.is_file():
from markitect.infospace.evaluation_io import read_history
history = read_history(history_path)
if history:
snapshot = history[-1]
state = build_state(cfg, entities=entities, snapshot=snapshot)
click.echo(f"Infospace: {state.topic_name}")
if cfg.topic.domain:
click.echo(f"Domain: {cfg.topic.domain}")
click.echo(f"Entities: {state.entity_count}")
if state.domains:
click.echo(f"Domains: {', '.join(state.domains)}")
if cfg.disciplines:
names = [d.name for d in cfg.disciplines]
click.echo(f"Disciplines: {', '.join(names)}")
if state.has_evaluations:
click.echo(f"Last evaluated: {state.latest_snapshot.created_at.isoformat()}")
else:
click.echo("Evaluations: none")
# ── entities ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--sort-by", "sort_key",
type=click.Choice(["slug", "domain", "words"]),
default="slug",
help="Sort entities by field.",
)
@click.option("--by-type", "by_type", is_flag=True, default=False,
help="Group entities by L2 entity type.")
def entities(config_path: Optional[str], sort_key: str, by_type: bool):
"""List entities with metadata summary."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.")
return
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.")
return
if by_type:
_entities_by_type(cfg, root, entity_list)
return
# Sort
if sort_key == "domain":
entity_list.sort(key=lambda e: (e.domain or "", e.slug))
elif sort_key == "words":
entity_list.sort(key=lambda e: e.total_word_count, reverse=True)
else:
entity_list.sort(key=lambda e: e.slug)
# Format as table
click.echo(f"{'Slug':<40} {'Domain':<20} {'Words':>6}")
click.echo("-" * 68)
for e in entity_list:
click.echo(f"{e.slug:<40} {(e.domain or '-'):<20} {e.total_word_count:>6}")
click.echo(f"\nTotal: {len(entity_list)} entities")
def _entities_by_type(cfg, root: "Path", entity_list: list) -> None:
"""Print entities grouped by L2 entity type."""
from markitect.infospace.classification import ENTITY_TYPES
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.evaluation_io import read_entity_evaluation
# Load classifications
cls_dir = root / cfg.classifications_dir
cls_map: dict = {}
if cls_dir.is_dir():
from markitect.infospace.classification_io import read_classifications_directory
for c in read_classifications_directory(cls_dir):
cls_map[c.entity_slug] = c
# Load evaluation scores (best-effort)
eval_dir = root / cfg.evaluations_dir
eval_scores: dict = {} # slug → overall_score
if eval_dir.is_dir():
for ef in eval_dir.glob("*.md"):
try:
ev = read_entity_evaluation(ef)
eval_scores[ev.entity_slug] = ev.overall_score
except Exception:
pass
# Build index: entity_type → list of (entity, classification)
entity_index = {
t: [] for t in ENTITY_TYPES
}
entity_index["Unclassified"] = []
entity_map = {e.slug: e for e in entity_list}
for e in entity_list:
cls = cls_map.get(e.slug)
if cls is None:
entity_index["Unclassified"].append((e, None))
else:
bucket = cls.entity_type if cls.entity_type in entity_index else "Unclassified"
entity_index[bucket].append((e, cls))
# Print each type group
type_order = list(ENTITY_TYPES) + ["Unclassified"]
total = 0
for etype in type_order:
group = entity_index.get(etype, [])
if not group:
continue
click.echo(f"\n=== {etype} ({len(group)} entities) ===")
group.sort(key=lambda x: x[0].slug)
for e, cls in group:
vsm = cls.vsm_system if cls else ""
domain = (e.domain or "-")[:18]
score = eval_scores.get(e.slug)
score_str = f" \u2605{score:.1f}" if score is not None else ""
slug_col = f"{e.slug:<40}"
click.echo(f" {slug_col} {domain:<18} {vsm:<4}{score_str}")
if cls and cls.entity_type == "Relation" and cls.links_mechanism:
subj = cls.links_subject or cls.links_subject_slug or "?"
obj = cls.links_object or cls.links_object_slug or "?"
click.echo(f" \u2192 links: {subj} \u2194 {obj}")
mech = cls.links_mechanism
if len(mech) > 80:
mech = mech[:77] + "..."
click.echo(f" \u2192 mechanism: {mech}")
total += len(group)
click.echo(f"\nTotal: {total} entities")
# ── chapters (per-source triage view) ────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--format", "output_format",
type=click.Choice(["text", "json"]),
default="text",
help="Output format.",
)
def chapters(config_path: Optional[str], output_format: str):
"""List source files in canonical order with per-source stats.
For each source file in the sources directory, reports entity count,
mean per-entity score (if evaluated), classification coverage, and
processing status. Useful for triaging long-text infospaces.
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(f"No sources directory at {sources_dir}.", err=True)
raise SystemExit(1)
source_files = sorted(sources_dir.glob("*.md"))
if not source_files:
click.echo(f"No source files in {sources_dir}.", err=True)
raise SystemExit(1)
entities_dir = root / cfg.entities_dir
entity_list = (
parse_entity_directory(entities_dir) if entities_dir.is_dir() else []
)
# Build a source_id → [entities] map using the source_chapter field.
# Matching is lenient: entities with a source_chapter substring-equal
# to a normalized form of the source stem count as belonging to it.
def _chapter_keys(source_id: str) -> list:
"""Return strings an entity's source_chapter might contain."""
keys = [source_id, source_id.replace("-", " ")]
m = re.match(r"book-(\d+)-chapter-(\d+)", source_id)
if m:
book, chap = m.group(1), m.group(2)
roman = {"1": "I", "2": "II", "3": "III", "4": "IV", "5": "V"}
if book in roman:
keys.append(f"Book {roman[book]}, Chapter {int(chap)}")
keys.append(f"Book {roman[book]} Chapter {int(chap)}")
return keys
# Precompute evaluation scores and classification slugs once.
evals_dir = root / cfg.evaluations_dir
cls_dir = root / cfg.classifications_dir
eval_scores: Dict[str, float] = {}
if evals_dir.is_dir():
from markitect.infospace.evaluation_io import read_entity_evaluation
for ev_path in evals_dir.glob("*.md"):
try:
ev = read_entity_evaluation(ev_path)
if ev.overall_score is not None:
eval_scores[ev_path.stem] = ev.overall_score
except Exception:
continue
classified_slugs = (
{p.stem for p in cls_dir.glob("*.md")} if cls_dir.is_dir() else set()
)
rows = []
for source_file in source_files:
source_id = source_file.stem
keys = _chapter_keys(source_id)
matched = [
e for e in entity_list
if any(k.lower() in (e.source_chapter or "").lower() for k in keys)
]
slugs = {e.slug for e in matched}
evaluated = slugs & set(eval_scores)
classified = slugs & classified_slugs
mean = (
sum(eval_scores[s] for s in evaluated) / len(evaluated)
if evaluated else None
)
rows.append({
"source_id": source_id,
"entities": len(matched),
"evaluated": len(evaluated),
"classified": len(classified),
"mean_score": round(mean, 2) if mean is not None else None,
})
if output_format == "json":
import json
click.echo(json.dumps(rows, indent=2))
return
# Text: aligned table.
headers = ("source", "entities", "evaluated", "classified", "mean_score")
widths = [
max(len(h), max((len(str(r[h.replace(' ', '_')])) if h != "source"
else len(r["source_id"]))
for r in rows)) if rows else len(h)
for h in headers
]
fmt = " ".join(f"{{:<{w}}}" for w in widths)
click.echo(fmt.format(*headers))
click.echo(fmt.format(*("-" * w for w in widths)))
for r in rows:
click.echo(fmt.format(
r["source_id"],
r["entities"],
r["evaluated"],
r["classified"],
"-" if r["mean_score"] is None else f"{r['mean_score']:.2f}",
))
totals = {
"entities": sum(r["entities"] for r in rows),
"evaluated": sum(r["evaluated"] for r in rows),
"classified": sum(r["classified"] for r in rows),
}
click.echo(
f"\n{len(rows)} source file(s); "
f"{totals['entities']} entities, "
f"{totals['evaluated']} evaluated, "
f"{totals['classified']} classified."
)
# ── entity (single lookup) ───────────────────────────────────────────
@infospace_commands.command()
@click.argument("name")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def entity(name: str, config_path: Optional[str]):
"""Look up one entity by name, tolerating case / hyphens / underscores.
Prints slug, source path, domain, chapter, word count, overall score,
VSM system (if classified), and evaluation-file path.
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities found.", err=True)
raise SystemExit(1)
# Normalize: lowercase, underscores.
def norm(s: str) -> str:
return s.lower().replace("-", "_").replace(" ", "_")
target = norm(name)
by_slug = {e.slug: e for e in entity_list}
match = by_slug.get(target)
if match is None:
# Substring fallback for partial input.
candidates = [e for e in entity_list if target in norm(e.slug)]
if len(candidates) == 1:
match = candidates[0]
elif len(candidates) > 1:
click.echo(f"Ambiguous — '{name}' matches multiple entities:", err=True)
for c in sorted(candidates, key=lambda e: e.slug)[:10]:
click.echo(f" {c.slug}", err=True)
if len(candidates) > 10:
click.echo(f" … and {len(candidates) - 10} more", err=True)
raise SystemExit(1)
else:
click.echo(f"No entity matching '{name}'.", err=True)
near = sorted(
e.slug for e in entity_list
if target.split("_", 1)[0] in e.slug
)[:5]
if near:
click.echo(f" Near matches: {', '.join(near)}", err=True)
raise SystemExit(1)
# Load score + classification (best-effort).
score: Optional[float] = None
evaluator: Optional[str] = None
eval_file = root / cfg.evaluations_dir / f"{match.slug}.md"
if eval_file.is_file():
try:
from markitect.infospace.evaluation_io import read_entity_evaluation
ev = read_entity_evaluation(eval_file)
score = ev.overall_score
evaluator = ev.evaluator
except Exception:
pass
vsm: Optional[str] = None
cls_file = root / cfg.classifications_dir / f"{match.slug}.md"
if cls_file.is_file():
try:
from markitect.infospace.classification_io import read_entity_classification
cls = read_entity_classification(cls_file)
vsm = cls.vsm_system
except Exception:
pass
# Output — one field per line so it's easy to grep or pipe.
click.echo(f"slug: {match.slug}")
click.echo(f"source_path: {match.source_path}")
click.echo(f"domain: {match.domain or '-'}")
click.echo(f"chapter: {match.source_chapter or '-'}")
click.echo(f"word_count: {match.total_word_count}")
click.echo(f"vsm_system: {vsm or '-'}")
if score is not None:
click.echo(f"overall_score: {score:.2f}")
click.echo(f"evaluator: {evaluator or '-'}")
click.echo(f"evaluation: {eval_file}")
else:
click.echo("evaluation: (not yet evaluated)")
# ── evaluate ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter", help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option("--entity", "entity_slug", default=None, help="Evaluate a single entity by slug.")
@click.option("--chapter", default=None, help="Evaluate entities from a specific chapter.")
@click.option("--force", is_flag=True, default=False,
help="Re-evaluate entities whose evaluation file already exists.")
@click.option("--model-fallback", "model_fallback", default=None,
help="If the primary model hits a rate limit (429), retry the "
"failed entities once with this model. Useful on free tiers "
"where models have separate quota buckets (e.g. "
"gemini-2.5-flash → gemini-2.5-flash-lite).")
def evaluate(config_path, provider, model, entity_slug, chapter, force, model_fallback):
"""Evaluate entities using LLM-based quality assessment."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to evaluate.")
return
# Filter. Accept hyphenated input for --entity by normalizing to the
# underscore slug format produced by parse_entity_directory.
if entity_slug:
normalized = entity_slug.replace("-", "_")
matches = [e for e in entity_list if e.slug == normalized]
if not matches:
# Build a short "did you mean…" list from entities sharing a stem.
stem = normalized.split("_", 1)[0]
near = sorted(e.slug for e in entity_list if e.slug.startswith(stem))[:5]
msg = f"Error: Entity '{entity_slug}' not found."
if near:
msg += f" Did you mean: {', '.join(near)} ?"
click.echo(msg, err=True)
raise SystemExit(1)
entity_list = matches
elif chapter:
entity_list = [e for e in entity_list if chapter in e.source_chapter]
if not entity_list:
click.echo(f"No entities found for chapter '{chapter}'.")
return
# Skip entities that already have evaluation files (incremental resume).
# Applies uniformly to full-pass, --entity, and --chapter runs unless
# --force is set.
from markitect.infospace.evaluate import run_entity_evaluation
output_dir = root / cfg.evaluations_dir
if not force and output_dir.is_dir():
existing = {p.stem for p in output_dir.glob("*.md")}
before = len(entity_list)
entity_list = [e for e in entity_list if e.slug not in existing]
skipped = before - len(entity_list)
if not entity_list:
click.echo("All selected entities already evaluated. "
"Re-run with --force to overwrite.")
return
if skipped:
click.echo(f"Skipping {skipped} already-evaluated entities. "
"Use --force to re-evaluate.")
# Create adapter
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.3, max_tokens=2000)
# Progress callback — surface error detail so agents don't have to
# drop into Python to see whether an ERROR was 429, 503, or auth.
def on_progress(done, total, result):
status = result.status.upper()
if status == "ERROR" and result.error:
click.echo(f" [{done}/{total}] {result.key}: ERROR — {result.error}")
else:
click.echo(f" [{done}/{total}] {result.key}: {status}")
click.echo(f"Evaluating {len(entity_list)} entities via {provider}...")
summary = run_entity_evaluation(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
# Model fallback: if any entities failed with a rate-limit-looking
# error and the user opted in with --model-fallback, retry them once
# with a fresh adapter on the fallback model. Different free-tier
# models have separate quota buckets, so this often succeeds when
# the primary is exhausted.
if model_fallback and summary.failed > 0:
rate_limited = [
r for r in summary.results
if r.status == "error"
and r.error
and ("429" in r.error or "rate" in r.error.lower())
]
if rate_limited:
retry_slugs = {r.key for r in rate_limited}
retry_entities = [e for e in entity_list if e.slug in retry_slugs]
click.echo(
f"\n{len(retry_entities)} rate-limited entities — "
f"retrying with --model-fallback {model_fallback}..."
)
fb_adapter = create_adapter(provider, model=model_fallback)
fb_run_config = RunConfig(
model_name=model_fallback, temperature=0.3, max_tokens=2000
)
fb_summary = run_entity_evaluation(
config=cfg,
entities=retry_entities,
adapter=fb_adapter,
run_config=fb_run_config,
output_dir=output_dir,
progress_callback=on_progress,
)
summary.succeeded += fb_summary.succeeded
summary.failed = (summary.failed - len(retry_entities)) + fb_summary.failed
summary.total_prompt_tokens += fb_summary.total_prompt_tokens
summary.total_completion_tokens += fb_summary.total_completion_tokens
click.echo(f"\nDone: {summary.succeeded} succeeded, {summary.failed} failed, {summary.skipped} skipped")
if summary.total_tokens > 0:
click.echo(f"Tokens used: {summary.total_tokens}")
# ── eval-summary ──────────────────────────────────────────────────────
@infospace_commands.command(name="eval-summary")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--update-metrics", is_flag=True, default=False,
help="Merge per_entity_mean into metrics.yaml for viability checks.")
def eval_summary(config_path: Optional[str], update_metrics: bool):
"""Show aggregate statistics from per-entity evaluation files."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
evaluations_dir = root / cfg.evaluations_dir
if not evaluations_dir.is_dir():
click.echo("No evaluations directory found. Run 'markitect infospace evaluate' first.")
return
from markitect.infospace.evaluation_io import read_entity_evaluation
eval_files = sorted(evaluations_dir.glob("*.md"))
if not eval_files:
click.echo("No evaluation files found.")
return
overall_scores: list = []
dim_scores: dict = {}
failed: list = []
for ef in eval_files:
try:
ev = read_entity_evaluation(ef)
overall_scores.append(ev.overall_score)
for s in ev.scores:
dim_scores.setdefault(s.name, []).append(s.value)
except Exception as exc:
failed.append((ef.stem, str(exc)))
n = len(overall_scores)
if n == 0:
click.echo("No evaluations could be read.")
return
mean_overall = sum(overall_scores) / n
click.echo(f"Evaluation summary — {n} entities evaluated")
if failed:
click.echo(f" (failed to read: {len(failed)})")
click.echo()
click.echo(f" {'Dimension':<30} {'Mean':>6}")
click.echo(" " + "-" * 38)
click.echo(f" {'overall':<30} {mean_overall:>6.3f}")
for dim, vals in sorted(dim_scores.items()):
click.echo(f" {dim:<30} {sum(vals)/len(vals):>6.3f}")
score_min = min(overall_scores)
score_max = max(overall_scores)
click.echo()
click.echo(f" Range: {score_min:.2f} {score_max:.2f}")
if update_metrics:
from markitect.infospace.history import read_metrics_file, write_metrics_file
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
existing = read_metrics_file(metrics_file)
existing["per_entity_mean"] = round(mean_overall, 6)
write_metrics_file(existing, metrics_file)
click.echo(f"\nUpdated metrics.yaml: per_entity_mean = {mean_overall:.4f}")
# ── relations ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--entity", "entity_slug", default=None,
help="Show only relations involving this entity slug.")
@click.option("--vsm", "vsm_filter", default=None,
help="Show only relations whose VSM channel contains this string (e.g. S2, S3).")
@click.option("--loops", "loops_only", is_flag=True, default=False,
help="Show only feedback loops (cycles in the relation graph).")
@click.option("--stats", "stats_only", is_flag=True, default=False,
help="Show aggregate statistics only, no individual relations.")
def relations(config_path: Optional[str], entity_slug: Optional[str],
vsm_filter: Optional[str], loops_only: bool, stats_only: bool):
"""Show the L3 relation graph — triplets, feedback loops, and VSM channels."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.relation_parser import parse_relations_directory
relations_dir = root / cfg.relations_dir
if not relations_dir.is_dir():
click.echo("No relations directory found. Create output/relations/ and add relation files.")
return
all_relations = parse_relations_directory(relations_dir)
if not all_relations:
click.echo("No relation files found in " + str(relations_dir))
return
# Build directed graph for cycle detection
try:
import networkx as nx
G = nx.DiGraph()
for r in all_relations:
G.add_edge(r.subject_slug, r.object_slug,
predicate=r.predicate,
relation_type=r.relation_type,
vsm_channel=r.vsm_channel,
slug=r.slug)
except ImportError:
G = None
# Find feedback loops
loops = []
if G is not None:
try:
loops = list(nx.simple_cycles(G))
except Exception:
loops = []
# Stats summary
import re as _re
def _vsm_code(channel: str) -> str:
"""Strip parenthetical description, returning just the system code (e.g. 'S3 → S1')."""
return _re.sub(r'\s*\(.*', '', channel).strip() or channel
n = len(all_relations)
vsm_counts: dict = {}
type_counts: dict = {}
for r in all_relations:
vsm_counts[_vsm_code(r.vsm_channel)] = vsm_counts.get(_vsm_code(r.vsm_channel), 0) + 1
type_counts[r.relation_type] = type_counts.get(r.relation_type, 0) + 1
click.echo(f"Relation graph — {n} relations")
if G is not None:
click.echo(f" Entities in graph: {G.number_of_nodes()}")
click.echo(f" Feedback loops: {len(loops)}")
click.echo()
if stats_only:
click.echo("Relation types:")
for rt, count in sorted(type_counts.items(), key=lambda x: -x[1]):
click.echo(f" {rt:<25} {count:>4}")
click.echo()
click.echo("VSM channels:")
for ch, count in sorted(vsm_counts.items(), key=lambda x: -x[1]):
click.echo(f" {ch:<20} {count:>4}")
return
# Feedback loops section
if loops or loops_only:
if loops:
click.echo(f"Feedback loops ({len(loops)}):")
for i, cycle in enumerate(loops, 1):
click.echo(f" Loop {i}: {''.join(cycle)}{cycle[0]}")
click.echo()
elif loops_only:
click.echo("No feedback loops detected in current relation set.")
return
if loops_only:
return
# Filter relations
filtered = all_relations
if entity_slug:
filtered = [r for r in filtered
if entity_slug in (r.subject_slug, r.object_slug)]
if not filtered:
click.echo(f"No relations found involving '{entity_slug}'.")
return
if vsm_filter:
filtered = [r for r in filtered if vsm_filter in r.vsm_channel]
if not filtered:
click.echo(f"No relations with VSM channel containing '{vsm_filter}'.")
return
# Display relations
click.echo(f"{'Subject':<35} {'Predicate':<30} {'Object':<35} {'VSM'}")
click.echo("-" * 110)
for r in filtered:
subj = r.subject[:33] + ".." if len(r.subject) > 35 else r.subject
obj = r.object[:33] + ".." if len(r.object) > 35 else r.object
pred = r.predicate[:28] + ".." if len(r.predicate) > 30 else r.predicate
click.echo(f"{subj:<35} {pred:<30} {obj:<35} {r.vsm_channel}")
# ── classify ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--entity", "entity_slug", default=None,
help="Classify a single entity by slug.")
@click.option("--provider", default="openrouter",
help="LLM provider (openrouter, gemini, openai, …).")
@click.option("--model", default=None, help="Model name override.")
@click.option("--rpm", default=0, type=int,
help="Max requests per minute (0 = unlimited). Use 10 for Gemini free tier.")
def classify(config_path: Optional[str], entity_slug: Optional[str],
provider: str, model: Optional[str], rpm: int):
"""Classify entities with Entity Type and VSM System (L2)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classifier import run_entity_classification
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
entity_list = parse_entity_directory(root / cfg.entities_dir)
if not entity_list:
click.echo("No entities found in " + str(root / cfg.entities_dir), err=True)
return
output_dir = root / cfg.classifications_dir
if entity_slug:
entity_list = [e for e in entity_list if e.slug == entity_slug]
if not entity_list:
click.echo(f"Entity '{entity_slug}' not found.", err=True)
return
else:
# Incremental skip — entities already classified are omitted
if output_dir.is_dir():
done_slugs = {p.stem for p in output_dir.glob("*.md")}
before = len(entity_list)
entity_list = [e for e in entity_list if e.slug not in done_slugs]
skipped = before - len(entity_list)
if skipped:
click.echo(f"Skipping {skipped} already-classified entities.")
if not entity_list:
click.echo("All entities already classified. Nothing to do.")
return
delay = (60.0 / rpm) if rpm > 0 else 0.0
click.echo(f"Classifying {len(entity_list)} entities …" +
(f" (rate: {rpm} RPM, {delay:.1f}s delay)" if delay else ""))
output_dir.mkdir(parents=True, exist_ok=True)
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.1, max_tokens=2000)
def _progress(done: int, total: int, result) -> None:
if result.status == "success":
click.echo(f" [{done}/{total}] {result.key}")
else:
click.echo(f" [{done}/{total}] {result.key} — FAILED: {result.error}")
summary = run_entity_classification(
config=cfg,
entities=entity_list,
adapter=adapter,
run_config=run_config,
output_dir=output_dir,
progress_callback=_progress,
delay_seconds=delay,
)
click.echo(f"\nDone: {summary.succeeded} classified, {summary.failed} failed.")
# ── classify-summary ──────────────────────────────────────────────────
@infospace_commands.command(name="classify-summary")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--update-metrics", "update_metrics", is_flag=True, default=False,
help="Write type_distribution metrics to metrics.yaml.")
def classify_summary(config_path: Optional[str], update_metrics: bool):
"""Show type × VSM distribution across all classified entities (L2)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification import ENTITY_TYPES, VSM_SYSTEMS
from markitect.infospace.classification_io import read_classifications_directory
cls_dir = root / cfg.classifications_dir
if not cls_dir.is_dir():
click.echo("No classifications directory found. Run 'classify' first.")
return
all_cls = read_classifications_directory(cls_dir)
if not all_cls:
click.echo("No classification files found.")
return
n = len(all_cls)
type_counts: dict = {}
vsm_counts: dict = {}
matrix: dict = {} # (entity_type, vsm_system) → count
for c in all_cls:
type_counts[c.entity_type] = type_counts.get(c.entity_type, 0) + 1
vsm_counts[c.vsm_system] = vsm_counts.get(c.vsm_system, 0) + 1
key = (c.entity_type, c.vsm_system)
matrix[key] = matrix.get(key, 0) + 1
click.echo(f"Classification summary — {n} entities\n")
click.echo("Entity types:")
for t, count in sorted(type_counts.items(), key=lambda x: -x[1]):
pct = 100 * count / n if n else 0.0
click.echo(f" {t:<15} {count:>4} ({pct:.1f}%)")
click.echo()
vsm_order = ["S1", "S2", "S3", "S3*", "S4", "S5"]
click.echo("VSM systems:")
for v in vsm_order:
if v in vsm_counts:
count = vsm_counts[v]
pct = 100 * count / n if n else 0.0
click.echo(f" {v:<6} {count:>4} ({pct:.1f}%)")
click.echo()
# Type × VSM matrix
header = f"{'':15}" + "".join(f"{v:>7}" for v in vsm_order)
sep = "-" * (15 + 7 * len(vsm_order))
click.echo(header)
click.echo(sep)
for t in ENTITY_TYPES:
row = f"{t:<15}"
for v in vsm_order:
c = matrix.get((t, v), 0)
row += f"{c if c else '.':>7}"
click.echo(row)
click.echo()
filled_cells = len(matrix)
total_cells = len(ENTITY_TYPES) * len(vsm_order)
click.echo(f"Matrix fill: {filled_cells}/{total_cells} cells occupied")
click.echo()
if update_metrics:
import math
from markitect.infospace.history import read_metrics_file, write_metrics_file
metrics_dir = root / cfg.metrics_dir
metrics_dir.mkdir(parents=True, exist_ok=True)
# Type entropy
type_entropy = 0.0
for count in type_counts.values():
p = count / n
if p > 0:
type_entropy -= p * math.log2(p)
existing = read_metrics_file(metrics_dir / "metrics.yaml")
new_metrics = {
"type_distribution": type_counts,
"vsm_type_matrix_cells": filled_cells,
"type_entropy": round(type_entropy, 4),
}
merged = {**existing, **new_metrics}
write_metrics_file(merged, metrics_dir / "metrics.yaml")
click.echo(
f"Updated metrics.yaml: type_entropy={type_entropy:.4f}, "
f"vsm_type_matrix_cells={filled_cells}"
)
# ── classify-links ────────────────────────────────────────────────────
@infospace_commands.command(name="classify-links")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default="openrouter",
help="LLM provider (openrouter, gemini, openai, …).")
@click.option("--model", default=None, help="Model name override.")
def classify_links(config_path: Optional[str], provider: str, model: Optional[str]):
"""Capture relation endpoint data (subject, object, mechanism) for Relation-type entities."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification import ENTITY_TYPES
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.classifier import run_relation_link_capture
from markitect.llm import create_adapter
from markitect.prompts.execution.models import RunConfig
cls_dir = root / cfg.classifications_dir
if not cls_dir.is_dir():
click.echo("No classifications directory found. Run 'classify' first.", err=True)
raise SystemExit(1)
all_cls = read_classifications_directory(cls_dir)
cls_map = {c.entity_slug: c for c in all_cls}
# Filter to Relation-type entities that are missing links_mechanism
relation_slugs = [
c.entity_slug for c in all_cls
if c.entity_type == "Relation" and not c.links_mechanism
]
if not relation_slugs:
click.echo("All Relation-type entities already have endpoint data. Nothing to do.")
return
# Load entity metadata for these slugs
entity_list = parse_entity_directory(root / cfg.entities_dir)
entity_map = {e.slug: e for e in entity_list}
relation_entities = [entity_map[s] for s in relation_slugs if s in entity_map]
missing_from_entities = [s for s in relation_slugs if s not in entity_map]
if missing_from_entities:
click.echo(f"Warning: {len(missing_from_entities)} Relation-type slugs not found in "
f"entities directory and will be skipped.")
if not relation_entities:
click.echo("No Relation-type entities found to enrich.")
return
click.echo(f"Capturing relation links for {len(relation_entities)} Relation-type entities …")
adapter = create_adapter(provider, model=model)
run_config = RunConfig(model_name=model, temperature=0.1, max_tokens=512)
def _progress(done: int, total: int, result) -> None:
if result.status == "success":
click.echo(f" [{done}/{total}] {result.key}")
else:
click.echo(f" [{done}/{total}] {result.key} — FAILED: {result.error}")
summary = run_relation_link_capture(
config=cfg,
relation_entities=relation_entities,
classifications=cls_map,
adapter=adapter,
run_config=run_config,
output_dir=cls_dir,
progress_callback=_progress,
)
click.echo(f"\nDone: {summary.succeeded} enriched, {summary.failed} failed.")
# ── viability ────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def viability(config_path: Optional[str]):
"""Show viability dashboard — threshold checks and pass/fail."""
cfg, cfg_path = _load_config_or_exit(config_path)
if not cfg.viability:
click.echo("No viability thresholds configured in infospace.yaml.")
return
# Try to load latest metrics
root = cfg_path.parent
metrics: dict = {}
metrics_file = root / cfg.metrics_dir / "metrics.yaml"
if metrics_file.is_file():
import yaml
raw = yaml.safe_load(metrics_file.read_text(encoding="utf-8"))
if isinstance(raw, dict):
metrics = {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
state = build_state(cfg, metrics=metrics if metrics else None)
if not state.viability_results:
click.echo("No metrics available. Run evaluations first.")
click.echo("\nConfigured thresholds:")
for name, t in cfg.viability.items():
bounds = []
if t.min is not None:
bounds.append(f"min={t.min}")
if t.max is not None:
bounds.append(f"max={t.max}")
click.echo(f" {name}: {', '.join(bounds)}")
return
click.echo(f"{'Metric':<30} {'Value':>8} {'Threshold':>15} {'Status':>8}")
click.echo("-" * 63)
for r in state.viability_results:
bounds = []
if r.threshold.min is not None:
bounds.append(f"min={r.threshold.min}")
if r.threshold.max is not None:
bounds.append(f"max={r.threshold.max}")
status_str = "PASS" if r.passed else "FAIL"
click.echo(
f"{r.metric:<30} {r.value:>8.4f} {', '.join(bounds):>15} {status_str:>8}"
)
click.echo()
if state.is_viable:
click.echo(f"Viable: YES ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
else:
click.echo(f"Viable: NO ({state.viability_pass_count}/{state.viability_total_count} thresholds met)")
# ── check ───────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--concern", "concerns", multiple=True,
type=click.Choice(["redundancy", "coverage", "coherence", "consistency", "granularity"]),
help="Run specific concern(s). Omit to run all five.",
)
@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
def check(config_path: Optional[str], concerns: tuple, as_json: bool):
"""Run collection-level quality checks (C1C5)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
entities_dir = root / cfg.entities_dir
if not entities_dir.is_dir():
click.echo("Error: No entities directory found.", err=True)
raise SystemExit(1)
entity_list = parse_entity_directory(entities_dir)
if not entity_list:
click.echo("No entities to check.")
return
from markitect.infospace.checks import run_all_checks
checks_list = list(concerns) if concerns else None
report = run_all_checks(
entities=entity_list,
checks=checks_list,
)
if as_json:
import json
click.echo(json.dumps(report.to_dict(), indent=2))
else:
click.echo(f"Collection checks — {len(entity_list)} entities\n")
d = report.to_dict()
for concern_name, concern_data in d.items():
label = concern_data.get("concern", concern_name.upper())
click.echo(f" {label}{concern_name}")
for k, v in concern_data.items():
if k == "concern":
continue
click.echo(f" {k}: {v}")
click.echo()
# Show summary metrics
m = report.metrics()
if m and not as_json:
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")
# Record to history
if m:
from markitect.infospace.history import record_check_results
snap = record_check_results(report, cfg, root, entity_count=len(entity_list))
if not as_json:
click.echo(f"\nRecorded snapshot {snap.snapshot_id}")
# ── history ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--metric", default=None, help="Show trend for a specific metric.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
def history(config_path: Optional[str], metric: Optional[str], as_json: bool):
"""Show metrics history — snapshots over time."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import get_history, metric_trend
snapshots = get_history(cfg, root)
if not snapshots:
click.echo("No history found. Run 'markitect infospace check' first.")
return
if metric:
trend = metric_trend(snapshots, metric)
if not trend:
click.echo(f"No data for metric '{metric}'.")
return
if as_json:
import json
click.echo(json.dumps(trend, indent=2))
else:
click.echo(f"Trend: {metric}\n")
for entry in trend:
click.echo(f" {entry['date'][:19]} {entry['value']:.4f}")
return
if as_json:
import json
click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str))
return
click.echo(f"History: {len(snapshots)} snapshot(s)\n")
click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}")
click.echo("-" * 42)
for i, snap in enumerate(snapshots, 1):
date_str = snap.created_at.isoformat()[:19]
n_metrics = len(snap.collection_metrics)
click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}")
@infospace_commands.command(name="history-diff")
@click.argument("date_a")
@click.argument("date_b")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def history_diff(date_a: str, date_b: str, config_path: Optional[str]):
"""Compare two history snapshots by date (YYYY-MM-DD)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import find_snapshot_by_date, get_history
from markitect.infospace.evaluation_io import diff_snapshots
snapshots = get_history(cfg, root)
if len(snapshots) < 2:
click.echo("Need at least two snapshots to diff.")
return
snap_a = find_snapshot_by_date(snapshots, date_a)
snap_b = find_snapshot_by_date(snapshots, date_b)
if snap_a is None:
click.echo(f"No snapshot found near '{date_a}'.")
return
if snap_b is None:
click.echo(f"No snapshot found near '{date_b}'.")
return
if snap_a.snapshot_id == snap_b.snapshot_id:
click.echo("Both dates resolve to the same snapshot.")
return
diff = diff_snapshots(snap_a, snap_b)
click.echo(diff.summary())
# ── bind-discipline ─────────────────────────────────────────────────
@infospace_commands.command(name="bind-discipline")
@click.argument("discipline_path")
@click.option("--name", required=True, help="Name for the discipline.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def bind_discipline_cmd(discipline_path: str, name: str, config_path: Optional[str]):
"""Bind a discipline infospace to the current infospace."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.composition import bind_discipline
status = bind_discipline(cfg, name=name, path=discipline_path, root=root)
if status.error:
click.echo(f"Error: {status.error}", err=True)
raise SystemExit(1)
# Persist updated config
save_infospace_config(cfg, cfg_path)
click.echo(f"Bound discipline '{name}' from {discipline_path}")
click.echo(f" Entities: {status.entity_count}")
if status.has_config:
viable_str = "YES" if status.is_viable else "NO"
click.echo(f" Viable: {viable_str}")
# ── disciplines ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def disciplines(config_path: Optional[str]):
"""List bound disciplines and their viability status."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound.")
return
from markitect.infospace.composition import check_discipline_status
click.echo(f"{'Name':<30} {'Entities':>8} {'Viable':>8} {'Path'}")
click.echo("-" * 70)
for binding in cfg.disciplines:
status = check_discipline_status(binding, root)
viable_str = "YES" if status.is_viable else ("NO" if status.has_config else "?")
click.echo(
f"{status.name:<30} {status.entity_count:>8} {viable_str:>8} {status.path}"
)
if status.error:
click.echo(f" Error: {status.error}")
# ── process ─────────────────────────────────────────────────────
@infospace_commands.command()
@click.argument("glob_pattern", default=None, required=False)
@click.option("--all", "process_all", is_flag=True, help="Process all source files.")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--provider", default=None, help="LLM provider (openrouter, openai, etc.).")
@click.option("--model", default=None, help="LLM model name.")
@click.option(
"--check-after-each",
is_flag=True,
help="Run collection checks (C1C5) after each source file.",
)
@click.option("--no-commit", is_flag=True, help="Skip git commits.")
@click.option(
"--eval-after-source",
is_flag=True,
help="After each source's stages succeed, evaluate just the newly-"
"added entities so the per-source commit is self-contained.",
)
@click.option(
"--classify-after-source",
is_flag=True,
help="After each source's stages succeed, classify just the newly-"
"added entities so the per-source commit is self-contained.",
)
def process(
glob_pattern: Optional[str],
process_all: bool,
config_path: Optional[str],
provider: Optional[str],
model: Optional[str],
check_after_each: bool,
no_commit: bool,
eval_after_source: bool,
classify_after_source: bool,
):
"""Process source files through the pipeline defined in infospace.yaml.
GLOB_PATTERN is matched against the sources directory declared in
infospace.yaml (default ``*.md``). Use ``--all`` to process every
source file.
\b
Examples:
# Process chapters 1-3 from book 1
markitect infospace process "book-1-chapter-0[1-3].md" --provider openrouter
# Process all source files and check metrics after each
markitect infospace process --all --provider openrouter --check-after-each
# Dry run — load existing outputs only, no LLM calls
markitect infospace process --all
"""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.pipeline or not cfg.pipeline.stages:
click.echo(
"Error: No pipeline stages defined in infospace.yaml.\n"
"Add a 'pipeline.stages' section with at least one stage.",
err=True,
)
raise SystemExit(1)
# Resolve sources directory
sources_dir = root / cfg.topic.sources if cfg.topic.sources else root
if not sources_dir.is_dir():
click.echo(
f"Error: Sources directory not found: {sources_dir}\n"
f"Set 'topic.sources' in infospace.yaml.",
err=True,
)
raise SystemExit(1)
# Collect source files
if process_all:
source_files = sorted(sources_dir.glob("*.md"))
else:
pattern = glob_pattern or "*.md"
source_files = sorted(sources_dir.glob(pattern))
if not source_files:
if process_all:
click.echo(f"No source files found in {sources_dir}")
else:
click.echo(
f"No files matched: {glob_pattern or '*.md'}\n"
f"Sources directory: {sources_dir}"
)
return
click.echo(f"Found {len(source_files)} source file(s) in {sources_dir.name}/")
# Create LLM adapter
adapter = None
if provider:
from markitect.llm import create_adapter
_PROVIDER_DEFAULTS = {"openrouter": "arcee-ai/trinity-large-preview:free"}
resolved_model = model or _PROVIDER_DEFAULTS.get(provider)
adapter = create_adapter(provider, model=resolved_model)
click.echo(f"LLM: {provider} ({resolved_model or 'default'})")
else:
click.echo("No LLM provider — will use existing outputs only (manual mode).")
# Run pipeline
from markitect.infospace.pipeline import SourcePipeline
if (eval_after_source or classify_after_source) and adapter is None:
click.echo(
"Error: --eval-after-source / --classify-after-source require "
"--provider (they call the LLM).",
err=True,
)
raise SystemExit(1)
pipeline = SourcePipeline(
cfg, root,
adapter=adapter,
provider=provider or "",
model=(model or _PROVIDER_DEFAULTS.get(provider or "", "")) if provider else "",
no_commit=no_commit,
eval_after_source=eval_after_source,
classify_after_source=classify_after_source,
)
total = len(source_files)
completed = 0
for i, source_file in enumerate(source_files, 1):
click.echo(f"\n[{i}/{total}] {source_file.name}")
success = pipeline.process_source(source_file)
if success:
completed += 1
if check_after_each:
pipeline.run_collection_check()
click.echo(f"\nDone: {completed}/{total} source file(s) fully processed.")
# ── stale-mappings ──────────────────────────────────────────────────
@infospace_commands.command(name="stale-mappings")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def stale_mappings(config_path: Optional[str]):
"""Check for stale mappings due to discipline changes."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
if not cfg.disciplines:
click.echo("No disciplines bound — no mappings to check.")
return
from markitect.infospace.composition import find_stale_mappings
# Try to load mapping references from output
mapping_refs = _load_mapping_references(cfg, root)
stale = find_stale_mappings(cfg, root, mapping_references=mapping_refs)
if not stale:
click.echo("No stale mappings detected.")
return
click.echo(f"Found {len(stale)} stale mapping(s):\n")
for s in stale:
click.echo(f" {s.entity_slug} -> {s.discipline_entity}")
click.echo(f" {s.reason}")
# ── graph ──────────────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option(
"--format", "output_format",
type=click.Choice(["mermaid", "dot"]),
default="mermaid",
show_default=True,
help="Output format.",
)
@click.option(
"--color-by",
type=click.Choice(["type", "vsm"]),
default="type",
show_default=True,
help="Color nodes by entity type or VSM system.",
)
@click.option("--type", "filter_type", default=None,
help="Show only entities with this entity type (e.g. Relation, Process).")
@click.option("--vsm", "filter_vsm", default=None,
help="Show only entities with this VSM system (e.g. S1, S3).")
@click.option("--entity", "filter_entity", default=None,
help="Show neighborhood of a specific entity slug.")
@click.option("--loops", "loops_only", is_flag=True, default=False,
help="Show only the feedback loop subgraph.")
@click.option("--output", "-o", default=None,
help="Write to file instead of stdout.")
@click.option("--classified-only/--all-entities", "classified_only",
default=True, show_default=True,
help="Only include classified entities (default: true).")
def graph(
config_path: Optional[str],
output_format: str,
color_by: str,
filter_type: Optional[str],
filter_vsm: Optional[str],
filter_entity: Optional[str],
loops_only: bool,
output: Optional[str],
classified_only: bool,
):
"""Render the entity-relation graph as Mermaid or DOT."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.classification_io import read_classifications_directory
from markitect.infospace.relation_parser import parse_relations_directory
from markitect.infospace.graph_export import (
apply_filters,
build_entity_graph,
to_dot,
to_mermaid,
)
# Load classifications
cls_dir = root / cfg.classifications_dir
classifications = []
if cls_dir.is_dir():
classifications = read_classifications_directory(cls_dir)
classified_slugs = {c.entity_slug for c in classifications}
# Load relations
relations_dir = root / cfg.relations_dir
relations = []
if relations_dir.is_dir():
relations = parse_relations_directory(relations_dir)
if not classifications and not relations:
click.echo("No classifications or relations found. Run 'classify' and add relation files.")
return
# Detect feedback loops via networkx
feedback_cycles = []
if relations:
try:
import networkx as nx
G = nx.DiGraph()
for r in relations:
G.add_edge(r.subject_slug, r.object_slug)
feedback_cycles = list(nx.simple_cycles(G))
except ImportError:
pass
# Build graph
g = build_entity_graph(classifications, relations, feedback_cycles)
# Apply filters
filtered = apply_filters(
g,
filter_type=filter_type,
filter_vsm=filter_vsm,
filter_entity=filter_entity,
loops_only=loops_only,
classified_only=classified_only,
classified_slugs=classified_slugs,
)
if not filtered.nodes:
click.echo("No nodes match the given filters.")
return
# Export
if output_format == "dot":
result = to_dot(filtered, color_by=color_by)
else:
result = to_mermaid(filtered, color_by=color_by)
if output:
out_path = Path(output)
out_path.write_text(result, encoding="utf-8")
click.echo(
f"Wrote {output_format} graph ({len(filtered.nodes)} nodes, "
f"{sum(len(v) for v in filtered.edges.values())} edges) to {out_path}"
)
else:
click.echo(result, nl=False)
def _load_mapping_references(
cfg: InfospaceConfig, root: Path
) -> Optional[dict]:
"""Try to load mapping references from YAML file in output dir."""
mapping_file = root / cfg.metrics_dir / "mapping-references.yaml"
if not mapping_file.is_file():
return None
import yaml
data = yaml.safe_load(mapping_file.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
return None