from __future__ import annotations import argparse from collections import Counter import json import sys from pathlib import Path from .archive import ( annotate_retention, archive_infospace, list_archives, restore_archive, ) from .checks import run_collection_checks from .engine import engine_capability_contract, plan_asset_sync, sync_assets from .errors import InfospaceError from .evaluation_io import read_entity_evaluations from .generator import ( init_generation_infospace, plan_generation, run_generation, status_generation, ) from .history import ( build_viability_report, find_snapshot, get_history, metric_trend, read_metrics_file, record_check_results, ) from .inspection import export_mermaid, relationship_summary from .lifecycle import add_artifact, create_infospace, load_infospace from .markdown_adapter import validate_infospace_artifacts from .semantics import list_entities, list_relations from .workflow import ( FixtureAssistedGenerationAdapter, load_workflows, plan_workflow, run_workflow, ) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="infospace-bench") sub = parser.add_subparsers(dest="command", required=True) create = sub.add_parser("create", help="Create an infospace") create.add_argument("workspace") create.add_argument("slug") create.add_argument("--name", required=True) create.add_argument("--topic-domain", default="") inspect = sub.add_parser("inspect", help="Inspect an infospace") inspect.add_argument("root") status = sub.add_parser("status", help="Summarize replacement readiness state") status.add_argument("root") add = sub.add_parser("add-artifact", help="Add an artifact to an infospace") add.add_argument("root") add.add_argument("source") add.add_argument("--kind", required=True) add.add_argument("--title", default="") export = sub.add_parser("export", help="Print the infospace representation") export.add_argument("root") validate = sub.add_parser("validate", help="Validate infospace artifacts") validate.add_argument("root") entities = sub.add_parser("entities", help="List parsed entity artifacts") entities.add_argument("root") relations = sub.add_parser("relations", help="List parsed relation artifacts") relations.add_argument("root") history = sub.add_parser("history", help="List evaluation snapshot history") history.add_argument("root") history.add_argument("--metric", default="") history_diff = sub.add_parser( "history-diff", help="Diff two evaluation snapshots by snapshot ID or date", ) history_diff.add_argument("root") history_diff.add_argument("before") history_diff.add_argument("after") metrics = sub.add_parser( "metrics", help="Run collection checks and persist metrics/history", ) metrics.add_argument("root") check = sub.add_parser( "check", help="Run collection checks and persist metrics/history", ) check.add_argument("root") viability = sub.add_parser("viability", help="Evaluate configured thresholds") viability.add_argument("root") graph = sub.add_parser("graph", help="Export the artifact relationship graph") graph.add_argument("root") graph.add_argument("--format", choices=["json", "mermaid"], default="json") workflow = sub.add_parser("workflow", help="Inspect, plan, and run workflows") workflow_sub = workflow.add_subparsers(dest="workflow_command", required=True) workflow_inspect = workflow_sub.add_parser( "inspect", help="Inspect workflow declarations", ) workflow_inspect.add_argument("root") workflow_plan = workflow_sub.add_parser( "plan", help="Plan a workflow without writing outputs", ) workflow_plan.add_argument("root") workflow_plan.add_argument("workflow_id") workflow_run = workflow_sub.add_parser( "run", help="Run a deterministic workflow", ) workflow_run.add_argument("root") workflow_run.add_argument("workflow_id") workflow_run.add_argument( "--fixture-responses", default="", help="Run assisted stages with deterministic fixture responses", ) generate = sub.add_parser("generate", help="Generate infospaces from sources") generate_sub = generate.add_subparsers(dest="generate_command", required=True) generate_init = generate_sub.add_parser( "init", help="Create a generation infospace from a local source", ) generate_init.add_argument("source") generate_init.add_argument("--workspace", default=".") generate_init.add_argument("--slug", required=True) generate_init.add_argument("--name", required=True) generate_init.add_argument("--profile", default="general-knowledge") generate_init.add_argument("--max-chunks", type=int, default=0) generate_plan = generate_sub.add_parser( "plan", help="Plan generator work without provider calls", ) generate_plan.add_argument("root") generate_plan.add_argument("--stage", default="all") generate_run = generate_sub.add_parser( "run", help="Run generator workflows for an infospace", ) generate_run.add_argument("root") generate_run.add_argument("--stage", default="all") generate_run.add_argument("--provider", choices=["fixture", "openrouter"], default="fixture") generate_run.add_argument("--model", default="") generate_run.add_argument("--fixture-responses", default="") generate_run.add_argument("--resume", action="store_true") generate_run.add_argument("--force", action="store_true") generate_resume = generate_sub.add_parser( "resume", help="Resume generator workflows for an infospace", ) generate_resume.add_argument("root") generate_resume.add_argument("--stage", default="all") generate_resume.add_argument("--provider", choices=["fixture", "openrouter"], default="fixture") generate_resume.add_argument("--model", default="") generate_resume.add_argument("--fixture-responses", default="") generate_resume.add_argument("--force", action="store_true") generate_status = generate_sub.add_parser( "status", help="Inspect generator status for an infospace", ) generate_status.add_argument("root") generate_from_source = generate_sub.add_parser( "from-source", help="Initialize and optionally run generation from a local source", ) generate_from_source.add_argument("source") generate_from_source.add_argument("--workspace", default=".") generate_from_source.add_argument("--slug", required=True) generate_from_source.add_argument("--name", required=True) generate_from_source.add_argument("--profile", default="general-knowledge") generate_from_source.add_argument("--stage", default="all") generate_from_source.add_argument("--provider", choices=["fixture", "openrouter"], default="fixture") generate_from_source.add_argument("--model", default="") generate_from_source.add_argument("--fixture-responses", default="") generate_from_source.add_argument("--max-chunks", type=int, default=0) generate_from_source.add_argument("--apply", action="store_true") archive = sub.add_parser( "archive", help="Archive an infospace into artifact-store (durable, content-addressed)", ) archive.add_argument("root") archive.add_argument( "--retention-class", default="release-evidence", help="artifact-store retention class id (default: release-evidence)", ) archive.add_argument( "--include", action="append", default=[], help="Relative path to include (repeatable). Default: infospace.yaml, artifacts/, workflows/, output/, reports/, exports/", ) archive.add_argument( "--exclude", action="append", default=[], help="Relative path or glob to exclude (repeatable)", ) archive.add_argument("--note", default="", help="Free-text note for the archive record") archive.add_argument( "--store-root", default="", help="Override the artifact-store location (default: /output/archives/.store)", ) archive_list = sub.add_parser( "archive-list", help="List recorded archives for an infospace", ) archive_list.add_argument("root") archive_list.add_argument( "--with-retention", action="store_true", help="Annotate each archive with its current retention state", ) archive_list.add_argument( "--store-root", default="", help="Override the artifact-store location for retention lookups", ) restore = sub.add_parser( "restore", help="Restore an archived infospace package into a target directory", ) restore.add_argument("package_id") restore.add_argument("--target", required=True, help="Directory to restore into") restore.add_argument( "--from", dest="from_root", default="", help="Source infospace whose archive store holds the package", ) restore.add_argument( "--store-root", default="", help="Direct path to the artifact-store location", ) restore.add_argument( "--force", action="store_true", help="Overwrite into a non-empty target directory", ) engine = sub.add_parser("engine", help="Inspect and sync engine boundary state") engine_sub = engine.add_subparsers(dest="engine_command", required=True) engine_inspect = engine_sub.add_parser( "inspect", help="Inspect the optional engine capability contract", ) engine_inspect.add_argument("root") engine_plan = engine_sub.add_parser( "plan-sync", help="Plan artifact-to-asset sync without mutation", ) engine_plan.add_argument("root") engine_sync = engine_sub.add_parser( "sync", help="Dry-run artifact-to-asset sync unless --apply is passed", ) engine_sync.add_argument("root") engine_sync.add_argument("--apply", action="store_true") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: if args.command == "create": infospace = create_infospace( Path(args.workspace), args.slug, name=args.name, topic_domain=args.topic_domain, ) _write_json({"slug": infospace.config.slug, "root": str(infospace.root)}) elif args.command == "inspect": _write_json(load_infospace(Path(args.root)).to_dict()) elif args.command == "status": _write_json(_status_payload(Path(args.root))) elif args.command == "add-artifact": artifact = add_artifact( Path(args.root), Path(args.source), kind=args.kind, title=args.title, ) _write_json({"artifact": artifact.to_dict()}) elif args.command == "export": _write_json(load_infospace(Path(args.root)).to_dict()) elif args.command == "validate": results = validate_infospace_artifacts(Path(args.root)) valid = all(result.valid for result in results) _write_json( { "valid": valid, "results": [result.to_dict() for result in results], } ) return 0 if valid else 1 elif args.command == "entities": _write_json( { "entities": [ entity.to_dict() for entity in list_entities(Path(args.root)) ] } ) elif args.command == "relations": _write_json( { "relations": [ relation.to_dict() for relation in list_relations(Path(args.root)) ] } ) elif args.command == "history": history = get_history(Path(args.root)) if args.metric: _write_json( { "metric": args.metric, "trend": metric_trend(history, args.metric), } ) else: _write_json({"history": [item.to_dict() for item in history]}) elif args.command == "history-diff": history = get_history(Path(args.root)) before = find_snapshot(history, args.before) after = find_snapshot(history, args.after) if before is None or after is None: missing = [] if before is None: missing.append(args.before) if after is None: missing.append(args.after) raise InfospaceError( "missing_snapshot", "Could not resolve requested snapshot reference", {"missing_refs": missing}, ) _write_json({"diff": before.diff(after).to_dict()}) elif args.command == "metrics": _write_json(_record_checks(Path(args.root)).to_dict()) elif args.command == "check": infospace = load_infospace(Path(args.root)) report = run_collection_checks(infospace.artifacts) result = record_check_results( infospace.root, report, artifact_evaluations=_read_output_evaluations(infospace.root), ) _write_json( { **result.to_dict(), "details": report.details, } ) elif args.command == "viability": _write_json(build_viability_report(Path(args.root))) elif args.command == "graph": infospace = load_infospace(Path(args.root)) summary = relationship_summary(infospace.artifacts) if args.format == "mermaid": print(export_mermaid(summary), end="") else: _write_json(_relationship_summary_payload(summary)) elif args.command == "workflow": if args.workflow_command == "inspect": _write_json( { "workflows": [ workflow.to_dict() for workflow in load_workflows(Path(args.root)) ] } ) elif args.workflow_command == "plan": _write_json( plan_workflow(Path(args.root), args.workflow_id).to_dict() ) elif args.workflow_command == "run": adapter = ( FixtureAssistedGenerationAdapter.from_file( Path(args.fixture_responses) ) if args.fixture_responses else None ) _write_json( run_workflow( Path(args.root), args.workflow_id, assisted_adapter=adapter, ).to_dict() ) else: parser.error(f"Unhandled workflow command: {args.workflow_command}") elif args.command == "generate": if args.generate_command == "init": infospace = init_generation_infospace( Path(args.workspace), Path(args.source), args.slug, name=args.name, profile=args.profile, max_chunks=_optional_positive(args.max_chunks), ) _write_json( { "slug": infospace.config.slug, "root": str(infospace.root), "status": "initialized", } ) elif args.generate_command == "plan": _write_json(plan_generation(Path(args.root), stage=args.stage)) elif args.generate_command == "run": _write_json( run_generation( Path(args.root), stage=args.stage, provider=args.provider, model=args.model, fixture_responses=args.fixture_responses or None, resume=args.resume, force=args.force, ).to_dict() ) elif args.generate_command == "resume": _write_json( run_generation( Path(args.root), stage=args.stage, provider=args.provider, model=args.model, fixture_responses=args.fixture_responses or None, resume=True, force=args.force, ).to_dict() ) elif args.generate_command == "status": _write_json(status_generation(Path(args.root))) elif args.generate_command == "from-source": infospace = init_generation_infospace( Path(args.workspace), Path(args.source), args.slug, name=args.name, profile=args.profile, max_chunks=_optional_positive(args.max_chunks), ) if args.apply: result = run_generation( infospace.root, stage=args.stage, provider=args.provider, model=args.model, fixture_responses=args.fixture_responses or None, ) _write_json(result.to_dict()) else: _write_json(plan_generation(infospace.root, stage=args.stage)) else: parser.error(f"Unhandled generate command: {args.generate_command}") elif args.command == "archive": record = archive_infospace( Path(args.root), retention_class=args.retention_class, include=args.include or None, exclude=args.exclude or None, note=args.note, store_root=args.store_root or None, ) _write_json(record.to_dict()) elif args.command == "archive-list": archives = list_archives(Path(args.root)) if args.with_retention: payload = annotate_retention( archives, store_root=args.store_root or None, source_infospace=Path(args.root) if not args.store_root else None, ) _write_json({"archives": payload}) else: _write_json({"archives": [rec.to_dict() for rec in archives]}) elif args.command == "restore": result = restore_archive( args.package_id, target=Path(args.target), store_root=args.store_root or None, source_infospace=Path(args.from_root) if args.from_root else None, force=args.force, ) _write_json(result.to_dict()) elif args.command == "engine": if args.engine_command == "inspect": _write_json( { "root": str(Path(args.root)), "contract": engine_capability_contract().to_dict(), } ) elif args.engine_command == "plan-sync": _write_json(plan_asset_sync(Path(args.root)).to_dict()) elif args.engine_command == "sync": _write_json( sync_assets(Path(args.root), dry_run=not args.apply).to_dict() ) else: parser.error(f"Unhandled engine command: {args.engine_command}") else: parser.error(f"Unhandled command: {args.command}") except InfospaceError as exc: print(json.dumps(exc.to_dict(), indent=2), file=sys.stderr) return 2 return 0 def _status_payload(root: Path) -> dict: infospace = load_infospace(root) validation = validate_infospace_artifacts(infospace.root) entities = list_entities(infospace.root) relations = list_relations(infospace.root) history = get_history(infospace.root) metrics = read_metrics_file( infospace.root / "output" / "metrics" / "metrics.yaml" ) viability = ( build_viability_report(infospace.root) if infospace.config.viability else None ) graph = relationship_summary(infospace.artifacts) return { "slug": infospace.config.slug, "name": infospace.config.name, "topic": infospace.config.topic.to_dict(), "artifact_count": len(infospace.artifacts), "artifact_kinds": dict( sorted(Counter(item.kind for item in infospace.artifacts).items()) ), "entity_count": len(entities), "relation_count": len(relations), "validation": { "checked_artifact_count": len(validation), "valid": all(result.valid for result in validation), }, "history": { "snapshot_count": len(history), "latest_snapshot_id": history[-1].snapshot_id if history else "", }, "metrics": metrics, "viability": viability, "graph": { "node_count": graph.node_count, "edge_count": graph.edge_count, "relationship_types": graph.relationship_types, }, } def _record_checks(root: Path): infospace = load_infospace(root) return record_check_results( infospace.root, run_collection_checks(infospace.artifacts), artifact_evaluations=_read_output_evaluations(infospace.root), ) def _read_output_evaluations(root: Path): return read_entity_evaluations(root / "output" / "evaluations") def _relationship_summary_payload(summary) -> dict: return { "node_count": summary.node_count, "edge_count": summary.edge_count, "nodes": summary.nodes, "edges": [ {"source": edge.source, "target": edge.target, "type": edge.type} for edge in summary.edges ], "relationship_types": summary.relationship_types, } def _write_json(payload: dict) -> None: print(json.dumps(payload, indent=2)) def _optional_positive(value: int) -> int | None: return value if value > 0 else None