merge-duplicates slice and did a first polish

This commit is contained in:
2026-04-25 23:50:58 +02:00
parent 1d6d103bc2
commit 19d34efa37
6 changed files with 788 additions and 1 deletions

View File

@@ -394,6 +394,102 @@ class RegistryService:
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
*,
target_ability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_ability(
repository_id,
analysis_run_id,
source_ability_id,
target_ability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_ability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
*,
target_capability_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_capability(
repository_id,
analysis_run_id,
source_capability_id,
target_capability_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_capability",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
*,
target_feature_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_feature(
repository_id,
analysis_run_id,
source_feature_id,
target_feature_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_feature",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def merge_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
*,
target_evidence_id: int,
notes: str = "",
) -> CandidateGraph:
self.store.merge_candidate_evidence(
repository_id,
analysis_run_id,
source_evidence_id,
target_evidence_id,
)
self.store.create_review_decision(
repository_id,
analysis_run_id,
action="merge_candidate_evidence",
notes=notes,
)
self.store.update_repository_status(repository_id, "reviewing")
return self.store.get_candidate_graph(repository_id, analysis_run_id)
def add_ability(
self,
repository_id: int,

View File

@@ -643,6 +643,132 @@ class RegistryStore:
target_capability_id=target_capability_id,
)
def merge_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
target_ability_id: int,
) -> None:
if source_ability_id == target_ability_id:
raise ValueError("source and target candidate ability must be different")
self._ensure_candidate_row(
table="candidate_abilities",
label="target candidate ability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_ability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_abilities
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_ability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"source candidate ability "
f"{source_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
connection.execute(
"""
UPDATE candidate_capabilities
SET ability_id = ?
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(target_ability_id, source_ability_id, repository_id, analysis_run_id),
)
def merge_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
target_capability_id: int,
) -> None:
if source_capability_id == target_capability_id:
raise ValueError("source and target candidate capability must be different")
self._ensure_candidate_row(
table="candidate_capabilities",
label="target candidate capability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_capability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_capability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"source candidate capability "
f"{source_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
for table in ("candidate_features", "candidate_evidence"):
connection.execute(
f"""
UPDATE {table}
SET capability_id = ?
WHERE capability_id = ?
AND repository_id = ?
AND analysis_run_id = ?
""",
(
target_capability_id,
source_capability_id,
repository_id,
analysis_run_id,
),
)
def merge_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
target_feature_id: int,
) -> None:
self._merge_candidate_leaf(
table="candidate_features",
label="candidate feature",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
source_id=source_feature_id,
target_id=target_feature_id,
)
def merge_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
target_evidence_id: int,
) -> None:
self._merge_candidate_leaf(
table="candidate_evidence",
label="candidate evidence",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
source_id=source_evidence_id,
target_id=target_evidence_id,
)
def _ensure_candidate_row(
self,
*,
@@ -703,6 +829,43 @@ class RegistryStore:
f"{repository_id} analysis run {analysis_run_id}"
)
def _merge_candidate_leaf(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
source_id: int,
target_id: int,
) -> None:
if source_id == target_id:
raise ValueError(f"source and target {label} must be different")
self._ensure_candidate_row(
table=table,
label=f"target {label}",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_id,
)
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET status = 'merged'
WHERE id = ?
AND repository_id = ?
AND analysis_run_id = ?
AND status = 'candidate'
""",
(source_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"source {label} {source_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _reject_candidate_leaf(
self,
*,

View File

@@ -94,6 +94,26 @@ class CandidateLeafRelink(BaseModel):
notes: str = ""
class CandidateAbilityMerge(BaseModel):
target_ability_id: int
notes: str = ""
class CandidateCapabilityMerge(BaseModel):
target_capability_id: int
notes: str = ""
class CandidateFeatureMerge(BaseModel):
target_feature_id: int
notes: str = ""
class CandidateEvidenceMerge(BaseModel):
target_evidence_id: int
notes: str = ""
app = FastAPI(title="Repository Ability Registry", version="0.1.0")
@@ -426,6 +446,102 @@ def relink_candidate_evidence(
raise HTTPException(status_code=404, detail=str(exc)) from exc
@app.post(
"/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-abilities/{source_ability_id}/merge"
)
def merge_candidate_ability(
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
payload: CandidateAbilityMerge,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
service.merge_candidate_ability(
repository_id,
analysis_run_id,
source_ability_id,
**payload.model_dump(),
)
)
except (NotFoundError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.post(
"/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-capabilities/{source_capability_id}/merge"
)
def merge_candidate_capability(
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
payload: CandidateCapabilityMerge,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
service.merge_candidate_capability(
repository_id,
analysis_run_id,
source_capability_id,
**payload.model_dump(),
)
)
except (NotFoundError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.post(
"/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-features/{source_feature_id}/merge"
)
def merge_candidate_feature(
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
payload: CandidateFeatureMerge,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
service.merge_candidate_feature(
repository_id,
analysis_run_id,
source_feature_id,
**payload.model_dump(),
)
)
except (NotFoundError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.post(
"/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-evidence/{source_evidence_id}/merge"
)
def merge_candidate_evidence(
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
payload: CandidateEvidenceMerge,
service: RegistryService = Depends(get_service),
) -> dict[str, object]:
try:
return asdict(
service.merge_candidate_evidence(
repository_id,
analysis_run_id,
source_evidence_id,
**payload.model_dump(),
)
)
except (NotFoundError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.post("/repos/{repository_id}/abilities", status_code=201)
def create_ability(
repository_id: int,

View File

@@ -124,7 +124,10 @@ def page(title: str, body: str) -> HTMLResponse:
<body>
<header>
<a href="/ui"><strong>Repository Ability Registry</strong></a>
<a href="/docs">API Docs</a>
<nav class="actions">
<a href="/ui/search">Search</a>
<a href="/docs">API Docs</a>
</nav>
</header>
<main>{body}</main>
</body>
@@ -170,6 +173,49 @@ def repository_index(service: RegistryService = Depends(get_service)) -> HTMLRes
return page("Repositories", body)
@router.get("/ui/search")
def search_page(
q: str = "",
service: RegistryService = Depends(get_service),
) -> HTMLResponse:
results = service.search(q) if q.strip() else []
rows = "\n".join(
f"""
<tr>
<td><a href="/ui/repos/{result.repository_id}">{escape(result.repository_name)}</a></td>
<td><span class="pill">{escape(result.match_type)}</span></td>
<td>{escape(result.match_name)}</td>
<td>{result.confidence:.2f}</td>
</tr>
"""
for result in results
)
empty = (
'<tr><td colspan="4" class="muted">No matches.</td></tr>'
if q.strip()
else '<tr><td colspan="4" class="muted">Enter a need, capability, or repository name.</td></tr>'
)
body = f"""
<div class="actions">
<h1 style="margin-right:auto">Search</h1>
<a class="button secondary" href="/ui">Repositories</a>
</div>
<section class="panel">
<form class="actions" method="get" action="/ui/search">
<input name="q" value="{escape(q)}" placeholder="Search approved registry entries">
<button type="submit">Search</button>
</form>
</section>
<section class="panel" style="margin-top:18px">
<table>
<thead><tr><th>Repository</th><th>Match</th><th>Name</th><th>Confidence</th></tr></thead>
<tbody>{rows or empty}</tbody>
</table>
</section>
"""
return page("Search", body)
@router.post("/ui/repos")
def create_repository_from_form(
url: str = Form(...),
@@ -524,6 +570,102 @@ def relink_candidate_evidence_from_form(
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-abilities/{source_ability_id}/merge"
)
def merge_candidate_ability_from_form(
repository_id: int,
analysis_run_id: int,
source_ability_id: int,
target_ability_id: int = Form(...),
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.merge_candidate_ability(
repository_id,
analysis_run_id,
source_ability_id,
target_ability_id=target_ability_id,
notes="Merged from web UI",
)
return RedirectResponse(
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}",
status_code=303,
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-capabilities/{source_capability_id}/merge"
)
def merge_candidate_capability_from_form(
repository_id: int,
analysis_run_id: int,
source_capability_id: int,
target_capability_id: int = Form(...),
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.merge_candidate_capability(
repository_id,
analysis_run_id,
source_capability_id,
target_capability_id=target_capability_id,
notes="Merged from web UI",
)
return RedirectResponse(
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}",
status_code=303,
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-features/{source_feature_id}/merge"
)
def merge_candidate_feature_from_form(
repository_id: int,
analysis_run_id: int,
source_feature_id: int,
target_feature_id: int = Form(...),
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.merge_candidate_feature(
repository_id,
analysis_run_id,
source_feature_id,
target_feature_id=target_feature_id,
notes="Merged from web UI",
)
return RedirectResponse(
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}",
status_code=303,
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-evidence/{source_evidence_id}/merge"
)
def merge_candidate_evidence_from_form(
repository_id: int,
analysis_run_id: int,
source_evidence_id: int,
target_evidence_id: int = Form(...),
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.merge_candidate_evidence(
repository_id,
analysis_run_id,
source_evidence_id,
target_evidence_id=target_evidence_id,
notes="Merged from web UI",
)
return RedirectResponse(
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}",
status_code=303,
)
def render_candidate_graph(graph: dict, repository_id: int, analysis_run_id: int) -> str:
abilities = graph.get("abilities", [])
if not abilities:
@@ -538,11 +680,13 @@ def render_candidate_graph(graph: dict, repository_id: int, analysis_run_id: int
f"""
<li>
<strong>{escape(ability['name'])}</strong>
<span class="pill">ID {ability['id']}</span>
<span class="pill">{escape(ability['status'])}</span>
<span class="pill">{ability['confidence']:.2f}</span>
{render_candidate_ability_actions(ability, repository_id, analysis_run_id)}
<p class="muted">{escape(ability['description'])}</p>
{render_candidate_edit_form('candidate-abilities', ability, repository_id, analysis_run_id)}
{render_candidate_merge_form('candidate-abilities', ability, repository_id, analysis_run_id, 'target_ability_id', 'Merge into ability ID')}
{render_sources(ability['source_refs'])}
<ul>{capabilities}</ul>
</li>
@@ -608,12 +752,14 @@ def render_candidate_capability(
return f"""
<li>
<strong>{escape(capability['name'])}</strong>
<span class="pill">ID {capability['id']}</span>
<span class="pill">{escape(capability['status'])}</span>
<span class="pill">{capability['confidence']:.2f}</span>
{render_candidate_reject_form('candidate-capabilities', capability, repository_id, analysis_run_id)}
<p class="muted">{escape(capability['description'])}</p>
{render_candidate_edit_form('candidate-capabilities', capability, repository_id, analysis_run_id)}
{render_candidate_relink_form('candidate-capabilities', capability, repository_id, analysis_run_id, 'target_ability_id', 'Target ability ID')}
{render_candidate_merge_form('candidate-capabilities', capability, repository_id, analysis_run_id, 'target_capability_id', 'Merge into capability ID')}
{render_sources(capability['source_refs'])}
<h3>Features</h3>
<ul>{features or '<li class="muted">No feature candidates.</li>'}</ul>
@@ -631,11 +777,13 @@ def render_candidate_feature(
return f"""
<li>
{escape(feature["name"])}
<span class="pill">ID {feature["id"]}</span>
<span class="pill">{escape(feature["status"])}</span>
<span class="pill">{escape(feature["type"])}</span>
<span class="source">{escape(feature["location"])}</span>
{render_candidate_reject_form('candidate-features', feature, repository_id, analysis_run_id)}
{render_candidate_relink_form('candidate-features', feature, repository_id, analysis_run_id, 'target_capability_id', 'Target capability ID')}
{render_candidate_merge_form('candidate-features', feature, repository_id, analysis_run_id, 'target_feature_id', 'Merge into feature ID')}
</li>
"""
@@ -648,11 +796,13 @@ def render_candidate_evidence(
return f"""
<li>
{escape(evidence["type"])}
<span class="pill">ID {evidence["id"]}</span>
<span class="pill">{escape(evidence["status"])}</span>
<span class="pill">{escape(evidence["strength"])}</span>
<span class="source">{escape(evidence["reference"])}</span>
{render_candidate_reject_form('candidate-evidence', evidence, repository_id, analysis_run_id)}
{render_candidate_relink_form('candidate-evidence', evidence, repository_id, analysis_run_id, 'target_capability_id', 'Target capability ID')}
{render_candidate_merge_form('candidate-evidence', evidence, repository_id, analysis_run_id, 'target_evidence_id', 'Merge into evidence ID')}
</li>
"""
@@ -698,6 +848,28 @@ def render_candidate_relink_form(
"""
def render_candidate_merge_form(
collection: str,
candidate: dict,
repository_id: int,
analysis_run_id: int,
field_name: str,
label: str,
) -> str:
if candidate["status"] != "candidate":
return ""
action = (
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
f"/{collection}/{candidate['id']}/merge"
)
return f"""
<form style="display:inline-grid; grid-template-columns: 140px auto; gap: 6px; align-items: end;" method="post" action="{action}">
<label>{label}<input name="{field_name}" type="number" min="1" required></label>
<button class="secondary" type="submit">Merge</button>
</form>
"""
def render_ability_map(ability_map: dict) -> str:
abilities = ability_map.get("abilities", [])
if not abilities:

View File

@@ -480,6 +480,154 @@ def test_relink_candidate_feature_and_evidence_to_another_capability(tmp_path):
}
def test_merge_candidate_ability_moves_capabilities_to_target(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# Merge Ability\n", encoding="utf-8")
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/health")\n'
"def health():\n"
" return {}\n",
encoding="utf-8",
)
service = make_service(tmp_path)
repository = service.register_repository(name="Merge Ability", url=str(source))
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
source_ability = graph.abilities[0]
with service.store.connect() as connection:
cursor = connection.execute(
"""
INSERT INTO candidate_abilities
(repository_id, analysis_run_id, name, description, confidence)
VALUES (?, ?, ?, ?, ?)
""",
(
repository.id,
summary.analysis_run.id,
"Merged Operational Ability",
"Preferred duplicate ability.",
0.83,
),
)
target_ability_id = int(cursor.lastrowid)
graph = service.merge_candidate_ability(
repository.id,
summary.analysis_run.id,
source_ability.id,
target_ability_id=target_ability_id,
)
ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id)
merged_source = [ability for ability in graph.abilities if ability.id == source_ability.id][0]
target = [ability for ability in graph.abilities if ability.id == target_ability_id][0]
assert merged_source.status == "merged"
assert target.capabilities
assert [ability.name for ability in ability_map.abilities] == [
"Merged Operational Ability"
]
assert ability_map.abilities[0].capabilities
def test_merge_candidate_capability_moves_children_to_target(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# Merge Capability\n", encoding="utf-8")
(source / "requirements.txt").write_text("fastapi\n", encoding="utf-8")
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/health")\n'
"def health():\n"
" return {}\n",
encoding="utf-8",
)
service = make_service(tmp_path)
repository = service.register_repository(name="Merge Capability", url=str(source))
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
source_capability = graph.abilities[0].capabilities[0]
target_capability = graph.abilities[0].capabilities[1]
graph = service.merge_candidate_capability(
repository.id,
summary.analysis_run.id,
source_capability.id,
target_capability_id=target_capability.id,
)
ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id)
merged_source = [
capability
for ability in graph.abilities
for capability in ability.capabilities
if capability.id == source_capability.id
][0]
target = [
capability
for ability in graph.abilities
for capability in ability.capabilities
if capability.id == target_capability.id
][0]
assert merged_source.status == "merged"
assert target.features
assert [capability.name for capability in ability_map.abilities[0].capabilities] == [
target_capability.name
]
assert ability_map.abilities[0].capabilities[0].features
def test_merge_candidate_feature_and_evidence_omits_duplicate_leaves(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# Merge Leaves\n", encoding="utf-8")
(source / "tests").mkdir()
(source / "tests" / "test_health.py").write_text(
"def test_health(): pass\n",
encoding="utf-8",
)
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/health")\n'
"def health():\n"
" return {}\n"
'@app.get("/ready")\n'
"def ready():\n"
" return {}\n",
encoding="utf-8",
)
service = make_service(tmp_path)
repository = service.register_repository(name="Merge Leaves", url=str(source))
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
capability = graph.abilities[0].capabilities[0]
service.merge_candidate_feature(
repository.id,
summary.analysis_run.id,
capability.features[1].id,
target_feature_id=capability.features[0].id,
)
service.merge_candidate_evidence(
repository.id,
summary.analysis_run.id,
capability.evidence[1].id,
target_evidence_id=capability.evidence[0].id,
)
ability_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id)
approved_capability = ability_map.abilities[0].capabilities[0]
assert len(approved_capability.features) == len(capability.features) - 1
assert len(approved_capability.evidence) == len(capability.evidence) - 1
def test_analyze_repository_failure_is_recorded(tmp_path):
service = make_service(tmp_path)
repository = service.register_repository(

View File

@@ -271,6 +271,7 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
run_detail = client.get(run_path)
assert run_detail.status_code == 200
assert "Candidate Graph" in run_detail.text
assert "ID " in run_detail.text
approve_response = client.post(
f"{run_path}/candidate-graph/approve",
@@ -282,6 +283,10 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
assert approved_detail.status_code == 200
assert "Approved Ability Map" in approved_detail.text
assert "Review UI Repo Repository Usefulness" in approved_detail.text
search_response = client.get("/ui/search", params={"q": "repository"})
assert search_response.status_code == 200
assert "UI Repo" in search_response.text
finally:
app.dependency_overrides.clear()
@@ -443,3 +448,90 @@ def test_api_relinks_candidate_feature_and_evidence(tmp_path):
assert relinked_capabilities[1]["evidence"][0]["id"] == evidence_id
finally:
app.dependency_overrides.clear()
def test_api_merges_candidate_capability_feature_and_evidence(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text("# API Merge\n", encoding="utf-8")
(source / "requirements.txt").write_text("fastapi\n", encoding="utf-8")
(source / "tests").mkdir()
(source / "tests" / "test_status.py").write_text(
"def test_status(): pass\n",
encoding="utf-8",
)
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/status")\n'
"def status():\n"
" return {}\n"
'@app.get("/ready")\n'
"def ready():\n"
" return {}\n",
encoding="utf-8",
)
def override_settings():
return Settings(
database_path=str(tmp_path / "api-merge.sqlite3"),
checkout_root=str(tmp_path / "api-merge-checkouts"),
)
app.dependency_overrides[get_settings] = override_settings
client = TestClient(app)
try:
repository_response = client.post(
"/repos",
json={"name": "API Merge", "url": str(source)},
)
repository_id = repository_response.json()["id"]
run_response = client.post(f"/repos/{repository_id}/analysis-runs", json={})
run_id = run_response.json()["analysis_run"]["id"]
graph_response = client.get(
f"/repos/{repository_id}/analysis-runs/{run_id}/candidate-graph"
)
capabilities = graph_response.json()["abilities"][0]["capabilities"]
source_capability = capabilities[0]
target_capability = capabilities[1]
feature_response = client.post(
f"/repos/{repository_id}/analysis-runs/{run_id}"
f"/candidate-features/{source_capability['features'][1]['id']}/merge",
json={
"target_feature_id": source_capability["features"][0]["id"],
"notes": "Duplicate route",
},
)
assert feature_response.status_code == 200
assert (
feature_response.json()["abilities"][0]["capabilities"][0]["features"][1][
"status"
]
== "merged"
)
evidence_response = client.post(
f"/repos/{repository_id}/analysis-runs/{run_id}"
f"/candidate-evidence/{source_capability['evidence'][1]['id']}/merge",
json={
"target_evidence_id": source_capability["evidence"][0]["id"],
"notes": "Duplicate evidence",
},
)
assert evidence_response.status_code == 200
capability_response = client.post(
f"/repos/{repository_id}/analysis-runs/{run_id}"
f"/candidate-capabilities/{source_capability['id']}/merge",
json={
"target_capability_id": target_capability["id"],
"notes": "Duplicate capability",
},
)
assert capability_response.status_code == 200
capabilities = capability_response.json()["abilities"][0]["capabilities"]
assert capabilities[0]["status"] == "merged"
assert capabilities[1]["features"]
finally:
app.dependency_overrides.clear()