Files
repo-scoping/src/repo_scoping/candidate_graph/generator.py

1778 lines
66 KiB
Python

from __future__ import annotations
import re
from dataclasses import dataclass, field, replace
from repo_scoping.core.models import ContentChunk, ObservedFact, Repository, SourceReference
@dataclass(frozen=True)
class CandidateEvidenceDraft:
type: str
reference: str
strength: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateFeatureDraft:
name: str
type: str
location: str
confidence: float
source_refs: list[SourceReference]
primary_class: str = ""
attributes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateCapabilityDraft:
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
source_refs: list[SourceReference]
primary_class: str = "capability"
attributes: list[str] = field(default_factory=list)
features: list[CandidateFeatureDraft] = field(default_factory=list)
evidence: list[CandidateEvidenceDraft] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbilityDraft:
name: str
description: str
confidence: float
source_refs: list[SourceReference]
primary_class: str = "ability"
attributes: list[str] = field(default_factory=list)
capabilities: list[CandidateCapabilityDraft] = field(default_factory=list)
REPO_SCOPING_NATIVE_CAPABILITY_SEEDS = [
{
"name": "Register And Track Repositories",
"primary_class": "ingestion",
"attributes": ["metadata", "git", "analysis-run"],
"features": [
(
"Create and update repository records",
"api",
["src/repo_scoping/core/service.py", "src/repo_scoping/web_api/app.py"],
),
(
"Resolve local or remote Git checkouts",
"backend",
["src/repo_scoping/repo_ingestion/git.py", "tests/test_git_ingestion.py"],
),
(
"Import repository metadata",
"backend",
[
"src/repo_scoping/repo_ingestion/metadata.py",
"tests/test_repository_metadata.py",
],
),
],
},
{
"name": "Scan Repositories Into Observed Facts",
"primary_class": "analysis",
"attributes": ["deterministic", "facts", "provenance"],
"features": [
(
"Detect source languages, manifests, docs, tests, config, and interfaces",
"backend",
["src/repo_scoping/repo_scanning/scanner.py", "tests/test_repository_scanner.py"],
),
(
"Classify source roles for facts",
"backend",
["src/repo_scoping/repo_scanning/scanner.py", "docs/characteristic-evidence-model.md"],
),
(
"Preserve analysis snapshots and fact records",
"storage",
["src/repo_scoping/storage/sqlite.py", "migrations/0001_initial.sql"],
),
],
},
{
"name": "Index Source Content With Provenance",
"primary_class": "analysis",
"attributes": ["content-chunks", "source-role"],
"features": [
(
"Create source-linked content chunks from observed facts",
"backend",
["src/repo_scoping/content_indexing/extractor.py", "tests/test_content_indexing.py"],
),
(
"Carry source-role metadata into downstream generation",
"backend",
[
"src/repo_scoping/content_indexing/extractor.py",
"src/repo_scoping/llm_extraction/extractor.py",
],
),
],
},
{
"name": "Generate Reviewable Candidate Characteristics",
"primary_class": "analysis",
"attributes": ["candidate-graph", "review-required"],
"features": [
(
"Build candidate abilities, capabilities, features, and evidence",
"backend",
[
"src/repo_scoping/candidate_graph/generator.py",
"src/repo_scoping/candidate_graph/normalization.py",
"tests/test_candidate_graph.py",
],
),
(
"Optionally map structured LLM extraction into candidates",
"integration",
[
"src/repo_scoping/llm_extraction/extractor.py",
"src/repo_scoping/llm_extraction/mapper.py",
"tests/test_llm_extraction.py",
],
),
],
},
{
"name": "Review And Approve Candidate Characteristics",
"primary_class": "review",
"attributes": ["curation", "approval", "audit"],
"features": [
(
"Edit, reject, merge, and relink candidate graph entries",
"api",
[
"src/repo_scoping/core/service.py",
"src/repo_scoping/web_api/app.py",
"tests/test_registry_service.py",
],
),
(
"Publish approved characteristic maps after review",
"storage",
["src/repo_scoping/core/service.py", "src/repo_scoping/storage/sqlite.py"],
),
(
"Record review decisions and expectation gaps",
"audit",
["src/repo_scoping/core/service.py", "src/repo_scoping/web_api/schemas.py"],
),
],
},
{
"name": "Search Compare And Export Approved Profiles",
"primary_class": "discovery",
"attributes": ["search", "comparison", "export"],
"features": [
(
"Search approved abilities, capabilities, features, and evidence",
"api",
["src/repo_scoping/core/service.py", "tests/test_registry_service.py"],
),
(
"Compare repositories and identify capability gaps",
"api",
["src/repo_scoping/core/service.py", "src/repo_scoping/web_api/app.py"],
),
(
"Export repository profiles",
"api",
["src/repo_scoping/web_api/app.py", "docs/api-contract.md"],
),
],
},
{
"name": "Generate And Maintain SCOPE.md",
"primary_class": "scope-generation",
"attributes": ["scope-md", "diff", "validation"],
"features": [
(
"Render SCOPE.md from approved characteristics",
"backend",
[
"src/repo_scoping/scope/generator.py",
"tests/test_scope_generator.py",
"docs/scope-md-spec.md",
],
),
(
"Diff, validate, and write scope files",
"api",
[
"src/repo_scoping/scope/validator.py",
"src/repo_scoping/web_api/app.py",
],
),
],
},
{
"name": "Explore Dependency And Impact Graphs",
"primary_class": "dependency-analysis",
"attributes": ["graph", "impact", "visualization"],
"features": [
(
"Model dependencies between facts, evidence, features, capabilities, abilities, and scope",
"backend",
[
"src/repo_scoping/core/service.py",
"docs/dependency-aware-scope-propagation.md",
"docs/dependency-visualization-exploration.md",
],
),
(
"Render dependency graph views and profiles",
"ui",
["src/repo_scoping/web_ui/views.py", "tests/test_web_api.py"],
),
],
},
{
"name": "Provide Scope Context To Downstream Agents",
"primary_class": "coordination",
"attributes": ["activity-core", "api-contract"],
"features": [
(
"Return compact JSON scope context by repository slug",
"api",
[
"src/repo_scoping/web_api/app.py",
"docs/schemas/repo-scope-context-response.json",
"tests/test_scope_context_api.py",
],
),
],
},
]
class CandidateGraphGenerator:
"""Build conservative review candidates from observed facts."""
def generate(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk] | None = None,
) -> list[CandidateAbilityDraft]:
if not facts:
return []
chunks = chunks or []
docs = self._facts(facts, "intent") + self._facts(facts, "documentation")
tests = self._facts(facts, "test")
examples = self._facts(facts, "example")
interfaces = self._facts(facts, "interface")
manifests = self._facts(facts, "manifest")
frameworks = self._facts(facts, "framework")
languages = self._facts(facts, "language")
configs = self._facts(facts, "config")
scope_facts = self._facts(facts, "scope")
llm_providers = self._facts(facts, "llm_provider")
credential_configs = self._facts(facts, "credential_config")
provider_registries = self._facts(facts, "provider_registry")
fallback_policies = self._facts(facts, "fallback_policy")
intent_facts = self._facts(facts, "intent")
ability_primary_class, ability_attributes = self._ability_classification(
repository,
facts,
chunks,
)
ability_sources = docs or scope_facts or manifests or languages or configs
ability = CandidateAbilityDraft(
name=self._ability_name(repository, chunks),
description=self._ability_description(chunks),
confidence=self._ability_confidence(
docs=docs,
interfaces=interfaces,
tests=tests,
examples=examples,
frameworks=frameworks,
languages=languages,
),
source_refs=self._source_refs(ability_sources),
primary_class=ability_primary_class,
attributes=ability_attributes,
capabilities=[],
)
capabilities: list[CandidateCapabilityDraft] = []
capabilities.extend(
self._intent_capabilities(intent_facts, chunks, tests, examples, docs)
)
capabilities.extend(
self._scope_capabilities(
scope_facts,
chunks,
tests,
examples,
allow_summary_fallback=not intent_facts,
)
)
capabilities.extend(
self._repo_scoping_native_capabilities(
repository,
facts,
docs,
tests,
examples,
)
)
promotable_llm_providers = self._promotable_llm_facts(llm_providers)
promotable_provider_registries = self._promotable_llm_facts(provider_registries)
promotable_fallback_policies = self._promotable_llm_facts(fallback_policies)
promotable_llm_facts = (
promotable_llm_providers
+ promotable_provider_registries
+ promotable_fallback_policies
)
if promotable_llm_facts:
capabilities.append(
self._llm_provider_capability(
promotable_llm_providers,
credential_configs,
promotable_provider_registries,
promotable_fallback_policies,
tests,
examples,
docs,
)
)
if interfaces and capabilities:
capabilities = self._attach_interface_features(
capabilities,
interfaces,
chunks,
)
elif interfaces:
capabilities.append(
self._interface_capability(interfaces, tests, examples, docs, chunks)
)
if not capabilities:
capabilities.extend(
self._fact_derived_capabilities(
configs=configs,
manifests=manifests,
frameworks=frameworks,
languages=languages,
docs=docs,
tests=tests,
chunks=chunks,
)
)
return [
CandidateAbilityDraft(
name=ability.name,
description=ability.description,
confidence=ability.confidence,
source_refs=ability.source_refs,
primary_class=ability.primary_class,
attributes=ability.attributes,
capabilities=capabilities,
)
]
def _interface_capability(
self,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> CandidateCapabilityDraft:
features = self._interface_features(interfaces, chunks)
return CandidateCapabilityDraft(
name="Expose Repository Interface",
description=self._interface_description(chunks),
inputs=self._interface_inputs(interfaces),
outputs=self._interface_outputs(interfaces),
confidence=self._interface_confidence(
interfaces=interfaces,
tests=tests,
examples=examples,
docs=docs,
),
source_refs=self._source_refs(interfaces),
primary_class="interface",
attributes=self._interface_attributes(interfaces, docs, chunks),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _llm_provider_capability(
self,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> CandidateCapabilityDraft:
provider_names = sorted({fact.name for fact in providers})
provider_summary = ", ".join(provider_names) if provider_names else "LLM providers"
features = [
CandidateFeatureDraft(
name=f"Use {provider} Models",
type="integration",
location=self._grouped_location(
[fact for fact in providers if fact.name == provider]
),
confidence=0.75,
source_refs=self._source_refs(
[fact for fact in providers if fact.name == provider]
),
primary_class="integration",
attributes=["llm-provider", provider.lower()],
)
for provider in provider_names
]
if credentials:
features.append(
CandidateFeatureDraft(
name="Configure LLM Provider Credentials",
type="configuration",
location=self._grouped_location(credentials),
confidence=0.7,
source_refs=self._source_refs(credentials),
primary_class="configuration",
attributes=["credential", "llm-provider"],
)
)
if registries:
features.append(
CandidateFeatureDraft(
name="Maintain LLM Provider Registry",
type="backend",
location=self._grouped_location(registries),
confidence=0.65,
source_refs=self._source_refs(registries),
primary_class="backend",
attributes=["provider-registry", "llm-provider"],
)
)
if fallback_policies:
features.append(
CandidateFeatureDraft(
name="Apply LLM Provider Fallback Policy",
type="backend",
location=self._grouped_location(fallback_policies),
confidence=0.6,
source_refs=self._source_refs(fallback_policies),
primary_class="backend",
attributes=["fallback-policy", "llm-provider"],
)
)
return CandidateCapabilityDraft(
name="Route LLM Requests Across Providers",
description=(
"Expose or configure model-provider integrations detected from "
f"source-linked provider hints: {provider_summary}."
),
inputs=["LLM request", "provider configuration"],
outputs=["provider-specific model response"],
confidence=self._llm_provider_confidence(
providers=providers,
credentials=credentials,
registries=registries,
fallback_policies=fallback_policies,
docs=docs,
),
source_refs=self._source_refs(
providers + credentials + registries + fallback_policies
),
primary_class="llm-integration",
attributes=self._llm_provider_attributes(
providers,
credentials,
registries,
fallback_policies,
) + self._utility_relationship_attributes(
providers + credentials + registries + fallback_policies
),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _intent_capabilities(
self,
intent_facts: list[ObservedFact],
chunks: list[ContentChunk],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateCapabilityDraft]:
intent_chunks = [
chunk
for chunk in chunks
if chunk.kind == "intent"
and (
chunk.metadata.get("source_role") == "intent_summary"
or chunk.path.lower().endswith("intent.md")
)
]
if not intent_chunks:
return []
source_refs = self._source_refs(intent_facts)
capabilities: list[CandidateCapabilityDraft] = []
seen: set[str] = set()
for item in self._intent_capability_items(intent_chunks):
name = self._intent_capability_name(item)
key = name.lower()
if not name or key in seen:
continue
seen.add(key)
capabilities.append(
CandidateCapabilityDraft(
name=name,
description=(
"Reviewable intended capability extracted from repository "
f"intent: {item}"
),
inputs=[],
outputs=[name],
confidence=self._confidence(
0.45,
[
(0.15, bool(source_refs)),
(0.10, bool(tests)),
(0.05, bool(examples)),
(0.05, bool(docs)),
],
),
source_refs=source_refs,
primary_class="intent-capability",
attributes=[
"intent-derived",
"utility-owned",
"review-required-intent",
],
evidence=self._evidence(tests, examples, docs),
)
)
return capabilities
def _intent_capability_items(self, chunks: list[ContentChunk]) -> list[str]:
items: list[str] = []
in_capability_section = False
for chunk in sorted(chunks, key=lambda item: (item.path, item.start_line)):
for raw_line in chunk.text.splitlines():
line = raw_line.strip()
if not line:
continue
if line.startswith("#"):
heading = line.lstrip("#").strip().lower()
in_capability_section = (
"capabilit" in heading
or heading in {"primary utility", "core utility"}
)
continue
if not in_capability_section:
continue
item = re.sub(r"^(?:[-*]|\d+[.)])\s+", "", line).strip()
item = re.sub(r"^(?:capability|intended capability)\s*:\s*", "", item, flags=re.I)
if item and item != line or raw_line.lstrip().startswith(("-", "*")):
items.append(item)
return items
def _intent_capability_name(self, text: str) -> str:
lowered = re.sub(r"[*_`]", "", text.lower())
if "continuous connectivity" in lowered and "remote systems" in lowered:
return "Maintain Continuous Connectivity Between Remote Systems And Central Hub"
if "observable" in lowered and "auditable" in lowered and "controllable" in lowered:
return "Make Connectivity Observable Auditable And Controllable"
if "cli tool" in lowered and "mcp" in lowered:
return "Expose CLI And MCP Accessible Service"
candidate = re.split(r"\s+-\s+|\s*:\s*|[.!?]\s+", text.strip(), maxsplit=1)[0]
candidate = candidate.strip(" .:-")
if not candidate:
return ""
words = candidate.split()
if words:
words[0] = self._imperative_verb(words[0])
while words and words[-1].lower().strip(",;:") in {"a", "an", "the", "and", "or", "as", "both"}:
words.pop()
return self._title_from_words(words[:10])
def _scope_capabilities(
self,
scope_facts: list[ObservedFact],
chunks: list[ContentChunk],
tests: list[ObservedFact],
examples: list[ObservedFact],
*,
allow_summary_fallback: bool = True,
) -> list[CandidateCapabilityDraft]:
scope_chunks = [
chunk
for chunk in chunks
if chunk.kind == "scope"
or chunk.metadata.get("source_role") == "derived_scope"
or chunk.path.lower().endswith("scope.md")
]
if not scope_chunks:
return []
source_refs = self._source_refs(scope_facts)
capabilities: list[CandidateCapabilityDraft] = []
seen: set[str] = set()
for block in self._scope_capability_blocks(scope_chunks):
title = block.get("title", "").strip()
if not title:
continue
key = title.lower()
if key in seen:
continue
seen.add(key)
capability_type = block.get("type", "scope-derived").strip() or "scope-derived"
description = block.get("description", "").strip()
keywords = self._scope_keywords(block.get("keywords", ""))
attributes = self._unique(
[
capability_type,
*keywords,
"scope-derived",
"current-state",
"review-required-scope",
]
)
feature = CandidateFeatureDraft(
name=title,
type=capability_type,
location="SCOPE.md",
confidence=0.55,
source_refs=source_refs,
primary_class=capability_type,
attributes=self._unique(
[capability_type, "scope-defined", "review-required-scope"]
),
)
capabilities.append(
CandidateCapabilityDraft(
name=title,
description=(
"Reviewable current-state capability extracted from "
f"SCOPE.md: {description or title}"
),
inputs=[],
outputs=[title],
confidence=self._confidence(
0.45,
[
(0.10, bool(description)),
(0.05, bool(keywords)),
(0.05, bool(tests)),
(0.05, bool(examples)),
],
),
source_refs=source_refs,
primary_class=capability_type,
attributes=attributes,
features=[feature],
evidence=[
CandidateEvidenceDraft(
type="scope-current-state",
reference="SCOPE.md",
strength="medium",
source_refs=source_refs,
)
],
)
)
if capabilities or not allow_summary_fallback:
return capabilities
fallback_name = self._scope_summary_capability_name(scope_chunks)
if not fallback_name:
return []
return [
CandidateCapabilityDraft(
name=fallback_name,
description=(
"Reviewable current-state capability inferred from SCOPE.md "
"summary text. A curator should split this into more precise "
"capabilities when reviewing."
),
inputs=[],
outputs=[fallback_name],
confidence=0.45,
source_refs=source_refs,
primary_class="scope-derived",
attributes=[
"scope-derived",
"current-state",
"review-required-scope",
],
evidence=[
CandidateEvidenceDraft(
type="scope-current-state",
reference="SCOPE.md",
strength="weak",
source_refs=source_refs,
)
],
)
]
def _scope_capability_blocks(
self,
chunks: list[ContentChunk],
) -> list[dict[str, str]]:
blocks: list[dict[str, str]] = []
in_block = False
current: dict[str, str] = {}
current_key = ""
for chunk in sorted(chunks, key=lambda item: (item.path, item.start_line)):
for raw_line in chunk.text.splitlines():
line = raw_line.rstrip()
stripped = line.strip()
if stripped.startswith("```capability"):
in_block = True
current = {}
current_key = ""
continue
if in_block and stripped.startswith("```"):
if current:
blocks.append(current)
in_block = False
current = {}
current_key = ""
continue
if not in_block:
continue
key, separator, value = stripped.partition(":")
if separator and re.match(r"^[A-Za-z_][A-Za-z0-9_-]*$", key):
current_key = key.lower()
current[current_key] = value.strip().strip('"')
elif current_key and stripped:
current[current_key] = (
f"{current[current_key]} {stripped.strip()}"
).strip()
return blocks
def _scope_keywords(self, value: str) -> list[str]:
cleaned = value.strip()
if cleaned.startswith("[") and cleaned.endswith("]"):
cleaned = cleaned[1:-1]
return [
item.strip(" `\"'")
for item in cleaned.split(",")
if item.strip(" `\"'")
][:8]
def _scope_summary_capability_name(self, chunks: list[ContentChunk]) -> str:
one_liner = self._scope_one_liner(chunks)
if one_liner:
return self._imperative_purpose(one_liner)
return ""
def _fact_derived_capabilities(
self,
*,
configs: list[ObservedFact],
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
docs: list[ObservedFact],
tests: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateCapabilityDraft]:
if not configs:
return []
capability_facts = configs + manifests + frameworks + languages
if not capability_facts:
return []
features: list[CandidateFeatureDraft] = []
for label, kind, facts in (
("Manage Repository Configuration", "configuration", configs),
("Declare Runtime And Package Manifests", "manifest", manifests),
("Use Detected Frameworks", "framework", frameworks),
("Provide Implementation In Detected Languages", "implementation", languages),
):
if not facts:
continue
features.append(
CandidateFeatureDraft(
name=label,
type=kind,
location=self._grouped_location(facts),
confidence=0.45,
source_refs=self._source_refs(facts),
primary_class=kind,
attributes=[kind, "fact-derived", "review-required"],
)
)
if not features:
return []
name = self._fact_derived_capability_name(chunks, features)
return [
CandidateCapabilityDraft(
name=name,
description=(
"Reviewable capability inferred from deterministic facts. "
"This fills the hierarchy when no stronger intent, scope "
"capability, or interface candidate exists."
),
inputs=self._feature_inputs(features),
outputs=self._feature_outputs(features),
confidence=self._confidence(
0.35,
[
(0.10, bool(configs)),
(0.10, bool(manifests)),
(0.05, bool(frameworks)),
(0.05, bool(tests)),
(0.05, bool(docs)),
],
),
source_refs=self._source_refs(capability_facts),
primary_class="fact-derived",
attributes=["fact-derived", "review-required", "partial-hierarchy"],
features=features,
evidence=self._evidence(tests, [], docs),
)
]
def _fact_derived_capability_name(
self,
chunks: list[ContentChunk],
features: list[CandidateFeatureDraft],
) -> str:
scope_name = self._scope_summary_capability_name(chunks)
if scope_name:
return scope_name
if any(feature.type == "configuration" for feature in features):
return "Manage Repository Configuration"
if any(feature.type == "manifest" for feature in features):
return "Declare Repository Runtime"
return "Describe Repository Implementation"
def _repo_scoping_native_capabilities(
self,
repository: Repository,
facts: list[ObservedFact],
docs: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
) -> list[CandidateCapabilityDraft]:
if not self._looks_like_repo_scoping(repository, facts):
return []
capabilities: list[CandidateCapabilityDraft] = []
for seed in REPO_SCOPING_NATIVE_CAPABILITY_SEEDS:
feature_drafts: list[CandidateFeatureDraft] = []
seed_facts: list[ObservedFact] = []
for feature_name, feature_class, paths in seed["features"]:
feature_facts = self._facts_for_paths(facts, paths)
if not feature_facts:
continue
seed_facts.extend(feature_facts)
feature_drafts.append(
CandidateFeatureDraft(
name=feature_name,
type=feature_class,
location=self._grouped_location(feature_facts),
confidence=0.7,
source_refs=self._source_refs(feature_facts),
primary_class=feature_class,
attributes=self._unique(
[feature_class, "source-linked", "repo-owned"]
),
)
)
seed_facts = self._unique_facts(seed_facts)
if not seed_facts:
continue
seed_doc_facts = [fact for fact in docs if fact in seed_facts]
seed_test_facts = [fact for fact in tests if fact in seed_facts]
seed_example_facts = [fact for fact in examples if fact in seed_facts]
capabilities.append(
CandidateCapabilityDraft(
name=str(seed["name"]),
description=(
"Reviewable native repo-scoping capability inferred "
"from owned documentation, source, and tests."
),
inputs=[],
outputs=[str(seed["name"])],
confidence=self._confidence(
0.45,
[
(0.10, bool(seed_doc_facts)),
(0.10, bool(seed_test_facts)),
(0.05, bool(seed_example_facts)),
(0.05, len(feature_drafts) > 1),
],
),
source_refs=self._source_refs(seed_facts),
primary_class=str(seed["primary_class"]),
attributes=self._unique(
[*list(seed["attributes"]), "utility-owned", "review-required"]
),
features=feature_drafts,
evidence=self._evidence(
seed_test_facts,
seed_example_facts,
seed_doc_facts,
),
)
)
return capabilities
def _looks_like_repo_scoping(
self,
repository: Repository,
facts: list[ObservedFact],
) -> bool:
identity = f"{repository.name} {repository.url} {repository.description or ''}".lower()
if "repo-scoping" in identity or "repository scoping" in identity:
return True
return any(fact.path.startswith("src/repo_scoping/") for fact in facts)
def _facts_for_paths(
self,
facts: list[ObservedFact],
paths: list[str],
) -> list[ObservedFact]:
matched: list[ObservedFact] = []
for fact in facts:
if any(fact.path == path or fact.path.startswith(f"{path}/") for path in paths):
matched.append(fact)
return self._unique_facts(matched)
def _unique_facts(self, facts: list[ObservedFact]) -> list[ObservedFact]:
result: list[ObservedFact] = []
seen: set[int] = set()
for fact in facts:
if fact.id in seen:
continue
seen.add(fact.id)
result.append(fact)
return result
def _attach_interface_features(
self,
capabilities: list[CandidateCapabilityDraft],
interfaces: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateCapabilityDraft]:
features = self._interface_features(interfaces, chunks)
if not features:
return capabilities
capability_features: dict[int, list[CandidateFeatureDraft]] = {
index: [] for index, _ in enumerate(capabilities)
}
for feature in features:
index = self._best_feature_capability_index(feature, capabilities)
capability_features[index].append(feature)
updated: list[CandidateCapabilityDraft] = []
for index, capability in enumerate(capabilities):
attached = capability_features[index]
if not attached:
updated.append(capability)
continue
updated.append(
replace(
capability,
inputs=capability.inputs or self._feature_inputs(attached),
outputs=capability.outputs or self._feature_outputs(attached),
features=[*capability.features, *attached],
)
)
return updated
def _best_feature_capability_index(
self,
feature: CandidateFeatureDraft,
capabilities: list[CandidateCapabilityDraft],
) -> int:
feature_text = f"{feature.name} {feature.type} {feature.location}".lower()
feature_terms = self._significant_terms(feature_text)
best_index = 0
best_score = -1
for index, capability in enumerate(capabilities):
capability_text = " ".join(
[
capability.name,
capability.description,
" ".join(capability.outputs),
" ".join(capability.attributes),
]
).lower()
capability_terms = self._significant_terms(capability_text)
score = len(feature_terms & capability_terms)
if feature.type == "CLI" and any(
token in capability_text for token in ("cli", "command", "mcp")
):
score += 3
if feature.type == "API" and any(
token in capability_text for token in ("api", "http", "service")
):
score += 3
if score > best_score:
best_index = index
best_score = score
return best_index
def _interface_features(
self,
interfaces: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateFeatureDraft]:
by_type: dict[str, list[ObservedFact]] = {}
for fact in interfaces:
by_type.setdefault(self._feature_type(fact), []).append(fact)
features: list[CandidateFeatureDraft] = []
for feature_type, facts in sorted(by_type.items()):
if len(facts) == 1:
fact = facts[0]
features.append(
CandidateFeatureDraft(
name=self._feature_name(fact, chunks),
type=feature_type,
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
primary_class=feature_type,
attributes=self._feature_attributes(feature_type, [fact]),
)
)
continue
features.append(
CandidateFeatureDraft(
name=self._grouped_interface_feature_name(
feature_type,
facts,
chunks,
),
type=feature_type,
location=self._grouped_location(facts),
confidence=self._grouped_interface_confidence(facts),
source_refs=self._source_refs(facts),
primary_class=feature_type,
attributes=self._feature_attributes(feature_type, facts),
)
)
return features
def _grouped_interface_feature_name(
self,
feature_type: str,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
summary = self._grouped_interface_summary(facts, chunks)
if feature_type == "API":
return f"HTTP API surface: {summary}"
if feature_type == "CLI":
return f"CLI command surface: {summary}"
return f"Callable interface surface: {summary}"
def _grouped_interface_summary(
self,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
names = [self._feature_name(fact, chunks) for fact in facts]
compact_names = self._unique([name for name in names if name])
if not compact_names:
return f"{len(facts)} entry points"
visible = compact_names[:3]
suffix = f", +{len(compact_names) - 3} more" if len(compact_names) > 3 else ""
return f"{', '.join(visible)}{suffix}"
def _grouped_location(self, facts: list[ObservedFact]) -> str:
paths = sorted({fact.path for fact in facts if fact.path})
if not paths:
return ""
if len(paths) == 1:
return paths[0]
return "multiple files"
def _grouped_interface_confidence(self, facts: list[ObservedFact]) -> float:
valued = sum(1 for fact in facts if fact.value)
return 0.7 if valued == len(facts) else 0.55
def _evidence(
self,
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateEvidenceDraft]:
evidence: list[CandidateEvidenceDraft] = []
for fact in tests:
evidence.append(
CandidateEvidenceDraft(
type="test",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in examples:
evidence.append(
CandidateEvidenceDraft(
type="example",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in docs:
evidence.append(
CandidateEvidenceDraft(
type="documentation",
reference=fact.path,
strength="medium",
source_refs=self._source_refs([fact]),
)
)
return evidence
def _feature_type(self, fact: ObservedFact) -> str:
lower = f"{fact.name} {fact.path} {fact.value}".lower()
if "cli" in lower or "command" in lower:
return "CLI"
if "api" in lower or "route" in lower or "@app." in lower or "@router." in lower:
return "API"
return "interface"
def _ability_classification(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> tuple[str, list[str]]:
text = " ".join(
[
repository.name,
repository.description or "",
" ".join(
chunk.text[:600]
for chunk in chunks
if chunk.kind in {"intent", "documentation"}
and chunk.metadata.get("source_role") != "agent_guidance"
),
" ".join(
f"{fact.kind} {fact.name} {fact.value}"
for fact in facts
if not (
fact.kind == "llm_provider"
and self._utility_relationship(fact)
not in {"facade", "adapter"}
)
),
]
).lower()
attributes: list[str] = []
if any(token in text for token in ("ssh", "tunnel", "reverse tunnel", "remote access", "connectivity")):
attributes.extend(["remote-access", "connectivity"])
if any(token in text for token in ("audit", "health check", "lifecycle", "ops", "operator")):
attributes.append("operations")
return "it-operations", self._unique(attributes)
if any(token in text for token in ("ability", "capability", "feature")):
return "repository-intelligence", self._unique(attributes + ["capability-mapping"])
promotable_llm = any(
fact.kind == "llm_provider"
and self._utility_relationship(fact) in {"owned", "facade", "adapter"}
for fact in facts
)
if promotable_llm:
return "ai-integration", self._unique(attributes + ["llm-provider"])
if any(fact.kind == "interface" for fact in facts):
attributes.append("interface")
return "developer-tooling", self._unique(attributes)
def _interface_attributes(
self,
interfaces: list[ObservedFact],
docs: list[ObservedFact] | None = None,
chunks: list[ContentChunk] | None = None,
) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
attributes = ["api" if item == "API" else "cli" if item == "CLI" else "callable" for item in feature_types]
utility = self._interface_utility_relationship(docs or [], chunks or [])
return self._unique(["surface", *attributes, f"utility-{utility}"])
def _interface_utility_relationship(
self,
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
doc_paths = {fact.path for fact in docs}
text = " ".join(
chunk.text.lower()
for chunk in chunks
if chunk.path in doc_paths
and chunk.kind in {"intent", "documentation"}
and chunk.metadata.get("source_role") != "derived_scope"
)
if any(token in text for token in ("facade", "proxy", "wrapper", "wraps ")):
return "facade"
return "owned"
def _feature_attributes(
self,
feature_type: str,
facts: list[ObservedFact],
) -> list[str]:
attributes = [feature_type]
if feature_type == "API":
attributes.extend(["surface", "http"])
elif feature_type == "CLI":
attributes.extend(["surface", "command"])
else:
attributes.append("surface")
paths = " ".join(fact.path.lower() for fact in facts)
if "test" in paths:
attributes.append("test-linked")
return self._unique(attributes)
def _structure_attributes(
self,
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
) -> list[str]:
return self._unique(
[
"manifest" if manifests else "",
*[fact.name for fact in frameworks],
*[fact.name for fact in languages],
"utility-dependency" if manifests or frameworks else "",
"utility-tooling" if languages and not (manifests or frameworks) else "",
"review-required-structural-context",
]
)
def _llm_provider_attributes(
self,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
) -> list[str]:
return self._unique(
[
"llm-provider",
*[fact.name.lower() for fact in providers],
"credential" if credentials else "",
"provider-registry" if registries else "",
"fallback-policy" if fallback_policies else "",
]
)
def _unique(self, values: list[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = value.strip()
key = item.lower()
if not item or key in seen:
continue
seen.add(key)
result.append(item)
return result
def _significant_terms(self, text: str) -> set[str]:
stop_words = {
"and",
"the",
"this",
"that",
"with",
"from",
"into",
"for",
"capability",
"repository",
"service",
}
return {
term
for term in re.findall(r"[a-z0-9]+", text.lower())
if len(term) > 2 and term not in stop_words
}
def _interface_inputs(self, interfaces: list[ObservedFact]) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
inputs: list[str] = []
if "API" in feature_types:
inputs.append("HTTP request")
if "CLI" in feature_types:
inputs.append("CLI arguments")
if not inputs:
inputs.append("caller input")
return inputs
def _interface_outputs(self, interfaces: list[ObservedFact]) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
outputs: list[str] = []
if "API" in feature_types:
outputs.append("HTTP response")
if "CLI" in feature_types:
outputs.append("command output")
if not outputs:
outputs.append("callable interface result")
return outputs
def _feature_inputs(self, features: list[CandidateFeatureDraft]) -> list[str]:
feature_types = {feature.type for feature in features}
inputs: list[str] = []
if "API" in feature_types:
inputs.append("HTTP request")
if "CLI" in feature_types:
inputs.append("CLI arguments")
if not inputs:
inputs.append("caller input")
return inputs
def _feature_outputs(self, features: list[CandidateFeatureDraft]) -> list[str]:
feature_types = {feature.type for feature in features}
outputs: list[str] = []
if "API" in feature_types:
outputs.append("HTTP response")
if "CLI" in feature_types:
outputs.append("command output")
if not outputs:
outputs.append("callable interface result")
return outputs
def _feature_name(self, fact: ObservedFact, chunks: list[ContentChunk]) -> str:
route_name = self._route_feature_name(fact.value)
if route_name:
return route_name
if self._feature_type(fact) == "CLI":
function_name = self._function_name_near_fact(fact, chunks)
if function_name:
return f"CLI command {function_name}"
return fact.value or fact.name
def _route_feature_name(self, value: str) -> str:
match = re.search(r"@(?:app|router)\.(get|post|put|patch|delete)\((['\"])(.*?)\2", value)
if match is None:
return ""
method = match.group(1).upper()
path = match.group(3)
return f"{method} {path}"
def _function_name_near_fact(
self,
fact: ObservedFact,
chunks: list[ContentChunk],
) -> str:
line = fact.metadata.get("line")
for chunk in chunks:
if chunk.path != fact.path or chunk.kind != "interface":
continue
if isinstance(line, int) and not (chunk.start_line <= line <= chunk.end_line):
continue
match = re.search(r"^\s*def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\(", chunk.text, re.MULTILINE)
if match is not None:
return match.group(1)
return ""
def _ability_confidence(
self,
*,
docs: list[ObservedFact],
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(docs)),
(0.15, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(frameworks)),
(0.05, bool(languages)),
],
)
def _interface_confidence(
self,
*,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.30,
[
(0.20, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(docs)),
(0.05, len(interfaces) > 1),
],
)
def _structure_confidence(
self,
*,
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(manifests)),
(0.15, bool(frameworks)),
(0.10, bool(languages)),
(0.05, bool(docs)),
],
)
def _llm_provider_confidence(
self,
*,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.35,
[
(0.20, bool(providers)),
(0.10, len({fact.name for fact in providers}) > 1),
(0.10, bool(credentials)),
(0.10, bool(registries)),
(0.10, bool(fallback_policies)),
(0.05, bool(docs)),
],
)
def _confidence(
self,
base: float,
factors: list[tuple[float, bool]],
) -> float:
score = base + sum(weight for weight, applies in factors if applies)
return min(1.0, round(score, 2))
def _ability_description(self, chunks: list[ContentChunk]) -> str:
doc_summary = self._document_summary(chunks)
if doc_summary:
return (
"Candidate repository purpose inferred from repository content: "
f"{doc_summary} Review is required before treating this as an "
"approved domain ability."
)
return (
"Candidate repository purpose inferred from observed repository "
"documentation, manifests, languages, and interfaces. Review is "
"required before treating this as an approved domain ability."
)
def _ability_name(
self,
repository: Repository,
chunks: list[ContentChunk],
) -> str:
ops_name = self._operations_ability_name(chunks)
if ops_name:
return ops_name
purpose_text = (
self._intent_purpose_sentence(chunks)
or self._scope_one_liner(chunks)
or self._documentation_purpose_sentence(chunks)
or repository.description
)
if purpose_text:
normalized = self._imperative_purpose(purpose_text)
if normalized:
return normalized
return f"Support {self._humanize_identifier(repository.name)}"
def _intent_purpose_sentence(self, chunks: list[ContentChunk]) -> str:
return self._purpose_sentence_for_chunks(
[
chunk
for chunk in self._purpose_chunks(chunks)
if chunk.kind == "intent"
or chunk.metadata.get("source_role") == "intent_summary"
or chunk.path.lower().endswith("intent.md")
]
)
def _documentation_purpose_sentence(self, chunks: list[ContentChunk]) -> str:
return self._purpose_sentence_for_chunks(
[
chunk
for chunk in self._purpose_chunks(chunks)
if chunk.kind == "documentation"
and chunk.metadata.get("source_role") != "derived_scope"
and not chunk.path.lower().endswith("scope.md")
]
)
def _purpose_sentence_for_chunks(self, chunks: list[ContentChunk]) -> str:
for chunk in chunks:
if chunk.kind not in {"intent", "documentation"}:
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
paragraph = next((line for line in lines if not line.startswith("#")), "")
if paragraph and not self._is_template_boilerplate(paragraph):
return paragraph
return ""
def _scope_one_liner(self, chunks: list[ContentChunk]) -> str:
for chunk in sorted(chunks, key=lambda item: (item.path, item.start_line)):
if not (
chunk.kind == "scope"
or chunk.metadata.get("source_role") == "derived_scope"
or chunk.path.lower().endswith("scope.md")
):
continue
lines = chunk.text.splitlines()
for index, raw_line in enumerate(lines):
if raw_line.strip().lower() == "## one-liner":
for following in lines[index + 1 :]:
candidate = following.strip()
if not candidate or candidate.startswith("---"):
continue
if candidate.startswith(">"):
continue
return candidate.strip(" .")
before_first_section: list[str] = []
for raw_line in lines:
candidate = raw_line.strip()
if candidate.startswith("## "):
break
before_first_section.append(candidate)
for candidate in before_first_section:
if (
candidate
and not candidate.startswith("#")
and not candidate.startswith(">")
and not candidate.startswith("---")
and not self._is_template_boilerplate(candidate)
):
return candidate.strip(" .")
return ""
def _is_template_boilerplate(self, text: str) -> bool:
lowered = text.lower()
return (
"git repository template to bootstrap" in lowered
or "this file helps you quickly understand" in lowered
or "intentionally lightweight and may be incomplete" in lowered
)
def _purpose_chunks(self, chunks: list[ContentChunk]) -> list[ContentChunk]:
def priority(chunk: ContentChunk) -> tuple[int, str, int]:
role = chunk.metadata.get("source_role")
path = chunk.path.lower()
if role == "intent_summary" or path.endswith("intent.md"):
return (0, path, chunk.start_line)
if role == "derived_scope" or path.endswith("scope.md"):
return (1, path, chunk.start_line)
if role == "product_documentation" or path.startswith("readme"):
return (2, path, chunk.start_line)
return (3, path, chunk.start_line)
return sorted(
[
chunk
for chunk in chunks
if chunk.kind in {"intent", "documentation", "scope"}
and chunk.metadata.get("source_role") != "agent_guidance"
],
key=priority,
)
def _operations_ability_name(self, chunks: list[ContentChunk]) -> str:
text = " ".join(
chunk.text
for chunk in self._documentation_chunks(chunks)
if chunk.kind == "intent"
).lower()
if "ssh reverse tunnel" in text or "ssh reverse tunneling" in text:
return "Manage SSH Reverse Tunnel Connectivity"
return ""
def _imperative_purpose(self, text: str) -> str:
cleaned = re.sub(r"\s+", " ", text.strip())
cleaned = re.split(r"[.!?]\s+", cleaned, maxsplit=1)[0]
cleaned = re.sub(
r"(?i)^this\s+repository\s+exists\s+to\s+provide\s+(?:an?\s+)?",
"Provide ",
cleaned,
)
cleaned = re.sub(r"^[A-Z][A-Za-z0-9_-]*\s+(?:is|provides|offers)\s+", "", cleaned)
cleaned = cleaned.strip(" .:-")
if not cleaned:
return ""
words = cleaned.split()
if not words:
return ""
words[0] = self._imperative_verb(words[0])
return self._title_from_words(words[:10])
def _imperative_verb(self, word: str) -> str:
if word.isupper():
return word
lower = word.lower().strip(",;:")
irregular = {
"does": "do",
"has": "have",
"is": "be",
}
if lower in irregular:
return irregular[lower]
if lower in {"this"}:
return lower
if lower.endswith("ies") and len(lower) > 4:
return f"{lower[:-3]}y"
if lower.endswith(("des", "ses", "tes", "ves", "zes")) and len(lower) > 4:
return lower[:-1]
if lower.endswith("es") and len(lower) > 3:
return lower[:-2]
if lower.endswith("s") and len(lower) > 3:
return lower[:-1]
return lower
def _title_from_words(self, words: list[str]) -> str:
cleaned_words = [
re.sub(r"[^A-Za-z0-9_/{}-]", "", word)
for word in words
]
return " ".join(
word if word.isupper() else word[:1].upper() + word[1:]
for word in cleaned_words
if word
)
def _humanize_identifier(self, value: str) -> str:
spaced = re.sub(r"[_-]+", " ", value)
spaced = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", " ", spaced)
return self._title_from_words(spaced.split())
def _interface_description(self, chunks: list[ContentChunk]) -> str:
interface_summary = self._interface_summary(chunks)
if interface_summary:
return (
"Expose one or more likely user-facing API or CLI entry points. "
f"Source context: {interface_summary} Review is required to name "
"the concrete domain behavior."
)
return (
"Expose one or more likely user-facing API or CLI entry points. "
"Review is required to name the concrete domain behavior."
)
def _document_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in self._documentation_chunks(chunks):
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
if chunk.kind == "scope" or chunk.metadata.get("source_role") == "derived_scope":
one_liner = self._scope_one_liner([chunk])
if one_liner:
return f"SCOPE. {one_liner}"
heading = next((line.lstrip("#").strip() for line in lines if line.startswith("#")), "")
paragraph = next((line for line in lines if not line.startswith("#")), "")
if self._is_template_boilerplate(paragraph):
paragraph = ""
if heading and paragraph:
return f"{heading}. {paragraph}"
return heading or paragraph
return ""
def _documentation_chunks(self, chunks: list[ContentChunk]) -> list[ContentChunk]:
def priority(chunk: ContentChunk) -> tuple[int, str, int]:
role = chunk.metadata.get("source_role")
path = chunk.path.lower()
if chunk.kind == "intent" or role == "intent_summary" or path.endswith("intent.md"):
return (0, path, chunk.start_line)
if chunk.kind == "scope" or role == "derived_scope" or path.endswith("scope.md"):
return (1, path, chunk.start_line)
return (2, path, chunk.start_line)
return sorted(
[
chunk
for chunk in chunks
if chunk.kind in {"intent", "documentation", "scope"}
and chunk.metadata.get("source_role") != "agent_guidance"
],
key=priority,
)
def _interface_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in chunks:
if chunk.kind != "interface":
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
return " ".join(lines[:3])
return ""
def _facts(self, facts: list[ObservedFact], kind: str) -> list[ObservedFact]:
return [fact for fact in facts if fact.kind == kind]
def _promotable_llm_facts(self, facts: list[ObservedFact]) -> list[ObservedFact]:
return [
fact
for fact in facts
if self._utility_relationship(fact) in {"facade", "adapter"}
]
def _utility_relationship(self, fact: ObservedFact) -> str:
relationship = fact.metadata.get("utility_relationship")
if isinstance(relationship, str) and relationship:
return relationship
source_role = fact.metadata.get("source_role")
if source_role == "implementation_source":
lower_path = fact.path.lower()
if "adapter" in lower_path or "provider" in lower_path:
return "adapter"
return "owned"
if source_role == "configuration":
return "configure"
if source_role == "dependency_declaration":
return "dependency"
if source_role in {"agent_guidance", "ci_tooling"}:
return "tooling"
if not source_role and fact.path.lower().endswith((".py", ".ts", ".js")):
return "owned"
return "mention"
def _utility_relationship_attributes(self, facts: list[ObservedFact]) -> list[str]:
relationships = sorted({self._utility_relationship(fact) for fact in facts})
return [f"utility-{relationship}" for relationship in relationships]
def _source_refs(self, facts: list[ObservedFact]) -> list[SourceReference]:
return [
SourceReference(
fact_id=fact.id,
path=fact.path,
kind=fact.kind,
name=fact.name,
line=fact.metadata.get("line"),
)
for fact in facts
]