from __future__ import annotations import argparse import json from dataclasses import asdict from pathlib import Path from typing import Sequence from repo_scoping.acceptance import ( criteria_registry_json, criteria_registry_markdown, load_quality_criteria, ) from repo_scoping.core.models import CharacteristicRebuildResult, Repository from repo_scoping.core.service import RegistryService from repo_scoping.llm_extraction import LLMCandidateExtractor, create_llm_connect_adapter from repo_scoping.repo_ingestion.git import GitIngestionService from repo_scoping.self_scoping.assessment import artifact_json, export_assessment_artifact from repo_scoping.self_scoping.comparison import ( compare_assessment_to_golden, comparison_json, comparison_markdown, load_json, ) from repo_scoping.storage.sqlite import NotFoundError, RegistryStore from repo_scoping.web_api.app import Settings def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="repo-scoping", description="Repository Scoping maintenance commands.", ) subparsers = parser.add_subparsers(dest="command", required=True) rebuild = subparsers.add_parser( "rebuild-characteristics", help="Rebuild candidate characteristics for one or more repositories.", ) target = rebuild.add_mutually_exclusive_group(required=True) target.add_argument("--repo", help="Repository id or exact repository name.") target.add_argument("--all", action="store_true", help="Rebuild every repository.") rebuild.add_argument("--dry-run", action="store_true", help="Preview without clearing approved characteristics.") rebuild.add_argument("--no-llm", action="store_true", help="Disable configured LLM assistance.") rebuild.add_argument( "--agentic-review", action="store_true", help="Request configured agentic review after a confirmed rebuild.", ) rebuild.add_argument( "--confirm", action="store_true", help="Confirm a destructive rebuild for selected repositories.", ) rebuild.add_argument( "--confirm-all", action="store_true", help="Confirm a destructive all-repository rebuild.", ) rebuild.add_argument("--database-path", help="Override REPO_SCOPING_DATABASE_PATH.") rebuild.add_argument("--checkout-root", help="Override REPO_SCOPING_CHECKOUT_ROOT.") export = subparsers.add_parser( "export-assessment", help="Export a completed analysis run as a self-scoping assessment artifact.", ) export.add_argument("--repo", required=True, help="Repository id or exact repository name.") export.add_argument("--analysis-run", type=int, required=True, help="Completed analysis run id.") export.add_argument("--output", help="Write artifact JSON to this path instead of stdout.") export.add_argument( "--role", choices=["baseline", "challenger", "negative_regression_seed"], default="challenger", help="Assessment artifact role.", ) export.add_argument( "--outcome", choices=[ "baseline", "challenger", "preferred", "tied", "rejected", "superseded", "needs-human", ], default="challenger", help="Initial assessment outcome.", ) export.add_argument("--reviewer", default="codex", help="Reviewer name recorded in the artifact.") export.add_argument("--summary", help="Assessment summary override.") export.add_argument("--database-path", help="Override REPO_SCOPING_DATABASE_PATH.") export.add_argument("--checkout-root", help="Override REPO_SCOPING_CHECKOUT_ROOT.") compare = subparsers.add_parser( "compare-assessment", help="Compare a self-scoping assessment artifact against a golden profile.", ) compare.add_argument("--golden", required=True, help="Golden profile JSON path.") compare.add_argument( "--assessment", required=True, help="Assessment artifact JSON path.", ) compare.add_argument("--output", help="Write comparison report to this path instead of stdout.") compare.add_argument( "--format", choices=["json", "markdown"], default="markdown", help="Comparison report format.", ) self_assess = subparsers.add_parser( "self-assess", help="Run repo-scoping against a source tree and compare the result to a golden profile.", ) self_assess.add_argument( "--repo", default="repo-scoping", help="Repository id or exact repository name to reuse; created by name when absent.", ) self_assess.add_argument( "--source-path", default=".", help="Source tree to analyze; defaults to the current working directory.", ) self_assess.add_argument( "--golden", default="docs/self-scoping/golden/repo-scoping-golden-profile.v1.json", help="Golden profile JSON path.", ) self_assess.add_argument( "--assessment-output", help="Write challenger assessment artifact JSON to this path.", ) self_assess.add_argument( "--comparison-output", help="Write comparison report to this path instead of stdout.", ) self_assess.add_argument( "--format", choices=["json", "markdown"], default="markdown", help="Comparison report format.", ) self_assess.add_argument( "--with-llm", action="store_false", dest="no_llm", help="Use configured LLM assistance during the self-assessment run.", ) self_assess.add_argument( "--agentic-review", action="store_true", help="Request configured agentic review; leaves candidates pending when none is configured.", ) self_assess.add_argument( "--fail-on-regression", action="store_true", help="Return exit code 1 only when comparison status is regression.", ) self_assess.add_argument("--database-path", help="Override REPO_SCOPING_DATABASE_PATH.") self_assess.add_argument("--checkout-root", help="Override REPO_SCOPING_CHECKOUT_ROOT.") self_assess.set_defaults(no_llm=True) criteria = subparsers.add_parser( "list-quality-criteria", help="List the active characteristic quality criteria registry.", ) criteria.add_argument( "--criteria-path", help="Override the default quality criteria registry JSON path.", ) criteria.add_argument("--output", help="Write criteria output to this path instead of stdout.") criteria.add_argument( "--format", choices=["json", "markdown"], default="markdown", help="Criteria output format.", ) legacy = subparsers.add_parser( "list-legacy-auto-approvals", help="List historical trusted deterministic auto-approval records.", ) legacy.add_argument("--database-path", help="Override REPO_SCOPING_DATABASE_PATH.") legacy.add_argument("--checkout-root", help="Override REPO_SCOPING_CHECKOUT_ROOT.") legacy.add_argument("--output", help="Write inventory output to this path instead of stdout.") legacy.add_argument( "--format", choices=["json", "markdown"], default="markdown", help="Inventory output format.", ) dataset = subparsers.add_parser( "assess-dataset", help="Summarize repository generation coverage across the local dataset.", ) dataset.add_argument("--database-path", help="Override REPO_SCOPING_DATABASE_PATH.") dataset.add_argument("--checkout-root", help="Override REPO_SCOPING_CHECKOUT_ROOT.") dataset.add_argument("--output", help="Write dataset assessment to this path instead of stdout.") dataset.add_argument( "--format", choices=["json", "markdown"], default="markdown", help="Dataset assessment output format.", ) return parser def main(argv: Sequence[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) if args.command == "rebuild-characteristics": return rebuild_characteristics_command(args, parser) if args.command == "export-assessment": return export_assessment_command(args, parser) if args.command == "compare-assessment": return compare_assessment_command(args) if args.command == "self-assess": return self_assess_command(args, parser) if args.command == "list-quality-criteria": return list_quality_criteria_command(args) if args.command == "list-legacy-auto-approvals": return list_legacy_auto_approvals_command(args) if args.command == "assess-dataset": return assess_dataset_command(args) parser.error(f"unknown command: {args.command}") return 2 def rebuild_characteristics_command( args: argparse.Namespace, parser: argparse.ArgumentParser, ) -> int: dry_run = bool(args.dry_run) if not dry_run and args.all and not args.confirm_all: parser.error("--all destructive rebuilds require --confirm-all") if not dry_run and not (args.confirm or args.confirm_all): parser.error("destructive rebuilds require --confirm or --confirm-all") service = service_from_args(args) repositories = selected_repositories(service, args) if not repositories: parser.error("no repositories matched the requested target") for repository in repositories: result = service.rebuild_characteristics_from_scratch( repository.id, dry_run=dry_run, confirm=not dry_run, use_llm_assistance=not args.no_llm, ) if args.agentic_review and not dry_run and result.analysis_run.status == "completed": service.request_agentic_review( repository.id, result.analysis_run.id, notes="CLI agentic review request after rebuild.", ) print(rebuild_summary_line(service, result, args)) return 0 def compare_assessment_command(args: argparse.Namespace) -> int: comparison = compare_assessment_to_golden( load_json(args.golden), load_json(args.assessment), ) content = ( comparison_json(comparison) if args.format == "json" else comparison_markdown(comparison) ) if args.output: write_text(args.output, content) else: print(content, end="" if content.endswith("\n") else "\n") return 0 def list_quality_criteria_command(args: argparse.Namespace) -> int: registry = load_quality_criteria(args.criteria_path) content = ( criteria_registry_json(registry) if args.format == "json" else criteria_registry_markdown(registry) ) if args.output: write_text(args.output, content) else: print(content, end="" if content.endswith("\n") else "\n") return 0 def list_legacy_auto_approvals_command(args: argparse.Namespace) -> int: service = service_from_args(args) records = service.list_trusted_auto_approval_migration_records() if args.format == "json": content = json.dumps([asdict(record) for record in records], indent=2) + "\n" else: content = legacy_auto_approval_records_markdown(records) if args.output: write_text(args.output, content) else: print(content, end="" if content.endswith("\n") else "\n") return 0 def assess_dataset_command(args: argparse.Namespace) -> int: service = service_from_args(args) report = dataset_assessment(service) content = ( json.dumps(report, indent=2) + "\n" if args.format == "json" else dataset_assessment_markdown(report) ) if args.output: write_text(args.output, content) else: print(content, end="" if content.endswith("\n") else "\n") return 0 def dataset_assessment(service: RegistryService) -> dict[str, object]: repositories = [] totals = { "repositories": 0, "facts": 0, "content_chunks": 0, "candidate_abilities": 0, "candidate_capabilities": 0, "candidate_features": 0, "candidate_evidence": 0, "approved_abilities": 0, "approved_capabilities": 0, "approved_features": 0, "approved_evidence": 0, "dependency_graph_nodes": 0, "dependency_graph_edges": 0, } for repository in service.list_repositories(): runs = service.list_analysis_runs(repository.id) latest_run = next((run for run in runs if run.status == "completed"), None) facts = service.list_observed_facts(repository.id, latest_run.id) if latest_run else [] chunks = service.list_content_chunks(repository.id, latest_run.id) if latest_run else [] candidate_counts = { "abilities": 0, "capabilities": 0, "features": 0, "evidence": 0, } candidate_names: list[str] = [] if latest_run is not None: try: graph = service.candidate_graph(repository.id, latest_run.id) except NotFoundError: graph = None if graph is not None: candidate_counts = candidate_graph_counts(graph) candidate_names = [ ability.name for ability in graph.abilities ][:5] ability_map = service.ability_map(repository.id) approved_counts = approved_graph_counts(ability_map) graph_metrics = {"node_count": 0, "edge_count": 0} try: dependency_graph = service.dependency_graph_elements(repository.id) graph_metrics = { "node_count": int(dependency_graph["metrics"]["node_count"]), "edge_count": int(dependency_graph["metrics"]["edge_count"]), } except (NotFoundError, ValueError): pass snapshot = ( service.store.get_snapshot(latest_run.snapshot_id) if latest_run is not None and latest_run.snapshot_id is not None else None ) doc_presence = document_presence(snapshot.source_path if snapshot else "") issues = dataset_assessment_issues( fact_count=len(facts), chunk_count=len(chunks), candidate_counts=candidate_counts, approved_counts=approved_counts, graph_metrics=graph_metrics, doc_presence=doc_presence, candidate_names=candidate_names, ) repositories.append( { "repository_id": repository.id, "name": repository.name, "status": repository.status, "latest_analysis_run_id": latest_run.id if latest_run else None, "latest_analysis_run_status": latest_run.status if latest_run else None, "facts": len(facts), "content_chunks": len(chunks), "candidate_counts": candidate_counts, "approved_counts": approved_counts, "dependency_graph": graph_metrics, "documents": doc_presence, "candidate_ability_names": candidate_names, "issues": issues, } ) totals["repositories"] += 1 totals["facts"] += len(facts) totals["content_chunks"] += len(chunks) totals["candidate_abilities"] += candidate_counts["abilities"] totals["candidate_capabilities"] += candidate_counts["capabilities"] totals["candidate_features"] += candidate_counts["features"] totals["candidate_evidence"] += candidate_counts["evidence"] totals["approved_abilities"] += approved_counts["abilities"] totals["approved_capabilities"] += approved_counts["capabilities"] totals["approved_features"] += approved_counts["features"] totals["approved_evidence"] += approved_counts["evidence"] totals["dependency_graph_nodes"] += graph_metrics["node_count"] totals["dependency_graph_edges"] += graph_metrics["edge_count"] return { "schema_version": "repo-scoping-dataset-assessment/v1", "summary": totals, "repositories": repositories, } def candidate_graph_counts(graph) -> dict[str, int]: capabilities = [ capability for ability in graph.abilities for capability in ability.capabilities ] return { "abilities": len(graph.abilities), "capabilities": len(capabilities), "features": sum(len(capability.features) for capability in capabilities), "evidence": sum(len(capability.evidence) for capability in capabilities), } def approved_graph_counts(ability_map) -> dict[str, int]: capabilities = [ capability for ability in ability_map.abilities for capability in ability.capabilities ] return { "scope": 1 if ability_map.scope else 0, "abilities": len(ability_map.abilities), "capabilities": len(capabilities), "features": sum(len(capability.features) for capability in capabilities), "evidence": sum(len(capability.evidence) for capability in capabilities), } def document_presence(source_path: str) -> dict[str, bool]: if not source_path: return { "INTENT.md": False, "SCOPE.md": False, "README": False, "CLAUDE.md": False, "AGENTS.md": False, } root = Path(source_path) return { "INTENT.md": (root / "INTENT.md").is_file(), "SCOPE.md": (root / "SCOPE.md").is_file(), "README": any(root.glob("README*")), "CLAUDE.md": (root / "CLAUDE.md").is_file(), "AGENTS.md": (root / "AGENTS.md").is_file(), } def dataset_assessment_issues( *, fact_count: int, chunk_count: int, candidate_counts: dict[str, int], approved_counts: dict[str, int], graph_metrics: dict[str, int], doc_presence: dict[str, bool], candidate_names: list[str], ) -> list[str]: issues: list[str] = [] if fact_count and not candidate_counts["capabilities"]: issues.append("facts-without-candidate-capabilities") if chunk_count and doc_presence.get("SCOPE.md") and not candidate_counts["capabilities"]: issues.append("scope-text-unused-for-lower-hierarchy") if fact_count and not graph_metrics["node_count"]: issues.append("facts-with-empty-dependency-graph") if approved_counts["abilities"] == 0 and graph_metrics["node_count"] == 0: issues.append("approved-hierarchy-missing-and-no-graph-fallback") if any("repo-seed" in name.lower() for name in candidate_names): issues.append("template-readme-contamination") return issues def dataset_assessment_markdown(report: dict[str, object]) -> str: lines = ["# Repo-Scoping Dataset Assessment", ""] summary = report["summary"] lines.extend( [ f"- Repositories: {summary['repositories']}", f"- Facts: {summary['facts']}", f"- Candidate hierarchy: {summary['candidate_abilities']} abilities / " f"{summary['candidate_capabilities']} capabilities / " f"{summary['candidate_features']} features / " f"{summary['candidate_evidence']} evidence", f"- Approved hierarchy: {summary['approved_abilities']} abilities / " f"{summary['approved_capabilities']} capabilities / " f"{summary['approved_features']} features / " f"{summary['approved_evidence']} evidence", f"- Dependency graph: {summary['dependency_graph_nodes']} nodes / " f"{summary['dependency_graph_edges']} edges", "", "| Repo | Run | Facts | Chunks | Candidate | Approved | Graph | Issues |", "| --- | ---: | ---: | ---: | --- | --- | --- | --- |", ] ) for item in report["repositories"]: candidate = item["candidate_counts"] approved = item["approved_counts"] graph = item["dependency_graph"] lines.append( f"| {item['name']} | {item['latest_analysis_run_id'] or '-'} | " f"{item['facts']} | {item['content_chunks']} | " f"{candidate['abilities']}/{candidate['capabilities']}/" f"{candidate['features']}/{candidate['evidence']} | " f"{approved['abilities']}/{approved['capabilities']}/" f"{approved['features']}/{approved['evidence']} | " f"{graph['node_count']}/{graph['edge_count']} | " f"{', '.join(item['issues']) or '-'} |" ) return "\n".join(lines) + "\n" def legacy_auto_approval_records_markdown(records) -> str: if not records: return "No legacy trusted auto-approval records found.\n" lines = ["# Legacy Trusted Auto-Approval Records", ""] for record in records: lines.extend( [ ( f"- repo={record.repository_id}:{record.repository_name} " f"run={record.analysis_run_id} decision={record.review_decision_id}" ), f" status={record.analysis_run_status} scanner={record.scanner_version or 'unknown'}", f" approved_abilities={record.current_approved_ability_count}", f" next={record.recommended_next_step}", ] ) return "\n".join(lines) + "\n" def self_assess_command( args: argparse.Namespace, parser: argparse.ArgumentParser, ) -> int: service = service_from_args(args) source_path = Path(args.source_path).expanduser().resolve() if not source_path.is_dir(): parser.error(f"source path does not exist or is not a directory: {source_path}") repository = self_assessment_repository(service, args.repo, source_path) summary = service.analyze_repository( repository.id, source_path=str(source_path), use_llm_assistance=not args.no_llm, agentic_review=args.agentic_review, trusted_auto_approve=False, ) if summary.analysis_run.status != "completed": parser.error(summary.analysis_run.error_message or "analysis failed") artifact = export_assessment_artifact( service, repository.id, summary.analysis_run.id, role="challenger", outcome="challenger", reviewer="self-assess", ) comparison = compare_assessment_to_golden(load_json(args.golden), artifact) if args.assessment_output: write_text(args.assessment_output, artifact_json(artifact)) report = ( comparison_json(comparison) if args.format == "json" else comparison_markdown(comparison) ) if args.comparison_output: write_text(args.comparison_output, report) else: print(report, end="" if report.endswith("\n") else "\n") if args.fail_on_regression and comparison["status"] == "regression": return 1 return 0 def export_assessment_command( args: argparse.Namespace, parser: argparse.ArgumentParser, ) -> int: service = service_from_args(args) repositories = selected_repositories(service, args) if not repositories: parser.error("no repositories matched the requested target") if len(repositories) > 1: parser.error("assessment export requires exactly one repository") repository = repositories[0] try: artifact = export_assessment_artifact( service, repository.id, args.analysis_run, role=args.role, outcome=args.outcome, reviewer=args.reviewer, summary=args.summary, ) except (NotFoundError, ValueError) as exc: parser.error(str(exc)) content = artifact_json(artifact) if args.output: write_text(args.output, content) else: print(content, end="") return 0 def service_from_args(args: argparse.Namespace) -> RegistryService: settings = Settings() database_path = Path(args.database_path or settings.database_path) checkout_root = args.checkout_root or settings.checkout_root database_path.parent.mkdir(parents=True, exist_ok=True) store = RegistryStore(database_path) store.initialize() llm_extractor = None no_llm = getattr(args, "no_llm", True) if not no_llm and settings.llm_enabled and settings.llm_provider: adapter = create_llm_connect_adapter(settings.llm_provider, model=settings.llm_model) llm_extractor = LLMCandidateExtractor(adapter) return RegistryService( store, ingestion=GitIngestionService(checkout_root), llm_extractor=llm_extractor, ) def selected_repositories( service: RegistryService, args: argparse.Namespace, ) -> list[Repository]: repositories = service.list_repositories() if getattr(args, "all", False): return repositories repo = str(args.repo) if repo.isdigit(): try: return [service.get_repository(int(repo))] except NotFoundError: return [] return [repository for repository in repositories if repository.name == repo] def self_assessment_repository( service: RegistryService, repo: str, source_path: Path, ) -> Repository: selected = selected_repositories(service, argparse.Namespace(repo=repo, all=False)) if selected: return selected[0] if repo.isdigit(): raise NotFoundError(f"repository {repo} was not found") return service.register_repository( name=repo, url=str(source_path), description="Self-scoping assessment target.", ) def write_text(path: str | Path, content: str) -> None: target = Path(path) target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content, encoding="utf-8") def rebuild_summary_line( service: RegistryService, result: CharacteristicRebuildResult, args: argparse.Namespace, ) -> str: graph = ( service.candidate_graph(result.repository.id, result.analysis_run.id) if result.analysis_run.status == "completed" else None ) remaining_review = 0 if graph is not None: remaining_review = sum( 1 for ability in graph.abilities for capability in ability.capabilities if capability.status == "candidate" ) candidate_source = "deterministic" if args.no_llm else "configured" return ( f"repo={result.repository.id}:{result.repository.name} " f"latest_analysis_run={result.analysis_run.id} " f"candidate_source={candidate_source} " f"dry_run={result.dry_run} " f"cleared_approved={result.cleared_approved} " f"approved_superseded={result.previous_counts} " f"candidates={result.candidate_counts} " f"remaining_review_queue={remaining_review}" ) if __name__ == "__main__": raise SystemExit(main())