chore(consistency): sync task status from DB [auto]

Updated by fix-consistency on 2026-05-15:
  - update .custodian-brief.md for repo-scoping
This commit is contained in:
2026-05-15 21:14:21 +02:00
parent f38ed6847c
commit 084159e51c
42 changed files with 5 additions and 5 deletions

View File

@@ -0,0 +1,5 @@
"""Repository Scoping."""
__all__ = ["__version__"]
__version__ = "0.1.0"

View File

@@ -0,0 +1,37 @@
from repo_registry.acceptance.agentic import (
AgenticReviewer,
AgenticReviewDecision,
AgenticReviewRequest,
validate_agentic_review_decision,
validate_agentic_review_decisions,
)
from repo_registry.acceptance.criteria import (
active_quality_criteria_version,
criteria_registry_dict,
criteria_registry_json,
criteria_registry_markdown,
load_quality_criteria,
)
from repo_registry.acceptance.gates import (
blocking_quality_gate_outcomes,
evaluate_candidate_capability_quality,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
__all__ = [
"active_quality_criteria_version",
"AgenticReviewDecision",
"AgenticReviewer",
"AgenticReviewRequest",
"blocking_quality_gate_outcomes",
"criteria_registry_dict",
"criteria_registry_json",
"criteria_registry_markdown",
"evaluate_candidate_capability_quality",
"evaluate_candidate_graph_quality",
"load_quality_criteria",
"quality_gate_outcome_dicts",
"validate_agentic_review_decision",
"validate_agentic_review_decisions",
]

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from typing import Protocol
from repo_registry.acceptance.gates import QualityGateOutcome
from repo_registry.core.models import CandidateGraph, Repository
AGENTIC_REVIEW_ACTIONS = {
"approve",
"approve_with_edits",
"reject",
"downgrade",
"request_human_review",
"propose_edit",
"relink",
}
AGENTIC_APPROVAL_ACTIONS = {"approve", "approve_with_edits"}
@dataclass(frozen=True)
class AgenticReviewRequest:
repository: Repository
candidate_graph: CandidateGraph
criteria_version: str
quality_gate_outcomes: list[QualityGateOutcome]
context: str
@dataclass(frozen=True)
class AgenticReviewDecision:
action: str
target_type: str
target_id: int
rationale: str
criterion_ids: list[str]
evidence_refs: list[str]
notes: str = ""
proposed_changes: dict[str, Any] | None = None
class AgenticReviewer(Protocol):
reviewer_id: str
policy_version: str
def review(self, request: AgenticReviewRequest) -> list[AgenticReviewDecision]:
"""Review a candidate graph and return structured decisions."""
def validate_agentic_review_decision(decision: AgenticReviewDecision) -> None:
if decision.action not in AGENTIC_REVIEW_ACTIONS:
raise ValueError(f"unsupported agentic review action: {decision.action}")
if not decision.target_type:
raise ValueError("agentic review decision target_type is required")
if decision.target_id < 0:
raise ValueError("agentic review decision target_id must be non-negative")
if not decision.rationale.strip():
raise ValueError("agentic review decision rationale is required")
if not decision.criterion_ids:
raise ValueError("agentic review decision criterion_ids are required")
if decision.action in AGENTIC_APPROVAL_ACTIONS and not decision.evidence_refs:
raise ValueError(
"agentic approval requires evidence refs tied to the rationale"
)
def validate_agentic_review_decisions(
decisions: list[AgenticReviewDecision],
) -> list[AgenticReviewDecision]:
for decision in decisions:
validate_agentic_review_decision(decision)
return decisions

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
CRITERIA_SCHEMA_VERSION = "quality-criteria-registry/v1"
DEFAULT_CRITERIA_PATH = (
Path(__file__).resolve().parents[3]
/ "docs"
/ "quality-criteria"
/ "acceptance-quality-criteria.v1.json"
)
REQUIRED_CRITERION_FIELDS = {
"id",
"title",
"category",
"severity",
"applies_to",
"description",
"deterministic_action",
"deterministic_action_when",
"reviewer_guidance",
}
@dataclass(frozen=True)
class QualityCriterion:
id: str
title: str
category: str
severity: str
applies_to: list[str]
description: str
deterministic_action: str
deterministic_action_when: str
reviewer_guidance: str
agentic_guidance: str = ""
examples: list[str] | None = None
@dataclass(frozen=True)
class QualityCriteriaRegistry:
schema_version: str
criteria_version: str
status: str
updated_at: str
criteria: list[QualityCriterion]
def load_quality_criteria(path: str | Path | None = None) -> QualityCriteriaRegistry:
criteria_path = Path(path) if path is not None else DEFAULT_CRITERIA_PATH
payload = json.loads(criteria_path.read_text(encoding="utf-8"))
return _registry_from_payload(payload)
def active_quality_criteria_version(path: str | Path | None = None) -> str:
return load_quality_criteria(path).criteria_version
def criteria_registry_dict(registry: QualityCriteriaRegistry) -> dict[str, Any]:
return asdict(registry)
def criteria_registry_json(registry: QualityCriteriaRegistry) -> str:
return json.dumps(criteria_registry_dict(registry), indent=2, sort_keys=True) + "\n"
def criteria_registry_markdown(registry: QualityCriteriaRegistry) -> str:
lines = [
f"# Quality Criteria Registry: {registry.criteria_version}",
"",
f"- Schema: `{registry.schema_version}`",
f"- Status: `{registry.status}`",
f"- Updated: `{registry.updated_at}`",
"",
]
for criterion in registry.criteria:
lines.extend(
[
f"## {criterion.id}: {criterion.title}",
"",
f"- Category: `{criterion.category}`",
f"- Severity: `{criterion.severity}`",
f"- Applies to: `{', '.join(criterion.applies_to)}`",
f"- Deterministic action: `{criterion.deterministic_action}`",
"",
criterion.description,
"",
f"Deterministic trigger: {criterion.deterministic_action_when}",
"",
f"Reviewer guidance: {criterion.reviewer_guidance}",
"",
]
)
return "\n".join(lines)
def _registry_from_payload(payload: dict[str, Any]) -> QualityCriteriaRegistry:
if payload.get("schema_version") != CRITERIA_SCHEMA_VERSION:
raise ValueError(
"unsupported quality criteria schema: "
f"{payload.get('schema_version', '<missing>')}"
)
criteria_payload = payload.get("criteria")
if not isinstance(criteria_payload, list) or not criteria_payload:
raise ValueError("quality criteria registry must contain criteria")
criteria = [_criterion_from_payload(item) for item in criteria_payload]
ids = [criterion.id for criterion in criteria]
if len(ids) != len(set(ids)):
raise ValueError("quality criteria ids must be unique")
return QualityCriteriaRegistry(
schema_version=str(payload.get("schema_version", "")),
criteria_version=str(payload.get("criteria_version", "")),
status=str(payload.get("status", "")),
updated_at=str(payload.get("updated_at", "")),
criteria=criteria,
)
def _criterion_from_payload(payload: dict[str, Any]) -> QualityCriterion:
missing = sorted(REQUIRED_CRITERION_FIELDS - set(payload))
if missing:
raise ValueError(
f"quality criterion {payload.get('id', '<unknown>')} missing fields: "
f"{', '.join(missing)}"
)
applies_to = payload.get("applies_to")
if not isinstance(applies_to, list) or not applies_to:
raise ValueError(
f"quality criterion {payload.get('id', '<unknown>')} must list applies_to"
)
examples = payload.get("examples") or []
return QualityCriterion(
id=str(payload["id"]),
title=str(payload["title"]),
category=str(payload["category"]),
severity=str(payload["severity"]),
applies_to=[str(item) for item in applies_to],
description=str(payload["description"]),
deterministic_action=str(payload["deterministic_action"]),
deterministic_action_when=str(payload["deterministic_action_when"]),
reviewer_guidance=str(payload["reviewer_guidance"]),
agentic_guidance=str(payload.get("agentic_guidance", "")),
examples=[str(item) for item in examples],
)

View File

@@ -0,0 +1,215 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from repo_registry.acceptance.criteria import (
QualityCriteriaRegistry,
QualityCriterion,
load_quality_criteria,
)
from repo_registry.core.models import (
CandidateCapability,
CandidateFeature,
CandidateGraph,
SourceReference,
)
PROVIDER_ROUTING_CAPABILITY = "Route LLM Requests Across Providers"
BLOCKING_OUTCOMES = {"downgraded", "rejected", "invalidated", "requires_review"}
@dataclass(frozen=True)
class QualityGateOutcome:
criteria_version: str
criterion_id: str
criterion_title: str
severity: str
outcome: str
element_type: str
element_id: int
element_name: str
reason: str
def evaluate_candidate_graph_quality(
graph: CandidateGraph,
registry: QualityCriteriaRegistry | None = None,
) -> list[QualityGateOutcome]:
active_registry = registry or load_quality_criteria()
outcomes: list[QualityGateOutcome] = []
for ability in graph.abilities:
for capability in ability.capabilities:
outcomes.extend(evaluate_candidate_capability_quality(capability, active_registry))
return outcomes
def evaluate_candidate_capability_quality(
capability: CandidateCapability,
registry: QualityCriteriaRegistry | None = None,
) -> list[QualityGateOutcome]:
active_registry = registry or load_quality_criteria()
criteria = {criterion.id: criterion for criterion in active_registry.criteria}
outcomes: list[QualityGateOutcome] = []
refs = _capability_refs(capability)
if not refs:
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-004"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate capability has no source refs supporting the abstraction.",
)
)
elif _all_generated_scope_refs(refs):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-005"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate is supported only by generated SCOPE.md evidence.",
)
)
elif _all_weak_source_refs(refs):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-001"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="All supporting refs are weak source roles for capability truth.",
)
)
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-006"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason="Candidate is primarily supported by tests, fixtures, schemas, or examples.",
)
)
if _looks_like_provider_routing(capability):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-002"],
element_type="capability",
element_id=capability.id,
element_name=capability.name,
reason=(
"Provider-routing or LLM-integration vocabulary requires "
"explicit product evidence before it can be native utility."
),
)
)
for feature in capability.features:
if _feature_misplaced_under_provider_routing(capability, feature):
outcomes.append(
_outcome(
active_registry,
criteria["RREG-QC-003"],
element_type="feature",
element_id=feature.id,
element_name=feature.name,
reason=(
"API/CLI surface is nested below provider-routing or "
"LLM-integration capability."
),
)
)
return outcomes
def blocking_quality_gate_outcomes(
outcomes: list[QualityGateOutcome],
) -> list[QualityGateOutcome]:
return [outcome for outcome in outcomes if outcome.outcome in BLOCKING_OUTCOMES]
def quality_gate_outcome_dicts(
outcomes: list[QualityGateOutcome],
) -> list[dict[str, object]]:
return [asdict(outcome) for outcome in outcomes]
def _outcome(
registry: QualityCriteriaRegistry,
criterion: QualityCriterion,
*,
element_type: str,
element_id: int,
element_name: str,
reason: str,
) -> QualityGateOutcome:
return QualityGateOutcome(
criteria_version=registry.criteria_version,
criterion_id=criterion.id,
criterion_title=criterion.title,
severity=criterion.severity,
outcome=criterion.deterministic_action,
element_type=element_type,
element_id=element_id,
element_name=element_name,
reason=reason,
)
def _capability_refs(capability: CandidateCapability) -> list[SourceReference]:
refs = list(capability.source_refs)
for feature in capability.features:
refs.extend(feature.source_refs)
for evidence in capability.evidence:
refs.extend(evidence.source_refs)
return refs
def _looks_like_provider_routing(capability: CandidateCapability) -> bool:
return (
capability.name == PROVIDER_ROUTING_CAPABILITY
or capability.primary_class in {"llm-integration", "provider-routing"}
)
def _feature_misplaced_under_provider_routing(
capability: CandidateCapability,
feature: CandidateFeature,
) -> bool:
if not _looks_like_provider_routing(capability):
return False
return feature.type.upper() in {"API", "CLI"} or feature.primary_class.upper() in {
"API",
"CLI",
}
def _all_generated_scope_refs(refs: list[SourceReference]) -> bool:
return bool(refs) and all(ref.path.endswith("SCOPE.md") for ref in refs)
def _all_weak_source_refs(refs: list[SourceReference]) -> bool:
return bool(refs) and all(_is_weak_source_ref(ref) for ref in refs)
def _is_weak_source_ref(ref: SourceReference) -> bool:
path = ref.path.lower()
kind = ref.kind.lower()
return (
path.startswith("tests/")
or "/tests/" in path
or "fixture" in path
or path.startswith("docs/schemas/")
or "schema" in kind
or "example" in kind
or kind in {"test", "fixture", "schema-example", "generated-scope"}
)

View File

@@ -0,0 +1 @@
"""Candidate ability graph generation."""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,273 @@
from __future__ import annotations
import re
from dataclasses import replace
from repo_registry.candidate_graph.generator import (
CandidateAbilityDraft,
CandidateCapabilityDraft,
CandidateEvidenceDraft,
CandidateFeatureDraft,
)
from repo_registry.core.models import SourceReference
STOP_WORDS = {
"a",
"an",
"and",
"capability",
"feature",
"for",
"models",
"model",
"of",
"support",
"supports",
"the",
"to",
"use",
"uses",
"using",
}
DISTINCTIVE_TOKENS = {
"anthropic",
"claude",
"gemini",
"openai",
"openrouter",
}
def normalize_candidate_drafts(
abilities: list[CandidateAbilityDraft],
) -> list[CandidateAbilityDraft]:
return _merge_abilities(abilities)
def _merge_abilities(
abilities: list[CandidateAbilityDraft],
) -> list[CandidateAbilityDraft]:
merged: list[CandidateAbilityDraft] = []
for ability in abilities:
index = _find_overlap(merged, ability.name)
if index is None:
merged.append(
replace(
ability,
capabilities=_merge_capabilities(ability.capabilities),
)
)
continue
merged[index] = _combine_abilities(merged[index], ability)
return merged
def _combine_abilities(
left: CandidateAbilityDraft,
right: CandidateAbilityDraft,
) -> CandidateAbilityDraft:
return CandidateAbilityDraft(
name=_preferred_name(left.name, right.name),
description=_preferred_description(left.description, right.description),
confidence=max(left.confidence, right.confidence),
source_refs=_merge_source_refs(left.source_refs, right.source_refs),
primary_class=_preferred_text(left.primary_class, right.primary_class),
attributes=_merge_strings(left.attributes, right.attributes),
capabilities=_merge_capabilities(left.capabilities + right.capabilities),
)
def _merge_capabilities(
capabilities: list[CandidateCapabilityDraft],
) -> list[CandidateCapabilityDraft]:
merged: list[CandidateCapabilityDraft] = []
for capability in capabilities:
index = _find_overlap(merged, capability.name)
if index is None:
merged.append(
replace(
capability,
features=_merge_features(capability.features),
evidence=_merge_evidence(capability.evidence),
)
)
continue
merged[index] = _combine_capabilities(merged[index], capability)
return merged
def _combine_capabilities(
left: CandidateCapabilityDraft,
right: CandidateCapabilityDraft,
) -> CandidateCapabilityDraft:
return CandidateCapabilityDraft(
name=_preferred_name(left.name, right.name),
description=_preferred_description(left.description, right.description),
inputs=_merge_strings(left.inputs, right.inputs),
outputs=_merge_strings(left.outputs, right.outputs),
confidence=max(left.confidence, right.confidence),
source_refs=_merge_source_refs(left.source_refs, right.source_refs),
primary_class=_preferred_text(left.primary_class, right.primary_class),
attributes=_merge_strings(left.attributes, right.attributes),
features=_merge_features(left.features + right.features),
evidence=_merge_evidence(left.evidence + right.evidence),
)
def _merge_features(
features: list[CandidateFeatureDraft],
) -> list[CandidateFeatureDraft]:
merged: list[CandidateFeatureDraft] = []
for feature in features:
index = _find_overlap(merged, feature.name)
if index is None:
merged.append(feature)
continue
existing = merged[index]
merged[index] = CandidateFeatureDraft(
name=_preferred_name(existing.name, feature.name),
type=_preferred_text(existing.type, feature.type),
location=_preferred_text(existing.location, feature.location),
confidence=max(existing.confidence, feature.confidence),
source_refs=_merge_source_refs(existing.source_refs, feature.source_refs),
primary_class=_preferred_text(existing.primary_class, feature.primary_class),
attributes=_merge_strings(existing.attributes, feature.attributes),
)
return merged
def _merge_evidence(
evidence_items: list[CandidateEvidenceDraft],
) -> list[CandidateEvidenceDraft]:
merged: list[CandidateEvidenceDraft] = []
seen: set[tuple[str, str]] = set()
for evidence in evidence_items:
key = (_normalize_text(evidence.type), _normalize_path(evidence.reference))
if key not in seen:
seen.add(key)
merged.append(evidence)
continue
index = next(
index
for index, item in enumerate(merged)
if (_normalize_text(item.type), _normalize_path(item.reference)) == key
)
existing = merged[index]
merged[index] = CandidateEvidenceDraft(
type=_preferred_text(existing.type, evidence.type),
reference=_preferred_text(existing.reference, evidence.reference),
strength=_stronger_evidence(existing.strength, evidence.strength),
source_refs=_merge_source_refs(existing.source_refs, evidence.source_refs),
)
return merged
def _find_overlap(items: list, name: str) -> int | None:
for index, item in enumerate(items):
if _names_overlap(item.name, name):
return index
return None
def _names_overlap(left: str, right: str) -> bool:
if _normalize_text(left) == _normalize_text(right):
return True
left_tokens = _tokens(left)
right_tokens = _tokens(right)
if not left_tokens or not right_tokens:
return False
intersection = left_tokens & right_tokens
union = left_tokens | right_tokens
overlap = len(intersection) / len(union)
containment = len(intersection) / min(len(left_tokens), len(right_tokens))
if intersection & DISTINCTIVE_TOKENS and containment >= 0.8:
return True
return overlap >= 0.6 or (containment >= 0.8 and len(intersection) >= 2)
def _tokens(value: str) -> set[str]:
return {
_stem(token)
for token in re.findall(r"[a-z0-9]+", value.lower())
if token not in STOP_WORDS
}
def _stem(token: str) -> str:
if token.endswith("ies") and len(token) > 4:
return f"{token[:-3]}y"
if token.endswith("s") and len(token) > 3:
return token[:-1]
return token
def _normalize_text(value: str) -> str:
return " ".join(sorted(_tokens(value)))
def _normalize_path(value: str) -> str:
return value.strip().lower()
def _preferred_name(left: str, right: str) -> str:
return _preferred_text(left, right)
def _preferred_description(left: str, right: str) -> str:
if not left.strip():
return right.strip()
if not right.strip():
return left.strip()
if _normalize_sentence(left) == _normalize_sentence(right):
return max((left.strip(), right.strip()), key=len)
return max((left.strip(), right.strip()), key=len)
def _normalize_sentence(value: str) -> str:
return re.sub(r"\s+", " ", value.strip().lower())
def _preferred_text(left: str, right: str) -> str:
if not left:
return right
if not right:
return left
return max((left, right), key=lambda item: (len(_tokens(item)), len(item)))
def _merge_strings(left: list[str], right: list[str]) -> list[str]:
merged: list[str] = []
seen: set[str] = set()
for value in left + right:
key = _normalize_value(value)
if key in seen:
continue
seen.add(key)
merged.append(value)
return merged
def _normalize_value(value: str) -> str:
return " ".join(re.findall(r"[a-z0-9]+", value.lower()))
def _merge_source_refs(
left: list[SourceReference],
right: list[SourceReference],
) -> list[SourceReference]:
merged: list[SourceReference] = []
seen: set[tuple[int | None, str, str, str, int | None]] = set()
for ref in left + right:
key = (ref.fact_id, ref.path, ref.kind, ref.name, ref.line)
if key in seen:
continue
seen.add(key)
merged.append(ref)
return merged
def _stronger_evidence(left: str, right: str) -> str:
ranking = {"weak": 0, "medium": 1, "strong": 2}
return left if ranking.get(left, 1) >= ranking.get(right, 1) else right

473
src/repo_scoping/cli.py Normal file
View File

@@ -0,0 +1,473 @@
from __future__ import annotations
import argparse
import json
from dataclasses import asdict
from pathlib import Path
from typing import Sequence
from repo_registry.acceptance import (
criteria_registry_json,
criteria_registry_markdown,
load_quality_criteria,
)
from repo_registry.core.models import CharacteristicRebuildResult, Repository
from repo_registry.core.service import RegistryService
from repo_registry.llm_extraction import LLMCandidateExtractor, create_llm_connect_adapter
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.self_scoping.assessment import artifact_json, export_assessment_artifact
from repo_registry.self_scoping.comparison import (
compare_assessment_to_golden,
comparison_json,
comparison_markdown,
load_json,
)
from repo_registry.storage.sqlite import NotFoundError, RegistryStore
from repo_registry.web_api.app import Settings
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="repo-scoping",
description="Repository Scoping maintenance commands.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
rebuild = subparsers.add_parser(
"rebuild-characteristics",
help="Rebuild candidate characteristics for one or more repositories.",
)
target = rebuild.add_mutually_exclusive_group(required=True)
target.add_argument("--repo", help="Repository id or exact repository name.")
target.add_argument("--all", action="store_true", help="Rebuild every repository.")
rebuild.add_argument("--dry-run", action="store_true", help="Preview without clearing approved characteristics.")
rebuild.add_argument("--no-llm", action="store_true", help="Disable configured LLM assistance.")
rebuild.add_argument(
"--agentic-review",
action="store_true",
help="Request configured agentic review after a confirmed rebuild.",
)
rebuild.add_argument(
"--confirm",
action="store_true",
help="Confirm a destructive rebuild for selected repositories.",
)
rebuild.add_argument(
"--confirm-all",
action="store_true",
help="Confirm a destructive all-repository rebuild.",
)
rebuild.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
rebuild.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
export = subparsers.add_parser(
"export-assessment",
help="Export a completed analysis run as a self-scoping assessment artifact.",
)
export.add_argument("--repo", required=True, help="Repository id or exact repository name.")
export.add_argument("--analysis-run", type=int, required=True, help="Completed analysis run id.")
export.add_argument("--output", help="Write artifact JSON to this path instead of stdout.")
export.add_argument(
"--role",
choices=["baseline", "challenger", "negative_regression_seed"],
default="challenger",
help="Assessment artifact role.",
)
export.add_argument(
"--outcome",
choices=[
"baseline",
"challenger",
"preferred",
"tied",
"rejected",
"superseded",
"needs-human",
],
default="challenger",
help="Initial assessment outcome.",
)
export.add_argument("--reviewer", default="codex", help="Reviewer name recorded in the artifact.")
export.add_argument("--summary", help="Assessment summary override.")
export.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
export.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
compare = subparsers.add_parser(
"compare-assessment",
help="Compare a self-scoping assessment artifact against a golden profile.",
)
compare.add_argument("--golden", required=True, help="Golden profile JSON path.")
compare.add_argument(
"--assessment",
required=True,
help="Assessment artifact JSON path.",
)
compare.add_argument("--output", help="Write comparison report to this path instead of stdout.")
compare.add_argument(
"--format",
choices=["json", "markdown"],
default="markdown",
help="Comparison report format.",
)
self_assess = subparsers.add_parser(
"self-assess",
help="Run repo-scoping against a source tree and compare the result to a golden profile.",
)
self_assess.add_argument(
"--repo",
default="repo-scoping",
help="Repository id or exact repository name to reuse; created by name when absent.",
)
self_assess.add_argument(
"--source-path",
default=".",
help="Source tree to analyze; defaults to the current working directory.",
)
self_assess.add_argument(
"--golden",
default="docs/self-scoping/golden/repo-scoping-golden-profile.v1.json",
help="Golden profile JSON path.",
)
self_assess.add_argument(
"--assessment-output",
help="Write challenger assessment artifact JSON to this path.",
)
self_assess.add_argument(
"--comparison-output",
help="Write comparison report to this path instead of stdout.",
)
self_assess.add_argument(
"--format",
choices=["json", "markdown"],
default="markdown",
help="Comparison report format.",
)
self_assess.add_argument(
"--with-llm",
action="store_false",
dest="no_llm",
help="Use configured LLM assistance during the self-assessment run.",
)
self_assess.add_argument(
"--agentic-review",
action="store_true",
help="Request configured agentic review; leaves candidates pending when none is configured.",
)
self_assess.add_argument(
"--fail-on-regression",
action="store_true",
help="Return exit code 1 only when comparison status is regression.",
)
self_assess.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
self_assess.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
self_assess.set_defaults(no_llm=True)
criteria = subparsers.add_parser(
"list-quality-criteria",
help="List the active characteristic quality criteria registry.",
)
criteria.add_argument(
"--criteria-path",
help="Override the default quality criteria registry JSON path.",
)
criteria.add_argument("--output", help="Write criteria output to this path instead of stdout.")
criteria.add_argument(
"--format",
choices=["json", "markdown"],
default="markdown",
help="Criteria output format.",
)
legacy = subparsers.add_parser(
"list-legacy-auto-approvals",
help="List historical trusted deterministic auto-approval records.",
)
legacy.add_argument("--database-path", help="Override REPO_REGISTRY_DATABASE_PATH.")
legacy.add_argument("--checkout-root", help="Override REPO_REGISTRY_CHECKOUT_ROOT.")
legacy.add_argument("--output", help="Write inventory output to this path instead of stdout.")
legacy.add_argument(
"--format",
choices=["json", "markdown"],
default="markdown",
help="Inventory output format.",
)
return parser
def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command == "rebuild-characteristics":
return rebuild_characteristics_command(args, parser)
if args.command == "export-assessment":
return export_assessment_command(args, parser)
if args.command == "compare-assessment":
return compare_assessment_command(args)
if args.command == "self-assess":
return self_assess_command(args, parser)
if args.command == "list-quality-criteria":
return list_quality_criteria_command(args)
if args.command == "list-legacy-auto-approvals":
return list_legacy_auto_approvals_command(args)
parser.error(f"unknown command: {args.command}")
return 2
def rebuild_characteristics_command(
args: argparse.Namespace,
parser: argparse.ArgumentParser,
) -> int:
dry_run = bool(args.dry_run)
if not dry_run and args.all and not args.confirm_all:
parser.error("--all destructive rebuilds require --confirm-all")
if not dry_run and not (args.confirm or args.confirm_all):
parser.error("destructive rebuilds require --confirm or --confirm-all")
service = service_from_args(args)
repositories = selected_repositories(service, args)
if not repositories:
parser.error("no repositories matched the requested target")
for repository in repositories:
result = service.rebuild_characteristics_from_scratch(
repository.id,
dry_run=dry_run,
confirm=not dry_run,
use_llm_assistance=not args.no_llm,
)
if args.agentic_review and not dry_run and result.analysis_run.status == "completed":
service.request_agentic_review(
repository.id,
result.analysis_run.id,
notes="CLI agentic review request after rebuild.",
)
print(rebuild_summary_line(service, result, args))
return 0
def compare_assessment_command(args: argparse.Namespace) -> int:
comparison = compare_assessment_to_golden(
load_json(args.golden),
load_json(args.assessment),
)
content = (
comparison_json(comparison)
if args.format == "json"
else comparison_markdown(comparison)
)
if args.output:
write_text(args.output, content)
else:
print(content, end="" if content.endswith("\n") else "\n")
return 0
def list_quality_criteria_command(args: argparse.Namespace) -> int:
registry = load_quality_criteria(args.criteria_path)
content = (
criteria_registry_json(registry)
if args.format == "json"
else criteria_registry_markdown(registry)
)
if args.output:
write_text(args.output, content)
else:
print(content, end="" if content.endswith("\n") else "\n")
return 0
def list_legacy_auto_approvals_command(args: argparse.Namespace) -> int:
service = service_from_args(args)
records = service.list_trusted_auto_approval_migration_records()
if args.format == "json":
content = json.dumps([asdict(record) for record in records], indent=2) + "\n"
else:
content = legacy_auto_approval_records_markdown(records)
if args.output:
write_text(args.output, content)
else:
print(content, end="" if content.endswith("\n") else "\n")
return 0
def legacy_auto_approval_records_markdown(records) -> str:
if not records:
return "No legacy trusted auto-approval records found.\n"
lines = ["# Legacy Trusted Auto-Approval Records", ""]
for record in records:
lines.extend(
[
(
f"- repo={record.repository_id}:{record.repository_name} "
f"run={record.analysis_run_id} decision={record.review_decision_id}"
),
f" status={record.analysis_run_status} scanner={record.scanner_version or 'unknown'}",
f" approved_abilities={record.current_approved_ability_count}",
f" next={record.recommended_next_step}",
]
)
return "\n".join(lines) + "\n"
def self_assess_command(
args: argparse.Namespace,
parser: argparse.ArgumentParser,
) -> int:
service = service_from_args(args)
source_path = Path(args.source_path).expanduser().resolve()
if not source_path.is_dir():
parser.error(f"source path does not exist or is not a directory: {source_path}")
repository = self_assessment_repository(service, args.repo, source_path)
summary = service.analyze_repository(
repository.id,
source_path=str(source_path),
use_llm_assistance=not args.no_llm,
agentic_review=args.agentic_review,
trusted_auto_approve=False,
)
if summary.analysis_run.status != "completed":
parser.error(summary.analysis_run.error_message or "analysis failed")
artifact = export_assessment_artifact(
service,
repository.id,
summary.analysis_run.id,
role="challenger",
outcome="challenger",
reviewer="self-assess",
)
comparison = compare_assessment_to_golden(load_json(args.golden), artifact)
if args.assessment_output:
write_text(args.assessment_output, artifact_json(artifact))
report = (
comparison_json(comparison)
if args.format == "json"
else comparison_markdown(comparison)
)
if args.comparison_output:
write_text(args.comparison_output, report)
else:
print(report, end="" if report.endswith("\n") else "\n")
if args.fail_on_regression and comparison["status"] == "regression":
return 1
return 0
def export_assessment_command(
args: argparse.Namespace,
parser: argparse.ArgumentParser,
) -> int:
service = service_from_args(args)
repositories = selected_repositories(service, args)
if not repositories:
parser.error("no repositories matched the requested target")
if len(repositories) > 1:
parser.error("assessment export requires exactly one repository")
repository = repositories[0]
try:
artifact = export_assessment_artifact(
service,
repository.id,
args.analysis_run,
role=args.role,
outcome=args.outcome,
reviewer=args.reviewer,
summary=args.summary,
)
except (NotFoundError, ValueError) as exc:
parser.error(str(exc))
content = artifact_json(artifact)
if args.output:
write_text(args.output, content)
else:
print(content, end="")
return 0
def service_from_args(args: argparse.Namespace) -> RegistryService:
settings = Settings()
database_path = Path(args.database_path or settings.database_path)
checkout_root = args.checkout_root or settings.checkout_root
database_path.parent.mkdir(parents=True, exist_ok=True)
store = RegistryStore(database_path)
store.initialize()
llm_extractor = None
no_llm = getattr(args, "no_llm", True)
if not no_llm and settings.llm_enabled and settings.llm_provider:
adapter = create_llm_connect_adapter(settings.llm_provider, model=settings.llm_model)
llm_extractor = LLMCandidateExtractor(adapter)
return RegistryService(
store,
ingestion=GitIngestionService(checkout_root),
llm_extractor=llm_extractor,
)
def selected_repositories(
service: RegistryService,
args: argparse.Namespace,
) -> list[Repository]:
repositories = service.list_repositories()
if getattr(args, "all", False):
return repositories
repo = str(args.repo)
if repo.isdigit():
try:
return [service.get_repository(int(repo))]
except NotFoundError:
return []
return [repository for repository in repositories if repository.name == repo]
def self_assessment_repository(
service: RegistryService,
repo: str,
source_path: Path,
) -> Repository:
selected = selected_repositories(service, argparse.Namespace(repo=repo, all=False))
if selected:
return selected[0]
if repo.isdigit():
raise NotFoundError(f"repository {repo} was not found")
return service.register_repository(
name=repo,
url=str(source_path),
description="Self-scoping assessment target.",
)
def write_text(path: str | Path, content: str) -> None:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
def rebuild_summary_line(
service: RegistryService,
result: CharacteristicRebuildResult,
args: argparse.Namespace,
) -> str:
graph = (
service.candidate_graph(result.repository.id, result.analysis_run.id)
if result.analysis_run.status == "completed"
else None
)
remaining_review = 0
if graph is not None:
remaining_review = sum(
1
for ability in graph.abilities
for capability in ability.capabilities
if capability.status == "candidate"
)
candidate_source = "deterministic" if args.no_llm else "configured"
return (
f"repo={result.repository.id}:{result.repository.name} "
f"latest_analysis_run={result.analysis_run.id} "
f"candidate_source={candidate_source} "
f"dry_run={result.dry_run} "
f"cleared_approved={result.cleared_approved} "
f"approved_superseded={result.previous_counts} "
f"candidates={result.candidate_counts} "
f"remaining_review_queue={remaining_review}"
)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,3 @@
from repo_registry.content_indexing.extractor import ContentChunkCandidate, ContentExtractor
__all__ = ["ContentChunkCandidate", "ContentExtractor"]

View File

@@ -0,0 +1,134 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from repo_registry.core.models import ObservedFact
INDEXED_FACT_KINDS = {
"intent",
"scope",
"documentation",
"example",
"test",
"manifest",
"interface",
"config",
"llm_provider",
"credential_config",
"provider_registry",
"fallback_policy",
}
MAX_CHUNK_LINES = 40
MAX_FILE_BYTES = 200_000
@dataclass(frozen=True)
class ContentChunkCandidate:
path: str
kind: str
start_line: int
end_line: int
text: str
metadata: dict[str, object] = field(default_factory=dict)
class ContentExtractor:
"""Extract deterministic text chunks from source-linked observed facts."""
def extract(
self,
source_path: str | Path,
facts: list[ObservedFact],
) -> list[ContentChunkCandidate]:
root = Path(source_path).expanduser().resolve()
chunks: list[ContentChunkCandidate] = []
seen: set[tuple[str, str, int, int]] = set()
for fact in facts:
if fact.kind not in INDEXED_FACT_KINDS or not fact.path:
continue
path = (root / fact.path).resolve()
if not self._is_within(root, path) or not path.is_file():
continue
for chunk in self._chunks_for_fact(path, root, fact):
key = (chunk.path, chunk.kind, chunk.start_line, chunk.end_line)
if key in seen:
continue
seen.add(key)
chunks.append(chunk)
return sorted(chunks, key=lambda chunk: (chunk.path, chunk.start_line, chunk.kind))
def _chunks_for_fact(
self,
path: Path,
root: Path,
fact: ObservedFact,
) -> list[ContentChunkCandidate]:
try:
if path.stat().st_size > MAX_FILE_BYTES:
return []
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
except OSError:
return []
if not lines:
return []
line = fact.metadata.get("line")
if isinstance(line, int):
start_line = max(1, line - 5)
end_line = min(len(lines), line + 10)
return [
self._chunk(
path,
root,
fact.kind,
fact.metadata,
lines,
start_line,
end_line,
)
]
chunks: list[ContentChunkCandidate] = []
for start_index in range(0, len(lines), MAX_CHUNK_LINES):
start_line = start_index + 1
end_line = min(len(lines), start_index + MAX_CHUNK_LINES)
chunks.append(
self._chunk(
path,
root,
fact.kind,
fact.metadata,
lines,
start_line,
end_line,
)
)
return chunks
def _chunk(
self,
path: Path,
root: Path,
kind: str,
fact_metadata: dict[str, object],
lines: list[str],
start_line: int,
end_line: int,
) -> ContentChunkCandidate:
return ContentChunkCandidate(
path=path.relative_to(root).as_posix(),
kind=kind,
start_line=start_line,
end_line=end_line,
text="\n".join(lines[start_line - 1 : end_line]).strip(),
metadata={"source_role": fact_metadata.get("source_role", "")},
)
def _is_within(self, root: Path, path: Path) -> bool:
try:
path.relative_to(root)
except ValueError:
return False
return True

View File

@@ -0,0 +1 @@
"""Core registry domain objects and services."""

View File

@@ -0,0 +1,15 @@
from __future__ import annotations
import json
import logging
from typing import Any
LOGGER_NAME = "repo_registry.operations"
def log_operation(event: str, **fields: Any) -> None:
payload = {"event": event, **fields}
logging.getLogger(LOGGER_NAME).info(
json.dumps(payload, sort_keys=True, default=str)
)

View File

@@ -0,0 +1,516 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
def confidence_label(confidence: float) -> str:
if confidence >= 0.8:
return "high"
if confidence >= 0.5:
return "medium"
return "low"
@dataclass(frozen=True)
class Repository:
id: int
name: str
url: str
description: str | None
branch: str
status: str
@dataclass(frozen=True)
class RepositorySnapshot:
id: int
repository_id: int
commit_hash: str
branch: str
source_path: str
file_count: int
@dataclass(frozen=True)
class AnalysisRun:
id: int
repository_id: int
snapshot_id: int | None
status: str
started_at: str
completed_at: str | None
error_message: str | None
scanner_version: str
@dataclass(frozen=True)
class ReviewDecision:
id: int
repository_id: int
analysis_run_id: int | None
action: str
notes: str
created_at: str
reviewer_type: str = "unknown"
reviewer_id: str = ""
policy_version: str = ""
criteria_version: str = ""
criterion_ids: list[str] = field(default_factory=list)
evidence_refs: list[str] = field(default_factory=list)
rationale: str = ""
accepted_after_edits: bool = False
decision_kind: str = "other"
@dataclass(frozen=True)
class TrustedAutoApprovalMigrationRecord:
repository_id: int
repository_name: str
repository_url: str
repository_status: str
analysis_run_id: int | None
analysis_run_status: str
scanner_version: str
review_decision_id: int
decision_created_at: str
notes: str
current_approved_ability_count: int
recommended_next_step: str
def enrich_review_decision(decision: ReviewDecision) -> ReviewDecision:
fields = review_decision_audit_fields(decision.action, decision.notes)
return replace_review_decision(decision, **fields)
def replace_review_decision(
decision: ReviewDecision,
**fields: object,
) -> ReviewDecision:
data = {
"id": decision.id,
"repository_id": decision.repository_id,
"analysis_run_id": decision.analysis_run_id,
"action": decision.action,
"notes": decision.notes,
"created_at": decision.created_at,
"reviewer_type": decision.reviewer_type,
"reviewer_id": decision.reviewer_id,
"policy_version": decision.policy_version,
"criteria_version": decision.criteria_version,
"criterion_ids": decision.criterion_ids,
"evidence_refs": decision.evidence_refs,
"rationale": decision.rationale,
"accepted_after_edits": decision.accepted_after_edits,
"decision_kind": decision.decision_kind,
}
data.update(fields)
return ReviewDecision(**data)
def review_decision_audit_fields(action: str, notes: str) -> dict[str, object]:
parsed = _parse_review_decision_notes(notes)
return {
"reviewer_type": _reviewer_type(action),
"reviewer_id": parsed.get("reviewer", ""),
"policy_version": parsed.get("policy_version", ""),
"criteria_version": parsed.get("criteria_version", ""),
"criterion_ids": _split_audit_list(parsed.get("criteria", "")),
"evidence_refs": _split_audit_list(parsed.get("evidence", "")),
"rationale": parsed.get("rationale", ""),
"accepted_after_edits": action.endswith("_with_edits")
or action == "agentic_approve_with_edits"
or bool(parsed.get("proposed_changes")),
"decision_kind": _decision_kind(action),
}
def _parse_review_decision_notes(notes: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for part in notes.split(";"):
key, separator, value = part.strip().partition("=")
if separator and key:
parsed[key] = value.strip()
return parsed
def _split_audit_list(value: str) -> list[str]:
if not value or value == "none":
return []
return [item.strip() for item in value.split(",") if item.strip()]
def _reviewer_type(action: str) -> str:
if action == "quality_gate_override":
return "human"
if action.startswith("agentic_"):
return "agent"
if action == "trusted_auto_approve_candidate_graph":
return "migration"
if action.startswith("quality_gate_"):
return "deterministic-gate"
if action.startswith("approve") or action.startswith("accept"):
return "human"
if action.startswith("reject") or action.startswith("edit") or action.startswith("merge"):
return "human"
if action.startswith("relink"):
return "human"
return "migration" if action.startswith("llm_extraction") else "unknown"
def _decision_kind(action: str) -> str:
if "approve_with_edits" in action:
return "accepted_after_edits"
if "approve" in action or action.startswith("accept"):
return "accepted_as_is"
if "reject" in action:
return "rejected"
if "downgrade" in action:
return "downgraded"
if "request_human_review" in action:
return "needs_human"
if "override" in action:
return "override"
if "propose_edit" in action:
return "proposed_edit"
if "relink" in action:
return "relinked"
return "other"
@dataclass(frozen=True)
class ExpectationGap:
id: int
repository_id: int
analysis_run_id: int | None
expected_type: str
expected_name: str
source: str
notes: str
status: str
created_at: str
@dataclass(frozen=True)
class AnalysisRunDiffItem:
change_type: str
item_type: str
key: str
base: dict[str, Any] | None = None
target: dict[str, Any] | None = None
@dataclass(frozen=True)
class AnalysisRunDiffSection:
added: list[AnalysisRunDiffItem] = field(default_factory=list)
removed: list[AnalysisRunDiffItem] = field(default_factory=list)
changed: list[AnalysisRunDiffItem] = field(default_factory=list)
weakened: list[AnalysisRunDiffItem] = field(default_factory=list)
@dataclass(frozen=True)
class AnalysisRunDiff:
repository: Repository
base_run: AnalysisRun
target_run: AnalysisRun
facts: AnalysisRunDiffSection
chunks: AnalysisRunDiffSection
candidates: AnalysisRunDiffSection
approved_entries: AnalysisRunDiffSection
@dataclass(frozen=True)
class ObservedFact:
id: int
repository_id: int
analysis_run_id: int
snapshot_id: int | None
kind: str
path: str
name: str
value: str
metadata: dict[str, Any]
@dataclass(frozen=True)
class ContentChunk:
id: int
repository_id: int
analysis_run_id: int
snapshot_id: int | None
path: str
kind: str
start_line: int
end_line: int
text: str
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class ScanSummary:
analysis_run: AnalysisRun
snapshot: RepositorySnapshot | None
facts: list[ObservedFact]
@dataclass(frozen=True)
class CharacteristicRebuildResult:
repository: Repository
analysis_run: AnalysisRun
dry_run: bool
confirmed: bool
cleared_approved: bool
previous_counts: dict[str, int]
previous_ids: dict[str, list[int]]
candidate_counts: dict[str, int]
@dataclass(frozen=True)
class SourceReference:
fact_id: int | None
path: str
kind: str
name: str
line: int | None = None
@dataclass(frozen=True)
class DependencyEdge:
source_kind: str
source_id: int | None
source_key: str
target_kind: str
target_id: int
target_key: str
dependency_type: str
strength: str
source: str
target_ownership: str
same_layer: bool = False
@dataclass(frozen=True)
class DependencyGraphViewProfile:
id: int
repository_id: int
name: str
description: str
default_mode: str
filter_rules: list[dict[str, Any]]
manual_overrides: dict[str, str]
created_at: str
updated_at: str
@dataclass(frozen=True)
class DependencyGraph:
repository: Repository
scope: "Scope"
edges: list[DependencyEdge]
@dataclass(frozen=True)
class DependencyImpactItem:
item_kind: str
item_id: int
item_key: str
name: str
freshness_state: str
ownership: str
recommended_action: str
impact_depth: int
reasons: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class DependencyImpactAnalysis:
repository: Repository
base_run: AnalysisRun
target_run: AnalysisRun
changed_fact_keys: list[str]
impacts: list[DependencyImpactItem]
max_depth: int
scope_impacted: bool
propagation_breadth: int
graph: DependencyGraph
@dataclass(frozen=True)
class CandidateEvidence:
id: int
type: str
reference: str
strength: str
status: str
source_refs: list[SourceReference]
target_kind: str = "capability"
target_id: int | None = None
reference_kind: str = "source"
reference_id: int | None = None
@dataclass(frozen=True)
class CandidateFeature:
id: int
name: str
type: str
location: str
confidence: float
status: str
source_refs: list[SourceReference]
confidence_label: str = ""
primary_class: str = ""
attributes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateCapability:
id: int
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
status: str
source_refs: list[SourceReference]
confidence_label: str = ""
primary_class: str = "capability"
attributes: list[str] = field(default_factory=list)
features: list[CandidateFeature] = field(default_factory=list)
evidence: list[CandidateEvidence] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbility:
id: int
name: str
description: str
confidence: float
status: str
source_refs: list[SourceReference]
confidence_label: str = ""
primary_class: str = "ability"
attributes: list[str] = field(default_factory=list)
capabilities: list[CandidateCapability] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateGraph:
repository: Repository
analysis_run: AnalysisRun
abilities: list[CandidateAbility]
@dataclass(frozen=True)
class Evidence:
id: int
type: str
reference: str
strength: str
source_refs: list[SourceReference] = field(default_factory=list)
target_kind: str = "capability"
target_id: int | None = None
reference_kind: str = "source"
reference_id: int | None = None
@dataclass(frozen=True)
class Scope:
id: int
name: str
description: str
confidence: float
confidence_label: str = ""
@dataclass(frozen=True)
class Feature:
id: int
name: str
type: str
location: str
confidence: float
confidence_label: str = ""
source_refs: list[SourceReference] = field(default_factory=list)
primary_class: str = ""
attributes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class Capability:
id: int
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
confidence_label: str = ""
primary_class: str = "capability"
attributes: list[str] = field(default_factory=list)
features: list[Feature] = field(default_factory=list)
evidence: list[Evidence] = field(default_factory=list)
@dataclass(frozen=True)
class Ability:
id: int
name: str
description: str
confidence: float
confidence_label: str = ""
primary_class: str = "ability"
attributes: list[str] = field(default_factory=list)
capabilities: list[Capability] = field(default_factory=list)
@dataclass(frozen=True)
class RepositoryAbilityMap:
repository: Repository
scope: Scope
abilities: list[Ability]
@dataclass(frozen=True)
class SearchResult:
repository_id: int
repository_name: str
match_type: str
match_name: str
confidence: float
confidence_label: str = ""
match_description: str = ""
matched_field: str = ""
ability_id: int | None = None
ability_name: str | None = None
capability_id: int | None = None
capability_name: str | None = None
evidence_level: str | None = None
source_reference: str | None = None
text_score: float = 0.0
vector_score: float = 0.0
hybrid_score: float = 0.0
@dataclass(frozen=True)
class AbilitySummary:
id: int
repository_id: int
repository_name: str
name: str
description: str
confidence: float
confidence_label: str = ""
@dataclass(frozen=True)
class CapabilitySummary:
id: int
repository_id: int
repository_name: str
ability_id: int
ability_name: str
name: str
description: str
confidence: float
confidence_label: str = ""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
"""Intent-file helpers for repository scoping."""

View File

@@ -0,0 +1,130 @@
from __future__ import annotations
import argparse
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Iterable
BOOTSTRAP_NOTE = (
"> Bootstrapped from `SCOPE.md` by repo-scoping.\n"
"> Review and edit this file as design intent. `SCOPE.md` remains the\n"
"> derived current-scope artifact."
)
@dataclass(frozen=True)
class IntentBootstrapResult:
repo_path: str
scope_path: str
intent_path: str
status: str
message: str
def bootstrap_intent_from_scope(
repo_path: str | Path,
*,
dry_run: bool = False,
overwrite: bool = False,
today: date | None = None,
) -> IntentBootstrapResult:
root = Path(repo_path).expanduser().resolve()
scope_path = root / "SCOPE.md"
intent_path = root / "INTENT.md"
if not root.is_dir():
return _result(root, scope_path, intent_path, "missing_repo", "repository path does not exist")
if not scope_path.is_file():
return _result(root, scope_path, intent_path, "missing_scope", "SCOPE.md is not present")
if intent_path.exists() and not overwrite:
return _result(root, scope_path, intent_path, "exists", "INTENT.md already exists")
status = "would_overwrite" if intent_path.exists() else "would_create"
if dry_run:
return _result(root, scope_path, intent_path, status, f"{status} INTENT.md from SCOPE.md")
intent_text = scope_to_intent_text(
scope_path.read_text(encoding="utf-8"),
today=today,
)
intent_path.write_text(intent_text, encoding="utf-8")
created_status = "overwritten" if status == "would_overwrite" else "created"
return _result(root, scope_path, intent_path, created_status, f"{created_status} INTENT.md from SCOPE.md")
def bootstrap_many(
repo_paths: Iterable[str | Path],
*,
dry_run: bool = False,
overwrite: bool = False,
today: date | None = None,
) -> list[IntentBootstrapResult]:
return [
bootstrap_intent_from_scope(
repo_path,
dry_run=dry_run,
overwrite=overwrite,
today=today,
)
for repo_path in repo_paths
]
def scope_to_intent_text(scope_text: str, *, today: date | None = None) -> str:
current_date = today or date.today()
lines = scope_text.splitlines()
while lines and not lines[0].strip():
lines.pop(0)
if lines and lines[0].lstrip().lower().startswith("# scope"):
lines[0] = "# INTENT"
elif not lines or not lines[0].startswith("#"):
lines.insert(0, "# INTENT")
note = f"{BOOTSTRAP_NOTE}\n> Bootstrap date: {current_date.isoformat()}"
insert_at = 1 if lines else 0
while insert_at < len(lines) and not lines[insert_at].strip():
insert_at += 1
lines[insert_at:insert_at] = ["", note, ""]
return "\n".join(lines).rstrip() + "\n"
def _result(
root: Path,
scope_path: Path,
intent_path: Path,
status: str,
message: str,
) -> IntentBootstrapResult:
return IntentBootstrapResult(
repo_path=str(root),
scope_path=str(scope_path),
intent_path=str(intent_path),
status=status,
message=message,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Bootstrap INTENT.md from SCOPE.md for repositories that do not have intent files yet."
)
parser.add_argument("repo_paths", nargs="+", help="Repository checkout path(s) to inspect")
parser.add_argument("--dry-run", action="store_true", help="Report planned writes without writing files")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing INTENT.md files")
args = parser.parse_args(argv)
results = bootstrap_many(
args.repo_paths,
dry_run=args.dry_run,
overwrite=args.overwrite,
)
for result in results:
print(f"{result.status}\t{result.repo_path}\t{result.message}")
return 1 if any(result.status in {"missing_repo", "missing_scope"} for result in results) else 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,21 @@
from repo_registry.llm_extraction.extractor import (
ExtractedAbility,
ExtractedCapability,
ExtractedEvidence,
ExtractedFeature,
LLMCandidateExtractor,
LLMExtractionError,
create_llm_connect_adapter,
)
from repo_registry.llm_extraction.mapper import LLMExtractionMapper
__all__ = [
"ExtractedAbility",
"ExtractedCapability",
"ExtractedEvidence",
"ExtractedFeature",
"LLMCandidateExtractor",
"LLMExtractionError",
"LLMExtractionMapper",
"create_llm_connect_adapter",
]

View File

@@ -0,0 +1,262 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Protocol
from repo_registry.core.models import ContentChunk, Repository
class LLMExtractionError(ValueError):
pass
class LLMResponseLike(Protocol):
content: str
class LLMAdapterLike(Protocol):
def execute_prompt(self, prompt: str, config: Any) -> LLMResponseLike:
pass
@dataclass(frozen=True)
class ExtractedEvidence:
type: str
reference: str
strength: str = "medium"
source_paths: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class ExtractedFeature:
name: str
type: str
location: str = ""
source_paths: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class ExtractedCapability:
name: str
description: str = ""
inputs: list[str] = field(default_factory=list)
outputs: list[str] = field(default_factory=list)
features: list[ExtractedFeature] = field(default_factory=list)
evidence: list[ExtractedEvidence] = field(default_factory=list)
source_paths: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class ExtractedAbility:
name: str
description: str = ""
capabilities: list[ExtractedCapability] = field(default_factory=list)
source_paths: list[str] = field(default_factory=list)
class LLMCandidateExtractor:
"""Structured candidate extraction over llm-connect-style adapters."""
def __init__(self, adapter: LLMAdapterLike, run_config: Any | None = None) -> None:
self.adapter = adapter
self.run_config = run_config or self._default_run_config()
def extract(
self,
repository: Repository,
chunks: list[ContentChunk],
) -> list[ExtractedAbility]:
prompt = self.build_prompt(repository, chunks)
response = self.adapter.execute_prompt(prompt, self.run_config)
return self.parse_response(response.content)
def build_prompt(self, repository: Repository, chunks: list[ContentChunk]) -> str:
chunk_text = "\n\n".join(
(
f"Source: {chunk.path}:{chunk.start_line}-{chunk.end_line} "
f"({chunk.kind}; source_role={self._source_role(chunk)})\n{chunk.text}"
)
for chunk in self._prompt_chunks(chunks)
)
return (
"Extract a conservative, source-linked repository ability map.\n"
"Use original repository utility only: capabilities the repository "
"owns, intentionally exposes as a facade, or implements as an adapter.\n"
"Prefer source_role=intent_summary, product_documentation, "
"implementation_source, and test_evidence. Do not use SCOPE.md or "
"source_role=derived_scope as primary evidence; it is a derived prior "
"registry view and may be stale. Ignore agent guidance, CI/tooling, "
"dependency-only, and mention-only context unless owned product "
"evidence supports the same claim.\n"
"Return strict JSON only with this shape:\n"
"{\n"
' "abilities": [\n'
" {\n"
' "name": "...",\n'
' "description": "...",\n'
' "source_paths": ["README.md"],\n'
' "capabilities": [\n'
" {\n"
' "name": "...",\n'
' "description": "...",\n'
' "inputs": ["..."],\n'
' "outputs": ["..."],\n'
' "source_paths": ["..."],\n'
' "features": [{"name": "...", "type": "...", "location": "...", "source_paths": ["..."]}],\n'
' "evidence": [{"type": "documentation", "reference": "...", "strength": "medium", "source_paths": ["..."]}]\n'
" }\n"
" ]\n"
" }\n"
" ]\n"
"}\n"
"Do not invent unsupported claims. If sources are weak, keep names generic.\n\n"
f"Repository: {repository.name}\n"
f"Description: {repository.description or ''}\n\n"
f"{chunk_text}\n"
)
def _prompt_chunks(self, chunks: list[ContentChunk]) -> list[ContentChunk]:
promptable = [
chunk
for chunk in chunks
if self._source_role(chunk) not in {"agent_guidance", "derived_scope"}
]
return sorted(
promptable,
key=lambda chunk: (
self._source_role_priority(self._source_role(chunk)),
chunk.path,
chunk.start_line,
),
)[:12]
def _source_role(self, chunk: ContentChunk) -> str:
role = chunk.metadata.get("source_role")
if isinstance(role, str) and role:
return role
path = chunk.path.lower()
if path.endswith("intent.md"):
return "intent_summary"
if path.endswith("scope.md"):
return "derived_scope"
if path.endswith(("agents.md", "claude.md")) or "/.claude/" in path:
return "agent_guidance"
return ""
def _source_role_priority(self, source_role: str) -> int:
priorities = {
"intent_summary": 0,
"product_documentation": 1,
"implementation_source": 2,
"test_evidence": 3,
"configuration": 4,
"dependency_declaration": 5,
"ci_tooling": 6,
}
return priorities.get(source_role, 7)
def parse_response(self, content: str) -> list[ExtractedAbility]:
try:
payload = json.loads(self._json_text(content))
except json.JSONDecodeError as exc:
raise LLMExtractionError(f"LLM response was not valid JSON: {exc}") from exc
abilities = payload.get("abilities")
if not isinstance(abilities, list):
raise LLMExtractionError("LLM response must contain an abilities list")
return [self._ability(item) for item in abilities]
def _ability(self, item: dict[str, Any]) -> ExtractedAbility:
return ExtractedAbility(
name=self._required_str(item, "name"),
description=self._optional_str(item, "description"),
source_paths=self._str_list(item.get("source_paths")),
capabilities=[
self._capability(capability)
for capability in item.get("capabilities", [])
if isinstance(capability, dict)
],
)
def _capability(self, item: dict[str, Any]) -> ExtractedCapability:
return ExtractedCapability(
name=self._required_str(item, "name"),
description=self._optional_str(item, "description"),
inputs=self._str_list(item.get("inputs")),
outputs=self._str_list(item.get("outputs")),
source_paths=self._str_list(item.get("source_paths")),
features=[
self._feature(feature)
for feature in item.get("features", [])
if isinstance(feature, dict)
],
evidence=[
self._evidence(evidence)
for evidence in item.get("evidence", [])
if isinstance(evidence, dict)
],
)
def _feature(self, item: dict[str, Any]) -> ExtractedFeature:
return ExtractedFeature(
name=self._required_str(item, "name"),
type=self._required_str(item, "type"),
location=self._optional_str(item, "location"),
source_paths=self._str_list(item.get("source_paths")),
)
def _evidence(self, item: dict[str, Any]) -> ExtractedEvidence:
return ExtractedEvidence(
type=self._required_str(item, "type"),
reference=self._required_str(item, "reference"),
strength=self._optional_str(item, "strength") or "medium",
source_paths=self._str_list(item.get("source_paths")),
)
def _json_text(self, content: str) -> str:
stripped = content.strip()
if stripped.startswith("```"):
lines = stripped.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].startswith("```"):
lines = lines[:-1]
return "\n".join(lines).strip()
return stripped
def _required_str(self, item: dict[str, Any], key: str) -> str:
value = item.get(key)
if not isinstance(value, str) or not value.strip():
raise LLMExtractionError(f"Missing required string field: {key}")
return value.strip()
def _optional_str(self, item: dict[str, Any], key: str) -> str:
value = item.get(key, "")
return value.strip() if isinstance(value, str) else ""
def _str_list(self, value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [item.strip() for item in value if isinstance(item, str) and item.strip()]
def _default_run_config(self) -> Any:
try:
from llm_connect import RunConfig
except ModuleNotFoundError:
return None
return RunConfig(temperature=0.1, max_tokens=2000)
def create_llm_connect_adapter(
provider: str,
model: str | None = None,
**kwargs: Any,
) -> LLMAdapterLike:
try:
from llm_connect import create_adapter
except ModuleNotFoundError as exc:
raise LLMExtractionError(
"llm-connect is not installed. Install the sibling project with "
"`python -m pip install -e ../llm-connect`."
) from exc
return create_adapter(provider, model=model, **kwargs)

View File

@@ -0,0 +1,145 @@
from __future__ import annotations
from repo_registry.candidate_graph.generator import (
CandidateAbilityDraft,
CandidateCapabilityDraft,
CandidateEvidenceDraft,
CandidateFeatureDraft,
)
from repo_registry.core.models import ContentChunk, ObservedFact, SourceReference
from repo_registry.llm_extraction.extractor import ExtractedAbility
class LLMExtractionMapper:
"""Map structured LLM extraction drafts into reviewable candidate drafts."""
def map(
self,
abilities: list[ExtractedAbility],
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateAbilityDraft]:
return [
CandidateAbilityDraft(
name=ability.name,
description=ability.description,
confidence=self._confidence(ability.source_paths, facts, chunks, 0.45),
source_refs=self._source_refs(ability.source_paths, facts, chunks),
capabilities=[
CandidateCapabilityDraft(
name=capability.name,
description=capability.description,
inputs=capability.inputs,
outputs=capability.outputs,
confidence=self._confidence(
capability.source_paths,
facts,
chunks,
0.5,
),
source_refs=self._source_refs(
capability.source_paths,
facts,
chunks,
),
features=[
CandidateFeatureDraft(
name=feature.name,
type=feature.type,
location=feature.location,
confidence=self._confidence(
feature.source_paths or [feature.location],
facts,
chunks,
0.45,
),
source_refs=self._source_refs(
feature.source_paths or [feature.location],
facts,
chunks,
),
)
for feature in capability.features
],
evidence=[
CandidateEvidenceDraft(
type=evidence.type,
reference=evidence.reference,
strength=evidence.strength,
source_refs=self._source_refs(
evidence.source_paths or [evidence.reference],
facts,
chunks,
),
)
for evidence in capability.evidence
],
)
for capability in ability.capabilities
],
)
for ability in abilities
]
def _confidence(
self,
source_paths: list[str],
facts: list[ObservedFact],
chunks: list[ContentChunk],
base: float,
) -> float:
refs = self._source_refs(source_paths, facts, chunks)
if not refs:
return base
fact_kinds = {ref.kind for ref in refs}
score = base + 0.15
if "documentation" in fact_kinds:
score += 0.10
if "test" in fact_kinds or "example" in fact_kinds:
score += 0.10
if "interface" in fact_kinds:
score += 0.10
return min(0.95, round(score, 2))
def _source_refs(
self,
source_paths: list[str],
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[SourceReference]:
refs: list[SourceReference] = []
seen: set[tuple[int | None, str, str, int | None]] = set()
for path in source_paths:
normalized = path.split(":", 1)[0]
for fact in facts:
if fact.path != normalized:
continue
ref = SourceReference(
fact_id=fact.id,
path=fact.path,
kind=fact.kind,
name=fact.name,
line=fact.metadata.get("line"),
)
key = (ref.fact_id, ref.path, ref.kind, ref.line)
if key not in seen:
seen.add(key)
refs.append(ref)
if any(ref.path == normalized for ref in refs):
continue
for chunk in chunks:
if chunk.path != normalized:
continue
ref = SourceReference(
fact_id=None,
path=chunk.path,
kind=chunk.kind,
name=chunk.path,
line=chunk.start_line,
)
key = (ref.fact_id, ref.path, ref.kind, ref.line)
if key not in seen:
seen.add(key)
refs.append(ref)
break
return refs

View File

@@ -0,0 +1 @@
"""Repository checkout and ingestion helpers."""

View File

@@ -0,0 +1,181 @@
from __future__ import annotations
import hashlib
import shutil
import subprocess
import os
from dataclasses import dataclass
from pathlib import Path
from base64 import b64encode
from urllib.parse import urlparse
@dataclass(frozen=True)
class Checkout:
source_path: Path
was_cloned: bool
class GitIngestionService:
def __init__(self, checkout_root: str | Path = "var/checkouts") -> None:
self.checkout_root = Path(checkout_root)
def resolve(
self,
url_or_path: str,
*,
branch: str = "main",
access_username: str | None = None,
access_password: str | None = None,
) -> Checkout:
local_path = self._local_path(url_or_path)
if local_path is not None:
return Checkout(source_path=local_path.resolve(), was_cloned=False)
checkout_path = self.checkout_root / self._checkout_key(url_or_path)
self.checkout_root.mkdir(parents=True, exist_ok=True)
if checkout_path.exists():
self._run_git(
["fetch", "--all", "--prune"],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
else:
self._run_git(
["clone", url_or_path, str(checkout_path)],
cwd=None,
access_username=access_username,
access_password=access_password,
)
self._checkout_branch(
checkout_path,
branch,
access_username=access_username,
access_password=access_password,
)
return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def cached_checkout(self, url_or_path: str) -> Checkout | None:
local_path = self._local_path(url_or_path)
if local_path is not None:
return Checkout(source_path=local_path.resolve(), was_cloned=False)
checkout_path = self.checkout_root / self._checkout_key(url_or_path)
if not checkout_path.exists():
return None
return Checkout(source_path=checkout_path.resolve(), was_cloned=True)
def _checkout_branch(
self,
checkout_path: Path,
branch: str,
*,
access_username: str | None = None,
access_password: str | None = None,
) -> None:
if branch:
self._run_git(
["checkout", branch],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
self._run_git(
["pull", "--ff-only"],
cwd=checkout_path,
access_username=access_username,
access_password=access_password,
)
def _local_path(self, value: str) -> Path | None:
parsed = urlparse(value)
if parsed.scheme:
return None
path = Path(value).expanduser()
if path.exists():
return path
return None
def _checkout_key(self, url: str) -> str:
parsed = urlparse(url)
name = Path(parsed.path.rstrip("/")).name or "repository"
if name.endswith(".git"):
name = name[:-4]
digest = hashlib.sha256(url.encode("utf-8")).hexdigest()[:12]
return f"{self._safe_name(name)}-{digest}"
def _safe_name(self, value: str) -> str:
safe = "".join(char if char.isalnum() or char in "-_" else "-" for char in value)
return safe.strip("-") or "repository"
def _run_git(
self,
args: list[str],
*,
cwd: Path | None,
access_username: str | None = None,
access_password: str | None = None,
) -> None:
if shutil.which("git") is None:
raise RuntimeError("git executable was not found")
auth_config = self._auth_config(access_username, access_password)
command = ["git", *auth_config, *args]
env = {
**os.environ,
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": "echo",
}
try:
result = subprocess.run(
command,
cwd=cwd,
check=False,
capture_output=True,
text=True,
timeout=120,
env=env,
)
except subprocess.TimeoutExpired as exc:
raise RuntimeError(
f"git {' '.join(args)} timed out after {exc.timeout} seconds. "
"If this is a private repository, provide HTTP access credentials."
) from exc
if result.returncode != 0:
message = result.stderr.strip() or result.stdout.strip()
if self._looks_like_auth_failure(message):
raise RuntimeError(
f"git {' '.join(args)} failed: authentication required. "
"Provide a username and password or access token for this repository."
)
raise RuntimeError(f"git {' '.join(args)} failed: {message}")
def _auth_config(
self,
access_username: str | None,
access_password: str | None,
) -> list[str]:
if not access_username or not access_password:
return []
token = b64encode(
f"{access_username}:{access_password}".encode("utf-8")
).decode("ascii")
return ["-c", f"http.extraHeader=Authorization: Basic {token}"]
def _looks_like_auth_failure(self, message: str) -> bool:
lowered = message.lower()
return any(
phrase in lowered
for phrase in (
"authentication failed",
"could not read username",
"could not read password",
"terminal prompts disabled",
"authentication required",
"access denied",
"401",
"403",
)
)

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
import json
import tomllib
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlparse
@dataclass(frozen=True)
class RepositoryMetadata:
name: str
description: str | None
class RepositoryMetadataExtractor:
def extract(self, source_path: str | Path, url: str) -> RepositoryMetadata:
root = Path(source_path)
pyproject = self._from_pyproject(root)
package = self._from_package_json(root)
readme = self._from_readme(root)
fallback_name = self._name_from_url_or_path(url)
return RepositoryMetadata(
name=pyproject.name or package.name or readme.name or fallback_name,
description=(
pyproject.description
or package.description
or readme.description
),
)
def _from_pyproject(self, root: Path) -> RepositoryMetadata:
path = root / "pyproject.toml"
if not path.exists():
return RepositoryMetadata(name="", description=None)
try:
project = tomllib.loads(path.read_text(encoding="utf-8")).get("project", {})
except (OSError, tomllib.TOMLDecodeError):
return RepositoryMetadata(name="", description=None)
return RepositoryMetadata(
name=str(project.get("name") or ""),
description=project.get("description"),
)
def _from_package_json(self, root: Path) -> RepositoryMetadata:
path = root / "package.json"
if not path.exists():
return RepositoryMetadata(name="", description=None)
try:
package = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return RepositoryMetadata(name="", description=None)
return RepositoryMetadata(
name=str(package.get("name") or ""),
description=package.get("description"),
)
def _from_readme(self, root: Path) -> RepositoryMetadata:
for readme in sorted(root.glob("README*")):
if not readme.is_file():
continue
try:
lines = readme.read_text(encoding="utf-8", errors="ignore").splitlines()
except OSError:
continue
title = ""
for line in lines:
stripped = line.strip()
cleaned = stripped.strip("#").strip()
if stripped.startswith("#") and cleaned and not title:
title = cleaned
continue
if cleaned:
return RepositoryMetadata(name=title, description=cleaned)
if title:
return RepositoryMetadata(name=title, description=None)
return RepositoryMetadata(name="", description=None)
def _name_from_url_or_path(self, value: str) -> str:
parsed = urlparse(value)
path = parsed.path if parsed.scheme else value
name = Path(path.rstrip("/")).name or "repository"
if name.endswith(".git"):
name = name[:-4]
return name or "repository"

View File

@@ -0,0 +1 @@
"""Deterministic repository scanning."""

View File

@@ -0,0 +1,574 @@
from __future__ import annotations
import subprocess
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
IGNORED_DIRS = {
".git",
".hg",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".tox",
".venv",
"__pycache__",
"build",
"dist",
"node_modules",
"target",
"var",
"vendor",
}
LANGUAGE_BY_EXTENSION = {
".go": "Go",
".java": "Java",
".js": "JavaScript",
".jsx": "JavaScript",
".kt": "Kotlin",
".php": "PHP",
".py": "Python",
".rb": "Ruby",
".rs": "Rust",
".ts": "TypeScript",
".tsx": "TypeScript",
}
MANIFEST_FRAMEWORK_HINTS = {
"pyproject.toml": {
"fastapi": "FastAPI",
"django": "Django",
"flask": "Flask",
"typer": "Typer",
"click": "Click",
"pytest": "pytest",
},
"requirements.txt": {
"fastapi": "FastAPI",
"django": "Django",
"flask": "Flask",
"typer": "Typer",
"click": "Click",
"pytest": "pytest",
},
"package.json": {
"next": "Next.js",
"react": "React",
"express": "Express",
"vite": "Vite",
"jest": "Jest",
"vitest": "Vitest",
},
"Cargo.toml": {
"axum": "Axum",
"actix-web": "Actix Web",
"clap": "Clap",
"tokio": "Tokio",
},
}
LLM_PROVIDER_HINTS = {
"openrouter": "OpenRouter",
"anthropic": "Anthropic",
"claude": "Claude",
"openai": "OpenAI",
"gemini": "Gemini",
"google-generativeai": "Gemini",
}
LLM_CREDENTIAL_HINTS = {
"OPENROUTER_API_KEY": "OpenRouter API key",
"ANTHROPIC_API_KEY": "Anthropic API key",
"OPENAI_API_KEY": "OpenAI API key",
"GEMINI_API_KEY": "Gemini API key",
"GOOGLE_API_KEY": "Google API key",
}
AGENT_GUIDANCE_FILES = {
"agents.md",
"claude.md",
}
AGENT_GUIDANCE_DIRS = {
".claude",
".codex",
".cursor",
}
@dataclass(frozen=True)
class FactCandidate:
kind: str
name: str
path: str = ""
value: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class ScanResult:
source_path: str
commit_hash: str
branch: str
file_count: int
facts: list[FactCandidate]
class DeterministicScanner:
version = "deterministic-v0.1"
def scan(self, source_path: str | Path) -> ScanResult:
root = Path(source_path).expanduser().resolve()
if not root.exists() or not root.is_dir():
raise ValueError(f"source path does not exist or is not a directory: {root}")
files = list(self._iter_files(root))
facts: list[FactCandidate] = []
facts.extend(self._language_facts(files, root))
facts.extend(self._classified_file_facts(files, root))
facts.extend(self._framework_facts(files, root))
facts.extend(self._interface_facts(files, root))
facts.extend(self._llm_provider_facts(files, root))
return ScanResult(
source_path=str(root),
commit_hash=self._git_value(root, "rev-parse", "HEAD") or "working-tree",
branch=self._git_value(root, "branch", "--show-current") or "unknown",
file_count=len(files),
facts=sorted(facts, key=lambda fact: (fact.kind, fact.path, fact.name)),
)
def _iter_files(self, root: Path) -> list[Path]:
files: list[Path] = []
for path in root.rglob("*"):
if not path.is_file():
continue
relative_parts = path.relative_to(root).parts
if any(part in IGNORED_DIRS for part in relative_parts):
continue
files.append(path)
return files
def _language_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
counts: dict[str, int] = {}
for path in files:
language = LANGUAGE_BY_EXTENSION.get(path.suffix)
if language is None:
continue
counts[language] = counts.get(language, 0) + 1
return [
FactCandidate(
kind="language",
name=language,
value=str(count),
metadata={"file_count": count, "source_role": "implementation_source"},
)
for language, count in counts.items()
]
def _classified_file_facts(
self, files: list[Path], root: Path
) -> list[FactCandidate]:
facts: list[FactCandidate] = []
for path in files:
relative = path.relative_to(root).as_posix()
lower = relative.lower()
name = path.name.lower()
source_role = self._source_role(relative)
if name == "intent.md":
facts.append(
FactCandidate(
"intent",
"INTENT",
relative,
metadata={"source_role": "intent_summary"},
)
)
elif name == "scope.md":
facts.append(
FactCandidate(
"scope",
"SCOPE",
relative,
metadata={"source_role": "derived_scope"},
)
)
elif name.startswith("readme"):
facts.append(
FactCandidate(
"documentation",
"README",
relative,
metadata={"source_role": "product_documentation"},
)
)
elif lower.startswith("docs/") or lower.startswith("doc/"):
facts.append(
FactCandidate(
"documentation",
path.name,
relative,
metadata={"source_role": "product_documentation"},
)
)
if lower.startswith("examples/") or lower.startswith("example/"):
facts.append(
FactCandidate(
"example",
path.name,
relative,
metadata={"source_role": "product_documentation"},
)
)
if (
lower.startswith("tests/")
or lower.startswith("test/")
or name.startswith("test_")
or name.endswith("_test.py")
or name.endswith(".test.ts")
or name.endswith(".spec.ts")
):
facts.append(
FactCandidate(
"test",
path.name,
relative,
metadata={"source_role": "test_evidence"},
)
)
if name in MANIFEST_FRAMEWORK_HINTS or name in {
"requirements.txt",
"poetry.lock",
"package-lock.json",
"pnpm-lock.yaml",
"yarn.lock",
"go.mod",
}:
facts.append(
FactCandidate(
"manifest",
path.name,
relative,
metadata={"source_role": "dependency_declaration"},
)
)
if lower.endswith((".yaml", ".yml", ".toml", ".ini", ".env.example")):
facts.append(
FactCandidate(
"config",
path.name,
relative,
metadata={"source_role": source_role},
)
)
return facts
def _framework_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
facts: list[FactCandidate] = []
seen: set[tuple[str, str]] = set()
for path in files:
hints = MANIFEST_FRAMEWORK_HINTS.get(path.name)
if hints is None:
continue
try:
text = path.read_text(encoding="utf-8", errors="ignore").lower()
except OSError:
continue
for needle, framework in hints.items():
if needle not in text:
continue
key = (framework, path.relative_to(root).as_posix())
if key in seen:
continue
seen.add(key)
facts.append(
FactCandidate(
kind="framework",
name=framework,
path=path.relative_to(root).as_posix(),
metadata={
"source": "manifest_hint",
"needle": needle,
"source_role": "dependency_declaration",
},
)
)
return facts
def _interface_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
facts: list[FactCandidate] = []
for path in files:
relative = path.relative_to(root).as_posix()
lower = relative.lower()
if path.suffix == ".py":
facts.extend(self._python_interface_facts(path, relative))
if "cli" in lower or lower.endswith("/commands.py"):
facts.append(
FactCandidate(
"interface",
"possible CLI",
relative,
metadata={"source_role": self._source_role(relative)},
)
)
if "routes" in lower or "api" in lower:
facts.append(
FactCandidate(
"interface",
"possible API surface",
relative,
metadata={"source_role": self._source_role(relative)},
)
)
return facts
def _llm_provider_facts(self, files: list[Path], root: Path) -> list[FactCandidate]:
facts: list[FactCandidate] = []
seen: set[tuple[str, str, str]] = set()
for path in files:
if path.suffix.lower() not in {
".py",
".ts",
".js",
".json",
".toml",
".yaml",
".yml",
".md",
".txt",
".env",
} and not path.name.lower().startswith(".env"):
continue
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue
lower_text = text.lower()
relative = path.relative_to(root).as_posix()
source_role = self._source_role(relative)
if source_role == "agent_guidance":
continue
utility_relationship = self._provider_utility_relationship(
source_role,
relative,
)
for needle, provider in LLM_PROVIDER_HINTS.items():
if not self._has_provider_signal(lower_text, needle):
continue
self._append_once(
facts,
seen,
FactCandidate(
kind="llm_provider",
name=provider,
path=relative,
value=needle,
metadata={
"source": "provider_hint",
"source_role": source_role,
"utility_relationship": utility_relationship,
},
),
)
for env_name, label in LLM_CREDENTIAL_HINTS.items():
if env_name.lower() not in lower_text:
continue
self._append_once(
facts,
seen,
FactCandidate(
kind="credential_config",
name=label,
path=relative,
value=env_name,
metadata={
"source": "environment_variable",
"source_role": source_role,
"utility_relationship": "configure",
},
),
)
registry_hint = (
"provider_registry" in lower_text
or "providers =" in lower_text
or ("adapter" in lower_text and source_role == "implementation_source")
)
if registry_hint:
if any(
self._has_provider_signal(lower_text, needle)
for needle in LLM_PROVIDER_HINTS
):
self._append_once(
facts,
seen,
FactCandidate(
kind="provider_registry",
name="LLM provider registry",
path=relative,
metadata={
"source": "provider_registry_hint",
"source_role": source_role,
"utility_relationship": utility_relationship,
},
),
)
if "fallback" in lower_text and any(
self._has_provider_signal(lower_text, needle)
for needle in LLM_PROVIDER_HINTS
):
self._append_once(
facts,
seen,
FactCandidate(
kind="fallback_policy",
name="LLM provider fallback policy",
path=relative,
metadata={
"source": "fallback_hint",
"source_role": source_role,
"utility_relationship": utility_relationship,
},
),
)
return facts
def _provider_utility_relationship(
self,
source_role: str,
relative_path: str,
) -> str:
if source_role == "implementation_source":
lower = relative_path.lower()
if "adapter" in lower or "provider" in lower:
return "adapter"
return "owned"
if source_role == "configuration":
return "configure"
if source_role == "dependency_declaration":
return "dependency"
if source_role in {"ci_tooling", "agent_guidance"}:
return "tooling"
return "mention"
def _source_role(self, relative_path: str) -> str:
lower = relative_path.lower()
parts = lower.split("/")
name = parts[-1]
if name == "intent.md":
return "intent_summary"
if name == "scope.md":
return "derived_scope"
if name in AGENT_GUIDANCE_FILES or any(part in AGENT_GUIDANCE_DIRS for part in parts):
return "agent_guidance"
if lower.startswith((".github/workflows/", ".gitea/workflows/")):
return "ci_tooling"
if lower.startswith(("tests/", "test/")) or name.startswith("test_"):
return "test_evidence"
if (
name.startswith("readme")
or name.endswith(".md")
or lower.startswith(("docs/", "doc/", "wiki/", "workplans/", "architecture/"))
):
return "product_documentation"
if name in MANIFEST_FRAMEWORK_HINTS or name.endswith((".lock", ".mod")):
return "dependency_declaration"
if lower.endswith((".yaml", ".yml", ".toml", ".ini", ".env.example")):
return "configuration"
return "implementation_source"
def _has_provider_signal(self, lower_text: str, needle: str) -> bool:
if f"{needle.lower()}_api_key" in lower_text:
return True
pattern = re.compile(rf"(?<![a-z0-9_-]){re.escape(needle.lower())}(?![a-z0-9_-])")
for match in pattern.finditer(lower_text):
context = lower_text[max(0, match.start() - 20) : match.end() + 20]
if needle == "claude" and (
"claude.md" in context
or "claude code" in context
or "claude.ai/code" in context
or "claude mcp" in context
or "mcp" in context
or ".claude" in context
or "claude.json" in context
or "claude plugin" in context
or "claude prompt" in context
):
continue
return True
return False
def _append_once(
self,
facts: list[FactCandidate],
seen: set[tuple[str, str, str]],
fact: FactCandidate,
) -> None:
key = (fact.kind, fact.name, fact.path)
if key in seen:
return
seen.add(key)
facts.append(fact)
def _python_interface_facts(self, path: Path, relative: str) -> list[FactCandidate]:
facts: list[FactCandidate] = []
try:
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
except OSError:
return facts
for line_number, line in enumerate(lines, start=1):
stripped = line.strip()
if stripped.startswith("@app.") or stripped.startswith("@router."):
facts.append(
FactCandidate(
kind="interface",
name="python route decorator",
path=relative,
value=stripped,
metadata={
"line": line_number,
"source_role": self._source_role(relative),
},
)
)
elif stripped.startswith("@click.command") or stripped.startswith("@app.command"):
facts.append(
FactCandidate(
kind="interface",
name="python CLI command decorator",
path=relative,
value=stripped,
metadata={
"line": line_number,
"source_role": self._source_role(relative),
},
)
)
return facts
def _git_value(self, root: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=root,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.SubprocessError):
return None
if result.returncode != 0:
return None
return result.stdout.strip() or None

View File

@@ -0,0 +1,4 @@
from repo_registry.scope.generator import ScopeGenerator
from repo_registry.scope.validator import ScopeValidator
__all__ = ["ScopeGenerator", "ScopeValidator"]

View File

@@ -0,0 +1,323 @@
from __future__ import annotations
import re
from dataclasses import asdict
from repo_registry.core.service import RegistryService
from repo_registry.storage.sqlite import NotFoundError
SCOPE_SECTIONS = [
"One-liner",
"Core Idea",
"In Scope",
"Out of Scope",
"Relevant When",
"Not Relevant When",
"Current State",
"How It Fits",
"Terminology",
"Related / Overlapping",
"Getting Oriented",
"Provided Capabilities",
"Notes",
]
NEEDS_INPUT = "<!-- needs curator input -->"
class ScopeGenerator:
"""Render SCOPE.md from approved repository characteristics."""
def __init__(self, service: RegistryService) -> None:
self.service = service
def generate(self, repo_slug: str) -> str:
repository = self._repository_by_slug(repo_slug)
ability_map = asdict(self.service.ability_map(repository.id))
facts = [asdict(fact) for fact in self.service.list_observed_facts(repository.id)]
sections = {
"One-liner": self._one_liner(ability_map),
"Core Idea": self._core_idea(ability_map),
"In Scope": self._in_scope(ability_map),
"Out of Scope": self._curator_stub(),
"Relevant When": self._relevant_when(ability_map),
"Not Relevant When": self._curator_stub(),
"Current State": self._current_state(repository.status, facts),
"How It Fits": self._how_it_fits(ability_map),
"Terminology": self._terminology(ability_map, facts),
"Related / Overlapping": self._curator_stub(),
"Getting Oriented": self._getting_oriented(ability_map, facts),
"Provided Capabilities": self._provided_capabilities(ability_map),
"Notes": self._curator_stub(),
}
lines = [
"# SCOPE",
"",
"> This file helps you quickly understand what this repository is about,",
"> when it is relevant, and when it is not.",
"> It was generated from approved repo-scoping characteristics.",
"",
"---",
"",
]
for section in SCOPE_SECTIONS:
lines.extend([f"## {section}", "", sections[section].rstrip(), "", "---", ""])
return "\n".join(lines).rstrip() + "\n"
def _repository_by_slug(self, repo_slug: str):
wanted = self._slug(repo_slug)
for repository in self.service.list_repositories():
candidates = {
self._slug(repository.name),
self._slug(repository.url.rstrip("/").rsplit("/", 1)[-1].removesuffix(".git")),
}
if wanted in candidates:
return repository
raise NotFoundError(f"repository slug {repo_slug!r} was not found")
def _one_liner(self, ability_map: dict) -> str:
scope = ability_map["scope"]
description = self._sentence(scope.get("description", ""))
if description:
return description
return f"{scope['name']} defines the repository scope for {ability_map['repository']['name']}."
def _core_idea(self, ability_map: dict) -> str:
scope = ability_map["scope"]
abilities = ability_map.get("abilities", [])
lines = [scope.get("description") or self._one_liner(ability_map)]
if abilities:
lines.append("")
lines.append("Approved abilities:")
lines.extend(
f"- {ability['name']}{ability.get('description') or 'Approved repository ability.'}"
for ability in abilities[:5]
)
else:
lines.extend(["", NEEDS_INPUT])
return "\n".join(lines)
def _in_scope(self, ability_map: dict) -> str:
abilities = ability_map.get("abilities", [])
if not abilities:
return self._curator_stub()
lines = []
for ability in abilities:
capabilities = ", ".join(
capability["name"] for capability in ability.get("capabilities", [])[:4]
)
suffix = f" Includes {capabilities}." if capabilities else ""
lines.append(
f"- {ability['name']}{ability.get('description') or 'Approved ability.'}{suffix}"
)
return "\n".join(lines)
def _relevant_when(self, ability_map: dict) -> str:
features = [
feature
for feature in self._features(ability_map)
if self._is_usecase_feature(feature)
]
if not features:
features = self._features(ability_map)[:5]
if not features:
return self._curator_stub()
lines = [
f"- You need {feature['name']} ({feature.get('primary_class') or feature.get('type', 'feature')})."
for feature in features
]
if not any(self._is_usecase_feature(feature) for feature in features):
lines.append(NEEDS_INPUT)
return "\n".join(lines)
def _current_state(self, status: str, facts: list[dict]) -> str:
kinds = self._facts_by_kind(facts)
languages = self._fact_names(kinds.get("language", []))
frameworks = self._fact_names(kinds.get("framework", []))
tests = kinds.get("test", [])
interfaces = kinds.get("interface", [])
manifests = kinds.get("manifest", [])
implementation = "substantial" if interfaces or manifests else "partial"
if not facts:
implementation = "unknown"
lines = [
f"- Status: {status}",
f"- Implementation: {implementation}",
"- Stability: evolving",
"- Usage: internal",
f"- Languages: {', '.join(languages) if languages else 'unknown'}",
f"- Frameworks: {', '.join(frameworks) if frameworks else 'none detected'}",
f"- Tests observed: {len(tests)}",
f"- Interfaces observed: {len(interfaces)}",
f"- Manifests observed: {len(manifests)}",
]
if not facts:
lines.append(NEEDS_INPUT)
return "\n".join(lines)
def _how_it_fits(self, ability_map: dict) -> str:
evidence = [
item
for capability in self._capabilities(ability_map)
for item in capability.get("evidence", [])
]
if not evidence:
return "\n".join(
[
"- Upstream dependencies: " + NEEDS_INPUT,
"- Downstream consumers: " + NEEDS_INPUT,
"- Often used with: " + NEEDS_INPUT,
]
)
refs = ", ".join(
sorted({item.get("reference", "") for item in evidence if item.get("reference")})[:8]
)
return "\n".join(
[
f"- Supported by evidence references: {refs or 'available evidence'}",
"- Upstream dependencies: " + NEEDS_INPUT,
"- Downstream consumers: " + NEEDS_INPUT,
"- Often used with: " + NEEDS_INPUT,
]
)
def _terminology(self, ability_map: dict, facts: list[dict]) -> str:
terms = set()
for item in [ability_map["scope"], *ability_map.get("abilities", [])]:
terms.add(item.get("name", ""))
terms.add(item.get("primary_class", ""))
terms.update(item.get("attributes", []))
for capability in self._capabilities(ability_map):
terms.add(capability.get("name", ""))
terms.add(capability.get("primary_class", ""))
terms.update(capability.get("attributes", []))
for fact in facts:
if fact.get("kind") in {"framework", "llm_provider", "provider_registry"}:
terms.add(fact.get("name", ""))
visible = [term for term in sorted(terms) if term]
if not visible:
return self._curator_stub()
return "\n".join(
[
"- Preferred terms: " + ", ".join(visible[:12]),
"- Also known as: " + NEEDS_INPUT,
"- Potentially confusing terms: " + NEEDS_INPUT,
]
)
def _getting_oriented(self, ability_map: dict, facts: list[dict]) -> str:
paths = self._source_paths(ability_map, facts)
if not paths:
return self._curator_stub()
return "\n".join(
[
f"- Start with: {paths[0]}",
f"- Key files / directories: {', '.join(paths[:8])}",
f"- Entry points: {', '.join(paths[:5])}",
]
)
def _provided_capabilities(self, ability_map: dict) -> str:
capabilities = self._capabilities(ability_map)
if not capabilities:
return f"<!-- No approved capabilities yet. -->\n{NEEDS_INPUT}"
blocks = []
for capability in capabilities:
keywords = self._keywords_for_capability(capability)
blocks.append(
"\n".join(
[
"```capability",
f"type: {self._capability_type(capability.get('primary_class', 'other'))}",
f"title: {capability['name']}",
"description: >",
f" {capability.get('description') or 'Approved repository capability.'}",
f"keywords: [{', '.join(keywords)}]",
"```",
]
)
)
return "\n\n".join(blocks)
def _capabilities(self, ability_map: dict) -> list[dict]:
return [
capability
for ability in ability_map.get("abilities", [])
for capability in ability.get("capabilities", [])
]
def _features(self, ability_map: dict) -> list[dict]:
return [
feature
for capability in self._capabilities(ability_map)
for feature in capability.get("features", [])
]
def _is_usecase_feature(self, feature: dict) -> bool:
labels = {str(feature.get("primary_class", "")).lower()}
labels.update(str(item).lower() for item in feature.get("attributes", []))
return bool(labels & {"business-usecase", "usecase", "workflow", "review"})
def _keywords_for_capability(self, capability: dict) -> list[str]:
keywords = [capability.get("primary_class", "")]
keywords.extend(capability.get("attributes", []))
for feature in capability.get("features", []):
keywords.append(feature.get("primary_class", ""))
keywords.extend(feature.get("attributes", []))
return [self._keyword(item) for item in self._unique(keywords)[:8] if item]
def _capability_type(self, primary_class: str) -> str:
normalized = primary_class.lower()
if normalized in {"api", "infrastructure", "data", "security", "documentation"}:
return normalized
if normalized in {"interface", "integration", "llm-integration"}:
return "api"
if normalized in {"storage", "repository-structure"}:
return "data"
return "other"
def _facts_by_kind(self, facts: list[dict]) -> dict[str, list[dict]]:
grouped: dict[str, list[dict]] = {}
for fact in facts:
grouped.setdefault(fact.get("kind", ""), []).append(fact)
return grouped
def _fact_names(self, facts: list[dict]) -> list[str]:
return self._unique([fact.get("name", "") for fact in facts])
def _source_paths(self, ability_map: dict, facts: list[dict]) -> list[str]:
paths = [fact.get("path", "") for fact in facts if fact.get("path")]
for feature in self._features(ability_map):
paths.append(feature.get("location", ""))
for source_ref in feature.get("source_refs", []):
paths.append(source_ref.get("path", ""))
return self._unique(paths)
def _curator_stub(self) -> str:
return f"- {NEEDS_INPUT}"
def _sentence(self, text: str) -> str:
cleaned = re.sub(r"\s+", " ", text.strip())
if not cleaned:
return ""
return re.split(r"(?<=[.!?])\s+", cleaned, maxsplit=1)[0]
def _slug(self, value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
def _keyword(self, value: str) -> str:
return self._slug(value) or "other"
def _unique(self, values: list[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = str(value).strip()
key = item.lower()
if not item or key in seen:
continue
seen.add(key)
result.append(item)
return result

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path
from repo_registry.scope.generator import SCOPE_SECTIONS, ScopeGenerator
@dataclass(frozen=True)
class ScopeDiffSection:
section: str
status: str
current_text: str | None
proposed_text: str | None
@dataclass(frozen=True)
class ScopeDiff:
sections: list[ScopeDiffSection]
@property
def needs_update(self) -> bool:
return any(section.status != "ok" for section in self.sections)
@dataclass(frozen=True)
class ScopeValidationIssue:
check: str
severity: str
message: str
@dataclass(frozen=True)
class ValidationResult:
issues: list[ScopeValidationIssue]
@property
def ok(self) -> bool:
return not any(issue.severity == "error" for issue in self.issues)
class ScopeValidator:
"""Validate and diff SCOPE.md files."""
def __init__(self, generator: ScopeGenerator | None = None) -> None:
self.generator = generator
def diff(self, repo_slug: str, existing_path: Path) -> ScopeDiff:
if self.generator is None:
raise ValueError("ScopeValidator.diff requires a ScopeGenerator")
current = existing_path.read_text(encoding="utf-8") if existing_path.exists() else ""
proposed = self.generator.generate(repo_slug)
current_sections = self._parse_sections(current)
proposed_sections = self._parse_sections(proposed)
sections: list[ScopeDiffSection] = []
for section in SCOPE_SECTIONS:
current_text = current_sections.get(section)
proposed_text = proposed_sections.get(section, "")
if current_text is None:
status = "missing"
elif self._normalize(current_text) == self._normalize(proposed_text):
status = "ok"
else:
status = "stale"
sections.append(
ScopeDiffSection(
section=section,
status=status,
current_text=current_text,
proposed_text=proposed_text,
)
)
return ScopeDiff(sections=sections)
def validate(self, path: Path) -> ValidationResult:
issues: list[ScopeValidationIssue] = []
if not path.exists():
return ValidationResult(
issues=[
ScopeValidationIssue(
check="C5a",
severity="error",
message="SCOPE.md is missing.",
)
]
)
content = path.read_text(encoding="utf-8")
sections = self._parse_sections(content)
missing = [section for section in SCOPE_SECTIONS if section not in sections]
if missing:
severity = "warn" if missing == ["Provided Capabilities"] else "error"
issues.append(
ScopeValidationIssue(
check="C5b",
severity=severity,
message=f"Missing SCOPE.md section(s): {', '.join(missing)}.",
)
)
ordered = self._heading_order(content)
expected_order = [section for section in SCOPE_SECTIONS if section in sections]
if ordered[: len(expected_order)] != expected_order:
issues.append(
ScopeValidationIssue(
check="C5b",
severity="warn",
message="SCOPE.md sections are not in canonical order.",
)
)
capabilities = sections.get("Provided Capabilities")
if capabilities is None:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message="Provided Capabilities section is missing.",
)
)
elif "```capability" in capabilities:
for index, block in enumerate(self._capability_blocks(capabilities), start=1):
keys = self._capability_keys(block)
missing_keys = {"type", "title"} - keys
if missing_keys:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message=(
f"Capability block {index} is missing required field(s): "
f"{', '.join(sorted(missing_keys))}."
),
)
)
elif "No approved capabilities yet" not in capabilities:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message=(
"Provided Capabilities has no capability blocks or explicit "
"empty-state note."
),
)
)
return ValidationResult(issues=issues)
def _parse_sections(self, content: str) -> dict[str, str]:
matches = list(re.finditer(r"^##\s+(.+?)\s*$", content, re.MULTILINE))
sections: dict[str, str] = {}
for index, match in enumerate(matches):
title = match.group(1).strip()
start = match.end()
end = matches[index + 1].start() if index + 1 < len(matches) else len(content)
body = content[start:end]
body = re.sub(r"\n---\s*$", "", body.strip())
sections[title] = body.strip()
return sections
def _heading_order(self, content: str) -> list[str]:
return [
match.group(1).strip()
for match in re.finditer(r"^##\s+(.+?)\s*$", content, re.MULTILINE)
if match.group(1).strip() in SCOPE_SECTIONS
]
def _normalize(self, value: str | None) -> str:
if value is None:
return ""
without_comments = re.sub(r"<!--.*?-->", "", value, flags=re.DOTALL)
without_markdown = re.sub(r"[`*_>#-]+", " ", without_comments)
return re.sub(r"\s+", " ", without_markdown).strip().lower()
def _capability_blocks(self, content: str) -> list[str]:
return re.findall(
r"```capability\s*(.*?)```",
content,
flags=re.DOTALL | re.IGNORECASE,
)
def _capability_keys(self, block: str) -> set[str]:
return {
match.group(1)
for match in re.finditer(r"^([A-Za-z_][A-Za-z0-9_-]*):", block, re.MULTILINE)
}

View File

@@ -0,0 +1,13 @@
from repo_registry.self_scoping.assessment import export_assessment_artifact
from repo_registry.self_scoping.comparison import compare_assessment_to_golden
from repo_registry.self_scoping.review_store import (
record_assessment_outcome,
record_assessment_pair_outcome,
)
__all__ = [
"compare_assessment_to_golden",
"export_assessment_artifact",
"record_assessment_outcome",
"record_assessment_pair_outcome",
]

View File

@@ -0,0 +1,478 @@
from __future__ import annotations
import json
import subprocess
from collections import Counter
from dataclasses import asdict
from datetime import UTC, datetime
from importlib import metadata
from pathlib import Path
from typing import Any
from repo_registry.acceptance import (
active_quality_criteria_version,
evaluate_candidate_graph_quality,
quality_gate_outcome_dicts,
)
from repo_registry.core.models import (
Ability,
CandidateAbility,
CandidateCapability,
CandidateEvidence,
CandidateFeature,
ContentChunk,
ObservedFact,
RepositoryAbilityMap,
ReviewDecision,
SourceReference,
)
from repo_registry.core.service import RegistryService
SCHEMA_VERSION = "self-scoping-assessment/v1"
KNOWN_PROVIDER_ROUTING_CAPABILITY = "Route LLM Requests Across Providers"
def export_assessment_artifact(
service: RegistryService,
repository_id: int,
analysis_run_id: int,
*,
role: str = "challenger",
outcome: str = "challenger",
reviewer: str = "codex",
summary: str | None = None,
engine_root: str | Path | None = None,
) -> dict[str, Any]:
"""Export a completed analysis run as a self-scoping assessment artifact."""
repository = service.get_repository(repository_id)
analysis_run = service.get_analysis_run(repository_id, analysis_run_id)
if analysis_run.status != "completed":
raise ValueError(
f"analysis run {analysis_run_id} is {analysis_run.status}, not completed"
)
snapshot = (
service.store.get_snapshot(analysis_run.snapshot_id)
if analysis_run.snapshot_id is not None
else None
)
facts = service.list_observed_facts(repository_id, analysis_run_id)
chunks = service.list_content_chunks(repository_id, analysis_run_id)
graph = service.candidate_graph(repository_id, analysis_run_id)
gate_outcomes = evaluate_candidate_graph_quality(graph)
ability_map = service.ability_map(repository_id)
decisions = service.list_review_decisions(repository_id, analysis_run_id)
engine_identity = _engine_identity(
analysis_run.scanner_version,
Path(engine_root or Path.cwd()),
)
regression_patterns = _known_regression_patterns(graph.abilities, decisions)
comparison_eligibility = _comparison_eligibility(
role,
engine_identity["release_binding_status"],
)
artifact_summary = summary or _summary(role, regression_patterns)
return {
"schema_version": SCHEMA_VERSION,
"artifact_id": _artifact_id(repository.name, analysis_run_id, role),
"artifact_type": "assessment_run",
"created_at": datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"target_repository": {
"repo_slug": _slug(repository.name),
"repository_id": repository.id,
"source": snapshot.source_path if snapshot is not None else repository.url,
"target_commit": snapshot.commit_hash if snapshot is not None else "unknown",
"target_branch": snapshot.branch if snapshot is not None else repository.branch,
"dirty_state": _dirty_state(Path(snapshot.source_path)) if snapshot is not None else "unknown",
"file_count": snapshot.file_count if snapshot is not None else None,
},
"engine_identity": engine_identity,
"execution": {
"mode": _execution_mode(decisions),
"analysis_run_id": analysis_run.id,
"candidate_source": _candidate_source(decisions),
"acceptance_mode": _acceptance_mode(decisions),
"started_at": _timestamp(analysis_run.started_at),
"completed_at": _timestamp(analysis_run.completed_at),
},
"assessment": {
"role": role,
"outcome": outcome,
"summary": artifact_summary,
"reviewer": reviewer,
"comparison_eligibility": comparison_eligibility,
"rationale": _rationale(regression_patterns, comparison_eligibility),
},
"fact_summary": _fact_summary(facts),
"content_chunk_summary": _content_chunk_summary(chunks),
"generated_tree": {
"abilities": [_candidate_ability(ability) for ability in graph.abilities]
},
"approved_map": _approved_map(ability_map),
"review_decisions": [_review_decision(decision) for decision in decisions],
"quality_gate_outcomes": quality_gate_outcome_dicts(gate_outcomes),
"known_regression_patterns": regression_patterns,
"notes": [
"Generated by repo-scoping self-scoping assessment exporter.",
(
"Artifact is not comparable as a preferred baseline until engine "
"identity is complete."
if comparison_eligibility == "not_comparable"
else "Artifact has enough engine identity metadata for comparison."
),
],
}
def _engine_identity(scanner_version: str, engine_root: Path) -> dict[str, Any]:
engine_commit = _git_value(engine_root, "rev-parse", "HEAD")
dirty_state = _dirty_state(engine_root)
release = _git_value(engine_root, "describe", "--tags", "--exact-match")
release_binding_status = "complete" if engine_commit else "unbound"
return {
"repo_scoping_version": _package_version(),
"engine_commit": engine_commit,
"engine_release": release,
"engine_dirty_state": dirty_state,
"scanner_version": scanner_version,
"candidate_generator_version": "unversioned",
"quality_criteria_version": active_quality_criteria_version(),
"prompt_version": None,
"release_binding_status": release_binding_status,
"release_binding_note": (
"Engine commit was captured from git."
if engine_commit
else "Engine commit could not be captured; artifact is not comparable."
),
}
def _package_version() -> str:
try:
return metadata.version("repo-registry")
except metadata.PackageNotFoundError:
return "unknown"
def _git_value(root: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", "-C", str(root), *args],
check=False,
capture_output=True,
text=True,
)
except OSError:
return None
value = result.stdout.strip()
return value if result.returncode == 0 and value else None
def _dirty_state(root: Path) -> str:
if not (root / ".git").exists():
return "unknown"
try:
result = subprocess.run(
["git", "-C", str(root), "status", "--short"],
check=False,
capture_output=True,
text=True,
)
except OSError:
return "unknown"
if result.returncode != 0:
return "unknown"
return "dirty" if result.stdout.strip() else "clean"
def _comparison_eligibility(role: str, release_binding_status: str) -> str:
if role == "negative_regression_seed":
return "eligible_as_negative_seed"
if release_binding_status == "complete":
return "eligible"
return "not_comparable"
def _summary(role: str, regression_patterns: list[dict[str, str]]) -> str:
if role == "negative_regression_seed":
return "Historical run captured as a negative self-scoping regression seed."
if regression_patterns:
return "Generated self-scoping assessment repeats known regression patterns."
return "Generated self-scoping assessment artifact for comparison."
def _rationale(
regression_patterns: list[dict[str, str]],
comparison_eligibility: str,
) -> list[str]:
rationale: list[str] = []
if comparison_eligibility == "not_comparable":
rationale.append("Engine identity is incomplete, so this cannot be a comparable baseline.")
for pattern in regression_patterns:
rationale.append(f"{pattern['id']}: {pattern['description']}")
return rationale
def _fact_summary(facts: list[ObservedFact]) -> dict[str, Any]:
return {
"counts_by_kind": dict(sorted(Counter(fact.kind for fact in facts).items())),
"contamination_sources": _contamination_sources(facts),
}
def _contamination_sources(facts: list[ObservedFact]) -> list[dict[str, str]]:
provider_kinds = {
"llm_provider",
"credential_config",
"provider_registry",
"fallback_policy",
}
suspicious_segments = (
"test",
"tests/",
"fixtures",
"expectations",
"schemas.py",
"scanner.py",
"normalization.py",
"workplans/",
)
results: list[dict[str, str]] = []
seen: set[str] = set()
for fact in facts:
lower = fact.path.lower()
if fact.kind not in provider_kinds or not any(segment in lower for segment in suspicious_segments):
continue
if fact.path in seen:
continue
seen.add(fact.path)
results.append(
{
"path": fact.path,
"reason": (
"Provider-related fact came from scanner rules, tests, fixtures, "
"schemas, or workplan context and needs native-utility review."
),
}
)
return sorted(results, key=lambda item: item["path"])
def _content_chunk_summary(chunks: list[ContentChunk]) -> dict[str, Any]:
source_roles = Counter(
str(chunk.metadata.get("source_role", "") or "unknown") for chunk in chunks
)
return {
"total": len(chunks),
"counts_by_kind": dict(sorted(Counter(chunk.kind for chunk in chunks).items())),
"counts_by_source_role": dict(sorted(source_roles.items())),
"paths": sorted({chunk.path for chunk in chunks}),
}
def _candidate_ability(ability: CandidateAbility) -> dict[str, Any]:
return {
"name": ability.name,
"status": ability.status,
"primary_class": ability.primary_class,
"source_refs": [_source_ref(ref) for ref in ability.source_refs],
"capabilities": [
_candidate_capability(capability) for capability in ability.capabilities
],
}
def _candidate_capability(capability: CandidateCapability) -> dict[str, Any]:
return {
"name": capability.name,
"status": capability.status,
"primary_class": capability.primary_class,
"source_refs": [_source_ref(ref) for ref in capability.source_refs],
"features": [_candidate_feature(feature) for feature in capability.features],
"evidence": [_candidate_evidence(evidence) for evidence in capability.evidence],
}
def _candidate_feature(feature: CandidateFeature) -> dict[str, Any]:
return {
"name": feature.name,
"type": feature.type,
"status": feature.status,
"primary_class": feature.primary_class,
"location": feature.location,
"source_refs": [_source_ref(ref) for ref in feature.source_refs],
}
def _candidate_evidence(evidence: CandidateEvidence) -> dict[str, Any]:
return {
"type": evidence.type,
"reference": evidence.reference,
"strength": evidence.strength,
"status": evidence.status,
"source_refs": [_source_ref(ref) for ref in evidence.source_refs],
}
def _approved_map(ability_map: RepositoryAbilityMap) -> dict[str, Any]:
return {
"scope": asdict(ability_map.scope),
"abilities": [_approved_ability(ability) for ability in ability_map.abilities],
}
def _approved_ability(ability: Ability) -> dict[str, Any]:
return {
"name": ability.name,
"primary_class": ability.primary_class,
"capabilities": [
{
"name": capability.name,
"primary_class": capability.primary_class,
"features": [
{
"name": feature.name,
"type": feature.type,
"primary_class": feature.primary_class,
"location": feature.location,
"source_refs": [
_source_ref(ref) for ref in feature.source_refs
],
}
for feature in capability.features
],
"evidence": [asdict(evidence) for evidence in capability.evidence],
}
for capability in ability.capabilities
],
}
def _source_ref(ref: SourceReference) -> dict[str, Any]:
return asdict(ref)
def _review_decision(decision: ReviewDecision) -> dict[str, Any]:
payload = asdict(decision)
payload["quality_criteria_version"] = active_quality_criteria_version()
return payload
def _known_regression_patterns(
abilities: list[CandidateAbility],
decisions: list[ReviewDecision],
) -> list[dict[str, str]]:
patterns: list[dict[str, str]] = []
llm_capabilities = [
capability
for ability in abilities
for capability in ability.capabilities
if capability.name == KNOWN_PROVIDER_ROUTING_CAPABILITY
]
if llm_capabilities:
patterns.append(
{
"id": "RREG-SELF-REG-001",
"title": "LLM provider vocabulary promoted as native capability",
"severity": "critical",
"description": (
"Generated tree contains Route LLM Requests Across Providers "
"as a repo-scoping capability."
),
"detection_hint": (
"Flag the provider-routing capability unless product intent "
"and public implementation explicitly support it."
),
}
)
if any(
feature.type in {"API", "CLI"}
for capability in llm_capabilities
for feature in capability.features
):
patterns.append(
{
"id": "RREG-SELF-REG-002",
"title": "Native API and CLI surfaces attached under false capability",
"severity": "high",
"description": (
"API or CLI surface features are nested below provider routing."
),
"detection_hint": (
"Flag API/CLI surface features whose parent capability is "
"llm-integration or provider-routing."
),
}
)
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
patterns.append(
{
"id": "RREG-SELF-REG-003",
"title": "Deterministic trusted auto-approval accepted candidate truth",
"severity": "high",
"description": (
"Candidate characteristics were approved through trusted "
"auto-approval instead of human or agentic judgement."
),
"detection_hint": "Flag trusted_auto_approve_candidate_graph review decisions.",
}
)
return patterns
def _execution_mode(decisions: list[ReviewDecision]) -> str:
if any(decision.action.startswith("agentic_review") for decision in decisions):
return "agentic-review"
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
return "trusted-auto-review"
if any(decision.action == "llm_extraction_used" for decision in decisions):
return "llm-assisted"
if any(decision.action.startswith("approve") for decision in decisions):
return "manual-review"
return "deterministic-only"
def _candidate_source(decisions: list[ReviewDecision]) -> str:
return "llm+deterministic" if any(
decision.action == "llm_extraction_used" for decision in decisions
) else "deterministic"
def _acceptance_mode(decisions: list[ReviewDecision]) -> str:
agentic_decision = next(
(decision for decision in decisions if decision.action.startswith("agentic_review")),
None,
)
if agentic_decision is not None:
return agentic_decision.action
if any(decision.action == "trusted_auto_approve_candidate_graph" for decision in decisions):
return "trusted_auto_approve_candidate_graph"
if any(decision.action == "approve_candidate_graph" for decision in decisions):
return "manual_candidate_graph_approval"
if any(decision.action == "approve_analysis_run_changes" for decision in decisions):
return "manual_change_approval"
return "pending_review"
def _timestamp(value: str | None) -> str | None:
if value is None:
return None
if "T" in value:
return value
return value.replace(" ", "T") + "Z"
def _artifact_id(repository_name: str, analysis_run_id: int, role: str) -> str:
return f"{_slug(repository_name)}-{role}-run-{analysis_run_id}"
def _slug(value: str) -> str:
return "-".join(
token for token in "".join(char.lower() if char.isalnum() else "-" for char in value).split("-") if token
)
def artifact_json(artifact: dict[str, Any]) -> str:
return json.dumps(artifact, indent=2, sort_keys=True) + "\n"

View File

@@ -0,0 +1,238 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
COMPARISON_SCHEMA_VERSION = "self-scoping-comparison/v1"
def load_json(path: str | Path) -> dict[str, Any]:
return json.loads(Path(path).read_text(encoding="utf-8"))
def compare_assessment_to_golden(
golden_profile: dict[str, Any],
assessment: dict[str, Any],
) -> dict[str, Any]:
expected = _expected_capabilities(golden_profile)
forbidden = _forbidden_capabilities(golden_profile)
generated = _generated_capabilities(assessment)
generated_names = set(generated)
missing_expected = sorted(expected - generated_names)
matched_expected = sorted(expected & generated_names)
forbidden_present = sorted(forbidden & generated_names)
known_regressions = assessment.get("known_regression_patterns", [])
misplaced_features = _misplaced_features(generated)
status = _status(
missing_expected=missing_expected,
forbidden_present=forbidden_present,
known_regressions=known_regressions,
misplaced_features=misplaced_features,
)
return {
"schema_version": COMPARISON_SCHEMA_VERSION,
"comparison_id": _comparison_id(golden_profile, assessment),
"created_at": datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
"golden_profile_id": golden_profile.get("profile_id", ""),
"assessment_artifact_id": assessment.get("artifact_id", ""),
"target_repo_slug": assessment.get("target_repository", {}).get("repo_slug", ""),
"status": status,
"summary": _summary(status, missing_expected, forbidden_present, known_regressions),
"matched_expected_capabilities": matched_expected,
"missing_expected_capabilities": missing_expected,
"unexpected_native_capabilities": _unexpected_capabilities(
generated_names,
expected,
forbidden,
),
"forbidden_native_capabilities_present": forbidden_present,
"known_regression_patterns": known_regressions,
"misplaced_features": misplaced_features,
"comparison_hints": _comparison_hints(status),
}
def comparison_json(comparison: dict[str, Any]) -> str:
return json.dumps(comparison, indent=2, sort_keys=True) + "\n"
def comparison_markdown(comparison: dict[str, Any]) -> str:
lines = [
f"# Self-Scoping Comparison: {comparison['assessment_artifact_id']}",
"",
f"- Status: `{comparison['status']}`",
f"- Golden profile: `{comparison['golden_profile_id']}`",
f"- Target repo: `{comparison['target_repo_slug']}`",
f"- Summary: {comparison['summary']}",
"",
"## Missing Expected Capabilities",
*_bullets(comparison["missing_expected_capabilities"]),
"",
"## Forbidden Native Capabilities Present",
*_bullets(comparison["forbidden_native_capabilities_present"]),
"",
"## Known Regression Patterns",
*_regression_bullets(comparison["known_regression_patterns"]),
"",
"## Misplaced Features",
*_misplaced_feature_bullets(comparison["misplaced_features"]),
"",
"## Matched Expected Capabilities",
*_bullets(comparison["matched_expected_capabilities"]),
"",
"## Review Hints",
*_bullets(comparison["comparison_hints"]),
"",
]
return "\n".join(lines)
def _expected_capabilities(golden_profile: dict[str, Any]) -> set[str]:
return {
capability["name"]
for capability in golden_profile.get("ability", {}).get("expected_capabilities", [])
if capability.get("name")
}
def _forbidden_capabilities(golden_profile: dict[str, Any]) -> set[str]:
return {
capability["name"]
for capability in golden_profile.get("forbidden_native_capabilities", [])
if capability.get("name")
}
def _generated_capabilities(assessment: dict[str, Any]) -> dict[str, dict[str, Any]]:
result: dict[str, dict[str, Any]] = {}
for ability in assessment.get("generated_tree", {}).get("abilities", []):
for capability in ability.get("capabilities", []):
name = capability.get("name")
if name:
result[name] = capability
return result
def _unexpected_capabilities(
generated_names: set[str],
expected: set[str],
forbidden: set[str],
) -> list[str]:
return sorted(generated_names - expected - forbidden)
def _misplaced_features(
generated: dict[str, dict[str, Any]],
) -> list[dict[str, str]]:
misplaced: list[dict[str, str]] = []
for capability_name, capability in generated.items():
primary_class = capability.get("primary_class", "")
if primary_class not in {"llm-integration", "provider-routing"}:
continue
for feature in capability.get("features", []):
if feature.get("type") not in {"API", "CLI"}:
continue
misplaced.append(
{
"capability": capability_name,
"feature": feature.get("name", ""),
"feature_type": feature.get("type", ""),
"reason": "API/CLI surface is nested below provider-routing capability.",
}
)
return misplaced
def _status(
*,
missing_expected: list[str],
forbidden_present: list[str],
known_regressions: list[dict[str, Any]],
misplaced_features: list[dict[str, str]],
) -> str:
if forbidden_present or misplaced_features or any(
item.get("severity") in {"high", "critical"} for item in known_regressions
):
return "regression"
if missing_expected or known_regressions:
return "needs_review"
return "candidate_improvement"
def _summary(
status: str,
missing_expected: list[str],
forbidden_present: list[str],
known_regressions: list[dict[str, Any]],
) -> str:
if status == "regression":
return (
"Assessment repeats known or forbidden self-scoping patterns; prefer "
"the golden profile until the engine is corrected."
)
if status == "needs_review":
return (
f"Assessment needs review: {len(missing_expected)} expected "
f"capability(s) missing and {len(known_regressions)} regression "
"pattern(s) reported."
)
return "Assessment covers the golden profile without known regression patterns."
def _comparison_hints(status: str) -> list[str]:
if status == "regression":
return [
"Do not promote this assessment as a preferred baseline.",
"Inspect forbidden capabilities and misplaced features first.",
"Use the findings as signal for scanner, generator, or acceptance-policy changes.",
]
if status == "needs_review":
return [
"Review missing expected capabilities before choosing old or new output.",
"Check whether the golden profile needs a curator-approved update.",
]
return [
"Candidate appears better than the known golden checks.",
"Human or agentic review should still confirm source evidence quality.",
]
def _comparison_id(
golden_profile: dict[str, Any],
assessment: dict[str, Any],
) -> str:
return (
f"{golden_profile.get('profile_id', 'golden')}"
f"__{assessment.get('artifact_id', 'assessment')}"
)
def _bullets(items: list[str]) -> list[str]:
if not items:
return ["- None"]
return [f"- {item}" for item in items]
def _regression_bullets(items: list[dict[str, Any]]) -> list[str]:
if not items:
return ["- None"]
return [
f"- `{item.get('id', '')}` {item.get('title', '')}: {item.get('description', '')}"
for item in items
]
def _misplaced_feature_bullets(items: list[dict[str, str]]) -> list[str]:
if not items:
return ["- None"]
return [
(
f"- `{item['feature']}` under `{item['capability']}` "
f"({item['feature_type']}): {item['reason']}"
)
for item in items
]

View File

@@ -0,0 +1,217 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from uuid import uuid4
SELF_SCOPING_ROOT_ENV = "REPO_REGISTRY_SELF_SCOPING_ROOT"
OUTCOME_SCHEMA_VERSION = "self-scoping-review-outcome/v1"
ALLOWED_OUTCOMES = {
"prefer_golden",
"prefer_assessment",
"prefer_baseline",
"prefer_challenger",
"tie",
"needs_human",
"reject_assessment",
"reject_challenger",
}
@dataclass(frozen=True)
class ReviewArtifact:
path: str
artifact_id: str
title: str
updated_at: str
def self_scoping_root(root: str | Path | None = None) -> Path:
configured = root or os.environ.get(SELF_SCOPING_ROOT_ENV) or "docs/self-scoping"
return Path(configured).resolve()
def list_golden_profiles(root: str | Path | None = None) -> list[ReviewArtifact]:
return _list_artifacts("golden", root=root)
def list_assessment_artifacts(root: str | Path | None = None) -> list[ReviewArtifact]:
return _list_artifacts("assessments", root=root)
def load_json_artifact(
relative_path: str,
root: str | Path | None = None,
) -> dict[str, Any]:
artifact_path = _safe_artifact_path(relative_path, root=root)
return json.loads(artifact_path.read_text(encoding="utf-8"))
def list_outcome_records(root: str | Path | None = None) -> list[dict[str, Any]]:
outcomes_dir = self_scoping_root(root) / "outcomes"
if not outcomes_dir.exists():
return []
records: list[dict[str, Any]] = []
for path in sorted(outcomes_dir.glob("*.json"), reverse=True):
try:
records.append(json.loads(path.read_text(encoding="utf-8")))
except json.JSONDecodeError:
continue
return records
def record_assessment_outcome(
*,
golden_path: str,
assessment_path: str,
outcome: str,
reviewer: str,
notes: str,
comparison_status: str,
root: str | Path | None = None,
) -> dict[str, Any]:
if outcome not in ALLOWED_OUTCOMES:
raise ValueError(f"unsupported review outcome: {outcome}")
base = self_scoping_root(root)
golden = load_json_artifact(golden_path, root=base)
assessment = load_json_artifact(assessment_path, root=base)
created_at = _created_at()
outcome_id = _outcome_id(created_at, assessment_path, outcome)
record = {
"schema_version": OUTCOME_SCHEMA_VERSION,
"outcome_id": outcome_id,
"created_at": created_at,
"reviewer": reviewer.strip() or "codex",
"outcome": outcome,
"notes": notes.strip(),
"comparison_status": comparison_status,
"golden_profile_path": golden_path,
"golden_profile_id": golden.get("profile_id", ""),
"assessment_artifact_path": assessment_path,
"assessment_artifact_id": assessment.get("artifact_id", ""),
"engine_identity": assessment.get("engine_identity", {}),
"decision_scope": "baseline-comparison",
}
_write_outcome(record, base)
return record
def record_assessment_pair_outcome(
*,
baseline_path: str,
challenger_path: str,
outcome: str,
reviewer: str,
notes: str,
comparison_status: str,
root: str | Path | None = None,
) -> dict[str, Any]:
if outcome not in ALLOWED_OUTCOMES:
raise ValueError(f"unsupported review outcome: {outcome}")
base = self_scoping_root(root)
baseline = load_json_artifact(baseline_path, root=base)
challenger = load_json_artifact(challenger_path, root=base)
created_at = _created_at()
outcome_id = _outcome_id(
created_at,
f"{Path(baseline_path).stem}__{Path(challenger_path).stem}",
outcome,
)
record = {
"schema_version": OUTCOME_SCHEMA_VERSION,
"outcome_id": outcome_id,
"created_at": created_at,
"reviewer": reviewer.strip() or "codex",
"outcome": outcome,
"notes": notes.strip(),
"comparison_status": comparison_status,
"baseline_assessment_path": baseline_path,
"baseline_assessment_artifact_id": baseline.get("artifact_id", ""),
"baseline_engine_identity": baseline.get("engine_identity", {}),
"challenger_assessment_path": challenger_path,
"challenger_assessment_artifact_id": challenger.get("artifact_id", ""),
"challenger_engine_identity": challenger.get("engine_identity", {}),
"decision_scope": "assessment-pair-comparison",
}
_write_outcome(record, base)
return record
def _created_at() -> str:
return (
datetime.now(UTC)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
def _write_outcome(record: dict[str, Any], base: Path) -> None:
outcomes_dir = base / "outcomes"
outcomes_dir.mkdir(parents=True, exist_ok=True)
output_path = outcomes_dir / f"{record['outcome_id']}.json"
output_path.write_text(
json.dumps(record, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
def _list_artifacts(kind: str, root: str | Path | None = None) -> list[ReviewArtifact]:
base = self_scoping_root(root)
artifacts: list[ReviewArtifact] = []
for path in sorted((base / kind).glob("*.json")):
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
continue
artifacts.append(
ReviewArtifact(
path=path.relative_to(base).as_posix(),
artifact_id=str(
payload.get("artifact_id") or payload.get("profile_id") or path.stem
),
title=str(
payload.get("title")
or payload.get("assessment", {}).get("summary")
or payload.get("artifact_type")
or path.stem
),
updated_at=str(
payload.get("updated_at") or payload.get("created_at") or ""
),
)
)
return artifacts
def _safe_artifact_path(relative_path: str, root: str | Path | None = None) -> Path:
base = self_scoping_root(root)
artifact_path = (base / relative_path).resolve()
try:
artifact_path.relative_to(base)
except ValueError as exc:
raise ValueError(f"artifact path escapes self-scoping root: {relative_path}") from exc
if artifact_path.suffix != ".json":
raise ValueError(f"artifact path is not JSON: {relative_path}")
if not artifact_path.exists():
raise FileNotFoundError(relative_path)
return artifact_path
def _outcome_id(created_at: str, assessment_path: str, outcome: str) -> str:
timestamp = (
created_at.replace("-", "")
.replace(":", "")
.replace("T", "-")
.replace("Z", "")
)
assessment_stem = Path(assessment_path).stem.replace(".", "-")
return f"{timestamp}__{assessment_stem}__{outcome}__{uuid4().hex[:8]}"

View File

@@ -0,0 +1,11 @@
from repo_registry.semantic.embeddings import (
EmbeddingProvider,
HashingEmbeddingProvider,
cosine_similarity,
)
__all__ = [
"EmbeddingProvider",
"HashingEmbeddingProvider",
"cosine_similarity",
]

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
import hashlib
import math
import re
from typing import Protocol
class EmbeddingProvider(Protocol):
name: str
def embed(self, text: str) -> list[float]:
"""Return a deterministic vector for the supplied text."""
class HashingEmbeddingProvider:
"""Offline test provider using hashed token buckets.
This is intentionally simple: it gives tests and local development a stable
semantic path without depending on an external model service.
"""
name = "hashing-v1"
def __init__(self, dimensions: int = 64) -> None:
self.dimensions = dimensions
def embed(self, text: str) -> list[float]:
vector = [0.0] * self.dimensions
for token in _tokens(text):
digest = hashlib.sha256(token.encode("utf-8")).digest()
index = int.from_bytes(digest[:2], "big") % self.dimensions
sign = 1.0 if digest[2] % 2 == 0 else -1.0
vector[index] += sign
norm = math.sqrt(sum(value * value for value in vector))
if norm == 0:
return vector
return [value / norm for value in vector]
def cosine_similarity(left: list[float], right: list[float]) -> float:
if not left or not right or len(left) != len(right):
return 0.0
return sum(a * b for a, b in zip(left, right, strict=True))
def _tokens(text: str) -> list[str]:
tokens = []
for token in re.findall(r"[A-Za-z0-9]+", text.lower()):
tokens.append(_stem(token))
return tokens
def _stem(token: str) -> str:
for suffix in ("ing", "ed", "es", "s"):
if len(token) > len(suffix) + 3 and token.endswith(suffix):
return token[: -len(suffix)]
return token

View File

@@ -0,0 +1 @@
"""Persistence adapters."""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
"""HTTP API package."""

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1 @@
"""Small server-rendered curator UI."""

File diff suppressed because it is too large Load Diff