import json import logging import subprocess from repo_scoping.core.logging import LOGGER_NAME from repo_scoping.core.models import SourceReference from repo_scoping.core.service import RegistryService from repo_scoping.llm_extraction import ( ExtractedAbility, ExtractedCapability, ExtractedFeature, ) from repo_scoping.repo_ingestion.git import GitIngestionService from repo_scoping.semantic import HashingEmbeddingProvider from repo_scoping.storage.sqlite import NotFoundError, RegistryStore from tests.fixtures import ( write_dependency_only_repo, write_empty_repo, write_facade_repo, write_javascript_typescript_package_repo, write_key_cape_like_repo, write_llm_connect_like_repo, write_ops_bridge_like_repo, write_misleading_docs_repo, write_python_cli_repo, write_readme_only_repo, ) def make_service(tmp_path): store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() return RegistryService(store, ingestion=GitIngestionService(tmp_path / "checkouts")) def add_candidate_capability(service, repository_id, analysis_run_id, ability_id, name): with service.store.connect() as connection: cursor = connection.execute( """ INSERT INTO candidate_capabilities (repository_id, analysis_run_id, ability_id, name, description, inputs, outputs, primary_class, attributes, confidence, source_refs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( repository_id, analysis_run_id, ability_id, name, "Review target capability inserted for review workflow tests.", "[]", "[]", "test-capability", json.dumps(["test-review-target"]), 0.5, "[]", ), ) return int(cursor.lastrowid) class FakeLLMExtractor: def __init__(self, abilities): self.abilities = abilities self.calls = [] def extract(self, repository, chunks): self.calls.append((repository, chunks)) return self.abilities class FailingLLMExtractor: def extract(self, repository, chunks): raise RuntimeError("provider unavailable") def test_manual_registry_builds_ability_map(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="MailRouter", url="https://example.com/mail-router.git", description="Routes incoming customer email", ) ability_id = service.add_ability( repository.id, name="Business Email Routing", description="Route inbound messages to the right department.", confidence=0.92, ) capability_id = service.add_capability( repository.id, ability_id, name="Classify Incoming Email", description="Classify messages into intent categories.", inputs=["subject", "body"], outputs=["intent", "confidence"], confidence=0.88, ) service.add_feature( repository.id, capability_id, name="POST /api/classify-email", type="REST endpoint", location="src/routes/classify_email.py", confidence=0.84, ) service.add_evidence( repository.id, capability_id, type="unit_test", reference="tests/test_email_classification.py", strength="strong", reference_kind="fact", reference_id=42, ) ability_map = service.ability_map(repository.id) assert ability_map.repository.name == "MailRouter" assert ability_map.scope.name == "MailRouter" assert ability_map.scope.confidence_label == "high" assert ability_map.abilities[0].name == "Business Email Routing" capability = ability_map.abilities[0].capabilities[0] assert capability.name == "Classify Incoming Email" assert capability.inputs == ["subject", "body"] assert capability.features[0].location == "src/routes/classify_email.py" assert capability.evidence[0].strength == "strong" assert capability.evidence[0].target_kind == "capability" assert capability.evidence[0].target_id == capability_id assert capability.evidence[0].reference_kind == "fact" assert capability.evidence[0].reference_id == 42 updated_map = service.update_scope( repository.id, name="MailRouter Product Scope", description="Email routing repository scope.", confidence=0.9, ) assert updated_map.scope.name == "MailRouter Product Scope" assert updated_map.scope.description == "Email routing repository scope." def test_dependency_impact_propagates_changed_fact_to_scope(tmp_path): service = make_service(tmp_path) source = write_python_cli_repo(tmp_path) repository = service.register_repository( name="PyCLI", url=str(source), description="CLI command repository.", ) base_summary = service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) click_fact = next( fact for fact in base_summary.facts if fact.kind == "framework" and fact.path == "requirements.txt" and fact.name == "Click" ) source_ref = SourceReference( fact_id=click_fact.id, path=click_fact.path, kind=click_fact.kind, name=click_fact.name, ) ability_id = service.add_ability( repository.id, name="Command Line Operations", description="Expose command line workflows.", ) capability_id = service.add_capability( repository.id, ability_id, name="Click Command Execution", description="Run commands through Click.", ) feature_id = service.store.create_feature( repository.id, capability_id, name="Click decorator", type="interface", location="cli.py", confidence=0.9, source_refs=[source_ref], ) evidence_id = service.store.create_evidence( repository.id, capability_id, type="dependency", reference="Click dependency", strength="strong", source_refs=[source_ref], ) (source / "requirements.txt").write_text("typer\npytest\n", encoding="utf-8") target_summary = service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) impact = service.analyze_dependency_impact( repository.id, base_summary.analysis_run.id, target_summary.analysis_run.id, ) impacted_keys = {item.item_key for item in impact.impacts} assert f"feature:{feature_id}" in impacted_keys assert f"evidence:{evidence_id}" in impacted_keys assert f"capability:{capability_id}" in impacted_keys assert f"ability:{ability_id}" in impacted_keys assert f"scope:{service.store.get_ability_map(repository.id).scope.id}" in impacted_keys assert impact.scope_impacted is True assert impact.max_depth == 4 assert any( "removed fact fact:framework:requirements.txt:Click" in reason for item in impact.impacts for reason in item.reasons ) assert all(item.freshness_state == "stale" for item in impact.impacts) def test_dependency_graph_flags_same_layer_edges(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Same Layer", url="https://example.com/same-layer.git", description="Tests same-layer dependency normalization signals.", ) ability_id = service.add_ability(repository.id, name="Operations") first_capability_id = service.add_capability( repository.id, ability_id, name="Source Capability", ) second_capability_id = service.add_capability( repository.id, ability_id, name="Target Capability", ) service.store.create_evidence( repository.id, second_capability_id, type="relationship", reference="Target depends on source capability", strength="medium", target_kind="capability", target_id=second_capability_id, reference_kind="capability", reference_id=first_capability_id, ) graph = service.build_dependency_graph(repository.id) same_layer_edges = [edge for edge in graph.edges if edge.same_layer] assert len(same_layer_edges) == 1 assert same_layer_edges[0].source_key == f"capability:{first_capability_id}" assert same_layer_edges[0].target_key == f"capability:{second_capability_id}" def test_dependency_graph_enriches_layers_and_filters_with_profiles(tmp_path): service = make_service(tmp_path) source = write_python_cli_repo(tmp_path) repository = service.register_repository( name="Graph Profile", url=str(source), description="Graph profile fixture.", ) summary = service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) fact = next(item for item in summary.facts if item.kind == "framework") source_ref = SourceReference( fact_id=fact.id, path=fact.path, kind=fact.kind, name=fact.name, ) ability_id = service.add_ability(repository.id, name="Explore Graphs") capability_id = service.add_capability( repository.id, ability_id, name="Filter Dependency Graph", ) feature_id = service.store.create_feature( repository.id, capability_id, name="Graph filter control", type="UI", location="src/ui.py", confidence=0.8, source_refs=[source_ref], ) service.store.create_evidence( repository.id, capability_id, type="test", reference="tests/test_ui.py", strength="strong", target_kind="feature", target_id=feature_id, source_refs=[source_ref], ) profile = service.create_dependency_graph_profile( repository.id, name="Evidence Audit", description="Blur non-evidence layers.", default_mode="full", filter_rules=[ {"name": "blur facts", "action": "blur", "match": {"layer": "fact"}}, {"name": "hide features", "action": "hide", "match": {"layer": "feature"}}, ], manual_overrides={f"feature:{feature_id}": "show", "missing:1": "hide"}, ) payload = service.dependency_graph_elements(repository.id, profile_id=profile.id) nodes = [ element["data"] for element in payload["elements"] if "source" not in element["data"] ] fact_node = next(node for node in nodes if node["kind"] == "fact") feature_node = next(node for node in nodes if node["id"] == f"feature:{feature_id}") evidence_node = next(node for node in nodes if node["kind"] == "evidence") assert fact_node["layer"] == "fact" assert fact_node["path"] == fact.path assert fact_node["displayState"] == "blur" assert fact_node["reviewState"] == "accepted" assert fact_node["visualSize"] == 36 assert feature_node["displayState"] == "show" assert feature_node["visibilitySource"] == "manual" assert feature_node["visualSize"] == 50 assert evidence_node["layer"] == "evidence" assert evidence_node["visualSize"] == 53 assert payload["filter"]["orphaned_overrides"] == ["missing:1"] assert payload["metrics"]["hidden_count"] == 0 evidence_edge = next( element["data"] for element in payload["elements"] if element["data"].get("target") == f"feature:{feature_id}" and element["data"].get("sourceKind") == "evidence" ) assert evidence_edge["edgeWidth"] == 5 assert evidence_edge["reviewState"] == "accepted" def test_dependency_graph_filters_review_state_and_marks_blurred_edges(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Review State", url="https://example.com/review-state.git", description="Review state fixture.", ) ability_id = service.add_ability(repository.id, name="Graph Review") capability_id = service.add_capability(repository.id, ability_id, name="Inspect") feature_id = service.add_feature( repository.id, capability_id, name="Inspector", type="UI", confidence=0.5, ) payload = service.dependency_graph_elements( repository.id, rules=[ { "name": "blur accepted", "action": "blur", "match": {"reviewState": "accepted"}, } ], use_latest_profile=False, ) feature = next( element["data"] for element in payload["elements"] if element["data"].get("id") == f"feature:{feature_id}" ) edge = next( element["data"] for element in payload["elements"] if element["data"].get("source") == f"feature:{feature_id}" ) assert feature["displayState"] == "blur" assert edge["connectedToBlurred"] is True def test_dependency_graph_uses_latest_profile_by_default(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Latest Profile", url="https://example.com/latest-profile.git", description="Latest profile fixture.", ) ability_id = service.add_ability(repository.id, name="Profile Defaults") service.add_capability(repository.id, ability_id, name="Load Profile") first = service.create_dependency_graph_profile( repository.id, name="First", filter_rules=[ {"name": "blur abilities", "action": "blur", "match": {"layer": "ability"}} ], ) second = service.create_dependency_graph_profile( repository.id, name="Second", filter_rules=[ {"name": "hide abilities", "action": "hide", "match": {"layer": "ability"}} ], ) default_payload = service.dependency_graph_elements(repository.id) explicit_payload = service.dependency_graph_elements( repository.id, profile_id=first.id, ) unsaved_payload = service.dependency_graph_elements( repository.id, use_latest_profile=False, ) assert default_payload["profile"]["id"] == second.id assert default_payload["metrics"]["hidden_count"] >= 1 assert explicit_payload["profile"]["id"] == first.id assert unsaved_payload["profile"] is None def test_dependency_graph_deduplicates_document_fact_nodes(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Docs", url="https://example.com/docs.git", description="Document graph fixture.", ) ability_id = service.add_ability(repository.id, name="Documented Operation") capability_id = service.add_capability(repository.id, ability_id, name="Read Docs") run = service.store.create_analysis_run(repository.id) with service.store.connect() as connection: cursor = connection.execute( """ INSERT INTO observed_facts (repository_id, analysis_run_id, snapshot_id, kind, path, name, value, metadata) VALUES (?, ?, NULL, 'documentation', 'README.md', 'README', '', '{}') """, (repository.id, run.id), ) readme_fact_id = int(cursor.lastrowid) cursor = connection.execute( """ INSERT INTO observed_facts (repository_id, analysis_run_id, snapshot_id, kind, path, name, value, metadata) VALUES (?, ?, NULL, 'scope', 'SCOPE.md', 'SCOPE', '', ?) """, (repository.id, run.id, '{"source_role": "derived_scope"}'), ) scope_fact_id = int(cursor.lastrowid) service.store.create_feature( repository.id, capability_id, name="README backed feature", type="docs", location="README.md", confidence=0.7, source_refs=[ SourceReference( fact_id=readme_fact_id, path="README.md", kind="documentation", name="README", ), SourceReference( fact_id=scope_fact_id, path="SCOPE.md", kind="scope", name="SCOPE", ), ], ) payload = service.dependency_graph_elements(repository.id, use_latest_profile=False) fact_nodes = [ element["data"] for element in payload["elements"] if element["data"].get("kind") == "fact" ] assert [node["id"] for node in fact_nodes] == ["fact:document:README.md"] assert fact_nodes[0]["label"] == "README.md (documentation)" def test_dependency_graph_renders_candidate_fallback_when_approved_hierarchy_missing(tmp_path): service = make_service(tmp_path) source = tmp_path / "scope-candidate" source.mkdir() (source / "SCOPE.md").write_text( "# SCOPE\n\n" "## One-liner\n" "S5 Workloads and Experience layer.\n\n" "## Provided Capabilities\n\n" "```capability\n" "type: infrastructure\n" "title: Application workload deployment\n" "description: Deploy applications as Helm releases.\n" "keywords: [helm]\n" "```\n", encoding="utf-8", ) repository = service.register_repository(name="Scope Candidate", url=str(source)) service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) payload = service.dependency_graph_elements(repository.id, use_latest_profile=False) nodes = [ element["data"] for element in payload["elements"] if "source" not in element["data"] ] edges = [ element["data"] for element in payload["elements"] if "source" in element["data"] ] assert payload["metrics"]["node_count"] > 0 assert any(node["reviewState"] == "candidate" for node in nodes) assert any(node["reviewState"] == "draft" for node in nodes) assert any(edge["dependencyType"] == "draft-realizes" for edge in edges) assert any(edge["dependencyType"] == "draft-supports" for edge in edges) def test_dependency_graph_candidate_fallback_uses_latest_completed_run(tmp_path): service = make_service(tmp_path) source = tmp_path / "latest-scope-candidate" source.mkdir() (source / "SCOPE.md").write_text( "# SCOPE\n\n## One-liner\nOld scope summary.\n", encoding="utf-8", ) repository = service.register_repository(name="Latest Scope Candidate", url=str(source)) service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) (source / "SCOPE.md").write_text( "# SCOPE\n\n" "## One-liner\n" "Latest scope summary.\n\n" "## Provided Capabilities\n\n" "```capability\n" "type: review\n" "title: Latest Scope Capability\n" "description: The second run should drive graph fallback.\n" "```\n", encoding="utf-8", ) latest = service.analyze_repository( repository.id, source_path=str(source), use_llm_assistance=False, ) payload = service.dependency_graph_elements(repository.id, use_latest_profile=False) labels = { element["data"].get("label") for element in payload["elements"] if "source" not in element["data"] } assert latest.analysis_run.id == service.list_analysis_runs(repository.id)[0].id assert "Latest Scope Capability" in labels assert "Old Scope Summary" not in labels def test_manual_registry_updates_and_deletes_approved_entries(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Manual", url="https://example.com/manual.git", description="Manual registry fixture.", ) ability_id = service.add_ability(repository.id, name="Original Ability") capability_id = service.add_capability( repository.id, ability_id, name="Original Capability", ) feature_id = service.add_feature( repository.id, capability_id, name="Original Feature", type="API", ) evidence_id = service.add_evidence( repository.id, capability_id, type="test", reference="tests/test_original.py", ) service.update_ability(repository.id, ability_id, name="Updated Ability") service.update_capability( repository.id, capability_id, name="Updated Capability", inputs=["request"], outputs=["response"], ) service.update_feature(repository.id, feature_id, location="src/api.py") ability_map = service.update_evidence( repository.id, evidence_id, strength="strong", reference_kind="feature", reference_id=feature_id, ) ability = ability_map.abilities[0] capability = ability.capabilities[0] assert ability.name == "Updated Ability" assert capability.name == "Updated Capability" assert capability.inputs == ["request"] assert capability.outputs == ["response"] assert capability.features[0].location == "src/api.py" assert capability.evidence[0].strength == "strong" assert capability.evidence[0].reference_kind == "feature" assert capability.evidence[0].reference_id == feature_id service.delete_feature(repository.id, feature_id) service.delete_evidence(repository.id, evidence_id) ability_map = service.delete_capability(repository.id, capability_id) assert ability_map.abilities[0].capabilities == [] ability_map = service.delete_ability(repository.id, ability_id) assert ability_map.abilities == [] def test_repository_update_and_delete(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Original", url="https://example.com/original.git", description="Original description.", ) ability_id = service.add_ability(repository.id, name="Original Ability") updated = service.update_repository( repository.id, name="Updated", description="Updated description.", branch="develop", ) assert updated.name == "Updated" assert updated.description == "Updated description." assert updated.branch == "develop" assert service.ability_map(repository.id).abilities[0].id == ability_id service.delete_repository(repository.id) try: service.get_repository(repository.id) except NotFoundError as exc: assert "repository" in str(exc) else: raise AssertionError("expected a NotFoundError") def test_search_matches_approved_abilities_and_capabilities(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="MailRouter", url="https://example.com/mail-router.git", description="Manual test repository.", ) ability_id = service.add_ability( repository.id, name="Business Email Routing", description="Route inbound messages.", ) service.add_capability( repository.id, ability_id, name="Classify Incoming Email", description="Classify messages into intent categories.", ) results = service.search("classify") assert len(results) == 1 assert results[0].repository_name == "MailRouter" assert results[0].match_type == "capability" assert results[0].match_name == "Classify Incoming Email" abilities = service.list_abilities() capabilities = service.list_capabilities() assert abilities[0].repository_name == "MailRouter" assert abilities[0].name == "Business Email Routing" assert capabilities[0].ability_name == "Business Email Routing" assert capabilities[0].name == "Classify Incoming Email" def test_search_matches_features_and_evidence_with_context(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="MailRouter", url="https://example.com/mail-router-feature.git", description="Manual test repository.", ) ability_id = service.add_ability(repository.id, name="Business Email Routing") capability_id = service.add_capability( repository.id, ability_id, name="Classify Incoming Email", ) service.add_feature( repository.id, capability_id, name="POST /api/classify-email", type="REST endpoint", location="src/routes/classify_email.py", ) service.add_evidence( repository.id, capability_id, type="unit_test", reference="tests/test_email_classification.py", strength="strong", ) feature_results = service.search("classify_email") evidence_results = service.search("unit_test") assert feature_results[0].match_type == "feature" assert feature_results[0].matched_field == "location" assert feature_results[0].ability_name == "Business Email Routing" assert feature_results[0].capability_name == "Classify Incoming Email" assert feature_results[0].source_reference == "src/routes/classify_email.py" assert evidence_results[0].match_type == "evidence" assert evidence_results[0].evidence_level == "strong" assert evidence_results[0].confidence == 0.9 def test_search_filters_by_status_language_and_framework(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Filterable\n", encoding="utf-8") (source / "requirements.txt").write_text("fastapi\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Filterable", url=str(source)) summary = service.analyze_repository(repository.id) service.approve_candidate_graph(repository.id, summary.analysis_run.id) results = service.search( "health", status="indexed", language="Python", framework="FastAPI", ability="Support Filterable", capability="Repository Interface", ) wrong_language_results = service.search( "repository", status="indexed", language="TypeScript", framework="FastAPI", ) wrong_capability_results = service.search( "repository", status="indexed", language="Python", framework="FastAPI", capability="Email Routing", ) assert results assert {result.repository_name for result in results} == {"Filterable"} assert wrong_language_results == [] assert wrong_capability_results == [] def test_fixture_breadth_readme_only_repo_stays_conservative(tmp_path): source = write_readme_only_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Readme Only", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert summary.analysis_run.status == "completed" assert graph.abilities[0].confidence == 0.45 assert graph.abilities[0].capabilities == [] assert service.ability_map(repository.id).abilities == [] def test_fixture_breadth_python_cli_repo_extracts_reviewable_cli_claims(tmp_path): source = write_python_cli_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Python CLI", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = graph.abilities[0].capabilities[0] assert summary.analysis_run.status == "completed" assert capability.name == "Expose Repository Interface" assert capability.features[0].type == "CLI" assert capability.features[0].name.startswith("CLI command surface:") assert capability.evidence[0].reference == "tests/test_cli.py" assert service.ability_map(repository.id).abilities == [] def test_fixture_breadth_javascript_typescript_package_extracts_structure_and_api(tmp_path): source = write_javascript_typescript_package_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="JS TS Package", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) fact_names = {(fact.kind, fact.name, fact.path) for fact in summary.facts} capability_names = { capability.name for ability in graph.abilities for capability in ability.capabilities } feature_types = { feature.type for ability in graph.abilities for capability in ability.capabilities for feature in capability.features } assert ("language", "TypeScript", "") in fact_names assert ("framework", "React", "package.json") in fact_names assert ("framework", "Vitest", "package.json") in fact_names assert "Expose Repository Interface" in capability_names assert "API" in feature_types assert service.ability_map(repository.id).abilities == [] def test_fixture_breadth_misleading_docs_do_not_become_approved_truth(tmp_path): source = write_misleading_docs_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Misleading Docs", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) ability_map = service.ability_map(repository.id) assert summary.analysis_run.status == "completed" assert graph.abilities[0].confidence == 0.45 assert graph.abilities[0].capabilities == [] assert ability_map.abilities == [] def test_regression_key_cape_like_repo_centers_iam_not_llm_provider_routing(tmp_path): source = write_key_cape_like_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="KeyCape Like", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability_names = { capability.name for ability in graph.abilities for capability in ability.capabilities } assert "Enforce OIDC PKCE Profiles" in capability_names assert "Validate LDAP Schema Migrations" in capability_names assert "Run Migration Tooling For Identity Data" in capability_names assert "Route LLM Requests Across Providers" not in capability_names facts = {(fact.kind, fact.name, fact.path) for fact in summary.facts} assert ("llm_provider", "Claude", "CLAUDE.md") not in facts def test_regression_llm_connect_like_repo_still_promotes_provider_routing(tmp_path): source = write_llm_connect_like_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="LLM Connect Like", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = next( capability for ability in graph.abilities for capability in ability.capabilities if capability.name == "Route LLM Requests Across Providers" ) assert {"utility-adapter", "llm-provider", "openrouter", "claude"} <= set( capability.attributes ) def test_regression_facade_repo_promotes_public_wrapper_as_facade(tmp_path): source = write_facade_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Mail Facade", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = graph.abilities[0].capabilities[0] assert capability.name == "Expose Repository Interface" assert "utility-facade" in capability.attributes assert "POST /classify" in {feature.name for feature in capability.features} def test_regression_dependency_only_repo_keeps_libraries_as_context(tmp_path): source = write_dependency_only_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Dependency Only", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability_names = { capability.name for ability in graph.abilities for capability in ability.capabilities } assert "Route LLM Requests Across Providers" not in capability_names assert capability_names == set() assert any(fact.kind == "manifest" for fact in summary.facts) def test_regression_ops_bridge_like_repo_is_it_operations_not_llm_provider(tmp_path): source = write_ops_bridge_like_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Ops Bridge Marketing Name", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) graph = service.candidate_graph(repository.id, summary.analysis_run.id) ability = graph.abilities[0] capability_names = { capability.name for candidate_ability in graph.abilities for capability in candidate_ability.capabilities } facts = {(fact.kind, fact.name, fact.path) for fact in summary.facts} assert ability.name == "Manage SSH Reverse Tunnel Connectivity" assert ability.primary_class == "it-operations" assert {"remote-access", "connectivity", "operations"} <= set(ability.attributes) assert "repository" not in ability.attributes assert "llm-provider" not in ability.attributes assert "Route LLM Requests Across Providers" not in capability_names assert "Maintain Continuous Connectivity Between Remote Systems And Central Hub" in capability_names assert "Make Connectivity Observable Auditable And Controllable" in capability_names assert "Expose CLI And MCP Accessible Service" in capability_names cli_capability = next( capability for candidate_ability in graph.abilities for capability in candidate_ability.capabilities if capability.name == "Expose CLI And MCP Accessible Service" ) assert {feature.name for feature in cli_capability.features} == { "CLI command surface: CLI command up" } assert ("llm_provider", "Claude", "scripts/register_mcp.py") not in facts assert ("llm_provider", "Claude", "workplans/BRIDGE-WP-0003.md") not in facts def test_fixture_breadth_empty_repo_produces_no_candidate_claims(tmp_path): source = write_empty_repo(tmp_path) service = make_service(tmp_path) repository = service.register_repository(name="Empty Repo", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert summary.analysis_run.status == "completed" assert summary.snapshot is not None assert summary.snapshot.file_count == 0 assert summary.facts == [] assert graph.abilities == [] assert service.ability_map(repository.id).abilities == [] def test_semantic_search_adds_hybrid_matches_without_changing_text_default(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Queue Worker\n\nHandles postponed customer jobs.\n", encoding="utf-8", ) text_service = make_service(tmp_path) repository = text_service.register_repository( name="Queue Worker", url=str(source), description="Processes deferred customer work.", ) ability_id = text_service.add_ability( repository.id, name="Background Job Processing", description="Run deferred work outside request handling.", confidence=0.8, ) capability_id = text_service.add_capability( repository.id, ability_id, name="Process Customer Tasks", description="Execute queued customer tasks asynchronously.", confidence=0.7, ) text_service.add_feature( repository.id, capability_id, name="worker task loop", type="background worker", location="worker.py", confidence=0.6, ) text_service.analyze_repository(repository.id) assert text_service.search("customer queued") == [] semantic_service = RegistryService( text_service.store, ingestion=GitIngestionService(tmp_path / "checkouts"), embedding_provider=HashingEmbeddingProvider(), ) results = semantic_service.search("customer queued") assert results assert results[0].match_type in {"capability", "content_chunk"} assert results[0].matched_field == "semantic" assert results[0].vector_score > 0 assert results[0].hybrid_score >= results[0].vector_score * 0.35 assert any(result.match_type == "content_chunk" for result in results) def test_register_repository_imports_metadata_when_name_is_omitted(tmp_path): source = tmp_path / "metadata-source" source.mkdir() (source / "pyproject.toml").write_text( '[project]\nname = "metadata-source"\ndescription = "Imported description."\n', encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(url=str(source)) assert repository.name == "metadata-source" assert repository.description == "Imported description." def test_operational_logging_records_analysis_and_review_events(tmp_path, caplog): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Logged\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) with caplog.at_level(logging.INFO, logger=LOGGER_NAME): repository = service.register_repository(name="Logged", url=str(source)) summary = service.analyze_repository(repository.id) service.approve_candidate_graph(repository.id, summary.analysis_run.id) events = [json.loads(record.message)["event"] for record in caplog.records] assert "repository_registered" in events assert "analysis_started" in events assert "analysis_completed" in events assert "review_decision_recorded" in events def test_capability_must_belong_to_repository(tmp_path): service = make_service(tmp_path) first = service.register_repository( name="First", url="https://example.com/first.git", description="Manual first repository.", ) second = service.register_repository( name="Second", url="https://example.com/second.git", description="Manual second repository.", ) ability_id = service.add_ability(first.id, name="Document Classification") try: service.add_capability(second.id, ability_id, name="Classify Document") except NotFoundError as exc: assert "ability" in str(exc) else: raise AssertionError("expected a NotFoundError") def test_analyze_repository_records_snapshot_and_observed_facts(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Example\n", encoding="utf-8") (source / "requirements.txt").write_text("fastapi\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository( name="Example", url=str(source), description="A local fixture repository", ) summary = service.analyze_repository(repository.id) assert summary.analysis_run.status == "completed" assert summary.snapshot is not None assert summary.snapshot.file_count == 3 assert service.get_repository(repository.id).status == "analyzed" fact_names = {(fact.kind, fact.name, fact.path) for fact in summary.facts} assert ("documentation", "README", "README.md") in fact_names assert ("framework", "FastAPI", "requirements.txt") in fact_names assert ("interface", "python route decorator", "app.py") in fact_names chunks = service.list_content_chunks(repository.id, summary.analysis_run.id) chunk_sources = {(chunk.kind, chunk.path) for chunk in chunks} assert ("documentation", "README.md") in chunk_sources assert ("manifest", "requirements.txt") in chunk_sources assert ("interface", "app.py") in chunk_sources candidate_graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert candidate_graph.repository.name == "Example" assert candidate_graph.abilities assert "Example" in candidate_graph.abilities[0].description assert "@app.get" in candidate_graph.abilities[0].capabilities[0].description assert candidate_graph.abilities[0].capabilities[0].features[0].name == "GET /health" capability_names = { capability.name for ability in candidate_graph.abilities for capability in ability.capabilities } assert "Expose Repository Interface" in capability_names def test_analyze_repository_can_use_optional_llm_extractor(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Email Router\nRoutes incoming customer email.\n", encoding="utf-8", ) store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() extractor = FakeLLMExtractor( [ ExtractedAbility( name="Business Email Routing", description="Route incoming messages.", source_paths=["README.md"], capabilities=[ ExtractedCapability( name="Classify Incoming Email", description="Classify messages by intent.", source_paths=["README.md"], ) ], ) ] ) service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=extractor, ) repository = service.register_repository(name="Email Router", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert extractor.calls assert extractor.calls[0][1] assert graph.abilities[0].name == "Business Email Routing" assert graph.abilities[0].capabilities[0].name == "Classify Incoming Email" assert graph.abilities[0].source_refs[0].path == "README.md" assert decisions[0].action == "llm_extraction_used" assert "llm+deterministic candidate generation" in decisions[0].notes assert {ability.name for ability in graph.abilities} >= { "Business Email Routing", "Route Incoming Customer Email", } def test_analyze_repository_folds_llm_capabilities_when_ability_comes_from_scope(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "INTENT.md").write_text( "# INTENT\n\n" "Provide lightweight IAM.\n\n" "## Intended Capabilities\n\n" "- Enforce OIDC PKCE profiles.\n", encoding="utf-8", ) (source / "SCOPE.md").write_text( "# SCOPE\n\nOld approved entry: route LLM provider requests.\n", encoding="utf-8", ) (source / "providers.py").write_text( "provider_registry = {'openrouter': object()}\n", encoding="utf-8", ) store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() extractor = FakeLLMExtractor( [ ExtractedAbility( name="Old LLM Routing", description="Stale prior scope claim.", source_paths=["SCOPE.md"], capabilities=[ ExtractedCapability( name="Configure OpenRouter Adapter", description="Source-linked provider adapter.", source_paths=["providers.py"], features=[ ExtractedFeature( name="OpenRouter provider registry", type="backend", location="providers.py", source_paths=["providers.py"], ) ], ) ], ) ] ) service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=extractor, ) repository = service.register_repository(name="KeyCape Like", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) capability_names = { capability.name for ability in graph.abilities for capability in ability.capabilities } ability_names = {ability.name for ability in graph.abilities} assert "Old LLM Routing" not in ability_names assert "Configure OpenRouter Adapter" in capability_names assert "Enforce OIDC PKCE Profiles" in capability_names assert decisions[0].action == "llm_extraction_used" assert "llm+deterministic candidate generation" in decisions[0].notes def test_analysis_isolation_between_repositories_with_stale_approved_data(tmp_path): poisoned_source = write_llm_connect_like_repo(tmp_path) target_source = write_key_cape_like_repo(tmp_path) service = make_service(tmp_path) poisoned = service.register_repository( name="Poisoned LLM Connect", url=str(poisoned_source), ) target = service.register_repository( name="Isolated KeyCape", url=str(target_source), ) poisoned_summary = service.analyze_repository( poisoned.id, use_llm_assistance=False, ) service.approve_candidate_graph(poisoned.id, poisoned_summary.analysis_run.id) assert any( capability.name == "Route LLM Requests Across Providers" for ability in service.ability_map(poisoned.id).abilities for capability in ability.capabilities ) target_summary = service.analyze_repository( target.id, use_llm_assistance=False, ) target_graph = service.candidate_graph(target.id, target_summary.analysis_run.id) target_facts = service.list_observed_facts(target.id, target_summary.analysis_run.id) target_chunks = service.list_content_chunks(target.id, target_summary.analysis_run.id) target_capability_names = { capability.name for ability in target_graph.abilities for capability in ability.capabilities } assert "Enforce OIDC PKCE Profiles" in target_capability_names assert "Route LLM Requests Across Providers" not in target_capability_names assert all(fact.repository_id == target.id for fact in target_facts) assert all(chunk.repository_id == target.id for chunk in target_chunks) assert all(ref.path != "providers.py" for ability in target_graph.abilities for ref in ability.source_refs) assert service.ability_map(target.id).abilities == [] def test_analyze_repository_can_disable_optional_llm_extractor(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Email Router\nRoutes incoming customer email.\n", encoding="utf-8", ) store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() extractor = FakeLLMExtractor( [ ExtractedAbility( name="Business Email Routing", description="Route incoming messages.", source_paths=["README.md"], ) ] ) service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=extractor, ) repository = service.register_repository(name="Email Router", url=str(source)) summary = service.analyze_repository( repository.id, use_llm_assistance=False, ) graph = service.candidate_graph(repository.id, summary.analysis_run.id) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert extractor.calls == [] assert graph.abilities[0].name == "Route Incoming Customer Email" assert all(decision.action != "llm_extraction_used" for decision in decisions) def test_analyze_repository_normalizes_duplicate_llm_candidates(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# LLM Connect\nSupports OpenRouter providers.\n", encoding="utf-8", ) store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() extractor = FakeLLMExtractor( [ ExtractedAbility( name="LLM Provider Integration", description="Connects to model providers.", source_paths=["README.md"], ), ExtractedAbility( name="LLM Provider Integrations", description="Connects prompts to OpenRouter providers.", source_paths=["README.md"], ), ] ) service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=extractor, ) repository = service.register_repository(name="LLM Connect", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert len(graph.abilities) == 2 assert graph.abilities[0].name == "LLM Provider Integrations" assert graph.abilities[1].name == "Support OpenRouter Providers" def test_analyze_repository_falls_back_when_optional_llm_extractor_returns_no_candidates(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Fallback\n", encoding="utf-8") store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=FakeLLMExtractor([]), ) repository = service.register_repository(name="Fallback", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert graph.abilities[0].name == "Support Fallback" def test_analyze_repository_routes_legacy_auto_approve_to_agentic_review(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Auto Approved\nReports health over HTTP.\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Auto Approved", url=str(source)) summary = service.analyze_repository( repository.id, trusted_auto_approve=True, use_llm_assistance=False, ) ability_map = service.ability_map(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert service.get_repository(repository.id).status == "analyzed" statuses_by_capability = { capability.name: capability.status for capability in graph.abilities[0].capabilities } assert statuses_by_capability["Expose Repository Interface"] == "candidate" assert ability_map.abilities == [] assert decisions[0].action == "agentic_review_unconfigured" assert "deterministic candidate generation" in decisions[0].notes assert "Deprecated trusted_auto_approve request was routed" in decisions[0].notes assert "candidates remain pending human review" in decisions[0].notes def test_rebuild_characteristics_dry_run_preserves_approved_map(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Rebuild\nReports health over HTTP.\n", encoding="utf-8") (source / "app.py").write_text('@app.get("/health")\ndef health():\n return {}\n', encoding="utf-8") service = make_service(tmp_path) repository = service.register_repository(name="Rebuild", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) service.approve_candidate_graph(repository.id, summary.analysis_run.id) result = service.rebuild_characteristics_from_scratch( repository.id, dry_run=True, source_path=str(source), use_llm_assistance=False, ) assert result.dry_run is True assert result.cleared_approved is False assert result.previous_counts["abilities"] == 1 assert result.previous_ids["abilities"] assert result.candidate_counts["abilities"] == 1 assert service.ability_map(repository.id).abilities decisions = service.list_review_decisions(repository.id, result.analysis_run.id) assert decisions[-1].action == "dry_run_rebuild_characteristics_from_scratch" def test_rebuild_characteristics_requires_confirmation_before_clearing(tmp_path): service = make_service(tmp_path) source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Rebuild\n", encoding="utf-8") repository = service.register_repository(name="Rebuild", url=str(source)) try: service.rebuild_characteristics_from_scratch( repository.id, dry_run=False, confirm=False, source_path=str(source), use_llm_assistance=False, ) except ValueError as exc: assert "confirm=True" in str(exc) else: raise AssertionError("expected confirmed rebuild to require confirm=True") def test_rebuild_characteristics_confirmed_clears_approved_map(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Rebuild\nReports health over HTTP.\n", encoding="utf-8") (source / "app.py").write_text('@app.get("/health")\ndef health():\n return {}\n', encoding="utf-8") service = make_service(tmp_path) repository = service.register_repository(name="Rebuild", url=str(source)) summary = service.analyze_repository(repository.id, use_llm_assistance=False) service.approve_candidate_graph(repository.id, summary.analysis_run.id) result = service.rebuild_characteristics_from_scratch( repository.id, dry_run=False, confirm=True, source_path=str(source), use_llm_assistance=False, ) assert result.cleared_approved is True assert result.previous_counts["abilities"] == 1 assert result.previous_ids["abilities"] assert service.ability_map(repository.id).abilities == [] assert service.get_repository(repository.id).status == "analyzed" decisions = service.list_review_decisions(repository.id, result.analysis_run.id) assert decisions[-1].action == "rebuild_characteristics_from_scratch" assert "Previous approved IDs" in decisions[-1].notes def test_analyze_repository_records_llm_failure_and_falls_back(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Failing LLM\n", encoding="utf-8") store = RegistryStore(tmp_path / "registry.sqlite3") store.initialize() service = RegistryService( store, ingestion=GitIngestionService(tmp_path / "checkouts"), llm_extractor=FailingLLMExtractor(), ) repository = service.register_repository(name="Failing LLM", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert summary.analysis_run.status == "completed" assert graph.abilities[0].name == "Support Failing LLM" assert decisions[0].action == "llm_extraction_failed" assert "provider unavailable" in decisions[0].notes def test_approve_candidate_graph_publishes_ability_map_once(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Example\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) (source / "cli.py").write_text( "import click\n\n" "@click.command()\n" "def health():\n" " click.echo('ok')\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Example", url=str(source)) summary = service.analyze_repository(repository.id) ability_map = service.approve_candidate_graph( repository.id, summary.analysis_run.id, notes="Looks good for the first pass.", ) second_approval = service.approve_candidate_graph( repository.id, summary.analysis_run.id, ) assert service.get_repository(repository.id).status == "indexed" assert len(ability_map.abilities) == 1 assert len(second_approval.abilities) == 1 assert ability_map.abilities[0].name == "Support Example" assert ability_map.abilities[0].primary_class == "developer-tooling" assert ability_map.abilities[0].attributes == ["interface"] assert ability_map.abilities[0].capabilities[0].primary_class == "interface" assert ability_map.abilities[0].capabilities[0].features[0].location == "app.py" assert ability_map.abilities[0].capabilities[0].features[0].primary_class == "API" assert ability_map.abilities[0].capabilities[0].features[0].attributes == [ "API", "surface", "http", ] assert ability_map.abilities[0].capabilities[0].features[0].source_refs assert ability_map.abilities[0].capabilities[0].features[0].source_refs[0].line == 3 assert ability_map.abilities[0].capabilities[0].evidence[0].source_refs candidate_graph = service.candidate_graph(repository.id, summary.analysis_run.id) assert candidate_graph.abilities[0].status == "approved" assert candidate_graph.abilities[0].primary_class == "developer-tooling" assert candidate_graph.abilities[0].capabilities[0].primary_class == "interface" assert ( candidate_graph.abilities[0].capabilities[0].features[0].primary_class == "API" ) assert candidate_graph.abilities[0].capabilities[0].features[0].attributes == [ "API", "surface", "http", ] decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert decisions[0].action == "approve_candidate_graph" assert decisions[0].notes == "Looks good for the first pass." def test_accept_candidate_feature_promotes_parent_context_once(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Feature Accept\nReports health over HTTP.\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Feature Accept", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) candidate_feature = graph.abilities[0].capabilities[0].features[0] ability_map = service.accept_candidate_feature( repository.id, summary.analysis_run.id, candidate_feature.id, ) graph_after_feature_accept = service.candidate_graph( repository.id, summary.analysis_run.id, ) assert len(ability_map.abilities) == 1 assert ability_map.abilities[0].capabilities[0].features[0].name == "GET /health" assert graph_after_feature_accept.abilities[0].capabilities[0].features[0].status == ( "approved" ) final_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) assert len(final_map.abilities) == 1 interface_capabilities = [ capability for capability in final_map.abilities[0].capabilities if capability.name == "Expose Repository Interface" ] assert len(interface_capabilities) == 1 assert len(interface_capabilities[0].features) == 1 decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert {decision.action for decision in decisions} >= { "accept_candidate_feature", "approve_candidate_graph", } def test_accept_candidate_evidence_promotes_parent_context(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text( "# Support Accept\nDocuments an HTTP health interface.\n", encoding="utf-8", ) (source / "tests").mkdir() (source / "tests" / "test_health.py").write_text( "def test_health(): pass\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Support Accept", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) candidate_evidence = graph.abilities[0].capabilities[0].evidence[0] ability_map = service.accept_candidate_evidence( repository.id, summary.analysis_run.id, candidate_evidence.id, ) graph_after_accept = service.candidate_graph( repository.id, summary.analysis_run.id, ) approved_evidence = ability_map.abilities[0].capabilities[0].evidence[0] assert approved_evidence.reference == candidate_evidence.reference assert approved_evidence.target_kind == "capability" assert graph_after_accept.abilities[0].capabilities[0].evidence[0].status == ( "approved" ) decisions = service.list_review_decisions(repository.id, summary.analysis_run.id) assert decisions[0].action == "accept_candidate_evidence" def test_analysis_run_diff_keeps_approved_map_stable_until_change_approval(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Example\n", encoding="utf-8") app_file = source / "app.py" app_file.write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Example", url=str(source)) first_summary = service.analyze_repository(repository.id) approved_before = service.approve_candidate_graph( repository.id, first_summary.analysis_run.id, ) app_file.write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/ready")\n' "def ready():\n" " return {}\n", encoding="utf-8", ) second_summary = service.analyze_repository(repository.id) approved_after_analysis = service.ability_map(repository.id) diff = service.diff_analysis_runs( repository.id, first_summary.analysis_run.id, second_summary.analysis_run.id, ) assert approved_after_analysis.abilities[0].capabilities[0].features[0].name == ( approved_before.abilities[0].capabilities[0].features[0].name ) assert any(item.item_type == "feature" for item in diff.candidates.added) assert any(item.item_type == "feature" for item in diff.candidates.removed) assert any(item.item_type == "feature" for item in diff.approved_entries.added) assert any(item.item_type == "feature" for item in diff.approved_entries.removed) approved_after_review = service.approve_analysis_run_changes( repository.id, second_summary.analysis_run.id, notes="Accept route change.", ) assert approved_after_review.abilities[0].capabilities[0].features[0].name == ( "GET /ready" ) decisions = service.list_review_decisions( repository.id, second_summary.analysis_run.id, ) assert decisions[0].action == "approve_analysis_run_changes" assert decisions[0].notes == "Accept route change." def test_reject_candidate_ability_excludes_it_from_approval(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Rejectable\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Rejectable", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) candidate = graph.abilities[0] rejected_graph = service.reject_candidate_ability( repository.id, summary.analysis_run.id, candidate.id, notes="Too generic.", ) ability_map = service.approve_candidate_graph( repository.id, summary.analysis_run.id, ) assert service.get_repository(repository.id).status == "reviewing" assert rejected_graph.abilities[0].status == "rejected" assert rejected_graph.abilities[0].capabilities[0].status == "rejected" assert rejected_graph.abilities[0].capabilities[0].features[0].status == "rejected" assert ability_map.abilities == [] def test_edit_candidate_graph_values_before_approval(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Editable\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Editable", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) candidate_ability = graph.abilities[0] candidate_capability = candidate_ability.capabilities[0] service.edit_candidate_ability( repository.id, summary.analysis_run.id, candidate_ability.id, name="Service Health Monitoring", description="Expose health state for operational monitoring.", confidence=0.91, notes="Curator renamed the generic ability.", ) service.edit_candidate_capability( repository.id, summary.analysis_run.id, candidate_capability.id, name="Report HTTP Health", description="Return a lightweight health response over HTTP.", confidence=0.87, ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) assert service.get_repository(repository.id).status == "indexed" assert ability_map.abilities[0].name == "Service Health Monitoring" assert ability_map.abilities[0].description == ( "Expose health state for operational monitoring." ) assert ability_map.abilities[0].confidence == 0.91 assert ability_map.abilities[0].capabilities[0].name == "Report HTTP Health" assert ability_map.abilities[0].capabilities[0].confidence == 0.87 def test_reject_candidate_capability_excludes_it_from_approval(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Capability Reject\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Capability Reject", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) candidate_capability = graph.abilities[0].capabilities[0] rejected_graph = service.reject_candidate_capability( repository.id, summary.analysis_run.id, candidate_capability.id, notes="Interface is not relevant.", ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) assert rejected_graph.abilities[0].capabilities[0].status == "rejected" assert rejected_graph.abilities[0].capabilities[0].features[0].status == "rejected" approved_capability_names = { capability.name for capability in ability_map.abilities[0].capabilities } assert candidate_capability.name not in approved_capability_names def test_reject_candidate_feature_and_evidence_excludes_only_those_leaves(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Leaf Reject\n", encoding="utf-8") (source / "tests").mkdir() (source / "tests" / "test_health.py").write_text( "def test_health(): pass\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Leaf Reject", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = graph.abilities[0].capabilities[0] service.reject_candidate_feature( repository.id, summary.analysis_run.id, capability.features[0].id, notes="Feature is incidental.", ) service.reject_candidate_evidence( repository.id, summary.analysis_run.id, capability.evidence[0].id, notes="Evidence is too weak.", ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) approved_capability = ability_map.abilities[0].capabilities[0] assert approved_capability.name == capability.name assert approved_capability.features == [] assert len(approved_capability.evidence) == len(capability.evidence) - 1 def test_relink_candidate_capability_to_another_ability_before_approval(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Relink Capability\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Relink Capability", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = graph.abilities[0].capabilities[0] with service.store.connect() as connection: cursor = connection.execute( """ INSERT INTO candidate_abilities (repository_id, analysis_run_id, name, description, confidence) VALUES (?, ?, ?, ?, ?) """, ( repository.id, summary.analysis_run.id, "Operations Visibility", "Curator-created target ability.", 0.72, ), ) target_ability_id = int(cursor.lastrowid) relinked_graph = service.relink_candidate_capability( repository.id, summary.analysis_run.id, capability.id, target_ability_id=target_ability_id, notes="Move interface under the operational ability.", ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) target_candidate = [ ability for ability in relinked_graph.abilities if ability.id == target_ability_id ][0] assert target_candidate.capabilities[0].id == capability.id approved_target = [ ability for ability in ability_map.abilities if ability.name == "Operations Visibility" ][0] assert approved_target.capabilities[0].name == capability.name def test_relink_candidate_feature_and_evidence_to_another_capability(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Relink Leaves\n", encoding="utf-8") (source / "requirements.txt").write_text("fastapi\n", encoding="utf-8") (source / "tests").mkdir() (source / "tests" / "test_health.py").write_text( "def test_health(): pass\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Relink Leaves", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) add_candidate_capability( service, repository.id, summary.analysis_run.id, graph.abilities[0].id, "Review Target Capability", ) graph = service.candidate_graph(repository.id, summary.analysis_run.id) source_capability = graph.abilities[0].capabilities[0] target_capability = graph.abilities[0].capabilities[1] feature = source_capability.features[0] evidence = source_capability.evidence[0] service.relink_candidate_feature( repository.id, summary.analysis_run.id, feature.id, target_capability_id=target_capability.id, ) service.relink_candidate_evidence( repository.id, summary.analysis_run.id, evidence.id, target_capability_id=target_capability.id, ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) approved_capabilities = { capability.name: capability for capability in ability_map.abilities[0].capabilities } assert approved_capabilities[source_capability.name].features == [] assert feature.name in { item.name for item in approved_capabilities[target_capability.name].features } assert evidence.reference in { item.reference for item in approved_capabilities[target_capability.name].evidence } def test_merge_candidate_ability_moves_capabilities_to_target(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Merge Ability\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Merge Ability", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) source_ability = graph.abilities[0] with service.store.connect() as connection: cursor = connection.execute( """ INSERT INTO candidate_abilities (repository_id, analysis_run_id, name, description, confidence) VALUES (?, ?, ?, ?, ?) """, ( repository.id, summary.analysis_run.id, "Merged Operational Ability", "Preferred duplicate ability.", 0.83, ), ) target_ability_id = int(cursor.lastrowid) graph = service.merge_candidate_ability( repository.id, summary.analysis_run.id, source_ability.id, target_ability_id=target_ability_id, ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) merged_source = [ability for ability in graph.abilities if ability.id == source_ability.id][0] target = [ability for ability in graph.abilities if ability.id == target_ability_id][0] assert merged_source.status == "merged" assert target.capabilities assert [ability.name for ability in ability_map.abilities] == [ "Merged Operational Ability" ] assert ability_map.abilities[0].capabilities def test_merge_candidate_capability_moves_children_to_target(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Merge Capability\n", encoding="utf-8") (source / "requirements.txt").write_text("fastapi\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Merge Capability", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) add_candidate_capability( service, repository.id, summary.analysis_run.id, graph.abilities[0].id, "Review Target Capability", ) graph = service.candidate_graph(repository.id, summary.analysis_run.id) source_capability = graph.abilities[0].capabilities[0] target_capability = graph.abilities[0].capabilities[1] graph = service.merge_candidate_capability( repository.id, summary.analysis_run.id, source_capability.id, target_capability_id=target_capability.id, ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) merged_source = [ capability for ability in graph.abilities for capability in ability.capabilities if capability.id == source_capability.id ][0] target = [ capability for ability in graph.abilities for capability in ability.capabilities if capability.id == target_capability.id ][0] assert merged_source.status == "merged" assert target.features assert [capability.name for capability in ability_map.abilities[0].capabilities] == [ target_capability.name ] assert ability_map.abilities[0].capabilities[0].features def test_merge_candidate_feature_and_evidence_omits_duplicate_leaves(tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Merge Leaves\n", encoding="utf-8") (source / "tests").mkdir() (source / "tests" / "test_health.py").write_text( "def test_health(): pass\n", encoding="utf-8", ) (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n" '@app.get("/ready")\n' "def ready():\n" " return {}\n", encoding="utf-8", ) (source / "cli.py").write_text( "import click\n\n" "@click.command()\n" "def health():\n" " click.echo('ok')\n", encoding="utf-8", ) service = make_service(tmp_path) repository = service.register_repository(name="Merge Leaves", url=str(source)) summary = service.analyze_repository(repository.id) graph = service.candidate_graph(repository.id, summary.analysis_run.id) capability = graph.abilities[0].capabilities[0] service.merge_candidate_feature( repository.id, summary.analysis_run.id, capability.features[1].id, target_feature_id=capability.features[0].id, ) service.merge_candidate_evidence( repository.id, summary.analysis_run.id, capability.evidence[1].id, target_evidence_id=capability.evidence[0].id, ) ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id) approved_capability = ability_map.abilities[0].capabilities[0] assert len(approved_capability.features) == len(capability.features) - 1 assert len(approved_capability.evidence) == len(capability.evidence) - 1 def test_analyze_repository_failure_is_recorded(tmp_path): service = make_service(tmp_path) repository = service.register_repository( name="Missing", url=str(tmp_path / "does-not-exist"), description="Manual missing repository.", ) summary = service.analyze_repository(repository.id) assert summary.analysis_run.status == "failed" assert summary.snapshot is None assert "does not exist" in (summary.analysis_run.error_message or "") assert service.get_repository(repository.id).status == "analysis_failed" def test_analyze_repository_clones_git_url_before_scanning(tmp_path): source = tmp_path / "git-source" source.mkdir() subprocess.run(["git", "init", "-b", "main"], cwd=source, check=True) subprocess.run( ["git", "config", "user.email", "tests@example.com"], cwd=source, check=True, ) subprocess.run( ["git", "config", "user.name", "Tests"], cwd=source, check=True, ) (source / "README.md").write_text("# Git Source\n", encoding="utf-8") (source / "requirements.txt").write_text("pytest\n", encoding="utf-8") subprocess.run(["git", "add", "."], cwd=source, check=True) subprocess.run(["git", "commit", "-m", "initial"], cwd=source, check=True) service = make_service(tmp_path) repository = service.register_repository(name="Git Source", url=source.as_uri()) summary = service.analyze_repository(repository.id) assert summary.analysis_run.status == "completed" assert summary.snapshot is not None assert str(tmp_path / "checkouts") in summary.snapshot.source_path fact_names = {(fact.kind, fact.name, fact.path) for fact in summary.facts} assert ("documentation", "README", "README.md") in fact_names assert ("framework", "pytest", "requirements.txt") in fact_names def test_analyze_repository_can_use_cached_checkout_without_fetching(tmp_path, monkeypatch): service = make_service(tmp_path) url = "https://example.com/private/repo.git" cached = tmp_path / "checkouts" / "repo-b5d250ec3c59" cached.mkdir(parents=True) (cached / "README.md").write_text("# Cached Repo\n", encoding="utf-8") def fail_run_git(*args, **kwargs): raise AssertionError("cached analysis should not run git") monkeypatch.setattr(service.ingestion, "_run_git", fail_run_git) repository = service.register_repository( name="Cached", url=url, description="Already cloned.", ) summary = service.analyze_repository( repository.id, use_cached_checkout=True, ) assert summary.analysis_run.status == "completed" assert summary.snapshot is not None assert str(cached) == summary.snapshot.source_path assert ("documentation", "README", "README.md") in { (fact.kind, fact.name, fact.path) for fact in summary.facts } def test_operational_logging_records_analysis_and_review_events(caplog, tmp_path): source = tmp_path / "repo" source.mkdir() (source / "README.md").write_text("# Logged Service\n", encoding="utf-8") (source / "requirements.txt").write_text("fastapi\n", encoding="utf-8") (source / "app.py").write_text( "from fastapi import FastAPI\n" "app = FastAPI()\n" '@app.get("/health")\n' "def health():\n" " return {}\n", encoding="utf-8", ) service = make_service(tmp_path) caplog.set_level(logging.INFO, logger=LOGGER_NAME) repository = service.register_repository(name="Logged", url=str(source)) summary = service.analyze_repository(repository.id) service.approve_candidate_graph( repository.id, summary.analysis_run.id, notes="Logged approval.", ) payloads = [ json.loads(record.message) for record in caplog.records if record.name == LOGGER_NAME ] events = {payload["event"] for payload in payloads} assert "repository_registered" in events assert "analysis_started" in events assert "analysis_completed" in events assert "review_decision_recorded" in events assert all(payload["repository_id"] == repository.id for payload in payloads)