generated from coulomb/repo-seed
225 lines
8.3 KiB
Python
225 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import Sequence
|
|
|
|
from repo_registry.core.models import CharacteristicRebuildResult, Repository
|
|
from repo_registry.core.service import RegistryService
|
|
from repo_registry.llm_extraction import LLMCandidateExtractor, create_llm_connect_adapter
|
|
from repo_registry.repo_ingestion.git import GitIngestionService
|
|
from repo_registry.self_scoping.assessment import artifact_json, export_assessment_artifact
|
|
from repo_registry.storage.sqlite import NotFoundError, RegistryStore
|
|
from repo_registry.web_api.app import Settings
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="repo-scoping",
|
|
description="Repository Scoping maintenance commands.",
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
rebuild = subparsers.add_parser(
|
|
"rebuild-characteristics",
|
|
help="Rebuild candidate characteristics for one or more repositories.",
|
|
)
|
|
target = rebuild.add_mutually_exclusive_group(required=True)
|
|
target.add_argument("--repo", help="Repository id or exact repository name.")
|
|
target.add_argument("--all", action="store_true", help="Rebuild every repository.")
|
|
rebuild.add_argument("--dry-run", action="store_true", help="Preview without clearing approved characteristics.")
|
|
rebuild.add_argument("--no-llm", action="store_true", help="Disable configured LLM assistance.")
|
|
rebuild.add_argument(
|
|
"--trusted-auto-approve",
|
|
action="store_true",
|
|
help="Run trusted auto-approval after a confirmed rebuild.",
|
|
)
|
|
rebuild.add_argument(
|
|
"--confirm",
|
|
action="store_true",
|
|
help="Confirm a destructive rebuild for selected repositories.",
|
|
)
|
|
rebuild.add_argument(
|
|
"--confirm-all",
|
|
action="store_true",
|
|
help="Confirm a destructive all-repository rebuild.",
|
|
)
|
|
rebuild.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
|
|
rebuild.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
|
|
export = subparsers.add_parser(
|
|
"export-assessment",
|
|
help="Export a completed analysis run as a self-scoping assessment artifact.",
|
|
)
|
|
export.add_argument("--repo", required=True, help="Repository id or exact repository name.")
|
|
export.add_argument("--analysis-run", type=int, required=True, help="Completed analysis run id.")
|
|
export.add_argument("--output", help="Write artifact JSON to this path instead of stdout.")
|
|
export.add_argument(
|
|
"--role",
|
|
choices=["baseline", "challenger", "negative_regression_seed"],
|
|
default="challenger",
|
|
help="Assessment artifact role.",
|
|
)
|
|
export.add_argument(
|
|
"--outcome",
|
|
choices=[
|
|
"baseline",
|
|
"challenger",
|
|
"preferred",
|
|
"tied",
|
|
"rejected",
|
|
"superseded",
|
|
"needs-human",
|
|
],
|
|
default="challenger",
|
|
help="Initial assessment outcome.",
|
|
)
|
|
export.add_argument("--reviewer", default="codex", help="Reviewer name recorded in the artifact.")
|
|
export.add_argument("--summary", help="Assessment summary override.")
|
|
export.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
|
|
export.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
|
|
return parser
|
|
|
|
|
|
def main(argv: Sequence[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
if args.command == "rebuild-characteristics":
|
|
return rebuild_characteristics_command(args, parser)
|
|
if args.command == "export-assessment":
|
|
return export_assessment_command(args, parser)
|
|
parser.error(f"unknown command: {args.command}")
|
|
return 2
|
|
|
|
|
|
def rebuild_characteristics_command(
|
|
args: argparse.Namespace,
|
|
parser: argparse.ArgumentParser,
|
|
) -> int:
|
|
dry_run = bool(args.dry_run)
|
|
if not dry_run and args.all and not args.confirm_all:
|
|
parser.error("--all destructive rebuilds require --confirm-all")
|
|
if not dry_run and not (args.confirm or args.confirm_all):
|
|
parser.error("destructive rebuilds require --confirm or --confirm-all")
|
|
|
|
service = service_from_args(args)
|
|
repositories = selected_repositories(service, args)
|
|
if not repositories:
|
|
parser.error("no repositories matched the requested target")
|
|
|
|
for repository in repositories:
|
|
result = service.rebuild_characteristics_from_scratch(
|
|
repository.id,
|
|
dry_run=dry_run,
|
|
confirm=not dry_run,
|
|
use_llm_assistance=not args.no_llm,
|
|
)
|
|
if args.trusted_auto_approve and not dry_run and result.analysis_run.status == "completed":
|
|
service.trusted_auto_approve_candidate_graph(
|
|
repository.id,
|
|
result.analysis_run.id,
|
|
notes="CLI trusted auto-approve after rebuild.",
|
|
)
|
|
print(rebuild_summary_line(service, result, args))
|
|
return 0
|
|
|
|
|
|
def export_assessment_command(
|
|
args: argparse.Namespace,
|
|
parser: argparse.ArgumentParser,
|
|
) -> int:
|
|
service = service_from_args(args)
|
|
repositories = selected_repositories(service, args)
|
|
if not repositories:
|
|
parser.error("no repositories matched the requested target")
|
|
if len(repositories) > 1:
|
|
parser.error("assessment export requires exactly one repository")
|
|
repository = repositories[0]
|
|
try:
|
|
artifact = export_assessment_artifact(
|
|
service,
|
|
repository.id,
|
|
args.analysis_run,
|
|
role=args.role,
|
|
outcome=args.outcome,
|
|
reviewer=args.reviewer,
|
|
summary=args.summary,
|
|
)
|
|
except (NotFoundError, ValueError) as exc:
|
|
parser.error(str(exc))
|
|
|
|
content = artifact_json(artifact)
|
|
if args.output:
|
|
Path(args.output).write_text(content, encoding="utf-8")
|
|
else:
|
|
print(content, end="")
|
|
return 0
|
|
|
|
|
|
def service_from_args(args: argparse.Namespace) -> RegistryService:
|
|
settings = Settings()
|
|
database_path = Path(args.database_path or settings.database_path)
|
|
checkout_root = args.checkout_root or settings.checkout_root
|
|
database_path.parent.mkdir(parents=True, exist_ok=True)
|
|
store = RegistryStore(database_path)
|
|
store.initialize()
|
|
llm_extractor = None
|
|
no_llm = getattr(args, "no_llm", True)
|
|
if not no_llm and settings.llm_enabled and settings.llm_provider:
|
|
adapter = create_llm_connect_adapter(settings.llm_provider, model=settings.llm_model)
|
|
llm_extractor = LLMCandidateExtractor(adapter)
|
|
return RegistryService(
|
|
store,
|
|
ingestion=GitIngestionService(checkout_root),
|
|
llm_extractor=llm_extractor,
|
|
)
|
|
|
|
|
|
def selected_repositories(
|
|
service: RegistryService,
|
|
args: argparse.Namespace,
|
|
) -> list[Repository]:
|
|
repositories = service.list_repositories()
|
|
if getattr(args, "all", False):
|
|
return repositories
|
|
repo = str(args.repo)
|
|
if repo.isdigit():
|
|
try:
|
|
return [service.get_repository(int(repo))]
|
|
except NotFoundError:
|
|
return []
|
|
return [repository for repository in repositories if repository.name == repo]
|
|
|
|
|
|
def rebuild_summary_line(
|
|
service: RegistryService,
|
|
result: CharacteristicRebuildResult,
|
|
args: argparse.Namespace,
|
|
) -> str:
|
|
graph = (
|
|
service.candidate_graph(result.repository.id, result.analysis_run.id)
|
|
if result.analysis_run.status == "completed"
|
|
else None
|
|
)
|
|
remaining_review = 0
|
|
if graph is not None:
|
|
remaining_review = sum(
|
|
1
|
|
for ability in graph.abilities
|
|
for capability in ability.capabilities
|
|
if capability.status == "candidate"
|
|
)
|
|
candidate_source = "deterministic" if args.no_llm else "configured"
|
|
return (
|
|
f"repo={result.repository.id}:{result.repository.name} "
|
|
f"latest_analysis_run={result.analysis_run.id} "
|
|
f"candidate_source={candidate_source} "
|
|
f"dry_run={result.dry_run} "
|
|
f"cleared_approved={result.cleared_approved} "
|
|
f"approved_superseded={result.previous_counts} "
|
|
f"candidates={result.candidate_counts} "
|
|
f"remaining_review_queue={remaining_review}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|