Cross repo isolation

This commit is contained in:
2026-05-02 21:55:35 +02:00
parent a462827eda
commit bf2dc4ae98
10 changed files with 695 additions and 14 deletions

View File

@@ -73,3 +73,93 @@ def write_empty_repo(root: Path) -> Path:
repo = root / "empty-repo"
repo.mkdir()
return repo
def write_key_cape_like_repo(root: Path) -> Path:
repo = root / "key-cape-like"
repo.mkdir()
(repo / "INTENT.md").write_text(
"# INTENT\n\n"
"Provide lightweight IAM profile enforcement for small deployments.\n\n"
"## Intended Capabilities\n\n"
"- Enforce OIDC PKCE profiles.\n"
"- Validate LDAP schema migrations.\n"
"- Run migration tooling for identity data.\n",
encoding="utf-8",
)
(repo / "SCOPE.md").write_text(
"# SCOPE\n\n"
"Old polluted scope mentions routing LLM provider requests.\n",
encoding="utf-8",
)
(repo / "README.md").write_text(
"# KeyCape\n\n"
"Lightweight IAM service with OIDC profile enforcement and LDAP schema "
"validation. Backend adapters live under src/internal/adapters.\n"
"See CLAUDE.md for agent workflow.\n",
encoding="utf-8",
)
(repo / "CLAUDE.md").write_text(
"# CLAUDE.md\n\n"
"Guidance for Claude Code when working in this repository.\n",
encoding="utf-8",
)
(repo / "src" / "internal" / "adapters").mkdir(parents=True)
(repo / "src" / "internal" / "adapters" / "oidc.py").write_text(
"def enforce_pkce_profile(client):\n"
" return client.require_pkce\n",
encoding="utf-8",
)
return repo
def write_llm_connect_like_repo(root: Path) -> Path:
repo = root / "llm-connect-like"
repo.mkdir()
(repo / "README.md").write_text(
"# LLM Connect\n\nSupports OpenRouter and Claude fallback for prompts.\n",
encoding="utf-8",
)
(repo / ".env.example").write_text(
"OPENROUTER_API_KEY=\nANTHROPIC_API_KEY=\n",
encoding="utf-8",
)
(repo / "providers.py").write_text(
"provider_registry = {'openrouter': OpenRouterAdapter, 'anthropic': ClaudeAdapter}\n"
"fallback_provider = 'claude'\n",
encoding="utf-8",
)
return repo
def write_facade_repo(root: Path) -> Path:
repo = root / "facade-repo"
repo.mkdir()
(repo / "README.md").write_text(
"# Mail Facade\n\n"
"Provides a public HTTP facade that wraps the upstream mail classifier.\n",
encoding="utf-8",
)
(repo / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.post("/classify")\n'
"def classify():\n"
" return {}\n",
encoding="utf-8",
)
return repo
def write_dependency_only_repo(root: Path) -> Path:
repo = root / "dependency-only"
repo.mkdir()
(repo / "README.md").write_text(
"# Dependency Only\n\nUses OpenRouter during experiments but exposes no API.\n",
encoding="utf-8",
)
(repo / "requirements.txt").write_text(
"openai\nanthropic\n",
encoding="utf-8",
)
return repo

100
tests/test_cli.py Normal file
View File

@@ -0,0 +1,100 @@
import pytest
from repo_registry.cli import main
from repo_registry.core.service import RegistryService
from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.storage.sqlite import RegistryStore
def make_service(tmp_path):
store = RegistryStore(tmp_path / "registry.sqlite3")
store.initialize()
return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts"))
def write_repo(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# CLI Rebuild\nReports health.\n", encoding="utf-8")
(source / "app.py").write_text('@app.get("/health")\ndef health():\n return {}\n', encoding="utf-8")
return source
def approved_repository(tmp_path):
service = make_service(tmp_path)
source = write_repo(tmp_path)
repository = service.register_repository(name="CLI Rebuild", url=str(source))
summary = service.analyze_repository(repository.id, use_llm_assistance=False)
service.approve_candidate_graph(repository.id, summary.analysis_run.id)
return service, repository
def test_rebuild_cli_dry_run_preserves_approved_characteristics(tmp_path, capsys):
service, repository = approved_repository(tmp_path)
exit_code = main(
[
"rebuild-characteristics",
"--repo",
str(repository.id),
"--dry-run",
"--no-llm",
"--database-path",
str(tmp_path / "registry.sqlite3"),
"--checkout-root",
str(tmp_path / "checkouts"),
]
)
output = capsys.readouterr().out
assert exit_code == 0
assert "repo=1:CLI Rebuild" in output
assert "latest_analysis_run=2" in output
assert "candidate_source=deterministic" in output
assert "dry_run=True" in output
assert "cleared_approved=False" in output
assert service.ability_map(repository.id).abilities
def test_rebuild_cli_confirmed_single_repo_clears_approved_characteristics(tmp_path, capsys):
_service, repository = approved_repository(tmp_path)
exit_code = main(
[
"rebuild-characteristics",
"--repo",
str(repository.id),
"--no-llm",
"--confirm",
"--database-path",
str(tmp_path / "registry.sqlite3"),
"--checkout-root",
str(tmp_path / "checkouts"),
]
)
service = make_service(tmp_path)
output = capsys.readouterr().out
assert exit_code == 0
assert "dry_run=False" in output
assert "cleared_approved=True" in output
assert service.ability_map(repository.id).abilities == []
def test_rebuild_cli_refuses_destructive_all_without_confirm_all(tmp_path):
approved_repository(tmp_path)
with pytest.raises(SystemExit) as exc:
main(
[
"rebuild-characteristics",
"--all",
"--confirm",
"--database-path",
str(tmp_path / "registry.sqlite3"),
"--checkout-root",
str(tmp_path / "checkouts"),
]
)
assert exc.value.code == 2

View File

@@ -50,6 +50,58 @@ def chunk():
)
def test_llm_prompt_filters_derived_scope_and_labels_source_roles():
adapter = FakeAdapter('{"abilities": []}')
extractor = LLMCandidateExtractor(adapter)
chunks = [
ContentChunk(
id=1,
repository_id=1,
analysis_run_id=1,
snapshot_id=1,
path="SCOPE.md",
kind="scope",
start_line=1,
end_line=3,
text="# SCOPE\n\nOld approved LLM routing entry.",
metadata={"source_role": "derived_scope"},
),
ContentChunk(
id=2,
repository_id=1,
analysis_run_id=1,
snapshot_id=1,
path="INTENT.md",
kind="intent",
start_line=1,
end_line=3,
text="# INTENT\n\nProvide lightweight IAM.",
metadata={"source_role": "intent_summary"},
),
ContentChunk(
id=3,
repository_id=1,
analysis_run_id=1,
snapshot_id=1,
path="CLAUDE.md",
kind="documentation",
start_line=1,
end_line=2,
text="# CLAUDE\n\nAgent guidance.",
metadata={"source_role": "agent_guidance"},
),
]
extractor.extract(repository(), chunks)
assert "Source: INTENT.md" in adapter.last_prompt
assert "source_role=intent_summary" in adapter.last_prompt
assert "Source: SCOPE.md" not in adapter.last_prompt
assert "Old approved LLM routing entry" not in adapter.last_prompt
assert "Source: CLAUDE.md" not in adapter.last_prompt
assert "Do not use SCOPE.md" in adapter.last_prompt
def test_llm_candidate_extractor_parses_structured_response():
adapter = FakeAdapter(
"""

View File

@@ -9,8 +9,12 @@ from repo_registry.repo_ingestion.git import GitIngestionService
from repo_registry.semantic import HashingEmbeddingProvider
from repo_registry.storage.sqlite import NotFoundError, RegistryStore
from tests.fixtures import (
write_dependency_only_repo,
write_empty_repo,
write_facade_repo,
write_javascript_typescript_package_repo,
write_key_cape_like_repo,
write_llm_connect_like_repo,
write_misleading_docs_repo,
write_python_cli_repo,
write_readme_only_repo,
@@ -396,6 +400,80 @@ def test_fixture_breadth_misleading_docs_do_not_become_approved_truth(tmp_path):
assert ability_map.abilities == []
def test_regression_key_cape_like_repo_centers_iam_not_llm_provider_routing(tmp_path):
source = write_key_cape_like_repo(tmp_path)
service = make_service(tmp_path)
repository = service.register_repository(name="KeyCape Like", url=str(source))
summary = service.analyze_repository(repository.id, use_llm_assistance=False)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
capability_names = {
capability.name
for ability in graph.abilities
for capability in ability.capabilities
}
assert "Enforce OIDC PKCE Profiles" in capability_names
assert "Validate LDAP Schema Migrations" in capability_names
assert "Run Migration Tooling For Identity Data" in capability_names
assert "Route LLM Requests Across Providers" not in capability_names
facts = {(fact.kind, fact.name, fact.path) for fact in summary.facts}
assert ("llm_provider", "Claude", "CLAUDE.md") not in facts
def test_regression_llm_connect_like_repo_still_promotes_provider_routing(tmp_path):
source = write_llm_connect_like_repo(tmp_path)
service = make_service(tmp_path)
repository = service.register_repository(name="LLM Connect Like", url=str(source))
summary = service.analyze_repository(repository.id, use_llm_assistance=False)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
capability = next(
capability
for ability in graph.abilities
for capability in ability.capabilities
if capability.name == "Route LLM Requests Across Providers"
)
assert {"utility-adapter", "llm-provider", "openrouter", "claude"} <= set(
capability.attributes
)
def test_regression_facade_repo_promotes_public_wrapper_as_facade(tmp_path):
source = write_facade_repo(tmp_path)
service = make_service(tmp_path)
repository = service.register_repository(name="Mail Facade", url=str(source))
summary = service.analyze_repository(repository.id, use_llm_assistance=False)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
capability = graph.abilities[0].capabilities[0]
assert capability.name == "Expose Repository Interface"
assert "utility-facade" in capability.attributes
assert "POST /classify" in {feature.name for feature in capability.features}
def test_regression_dependency_only_repo_keeps_libraries_as_context(tmp_path):
source = write_dependency_only_repo(tmp_path)
service = make_service(tmp_path)
repository = service.register_repository(name="Dependency Only", url=str(source))
summary = service.analyze_repository(repository.id, use_llm_assistance=False)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
capability_names = {
capability.name
for ability in graph.abilities
for capability in ability.capabilities
}
assert "Route LLM Requests Across Providers" not in capability_names
assert capability_names == {"Describe Repository Structure"}
structure = graph.abilities[0].capabilities[0]
assert "utility-dependency" in structure.attributes
assert "review-required-structural-context" in structure.attributes
def test_fixture_breadth_empty_repo_produces_no_candidate_claims(tmp_path):
source = write_empty_repo(tmp_path)
service = make_service(tmp_path)
@@ -622,7 +700,110 @@ def test_analyze_repository_can_use_optional_llm_extractor(tmp_path):
assert graph.abilities[0].capabilities[0].name == "Classify Incoming Email"
assert graph.abilities[0].source_refs[0].path == "README.md"
assert decisions[0].action == "llm_extraction_used"
assert "1 candidate ability" in decisions[0].notes
assert "llm+deterministic candidate generation" in decisions[0].notes
assert {ability.name for ability in graph.abilities} >= {
"Business Email Routing",
"Route Incoming Customer Email",
}
def test_analyze_repository_keeps_deterministic_candidates_when_llm_returns_stale_entries(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "INTENT.md").write_text(
"# INTENT\n\n"
"Provide lightweight IAM.\n\n"
"## Intended Capabilities\n\n"
"- Enforce OIDC PKCE profiles.\n",
encoding="utf-8",
)
(source / "SCOPE.md").write_text(
"# SCOPE\n\nOld approved entry: route LLM provider requests.\n",
encoding="utf-8",
)
store = RegistryStore(tmp_path / "registry.sqlite3")
store.initialize()
extractor = FakeLLMExtractor(
[
ExtractedAbility(
name="Old LLM Routing",
description="Stale prior scope claim.",
source_paths=["SCOPE.md"],
capabilities=[
ExtractedCapability(
name="Route LLM Provider Requests",
description="Old scope reuse.",
source_paths=["SCOPE.md"],
)
],
)
]
)
service = RegistryService(
store,
ingestion=GitIngestionService(tmp_path / "checkouts"),
llm_extractor=extractor,
)
repository = service.register_repository(name="KeyCape Like", url=str(source))
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
decisions = service.list_review_decisions(repository.id, summary.analysis_run.id)
capability_names = {
capability.name
for ability in graph.abilities
for capability in ability.capabilities
}
assert "Route LLM Provider Requests" in capability_names
assert "Enforce OIDC PKCE Profiles" in capability_names
assert decisions[0].action == "llm_extraction_used"
assert "llm+deterministic candidate generation" in decisions[0].notes
def test_analysis_isolation_between_repositories_with_stale_approved_data(tmp_path):
poisoned_source = write_llm_connect_like_repo(tmp_path)
target_source = write_key_cape_like_repo(tmp_path)
service = make_service(tmp_path)
poisoned = service.register_repository(
name="Poisoned LLM Connect",
url=str(poisoned_source),
)
target = service.register_repository(
name="Isolated KeyCape",
url=str(target_source),
)
poisoned_summary = service.analyze_repository(
poisoned.id,
use_llm_assistance=False,
)
service.approve_candidate_graph(poisoned.id, poisoned_summary.analysis_run.id)
assert any(
capability.name == "Route LLM Requests Across Providers"
for ability in service.ability_map(poisoned.id).abilities
for capability in ability.capabilities
)
target_summary = service.analyze_repository(
target.id,
use_llm_assistance=False,
)
target_graph = service.candidate_graph(target.id, target_summary.analysis_run.id)
target_facts = service.list_observed_facts(target.id, target_summary.analysis_run.id)
target_chunks = service.list_content_chunks(target.id, target_summary.analysis_run.id)
target_capability_names = {
capability.name
for ability in target_graph.abilities
for capability in ability.capabilities
}
assert "Enforce OIDC PKCE Profiles" in target_capability_names
assert "Route LLM Requests Across Providers" not in target_capability_names
assert all(fact.repository_id == target.id for fact in target_facts)
assert all(chunk.repository_id == target.id for chunk in target_chunks)
assert all(ref.path != "providers.py" for ability in target_graph.abilities for ref in ability.source_refs)
assert service.ability_map(target.id).abilities == []
def test_analyze_repository_can_disable_optional_llm_extractor(tmp_path):
@@ -695,8 +876,9 @@ def test_analyze_repository_normalizes_duplicate_llm_candidates(tmp_path):
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
assert len(graph.abilities) == 1
assert len(graph.abilities) == 2
assert graph.abilities[0].name == "LLM Provider Integrations"
assert graph.abilities[1].name == "Support OpenRouter Providers"
def test_analyze_repository_falls_back_when_optional_llm_extractor_returns_no_candidates(tmp_path):