Files
repo-scoping/src/repo_registry/candidate_graph/generator.py

1095 lines
40 KiB
Python

from __future__ import annotations
import re
from dataclasses import dataclass, field, replace
from repo_registry.core.models import ContentChunk, ObservedFact, Repository, SourceReference
@dataclass(frozen=True)
class CandidateEvidenceDraft:
type: str
reference: str
strength: str
source_refs: list[SourceReference]
@dataclass(frozen=True)
class CandidateFeatureDraft:
name: str
type: str
location: str
confidence: float
source_refs: list[SourceReference]
primary_class: str = ""
attributes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateCapabilityDraft:
name: str
description: str
inputs: list[str]
outputs: list[str]
confidence: float
source_refs: list[SourceReference]
primary_class: str = "capability"
attributes: list[str] = field(default_factory=list)
features: list[CandidateFeatureDraft] = field(default_factory=list)
evidence: list[CandidateEvidenceDraft] = field(default_factory=list)
@dataclass(frozen=True)
class CandidateAbilityDraft:
name: str
description: str
confidence: float
source_refs: list[SourceReference]
primary_class: str = "ability"
attributes: list[str] = field(default_factory=list)
capabilities: list[CandidateCapabilityDraft] = field(default_factory=list)
class CandidateGraphGenerator:
"""Build conservative review candidates from observed facts."""
def generate(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk] | None = None,
) -> list[CandidateAbilityDraft]:
if not facts:
return []
chunks = chunks or []
docs = self._facts(facts, "intent") + self._facts(facts, "documentation")
tests = self._facts(facts, "test")
examples = self._facts(facts, "example")
interfaces = self._facts(facts, "interface")
manifests = self._facts(facts, "manifest")
frameworks = self._facts(facts, "framework")
languages = self._facts(facts, "language")
llm_providers = self._facts(facts, "llm_provider")
credential_configs = self._facts(facts, "credential_config")
provider_registries = self._facts(facts, "provider_registry")
fallback_policies = self._facts(facts, "fallback_policy")
intent_facts = self._facts(facts, "intent")
ability_primary_class, ability_attributes = self._ability_classification(
repository,
facts,
chunks,
)
ability_sources = docs or manifests or languages
ability = CandidateAbilityDraft(
name=self._ability_name(repository, chunks),
description=self._ability_description(chunks),
confidence=self._ability_confidence(
docs=docs,
interfaces=interfaces,
tests=tests,
examples=examples,
frameworks=frameworks,
languages=languages,
),
source_refs=self._source_refs(ability_sources),
primary_class=ability_primary_class,
attributes=ability_attributes,
capabilities=[],
)
capabilities: list[CandidateCapabilityDraft] = []
capabilities.extend(
self._intent_capabilities(intent_facts, chunks, tests, examples, docs)
)
promotable_llm_providers = self._promotable_llm_facts(llm_providers)
promotable_provider_registries = self._promotable_llm_facts(provider_registries)
promotable_fallback_policies = self._promotable_llm_facts(fallback_policies)
promotable_llm_facts = (
promotable_llm_providers
+ promotable_provider_registries
+ promotable_fallback_policies
)
if promotable_llm_facts:
capabilities.append(
self._llm_provider_capability(
promotable_llm_providers,
credential_configs,
promotable_provider_registries,
promotable_fallback_policies,
tests,
examples,
docs,
)
)
if interfaces and capabilities:
capabilities = self._attach_interface_features(
capabilities,
interfaces,
chunks,
)
elif interfaces:
capabilities.append(
self._interface_capability(interfaces, tests, examples, docs, chunks)
)
return [
CandidateAbilityDraft(
name=ability.name,
description=ability.description,
confidence=ability.confidence,
source_refs=ability.source_refs,
primary_class=ability.primary_class,
attributes=ability.attributes,
capabilities=capabilities,
)
]
def _interface_capability(
self,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> CandidateCapabilityDraft:
features = self._interface_features(interfaces, chunks)
return CandidateCapabilityDraft(
name="Expose Repository Interface",
description=self._interface_description(chunks),
inputs=self._interface_inputs(interfaces),
outputs=self._interface_outputs(interfaces),
confidence=self._interface_confidence(
interfaces=interfaces,
tests=tests,
examples=examples,
docs=docs,
),
source_refs=self._source_refs(interfaces),
primary_class="interface",
attributes=self._interface_attributes(interfaces, docs, chunks),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _llm_provider_capability(
self,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> CandidateCapabilityDraft:
provider_names = sorted({fact.name for fact in providers})
provider_summary = ", ".join(provider_names) if provider_names else "LLM providers"
features = [
CandidateFeatureDraft(
name=f"Use {provider} Models",
type="integration",
location=self._grouped_location(
[fact for fact in providers if fact.name == provider]
),
confidence=0.75,
source_refs=self._source_refs(
[fact for fact in providers if fact.name == provider]
),
primary_class="integration",
attributes=["llm-provider", provider.lower()],
)
for provider in provider_names
]
if credentials:
features.append(
CandidateFeatureDraft(
name="Configure LLM Provider Credentials",
type="configuration",
location=self._grouped_location(credentials),
confidence=0.7,
source_refs=self._source_refs(credentials),
primary_class="configuration",
attributes=["credential", "llm-provider"],
)
)
if registries:
features.append(
CandidateFeatureDraft(
name="Maintain LLM Provider Registry",
type="backend",
location=self._grouped_location(registries),
confidence=0.65,
source_refs=self._source_refs(registries),
primary_class="backend",
attributes=["provider-registry", "llm-provider"],
)
)
if fallback_policies:
features.append(
CandidateFeatureDraft(
name="Apply LLM Provider Fallback Policy",
type="backend",
location=self._grouped_location(fallback_policies),
confidence=0.6,
source_refs=self._source_refs(fallback_policies),
primary_class="backend",
attributes=["fallback-policy", "llm-provider"],
)
)
return CandidateCapabilityDraft(
name="Route LLM Requests Across Providers",
description=(
"Expose or configure model-provider integrations detected from "
f"source-linked provider hints: {provider_summary}."
),
inputs=["LLM request", "provider configuration"],
outputs=["provider-specific model response"],
confidence=self._llm_provider_confidence(
providers=providers,
credentials=credentials,
registries=registries,
fallback_policies=fallback_policies,
docs=docs,
),
source_refs=self._source_refs(
providers + credentials + registries + fallback_policies
),
primary_class="llm-integration",
attributes=self._llm_provider_attributes(
providers,
credentials,
registries,
fallback_policies,
) + self._utility_relationship_attributes(
providers + credentials + registries + fallback_policies
),
features=features,
evidence=self._evidence(tests, examples, docs),
)
def _intent_capabilities(
self,
intent_facts: list[ObservedFact],
chunks: list[ContentChunk],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateCapabilityDraft]:
intent_chunks = [
chunk
for chunk in chunks
if chunk.kind == "intent"
and (
chunk.metadata.get("source_role") == "intent_summary"
or chunk.path.lower().endswith("intent.md")
)
]
if not intent_chunks:
return []
source_refs = self._source_refs(intent_facts)
capabilities: list[CandidateCapabilityDraft] = []
seen: set[str] = set()
for item in self._intent_capability_items(intent_chunks):
name = self._intent_capability_name(item)
key = name.lower()
if not name or key in seen:
continue
seen.add(key)
capabilities.append(
CandidateCapabilityDraft(
name=name,
description=(
"Reviewable intended capability extracted from repository "
f"intent: {item}"
),
inputs=[],
outputs=[name],
confidence=self._confidence(
0.45,
[
(0.15, bool(source_refs)),
(0.10, bool(tests)),
(0.05, bool(examples)),
(0.05, bool(docs)),
],
),
source_refs=source_refs,
primary_class="intent-capability",
attributes=[
"intent-derived",
"utility-owned",
"review-required-intent",
],
evidence=self._evidence(tests, examples, docs),
)
)
return capabilities
def _intent_capability_items(self, chunks: list[ContentChunk]) -> list[str]:
items: list[str] = []
in_capability_section = False
for chunk in sorted(chunks, key=lambda item: (item.path, item.start_line)):
for raw_line in chunk.text.splitlines():
line = raw_line.strip()
if not line:
continue
if line.startswith("#"):
heading = line.lstrip("#").strip().lower()
in_capability_section = (
"capabilit" in heading
or heading in {"primary utility", "core utility"}
)
continue
if not in_capability_section:
continue
item = re.sub(r"^(?:[-*]|\d+[.)])\s+", "", line).strip()
item = re.sub(r"^(?:capability|intended capability)\s*:\s*", "", item, flags=re.I)
if item and item != line or raw_line.lstrip().startswith(("-", "*")):
items.append(item)
return items
def _intent_capability_name(self, text: str) -> str:
lowered = re.sub(r"[*_`]", "", text.lower())
if "continuous connectivity" in lowered and "remote systems" in lowered:
return "Maintain Continuous Connectivity Between Remote Systems And Central Hub"
if "observable" in lowered and "auditable" in lowered and "controllable" in lowered:
return "Make Connectivity Observable Auditable And Controllable"
if "cli tool" in lowered and "mcp" in lowered:
return "Expose CLI And MCP Accessible Service"
candidate = re.split(r"\s+-\s+|\s*:\s*|[.!?]\s+", text.strip(), maxsplit=1)[0]
candidate = candidate.strip(" .:-")
if not candidate:
return ""
words = candidate.split()
if words:
words[0] = self._imperative_verb(words[0])
while words and words[-1].lower().strip(",;:") in {"a", "an", "the", "and", "or", "as", "both"}:
words.pop()
return self._title_from_words(words[:10])
def _attach_interface_features(
self,
capabilities: list[CandidateCapabilityDraft],
interfaces: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateCapabilityDraft]:
features = self._interface_features(interfaces, chunks)
if not features:
return capabilities
capability_features: dict[int, list[CandidateFeatureDraft]] = {
index: [] for index, _ in enumerate(capabilities)
}
for feature in features:
index = self._best_feature_capability_index(feature, capabilities)
capability_features[index].append(feature)
updated: list[CandidateCapabilityDraft] = []
for index, capability in enumerate(capabilities):
attached = capability_features[index]
if not attached:
updated.append(capability)
continue
updated.append(
replace(
capability,
inputs=capability.inputs or self._feature_inputs(attached),
outputs=capability.outputs or self._feature_outputs(attached),
features=[*capability.features, *attached],
)
)
return updated
def _best_feature_capability_index(
self,
feature: CandidateFeatureDraft,
capabilities: list[CandidateCapabilityDraft],
) -> int:
feature_text = f"{feature.name} {feature.type} {feature.location}".lower()
feature_terms = self._significant_terms(feature_text)
best_index = 0
best_score = -1
for index, capability in enumerate(capabilities):
capability_text = " ".join(
[
capability.name,
capability.description,
" ".join(capability.outputs),
" ".join(capability.attributes),
]
).lower()
capability_terms = self._significant_terms(capability_text)
score = len(feature_terms & capability_terms)
if feature.type == "CLI" and any(
token in capability_text for token in ("cli", "command", "mcp")
):
score += 3
if feature.type == "API" and any(
token in capability_text for token in ("api", "http", "service")
):
score += 3
if score > best_score:
best_index = index
best_score = score
return best_index
def _interface_features(
self,
interfaces: list[ObservedFact],
chunks: list[ContentChunk],
) -> list[CandidateFeatureDraft]:
by_type: dict[str, list[ObservedFact]] = {}
for fact in interfaces:
by_type.setdefault(self._feature_type(fact), []).append(fact)
features: list[CandidateFeatureDraft] = []
for feature_type, facts in sorted(by_type.items()):
if len(facts) == 1:
fact = facts[0]
features.append(
CandidateFeatureDraft(
name=self._feature_name(fact, chunks),
type=feature_type,
location=fact.path,
confidence=0.65 if fact.value else 0.45,
source_refs=self._source_refs([fact]),
primary_class=feature_type,
attributes=self._feature_attributes(feature_type, [fact]),
)
)
continue
features.append(
CandidateFeatureDraft(
name=self._grouped_interface_feature_name(
feature_type,
facts,
chunks,
),
type=feature_type,
location=self._grouped_location(facts),
confidence=self._grouped_interface_confidence(facts),
source_refs=self._source_refs(facts),
primary_class=feature_type,
attributes=self._feature_attributes(feature_type, facts),
)
)
return features
def _grouped_interface_feature_name(
self,
feature_type: str,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
summary = self._grouped_interface_summary(facts, chunks)
if feature_type == "API":
return f"HTTP API surface: {summary}"
if feature_type == "CLI":
return f"CLI command surface: {summary}"
return f"Callable interface surface: {summary}"
def _grouped_interface_summary(
self,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
names = [self._feature_name(fact, chunks) for fact in facts]
compact_names = self._unique([name for name in names if name])
if not compact_names:
return f"{len(facts)} entry points"
visible = compact_names[:3]
suffix = f", +{len(compact_names) - 3} more" if len(compact_names) > 3 else ""
return f"{', '.join(visible)}{suffix}"
def _grouped_location(self, facts: list[ObservedFact]) -> str:
paths = sorted({fact.path for fact in facts if fact.path})
if not paths:
return ""
if len(paths) == 1:
return paths[0]
return "multiple files"
def _grouped_interface_confidence(self, facts: list[ObservedFact]) -> float:
valued = sum(1 for fact in facts if fact.value)
return 0.7 if valued == len(facts) else 0.55
def _evidence(
self,
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> list[CandidateEvidenceDraft]:
evidence: list[CandidateEvidenceDraft] = []
for fact in tests:
evidence.append(
CandidateEvidenceDraft(
type="test",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in examples:
evidence.append(
CandidateEvidenceDraft(
type="example",
reference=fact.path,
strength="strong",
source_refs=self._source_refs([fact]),
)
)
for fact in docs:
evidence.append(
CandidateEvidenceDraft(
type="documentation",
reference=fact.path,
strength="medium",
source_refs=self._source_refs([fact]),
)
)
return evidence
def _feature_type(self, fact: ObservedFact) -> str:
lower = f"{fact.name} {fact.path} {fact.value}".lower()
if "cli" in lower or "command" in lower:
return "CLI"
if "api" in lower or "route" in lower or "@app." in lower or "@router." in lower:
return "API"
return "interface"
def _ability_classification(
self,
repository: Repository,
facts: list[ObservedFact],
chunks: list[ContentChunk],
) -> tuple[str, list[str]]:
text = " ".join(
[
repository.name,
repository.description or "",
" ".join(
chunk.text[:600]
for chunk in chunks
if chunk.kind in {"intent", "documentation"}
and chunk.metadata.get("source_role") != "agent_guidance"
),
" ".join(
f"{fact.kind} {fact.name} {fact.value}"
for fact in facts
if not (
fact.kind == "llm_provider"
and self._utility_relationship(fact) not in {"owned", "facade", "adapter"}
)
),
]
).lower()
attributes: list[str] = []
if any(token in text for token in ("ssh", "tunnel", "reverse tunnel", "remote access", "connectivity")):
attributes.extend(["remote-access", "connectivity"])
if any(token in text for token in ("audit", "health check", "lifecycle", "ops", "operator")):
attributes.append("operations")
return "it-operations", self._unique(attributes)
if any(token in text for token in ("ability", "capability", "feature")):
return "repository-intelligence", self._unique(attributes + ["capability-mapping"])
promotable_llm = any(
fact.kind == "llm_provider"
and self._utility_relationship(fact) in {"owned", "facade", "adapter"}
for fact in facts
)
if promotable_llm:
return "ai-integration", self._unique(attributes + ["llm-provider"])
if any(fact.kind == "interface" for fact in facts):
attributes.append("interface")
return "developer-tooling", self._unique(attributes)
def _interface_attributes(
self,
interfaces: list[ObservedFact],
docs: list[ObservedFact] | None = None,
chunks: list[ContentChunk] | None = None,
) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
attributes = ["api" if item == "API" else "cli" if item == "CLI" else "callable" for item in feature_types]
utility = self._interface_utility_relationship(docs or [], chunks or [])
return self._unique(["surface", *attributes, f"utility-{utility}"])
def _interface_utility_relationship(
self,
docs: list[ObservedFact],
chunks: list[ContentChunk],
) -> str:
doc_paths = {fact.path for fact in docs}
text = " ".join(
chunk.text.lower()
for chunk in chunks
if chunk.path in doc_paths
and chunk.kind in {"intent", "documentation"}
and chunk.metadata.get("source_role") != "derived_scope"
)
if any(token in text for token in ("facade", "proxy", "wrapper", "wraps ")):
return "facade"
return "owned"
def _feature_attributes(
self,
feature_type: str,
facts: list[ObservedFact],
) -> list[str]:
attributes = [feature_type]
if feature_type == "API":
attributes.extend(["surface", "http"])
elif feature_type == "CLI":
attributes.extend(["surface", "command"])
else:
attributes.append("surface")
paths = " ".join(fact.path.lower() for fact in facts)
if "test" in paths:
attributes.append("test-linked")
return self._unique(attributes)
def _structure_attributes(
self,
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
) -> list[str]:
return self._unique(
[
"manifest" if manifests else "",
*[fact.name for fact in frameworks],
*[fact.name for fact in languages],
"utility-dependency" if manifests or frameworks else "",
"utility-tooling" if languages and not (manifests or frameworks) else "",
"review-required-structural-context",
]
)
def _llm_provider_attributes(
self,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
) -> list[str]:
return self._unique(
[
"llm-provider",
*[fact.name.lower() for fact in providers],
"credential" if credentials else "",
"provider-registry" if registries else "",
"fallback-policy" if fallback_policies else "",
]
)
def _unique(self, values: list[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = value.strip()
key = item.lower()
if not item or key in seen:
continue
seen.add(key)
result.append(item)
return result
def _significant_terms(self, text: str) -> set[str]:
stop_words = {
"and",
"the",
"this",
"that",
"with",
"from",
"into",
"for",
"capability",
"repository",
"service",
}
return {
term
for term in re.findall(r"[a-z0-9]+", text.lower())
if len(term) > 2 and term not in stop_words
}
def _interface_inputs(self, interfaces: list[ObservedFact]) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
inputs: list[str] = []
if "API" in feature_types:
inputs.append("HTTP request")
if "CLI" in feature_types:
inputs.append("CLI arguments")
if not inputs:
inputs.append("caller input")
return inputs
def _interface_outputs(self, interfaces: list[ObservedFact]) -> list[str]:
feature_types = {self._feature_type(fact) for fact in interfaces}
outputs: list[str] = []
if "API" in feature_types:
outputs.append("HTTP response")
if "CLI" in feature_types:
outputs.append("command output")
if not outputs:
outputs.append("callable interface result")
return outputs
def _feature_inputs(self, features: list[CandidateFeatureDraft]) -> list[str]:
feature_types = {feature.type for feature in features}
inputs: list[str] = []
if "API" in feature_types:
inputs.append("HTTP request")
if "CLI" in feature_types:
inputs.append("CLI arguments")
if not inputs:
inputs.append("caller input")
return inputs
def _feature_outputs(self, features: list[CandidateFeatureDraft]) -> list[str]:
feature_types = {feature.type for feature in features}
outputs: list[str] = []
if "API" in feature_types:
outputs.append("HTTP response")
if "CLI" in feature_types:
outputs.append("command output")
if not outputs:
outputs.append("callable interface result")
return outputs
def _feature_name(self, fact: ObservedFact, chunks: list[ContentChunk]) -> str:
route_name = self._route_feature_name(fact.value)
if route_name:
return route_name
if self._feature_type(fact) == "CLI":
function_name = self._function_name_near_fact(fact, chunks)
if function_name:
return f"CLI command {function_name}"
return fact.value or fact.name
def _route_feature_name(self, value: str) -> str:
match = re.search(r"@(?:app|router)\.(get|post|put|patch|delete)\((['\"])(.*?)\2", value)
if match is None:
return ""
method = match.group(1).upper()
path = match.group(3)
return f"{method} {path}"
def _function_name_near_fact(
self,
fact: ObservedFact,
chunks: list[ContentChunk],
) -> str:
line = fact.metadata.get("line")
for chunk in chunks:
if chunk.path != fact.path or chunk.kind != "interface":
continue
if isinstance(line, int) and not (chunk.start_line <= line <= chunk.end_line):
continue
match = re.search(r"^\s*def\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\(", chunk.text, re.MULTILINE)
if match is not None:
return match.group(1)
return ""
def _ability_confidence(
self,
*,
docs: list[ObservedFact],
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(docs)),
(0.15, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(frameworks)),
(0.05, bool(languages)),
],
)
def _interface_confidence(
self,
*,
interfaces: list[ObservedFact],
tests: list[ObservedFact],
examples: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.30,
[
(0.20, bool(interfaces)),
(0.15, bool(tests)),
(0.10, bool(examples)),
(0.10, bool(docs)),
(0.05, len(interfaces) > 1),
],
)
def _structure_confidence(
self,
*,
manifests: list[ObservedFact],
frameworks: list[ObservedFact],
languages: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.25,
[
(0.20, bool(manifests)),
(0.15, bool(frameworks)),
(0.10, bool(languages)),
(0.05, bool(docs)),
],
)
def _llm_provider_confidence(
self,
*,
providers: list[ObservedFact],
credentials: list[ObservedFact],
registries: list[ObservedFact],
fallback_policies: list[ObservedFact],
docs: list[ObservedFact],
) -> float:
return self._confidence(
0.35,
[
(0.20, bool(providers)),
(0.10, len({fact.name for fact in providers}) > 1),
(0.10, bool(credentials)),
(0.10, bool(registries)),
(0.10, bool(fallback_policies)),
(0.05, bool(docs)),
],
)
def _confidence(
self,
base: float,
factors: list[tuple[float, bool]],
) -> float:
score = base + sum(weight for weight, applies in factors if applies)
return min(1.0, round(score, 2))
def _ability_description(self, chunks: list[ContentChunk]) -> str:
doc_summary = self._document_summary(chunks)
if doc_summary:
return (
"Candidate repository purpose inferred from repository content: "
f"{doc_summary} Review is required before treating this as an "
"approved domain ability."
)
return (
"Candidate repository purpose inferred from observed repository "
"documentation, manifests, languages, and interfaces. Review is "
"required before treating this as an approved domain ability."
)
def _ability_name(
self,
repository: Repository,
chunks: list[ContentChunk],
) -> str:
ops_name = self._operations_ability_name(chunks)
if ops_name:
return ops_name
purpose_text = self._document_purpose_sentence(chunks) or repository.description
if purpose_text:
normalized = self._imperative_purpose(purpose_text)
if normalized:
return normalized
return f"Support {self._humanize_identifier(repository.name)}"
def _document_purpose_sentence(self, chunks: list[ContentChunk]) -> str:
for chunk in self._purpose_chunks(chunks):
if chunk.kind not in {"intent", "documentation"}:
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
paragraph = next((line for line in lines if not line.startswith("#")), "")
if paragraph:
return paragraph
return ""
def _purpose_chunks(self, chunks: list[ContentChunk]) -> list[ContentChunk]:
def priority(chunk: ContentChunk) -> tuple[int, str, int]:
role = chunk.metadata.get("source_role")
path = chunk.path.lower()
if role == "intent_summary" or path.endswith("intent.md"):
return (0, path, chunk.start_line)
if role == "product_documentation" or path.startswith("readme"):
return (1, path, chunk.start_line)
if role == "derived_scope" or path.endswith("scope.md"):
return (3, path, chunk.start_line)
return (2, path, chunk.start_line)
return sorted(
[
chunk
for chunk in chunks
if chunk.kind in {"intent", "documentation"}
and chunk.metadata.get("source_role") != "agent_guidance"
],
key=priority,
)
def _operations_ability_name(self, chunks: list[ContentChunk]) -> str:
text = " ".join(
chunk.text
for chunk in self._documentation_chunks(chunks)
if chunk.kind == "intent"
).lower()
if "ssh reverse tunnel" in text or "ssh reverse tunneling" in text:
return "Manage SSH Reverse Tunnel Connectivity"
return ""
def _imperative_purpose(self, text: str) -> str:
cleaned = re.sub(r"\s+", " ", text.strip())
cleaned = re.split(r"[.!?]\s+", cleaned, maxsplit=1)[0]
cleaned = re.sub(
r"(?i)^this\s+repository\s+exists\s+to\s+provide\s+(?:an?\s+)?",
"Provide ",
cleaned,
)
cleaned = re.sub(r"^[A-Z][A-Za-z0-9_-]*\s+(?:is|provides|offers)\s+", "", cleaned)
cleaned = cleaned.strip(" .:-")
if not cleaned:
return ""
words = cleaned.split()
if not words:
return ""
words[0] = self._imperative_verb(words[0])
return self._title_from_words(words[:8])
def _imperative_verb(self, word: str) -> str:
lower = word.lower().strip(",;:")
irregular = {
"does": "do",
"has": "have",
"is": "be",
}
if lower in irregular:
return irregular[lower]
if lower in {"this"}:
return lower
if lower.endswith("ies") and len(lower) > 4:
return f"{lower[:-3]}y"
if lower.endswith(("des", "ses", "tes", "ves", "zes")) and len(lower) > 4:
return lower[:-1]
if lower.endswith("es") and len(lower) > 3:
return lower[:-2]
if lower.endswith("s") and len(lower) > 3:
return lower[:-1]
return lower
def _title_from_words(self, words: list[str]) -> str:
cleaned_words = [
re.sub(r"[^A-Za-z0-9_/{}-]", "", word)
for word in words
]
return " ".join(
word[:1].upper() + word[1:]
for word in cleaned_words
if word
)
def _humanize_identifier(self, value: str) -> str:
spaced = re.sub(r"[_-]+", " ", value)
spaced = re.sub(r"(?<=[a-z0-9])(?=[A-Z])", " ", spaced)
return self._title_from_words(spaced.split())
def _interface_description(self, chunks: list[ContentChunk]) -> str:
interface_summary = self._interface_summary(chunks)
if interface_summary:
return (
"Expose one or more likely user-facing API or CLI entry points. "
f"Source context: {interface_summary} Review is required to name "
"the concrete domain behavior."
)
return (
"Expose one or more likely user-facing API or CLI entry points. "
"Review is required to name the concrete domain behavior."
)
def _document_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in self._documentation_chunks(chunks):
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
heading = next((line.lstrip("#").strip() for line in lines if line.startswith("#")), "")
paragraph = next((line for line in lines if not line.startswith("#")), "")
if heading and paragraph:
return f"{heading}. {paragraph}"
return heading or paragraph
return ""
def _documentation_chunks(self, chunks: list[ContentChunk]) -> list[ContentChunk]:
return sorted(
[chunk for chunk in chunks if chunk.kind in {"intent", "documentation"}],
key=lambda chunk: (0 if chunk.kind == "intent" else 1, chunk.path, chunk.start_line),
)
def _interface_summary(self, chunks: list[ContentChunk]) -> str:
for chunk in chunks:
if chunk.kind != "interface":
continue
lines = [line.strip() for line in chunk.text.splitlines() if line.strip()]
if not lines:
continue
return " ".join(lines[:3])
return ""
def _facts(self, facts: list[ObservedFact], kind: str) -> list[ObservedFact]:
return [fact for fact in facts if fact.kind == kind]
def _promotable_llm_facts(self, facts: list[ObservedFact]) -> list[ObservedFact]:
return [
fact
for fact in facts
if self._utility_relationship(fact) in {"owned", "facade", "adapter"}
]
def _utility_relationship(self, fact: ObservedFact) -> str:
relationship = fact.metadata.get("utility_relationship")
if isinstance(relationship, str) and relationship:
return relationship
source_role = fact.metadata.get("source_role")
if source_role == "implementation_source":
lower_path = fact.path.lower()
if "adapter" in lower_path or "provider" in lower_path:
return "adapter"
return "owned"
if source_role == "configuration":
return "configure"
if source_role == "dependency_declaration":
return "dependency"
if source_role in {"agent_guidance", "ci_tooling"}:
return "tooling"
if not source_role and fact.path.lower().endswith((".py", ".ts", ".js")):
return "owned"
return "mention"
def _utility_relationship_attributes(self, facts: list[ObservedFact]) -> list[str]:
relationships = sorted({self._utility_relationship(fact) for fact in facts})
return [f"utility-{relationship}" for relationship in relationships]
def _source_refs(self, facts: list[ObservedFact]) -> list[SourceReference]:
return [
SourceReference(
fact_id=fact.id,
path=fact.path,
kind=fact.kind,
name=fact.name,
line=fact.metadata.get("line"),
)
for fact in facts
]