Transfered deep scope functionality from the custodian

This commit is contained in:
2026-05-01 00:42:10 +02:00
parent b424dea01b
commit 2d9da98257
10 changed files with 1397 additions and 47 deletions

View File

@@ -0,0 +1,4 @@
from repo_registry.scope.generator import ScopeGenerator
from repo_registry.scope.validator import ScopeValidator
__all__ = ["ScopeGenerator", "ScopeValidator"]

View File

@@ -0,0 +1,323 @@
from __future__ import annotations
import re
from dataclasses import asdict
from repo_registry.core.service import RegistryService
from repo_registry.storage.sqlite import NotFoundError
SCOPE_SECTIONS = [
"One-liner",
"Core Idea",
"In Scope",
"Out of Scope",
"Relevant When",
"Not Relevant When",
"Current State",
"How It Fits",
"Terminology",
"Related / Overlapping Repositories",
"Getting Oriented",
"Provided Capabilities",
"Notes",
]
NEEDS_INPUT = "<!-- needs curator input -->"
class ScopeGenerator:
"""Render SCOPE.md from approved repository characteristics."""
def __init__(self, service: RegistryService) -> None:
self.service = service
def generate(self, repo_slug: str) -> str:
repository = self._repository_by_slug(repo_slug)
ability_map = asdict(self.service.ability_map(repository.id))
facts = [asdict(fact) for fact in self.service.list_observed_facts(repository.id)]
sections = {
"One-liner": self._one_liner(ability_map),
"Core Idea": self._core_idea(ability_map),
"In Scope": self._in_scope(ability_map),
"Out of Scope": self._curator_stub(),
"Relevant When": self._relevant_when(ability_map),
"Not Relevant When": self._curator_stub(),
"Current State": self._current_state(repository.status, facts),
"How It Fits": self._how_it_fits(ability_map),
"Terminology": self._terminology(ability_map, facts),
"Related / Overlapping Repositories": self._curator_stub(),
"Getting Oriented": self._getting_oriented(ability_map, facts),
"Provided Capabilities": self._provided_capabilities(ability_map),
"Notes": self._curator_stub(),
}
lines = [
"# SCOPE",
"",
"> This file helps you quickly understand what this repository is about,",
"> when it is relevant, and when it is not.",
"> It was generated from approved repo-registry characteristics.",
"",
"---",
"",
]
for section in SCOPE_SECTIONS:
lines.extend([f"## {section}", "", sections[section].rstrip(), "", "---", ""])
return "\n".join(lines).rstrip() + "\n"
def _repository_by_slug(self, repo_slug: str):
wanted = self._slug(repo_slug)
for repository in self.service.list_repositories():
candidates = {
self._slug(repository.name),
self._slug(repository.url.rstrip("/").rsplit("/", 1)[-1].removesuffix(".git")),
}
if wanted in candidates:
return repository
raise NotFoundError(f"repository slug {repo_slug!r} was not found")
def _one_liner(self, ability_map: dict) -> str:
scope = ability_map["scope"]
description = self._sentence(scope.get("description", ""))
if description:
return description
return f"{scope['name']} defines the repository scope for {ability_map['repository']['name']}."
def _core_idea(self, ability_map: dict) -> str:
scope = ability_map["scope"]
abilities = ability_map.get("abilities", [])
lines = [scope.get("description") or self._one_liner(ability_map)]
if abilities:
lines.append("")
lines.append("Approved abilities:")
lines.extend(
f"- {ability['name']}{ability.get('description') or 'Approved repository ability.'}"
for ability in abilities[:5]
)
else:
lines.extend(["", NEEDS_INPUT])
return "\n".join(lines)
def _in_scope(self, ability_map: dict) -> str:
abilities = ability_map.get("abilities", [])
if not abilities:
return self._curator_stub()
lines = []
for ability in abilities:
capabilities = ", ".join(
capability["name"] for capability in ability.get("capabilities", [])[:4]
)
suffix = f" Includes {capabilities}." if capabilities else ""
lines.append(
f"- {ability['name']}{ability.get('description') or 'Approved ability.'}{suffix}"
)
return "\n".join(lines)
def _relevant_when(self, ability_map: dict) -> str:
features = [
feature
for feature in self._features(ability_map)
if self._is_usecase_feature(feature)
]
if not features:
features = self._features(ability_map)[:5]
if not features:
return self._curator_stub()
lines = [
f"- You need {feature['name']} ({feature.get('primary_class') or feature.get('type', 'feature')})."
for feature in features
]
if not any(self._is_usecase_feature(feature) for feature in features):
lines.append(NEEDS_INPUT)
return "\n".join(lines)
def _current_state(self, status: str, facts: list[dict]) -> str:
kinds = self._facts_by_kind(facts)
languages = self._fact_names(kinds.get("language", []))
frameworks = self._fact_names(kinds.get("framework", []))
tests = kinds.get("test", [])
interfaces = kinds.get("interface", [])
manifests = kinds.get("manifest", [])
implementation = "substantial" if interfaces or manifests else "partial"
if not facts:
implementation = "unknown"
lines = [
f"- Status: {status}",
f"- Implementation: {implementation}",
"- Stability: evolving",
"- Usage: internal",
f"- Languages: {', '.join(languages) if languages else 'unknown'}",
f"- Frameworks: {', '.join(frameworks) if frameworks else 'none detected'}",
f"- Tests observed: {len(tests)}",
f"- Interfaces observed: {len(interfaces)}",
f"- Manifests observed: {len(manifests)}",
]
if not facts:
lines.append(NEEDS_INPUT)
return "\n".join(lines)
def _how_it_fits(self, ability_map: dict) -> str:
evidence = [
item
for capability in self._capabilities(ability_map)
for item in capability.get("evidence", [])
]
if not evidence:
return "\n".join(
[
"- Upstream dependencies: " + NEEDS_INPUT,
"- Downstream consumers: " + NEEDS_INPUT,
"- Often used with: " + NEEDS_INPUT,
]
)
refs = ", ".join(
sorted({item.get("reference", "") for item in evidence if item.get("reference")})[:8]
)
return "\n".join(
[
f"- Supported by evidence references: {refs or 'available evidence'}",
"- Upstream dependencies: " + NEEDS_INPUT,
"- Downstream consumers: " + NEEDS_INPUT,
"- Often used with: " + NEEDS_INPUT,
]
)
def _terminology(self, ability_map: dict, facts: list[dict]) -> str:
terms = set()
for item in [ability_map["scope"], *ability_map.get("abilities", [])]:
terms.add(item.get("name", ""))
terms.add(item.get("primary_class", ""))
terms.update(item.get("attributes", []))
for capability in self._capabilities(ability_map):
terms.add(capability.get("name", ""))
terms.add(capability.get("primary_class", ""))
terms.update(capability.get("attributes", []))
for fact in facts:
if fact.get("kind") in {"framework", "llm_provider", "provider_registry"}:
terms.add(fact.get("name", ""))
visible = [term for term in sorted(terms) if term]
if not visible:
return self._curator_stub()
return "\n".join(
[
"- Preferred terms: " + ", ".join(visible[:12]),
"- Also known as: " + NEEDS_INPUT,
"- Potentially confusing terms: " + NEEDS_INPUT,
]
)
def _getting_oriented(self, ability_map: dict, facts: list[dict]) -> str:
paths = self._source_paths(ability_map, facts)
if not paths:
return self._curator_stub()
return "\n".join(
[
f"- Start with: {paths[0]}",
f"- Key files / directories: {', '.join(paths[:8])}",
f"- Entry points: {', '.join(paths[:5])}",
]
)
def _provided_capabilities(self, ability_map: dict) -> str:
capabilities = self._capabilities(ability_map)
if not capabilities:
return f"<!-- No approved capabilities yet. -->\n{NEEDS_INPUT}"
blocks = []
for capability in capabilities:
keywords = self._keywords_for_capability(capability)
blocks.append(
"\n".join(
[
"```capability",
f"type: {self._capability_type(capability.get('primary_class', 'other'))}",
f"title: {capability['name']}",
"description: >",
f" {capability.get('description') or 'Approved repository capability.'}",
f"keywords: [{', '.join(keywords)}]",
"```",
]
)
)
return "\n\n".join(blocks)
def _capabilities(self, ability_map: dict) -> list[dict]:
return [
capability
for ability in ability_map.get("abilities", [])
for capability in ability.get("capabilities", [])
]
def _features(self, ability_map: dict) -> list[dict]:
return [
feature
for capability in self._capabilities(ability_map)
for feature in capability.get("features", [])
]
def _is_usecase_feature(self, feature: dict) -> bool:
labels = {str(feature.get("primary_class", "")).lower()}
labels.update(str(item).lower() for item in feature.get("attributes", []))
return bool(labels & {"business-usecase", "usecase", "workflow", "review"})
def _keywords_for_capability(self, capability: dict) -> list[str]:
keywords = [capability.get("primary_class", "")]
keywords.extend(capability.get("attributes", []))
for feature in capability.get("features", []):
keywords.append(feature.get("primary_class", ""))
keywords.extend(feature.get("attributes", []))
return [self._keyword(item) for item in self._unique(keywords)[:8] if item]
def _capability_type(self, primary_class: str) -> str:
normalized = primary_class.lower()
if normalized in {"api", "infrastructure", "data", "security", "documentation"}:
return normalized
if normalized in {"interface", "integration", "llm-integration"}:
return "api"
if normalized in {"storage", "repository-structure"}:
return "data"
return "other"
def _facts_by_kind(self, facts: list[dict]) -> dict[str, list[dict]]:
grouped: dict[str, list[dict]] = {}
for fact in facts:
grouped.setdefault(fact.get("kind", ""), []).append(fact)
return grouped
def _fact_names(self, facts: list[dict]) -> list[str]:
return self._unique([fact.get("name", "") for fact in facts])
def _source_paths(self, ability_map: dict, facts: list[dict]) -> list[str]:
paths = [fact.get("path", "") for fact in facts if fact.get("path")]
for feature in self._features(ability_map):
paths.append(feature.get("location", ""))
for source_ref in feature.get("source_refs", []):
paths.append(source_ref.get("path", ""))
return self._unique(paths)
def _curator_stub(self) -> str:
return f"- {NEEDS_INPUT}"
def _sentence(self, text: str) -> str:
cleaned = re.sub(r"\s+", " ", text.strip())
if not cleaned:
return ""
return re.split(r"(?<=[.!?])\s+", cleaned, maxsplit=1)[0]
def _slug(self, value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
def _keyword(self, value: str) -> str:
return self._slug(value) or "other"
def _unique(self, values: list[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = str(value).strip()
key = item.lower()
if not item or key in seen:
continue
seen.add(key)
result.append(item)
return result

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from pathlib import Path
from repo_registry.scope.generator import SCOPE_SECTIONS, ScopeGenerator
@dataclass(frozen=True)
class ScopeDiffSection:
section: str
status: str
current_text: str | None
proposed_text: str | None
@dataclass(frozen=True)
class ScopeDiff:
sections: list[ScopeDiffSection]
@property
def needs_update(self) -> bool:
return any(section.status != "ok" for section in self.sections)
@dataclass(frozen=True)
class ScopeValidationIssue:
check: str
severity: str
message: str
@dataclass(frozen=True)
class ValidationResult:
issues: list[ScopeValidationIssue]
@property
def ok(self) -> bool:
return not any(issue.severity == "error" for issue in self.issues)
class ScopeValidator:
"""Validate and diff SCOPE.md files."""
def __init__(self, generator: ScopeGenerator | None = None) -> None:
self.generator = generator
def diff(self, repo_slug: str, existing_path: Path) -> ScopeDiff:
if self.generator is None:
raise ValueError("ScopeValidator.diff requires a ScopeGenerator")
current = existing_path.read_text(encoding="utf-8") if existing_path.exists() else ""
proposed = self.generator.generate(repo_slug)
current_sections = self._parse_sections(current)
proposed_sections = self._parse_sections(proposed)
sections: list[ScopeDiffSection] = []
for section in SCOPE_SECTIONS:
current_text = current_sections.get(section)
proposed_text = proposed_sections.get(section, "")
if current_text is None:
status = "missing"
elif self._normalize(current_text) == self._normalize(proposed_text):
status = "ok"
else:
status = "stale"
sections.append(
ScopeDiffSection(
section=section,
status=status,
current_text=current_text,
proposed_text=proposed_text,
)
)
return ScopeDiff(sections=sections)
def validate(self, path: Path) -> ValidationResult:
issues: list[ScopeValidationIssue] = []
if not path.exists():
return ValidationResult(
issues=[
ScopeValidationIssue(
check="C5a",
severity="error",
message="SCOPE.md is missing.",
)
]
)
content = path.read_text(encoding="utf-8")
sections = self._parse_sections(content)
missing = [section for section in SCOPE_SECTIONS if section not in sections]
if missing:
severity = "warn" if missing == ["Provided Capabilities"] else "error"
issues.append(
ScopeValidationIssue(
check="C5b",
severity=severity,
message=f"Missing SCOPE.md section(s): {', '.join(missing)}.",
)
)
ordered = self._heading_order(content)
expected_order = [section for section in SCOPE_SECTIONS if section in sections]
if ordered[: len(expected_order)] != expected_order:
issues.append(
ScopeValidationIssue(
check="C5b",
severity="warn",
message="SCOPE.md sections are not in canonical order.",
)
)
capabilities = sections.get("Provided Capabilities")
if capabilities is None:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message="Provided Capabilities section is missing.",
)
)
elif "```capability" in capabilities:
for index, block in enumerate(self._capability_blocks(capabilities), start=1):
keys = self._capability_keys(block)
missing_keys = {"type", "title"} - keys
if missing_keys:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message=(
f"Capability block {index} is missing required field(s): "
f"{', '.join(sorted(missing_keys))}."
),
)
)
elif "No approved capabilities yet" not in capabilities:
issues.append(
ScopeValidationIssue(
check="C5c",
severity="warn",
message=(
"Provided Capabilities has no capability blocks or explicit "
"empty-state note."
),
)
)
return ValidationResult(issues=issues)
def _parse_sections(self, content: str) -> dict[str, str]:
matches = list(re.finditer(r"^##\s+(.+?)\s*$", content, re.MULTILINE))
sections: dict[str, str] = {}
for index, match in enumerate(matches):
title = match.group(1).strip()
start = match.end()
end = matches[index + 1].start() if index + 1 < len(matches) else len(content)
body = content[start:end]
body = re.sub(r"\n---\s*$", "", body.strip())
sections[title] = body.strip()
return sections
def _heading_order(self, content: str) -> list[str]:
return [
match.group(1).strip()
for match in re.finditer(r"^##\s+(.+?)\s*$", content, re.MULTILINE)
if match.group(1).strip() in SCOPE_SECTIONS
]
def _normalize(self, value: str | None) -> str:
if value is None:
return ""
without_comments = re.sub(r"<!--.*?-->", "", value, flags=re.DOTALL)
without_markdown = re.sub(r"[`*_>#-]+", " ", without_comments)
return re.sub(r"\s+", " ", without_markdown).strip().lower()
def _capability_blocks(self, content: str) -> list[str]:
return re.findall(
r"```capability\s*(.*?)```",
content,
flags=re.DOTALL | re.IGNORECASE,
)
def _capability_keys(self, block: str) -> set[str]:
return {
match.group(1)
for match in re.finditer(r"^([A-Za-z_][A-Za-z0-9_-]*):", block, re.MULTILINE)
}

View File

@@ -1,8 +1,11 @@
from __future__ import annotations
import logging
import json
from dataclasses import asdict
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.responses import PlainTextResponse
@@ -13,6 +16,7 @@ from repo_registry.core.service import RegistryService
from repo_registry.llm_extraction import LLMCandidateExtractor, create_llm_connect_adapter
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.semantic import HashingEmbeddingProvider
from repo_registry.scope import ScopeGenerator, ScopeValidator
from repo_registry.storage.sqlite import NotFoundError, RegistryStore
from repo_registry.web_api.schemas import (
AbilityCreate,
@@ -58,6 +62,12 @@ from repo_registry.web_api.schemas import (
)
def slugify(value: str) -> str:
import re
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="REPO_REGISTRY_")
@@ -67,6 +77,7 @@ class Settings(BaseSettings):
llm_provider: str | None = Field(default=None)
llm_model: str | None = Field(default=None)
embedding_provider: str | None = Field(default=None)
state_hub_base_url: str = Field(default="http://127.0.0.1:8000")
log_level: str = Field(default="INFO")
@@ -111,6 +122,7 @@ OPENAPI_TAGS = [
{"name": "analysis", "description": "Repository scans and extracted review inputs."},
{"name": "review", "description": "Candidate graph approval and correction workflow."},
{"name": "registry", "description": "Approved ability maps and manual registry CRUD."},
{"name": "scope", "description": "SCOPE.md generation, diffing, and writing."},
{"name": "search", "description": "Agent-facing discovery endpoints."},
{"name": "discovery", "description": "Comparison, gap analysis, and export helpers."},
]
@@ -1120,6 +1132,144 @@ def export_repository_registry_entry(
return PlainTextResponse(content, media_type="application/x-yaml")
@app.get(
"/repos/{repo_slug}/scope",
tags=["scope"],
response_class=PlainTextResponse,
responses={
200: {
"content": {"text/markdown": {}},
"description": "Generated SCOPE.md preview from approved characteristics.",
}
},
)
def generate_repository_scope(
repo_slug: str,
service: RegistryService = Depends(get_service),
) -> PlainTextResponse:
try:
ensure_scope_generation_ready(service, repo_slug)
content = ScopeGenerator(service).generate(repo_slug)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return PlainTextResponse(content, media_type="text/markdown")
@app.get(
"/repos/{repo_slug}/scope/diff",
tags=["scope"],
)
def diff_repository_scope(
repo_slug: str,
service: RegistryService = Depends(get_service),
settings: Settings = Depends(get_settings),
) -> dict[str, object]:
try:
repository = ensure_scope_generation_ready(service, repo_slug)
scope_path = scope_file_path(service, repository, repo_slug, settings)
diff = ScopeValidator(ScopeGenerator(service)).diff(repo_slug, scope_path)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
return {
"sections": [asdict(section) for section in diff.sections],
"needs_update": diff.needs_update,
}
@app.post(
"/repos/{repo_slug}/scope/write",
tags=["scope"],
)
def write_repository_scope(
repo_slug: str,
service: RegistryService = Depends(get_service),
settings: Settings = Depends(get_settings),
) -> dict[str, object]:
try:
repository = ensure_scope_generation_ready(service, repo_slug)
scope_path = scope_file_path(service, repository, repo_slug, settings)
content = ScopeGenerator(service).generate(repo_slug)
except NotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
scope_path.write_text(content, encoding="utf-8")
return {"written": True, "path": str(scope_path)}
def ensure_scope_generation_ready(
service: RegistryService,
repo_slug: str,
):
repository = repository_by_slug(service, repo_slug)
ability_map = service.ability_map(repository.id)
if not ability_map.abilities:
raise NotFoundError(
f"repository {repo_slug!r} has no approved characteristics"
)
return repository
def repository_by_slug(service: RegistryService, repo_slug: str):
wanted = slugify(repo_slug)
for repository in service.list_repositories():
candidates = {
slugify(repository.name),
slugify(repository.url.rstrip("/").rsplit("/", 1)[-1].removesuffix(".git")),
}
if wanted in candidates:
return repository
raise NotFoundError(f"repository slug {repo_slug!r} was not found")
def scope_file_path(
service: RegistryService,
repository,
repo_slug: str,
settings: Settings,
) -> Path:
state_hub_path = state_hub_scope_file_path(repo_slug, settings)
if state_hub_path is not None:
return state_hub_path
source_path = Path(repository.url)
if source_path.exists() and source_path.is_dir():
return source_path / "SCOPE.md"
checkout = service.ingestion.cached_checkout(repository.url)
if checkout is not None and checkout.source_path.exists():
return checkout.source_path / "SCOPE.md"
raise ValueError(
"repository has no known local checkout path on this host"
)
def state_hub_scope_file_path(repo_slug: str, settings: Settings) -> Path | None:
base_url = settings.state_hub_base_url.rstrip("/")
if not base_url:
return None
try:
with urlopen(f"{base_url}/repos/{repo_slug}/", timeout=2) as response:
repo = json.loads(response.read().decode("utf-8"))
except HTTPError as exc:
if exc.code == 404:
return None
raise ValueError("state hub repository path lookup failed") from exc
except (URLError, TimeoutError, OSError, json.JSONDecodeError):
return None
local_path = repo.get("local_path")
if not local_path:
raise ValueError(
f"state hub repo {repo_slug!r} has no local path on this host"
)
path = Path(local_path)
if path.exists() and path.is_dir():
return path / "SCOPE.md"
raise ValueError(
f"state hub local path for repo {repo_slug!r} is not available: {path}"
)
@app.get(
"/repository-comparisons",
tags=["discovery"],