Combined approved and candidate view with actions

This commit is contained in:
2026-04-29 13:19:58 +02:00
parent 8bd22dab1b
commit 142812e7f2
5 changed files with 942 additions and 43 deletions

View File

@@ -348,23 +348,15 @@ class RegistryService:
ability for ability in graph.abilities if ability.status == "candidate"
]
for ability in pending_abilities:
approved_ability_id = self.store.create_ability(
repository_id,
name=ability.name,
description=ability.description,
confidence=ability.confidence,
)
approved_ability_id = self._ensure_approved_ability(repository_id, ability)
for capability in ability.capabilities:
if capability.status != "candidate":
continue
approved_capability_id = self.store.create_capability(
approved_capability_id = self._ensure_approved_capability(
repository_id,
approved_ability_id,
name=capability.name,
description=capability.description,
inputs=capability.inputs,
outputs=capability.outputs,
confidence=capability.confidence,
ability.name,
capability,
)
for feature in capability.features:
if feature.status != "candidate":
@@ -405,6 +397,129 @@ class RegistryService:
self.store.update_repository_status(repository_id, "indexed")
return self.store.get_ability_map(repository_id)
def accept_candidate_ability(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
*,
notes: str = "",
) -> RepositoryAbilityMap:
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
ability = next(
(
item
for item in graph.abilities
if item.id == candidate_ability_id and item.status == "candidate"
),
None,
)
if ability is None:
raise ValueError(f"candidate ability {candidate_ability_id} is not pending")
approved_ability_id = self._ensure_approved_ability(repository_id, ability)
for capability in ability.capabilities:
if capability.status == "candidate":
self._create_approved_capability_subtree(
repository_id,
approved_ability_id,
capability,
)
self.store.mark_candidate_ability_status(
repository_id,
analysis_run_id,
candidate_ability_id,
"approved",
)
self._record_candidate_acceptance(
repository_id,
analysis_run_id,
"accept_candidate_ability",
notes or f"Accepted candidate ability: {ability.name}",
)
return self.store.get_ability_map(repository_id)
def accept_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
*,
notes: str = "",
) -> RepositoryAbilityMap:
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
parent_ability, capability = self._candidate_capability_with_parent(
graph,
candidate_capability_id,
)
if capability.status != "candidate":
raise ValueError(
f"candidate capability {candidate_capability_id} is not pending"
)
approved_ability_id = self._ensure_approved_ability(repository_id, parent_ability)
self._create_approved_capability_subtree(
repository_id,
approved_ability_id,
capability,
)
self.store.mark_candidate_capability_status(
repository_id,
analysis_run_id,
candidate_capability_id,
"approved",
)
self._record_candidate_acceptance(
repository_id,
analysis_run_id,
"accept_candidate_capability",
notes or f"Accepted candidate capability: {capability.name}",
)
return self.store.get_ability_map(repository_id)
def accept_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
*,
notes: str = "",
) -> RepositoryAbilityMap:
graph = self.store.get_candidate_graph(repository_id, analysis_run_id)
parent_ability, parent_capability, feature = self._candidate_feature_with_parent(
graph,
candidate_feature_id,
)
if feature.status != "candidate":
raise ValueError(f"candidate feature {candidate_feature_id} is not pending")
approved_ability_id = self._ensure_approved_ability(repository_id, parent_ability)
approved_capability_id = self._ensure_approved_capability(
repository_id,
approved_ability_id,
parent_ability.name,
parent_capability,
)
self.store.create_feature(
repository_id,
approved_capability_id,
name=feature.name,
type=feature.type,
location=feature.location,
confidence=feature.confidence,
source_refs=feature.source_refs,
)
self.store.mark_candidate_feature_status(
repository_id,
analysis_run_id,
candidate_feature_id,
"approved",
)
self._record_candidate_acceptance(
repository_id,
analysis_run_id,
"accept_candidate_feature",
notes or f"Accepted candidate feature: {feature.name}",
)
return self.store.get_ability_map(repository_id)
def diff_analysis_runs(
self,
repository_id: int,
@@ -467,6 +582,124 @@ class RegistryService:
self.store.update_repository_status(repository_id, "indexed")
return self.store.get_ability_map(repository_id)
def _create_approved_capability_subtree(
self,
repository_id: int,
approved_ability_id: int,
capability: CandidateCapability,
) -> int:
approved_capability_id = self.store.create_capability(
repository_id,
approved_ability_id,
name=capability.name,
description=capability.description,
inputs=capability.inputs,
outputs=capability.outputs,
confidence=capability.confidence,
)
for feature in capability.features:
if feature.status != "candidate":
continue
self.store.create_feature(
repository_id,
approved_capability_id,
name=feature.name,
type=feature.type,
location=feature.location,
confidence=feature.confidence,
source_refs=feature.source_refs,
)
for evidence in capability.evidence:
if evidence.status != "candidate":
continue
self.store.create_evidence(
repository_id,
approved_capability_id,
type=evidence.type,
reference=evidence.reference,
strength=evidence.strength,
source_refs=evidence.source_refs,
)
return approved_capability_id
def _ensure_approved_ability(
self,
repository_id: int,
candidate_ability: CandidateAbility,
) -> int:
ability_map = self.store.get_ability_map(repository_id)
for ability in ability_map.abilities:
if ability.name == candidate_ability.name:
return ability.id
return self.store.create_ability(
repository_id,
name=candidate_ability.name,
description=candidate_ability.description,
confidence=candidate_ability.confidence,
)
def _ensure_approved_capability(
self,
repository_id: int,
approved_ability_id: int,
approved_ability_name: str,
candidate_capability: CandidateCapability,
) -> int:
ability_map = self.store.get_ability_map(repository_id)
for ability in ability_map.abilities:
if ability.name != approved_ability_name:
continue
for capability in ability.capabilities:
if capability.name == candidate_capability.name:
return capability.id
return self.store.create_capability(
repository_id,
approved_ability_id,
name=candidate_capability.name,
description=candidate_capability.description,
inputs=candidate_capability.inputs,
outputs=candidate_capability.outputs,
confidence=candidate_capability.confidence,
)
def _candidate_capability_with_parent(
self,
graph: CandidateGraph,
candidate_capability_id: int,
) -> tuple[CandidateAbility, CandidateCapability]:
for ability in graph.abilities:
for capability in ability.capabilities:
if capability.id == candidate_capability_id:
return ability, capability
raise ValueError(f"candidate capability {candidate_capability_id} was not found")
def _candidate_feature_with_parent(
self,
graph: CandidateGraph,
candidate_feature_id: int,
) -> tuple[CandidateAbility, CandidateCapability, CandidateFeature]:
for ability in graph.abilities:
for capability in ability.capabilities:
for feature in capability.features:
if feature.id == candidate_feature_id:
return ability, capability, feature
raise ValueError(f"candidate feature {candidate_feature_id} was not found")
def _record_candidate_acceptance(
self,
repository_id: int,
analysis_run_id: int,
action: str,
notes: str,
) -> None:
self.store.create_review_decision(
repository_id,
analysis_run_id,
action=action,
notes=notes,
)
self.store.update_repository_status(repository_id, "indexed")
def reject_candidate_ability(
self,
repository_id: int,

View File

@@ -504,6 +504,131 @@ class RegistryStore:
(status, repository_id, analysis_run_id),
)
def mark_candidate_ability_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_abilities
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_ability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate ability "
f"{candidate_ability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
capability_rows = connection.execute(
"""
SELECT id FROM candidate_capabilities
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_ability_id, repository_id, analysis_run_id),
).fetchall()
capability_ids = [row["id"] for row in capability_rows]
connection.execute(
"""
UPDATE candidate_capabilities
SET status = ?
WHERE ability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_ability_id, repository_id, analysis_run_id),
)
for capability_id in capability_ids:
self._mark_candidate_children_status(
connection,
repository_id,
analysis_run_id,
capability_id,
status,
)
def mark_candidate_capability_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
self._mark_candidate_children_status(
connection,
repository_id,
analysis_run_id,
candidate_capability_id,
status,
)
def mark_candidate_feature_status(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
status: str,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_features
SET status = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_feature_id, repository_id, analysis_run_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate feature "
f"{candidate_feature_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _mark_candidate_children_status(
self,
connection: sqlite3.Connection,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
status: str,
) -> None:
connection.execute(
"""
UPDATE candidate_features
SET status = ?
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
connection.execute(
"""
UPDATE candidate_evidence
SET status = ?
WHERE capability_id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(status, candidate_capability_id, repository_id, analysis_run_id),
)
def reject_candidate_ability(
self,
repository_id: int,

View File

@@ -101,6 +101,14 @@ def page(title: str, body: str) -> HTMLResponse:
padding: 9px 10px;
font: inherit;
}}
select {{
width: 100%;
border: 1px solid var(--line);
border-radius: 6px;
padding: 9px 10px;
background: white;
font: inherit;
}}
label {{ display: grid; gap: 5px; color: var(--muted); font-size: 12px; font-weight: 600; }}
label.checkbox {{
display: flex;
@@ -534,7 +542,10 @@ def repository_detail(
{render_graph_counts(
asdict(ability_map),
facts_count=None,
base_href=f"/ui/repos/{repository_id}/elements?scope=approved",
base_href=(
f"/ui/repos/{repository_id}/elements?scope=all"
f"&entry_filter=approved"
),
)}
{render_approved_registry_actions(repository_id, asdict(ability_map))}
<h2>Latest Candidate Graph</h2>
@@ -903,7 +914,8 @@ def analysis_run_detail(
asdict(candidate_graph),
facts_count=len(facts),
base_href=(
f"/ui/repos/{repository_id}/elements?scope=candidate"
f"/ui/repos/{repository_id}/elements?scope=all"
f"&entry_filter=candidate"
f"&analysis_run_id={analysis_run_id}"
),
facts_href=(
@@ -986,38 +998,69 @@ def create_expectation_gap_from_form(
@router.get("/ui/repos/{repository_id}/elements")
def repository_element_listing(
repository_id: int,
scope: str = Query("approved"),
scope: str = Query("all"),
type: str = Query("abilities"),
q: str = Query(""),
class_filter: str = Query(""),
entry_filter: str = Query(""),
candidate_status_filter: str = Query("active"),
analysis_run_id: int | None = Query(default=None),
service: RegistryService = Depends(get_service),
) -> HTMLResponse:
repository = service.get_repository(repository_id)
if scope in {"approved", "candidate"} and not entry_filter:
entry_filter = scope
title = element_listing_title(repository.name, scope, type)
if scope == "approved":
graph = asdict(service.ability_map(repository_id))
elements = graph_element_rows(graph, type)
elif scope == "candidate":
if scope in {"all", "approved", "candidate"}:
elements = graph_element_rows(
asdict(service.ability_map(repository_id)),
type,
entry_state="approved",
)
candidate_elements: list[dict] = []
if analysis_run_id is None:
runs = service.list_analysis_runs(repository_id)
latest = latest_completed_candidate_graph(service, repository_id, runs)
if latest is None:
elements = []
else:
if latest is not None:
analysis_run_id, candidate_graph = latest
elements = graph_element_rows(asdict(candidate_graph), type)
candidate_elements = graph_element_rows(
asdict(candidate_graph),
type,
entry_state="candidate",
)
else:
candidate_graph = service.candidate_graph(repository_id, analysis_run_id)
elements = graph_element_rows(asdict(candidate_graph), type)
candidate_elements = graph_element_rows(
asdict(candidate_graph),
type,
entry_state="candidate",
)
elements.extend(candidate_elements)
elif scope == "facts":
facts = service.list_observed_facts(repository_id, analysis_run_id)
elements = fact_element_rows(facts)
else:
elements = []
filtered = filter_element_rows(elements, q, class_filter)
rows = render_element_rows(filtered)
entry_scoped_elements = filter_element_rows(
elements,
"",
"",
entry_filter,
candidate_status_filter,
)
filtered = filter_element_rows(
entry_scoped_elements,
q,
class_filter,
candidate_status_filter="all",
)
rows = render_element_rows(
filtered,
repository_id=repository_id,
analysis_run_id=analysis_run_id,
)
filter_action = f"/ui/repos/{repository_id}/elements"
listing_scope = "facts" if scope == "facts" else "all"
body = f"""
<div class="actions">
@@ -1026,24 +1069,26 @@ def repository_element_listing(
</div>
<section class="panel" style="margin-bottom:18px">
<form class="stack" method="get" action="{filter_action}">
<input type="hidden" name="scope" value="{escape(scope)}">
<input type="hidden" name="scope" value="{escape(listing_scope)}">
<input type="hidden" name="type" value="{escape(type)}">
{render_optional_hidden("analysis_run_id", analysis_run_id)}
<div class="grid">
<label>Search <input name="q" value="{escape(q)}" placeholder="Name, parent, source, or class"></label>
<label>Class <input name="class_filter" value="{escape(class_filter)}" list="element-classes" placeholder="Any class"></label>
{render_entry_filter(entry_filter) if scope != "facts" else ""}
{render_candidate_status_filter(candidate_status_filter) if scope != "facts" else ""}
</div>
{render_class_datalist(elements)}
{render_class_datalist(entry_scoped_elements)}
<div class="actions">
<button type="submit">Filter</button>
<a class="button secondary" href="{filter_action}?scope={escape(scope)}&type={escape(type)}{render_analysis_run_query_suffix(analysis_run_id)}">Clear</a>
<span class="muted">{len(filtered)} of {len(elements)} shown</span>
<a class="button secondary" href="{filter_action}?scope={escape(listing_scope)}&type={escape(type)}{render_analysis_run_query_suffix(analysis_run_id)}">Clear</a>
<span class="muted">{len(filtered)} of {len(entry_scoped_elements)} shown</span>
</div>
</form>
</section>
<section class="panel">
<table>
<thead><tr><th>Class</th><th>Name</th><th>Parent</th><th>Source</th></tr></thead>
<thead><tr><th>Entry</th><th>Class</th><th>Name</th><th>Parent</th><th>Source</th><th>Actions</th></tr></thead>
<tbody>{rows}</tbody>
</table>
</section>
@@ -1155,6 +1200,25 @@ def reject_candidate_ability_from_form(
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-abilities/{candidate_ability_id}/accept"
)
def accept_candidate_ability_from_form(
repository_id: int,
analysis_run_id: int,
candidate_ability_id: int,
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.accept_candidate_ability(
repository_id,
analysis_run_id,
candidate_ability_id,
notes="Accepted from web UI",
)
return RedirectResponse(f"/ui/repos/{repository_id}", status_code=303)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-capabilities/{candidate_capability_id}/reject"
@@ -1177,6 +1241,25 @@ def reject_candidate_capability_from_form(
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-capabilities/{candidate_capability_id}/accept"
)
def accept_candidate_capability_from_form(
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.accept_candidate_capability(
repository_id,
analysis_run_id,
candidate_capability_id,
notes="Accepted from web UI",
)
return RedirectResponse(f"/ui/repos/{repository_id}", status_code=303)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-features/{candidate_feature_id}/reject"
@@ -1199,6 +1282,25 @@ def reject_candidate_feature_from_form(
)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-features/{candidate_feature_id}/accept"
)
def accept_candidate_feature_from_form(
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
service: RegistryService = Depends(get_service),
) -> RedirectResponse:
service.accept_candidate_feature(
repository_id,
analysis_run_id,
candidate_feature_id,
notes="Accepted from web UI",
)
return RedirectResponse(f"/ui/repos/{repository_id}", status_code=303)
@router.post(
"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
"/candidate-evidence/{candidate_evidence_id}/reject"
@@ -1752,7 +1854,8 @@ def render_latest_candidate_counts(
facts_count=facts_count,
label_prefix="candidate",
base_href=(
f"/ui/repos/{repository_id}/elements?scope=candidate"
f"/ui/repos/{repository_id}/elements?scope=all"
f"&entry_filter=candidate"
f"&analysis_run_id={analysis_run_id}"
),
facts_href=(
@@ -1832,13 +1935,18 @@ def render_approved_registry_actions(repository_id: int, ability_map: dict) -> s
<a class="button secondary" href="/ui/search?q={quote_plus(search_query)}">Search Profile</a>
<a class="button secondary" href="/ui/discovery">Discovery</a>
<a class="button secondary" href="/ui/repos/{repository_id}/export">Export</a>
<a class="button secondary" href="/ui/repos/{repository_id}/elements?scope=approved&type=abilities">Approved Elements</a>
<a class="button secondary" href="/ui/repos/{repository_id}/elements?scope=all&entry_filter=approved&type=abilities">Elements</a>
</div>
</div>
"""
def graph_element_rows(graph: dict, item_type: str) -> list[dict]:
def graph_element_rows(
graph: dict,
item_type: str,
*,
entry_state: str = "",
) -> list[dict]:
rows: list[dict] = []
for ability in graph.get("abilities", []):
if item_type == "abilities":
@@ -1848,6 +1956,12 @@ def graph_element_rows(graph: dict, item_type: str) -> list[dict]:
ability["name"],
"",
ability.get("source_refs", []),
item_id=ability.get("id"),
item_kind="abilities",
description=ability.get("description", ""),
confidence=ability.get("confidence", 1.0),
status=ability.get("status", ""),
entry_state=entry_state,
)
)
for capability in ability.get("capabilities", []):
@@ -1858,6 +1972,14 @@ def graph_element_rows(graph: dict, item_type: str) -> list[dict]:
capability["name"],
ability["name"],
capability.get("source_refs", []),
item_id=capability.get("id"),
item_kind="capabilities",
description=capability.get("description", ""),
confidence=capability.get("confidence", 1.0),
inputs=capability.get("inputs", []),
outputs=capability.get("outputs", []),
status=capability.get("status", ""),
entry_state=entry_state,
)
)
for feature in capability.get("features", []):
@@ -1868,6 +1990,12 @@ def graph_element_rows(graph: dict, item_type: str) -> list[dict]:
feature["name"],
capability["name"],
feature.get("source_refs", []),
item_id=feature.get("id"),
item_kind="features",
confidence=feature.get("confidence", 1.0),
location=feature.get("location", ""),
status=feature.get("status", ""),
entry_state=entry_state,
)
)
return rows
@@ -1883,6 +2011,7 @@ def element_listing_title(repository_name: str, scope: str, item_type: str) -> s
if scope == "facts":
return f"{repository_name} · Observed Facts"
scope_labels = {
"all": "Registry",
"approved": "Approved",
"candidate": "Candidate",
}
@@ -1915,12 +2044,14 @@ def element_row(
name: str,
parent: str,
source_refs: list[dict],
**metadata,
) -> dict:
return {
"primary_class": primary_class,
"name": name,
"parent": parent,
"source_refs": source_refs,
**metadata,
}
@@ -1928,11 +2059,19 @@ def filter_element_rows(
rows: list[dict],
query: str,
class_filter: str,
entry_filter: str = "",
candidate_status_filter: str = "",
) -> list[dict]:
query = query.strip().lower()
class_filter = class_filter.strip().lower()
entry_filter = entry_filter.strip().lower()
candidate_status_filter = candidate_status_filter.strip().lower()
filtered = []
for row in rows:
if entry_filter and row.get("entry_state", "").lower() != entry_filter:
continue
if not candidate_status_matches(row, candidate_status_filter):
continue
row_class = str(row["primary_class"]).lower()
if class_filter and class_filter not in row_class:
continue
@@ -1941,6 +2080,8 @@ def filter_element_rows(
str(row["primary_class"]),
str(row["name"]),
str(row["parent"]),
str(row.get("entry_state", "")),
str(row.get("status", "")),
source_refs_text(row["source_refs"]),
]
).lower()
@@ -1950,23 +2091,232 @@ def filter_element_rows(
return filtered
def render_element_rows(rows: list[dict]) -> str:
def candidate_status_matches(row: dict, candidate_status_filter: str) -> bool:
if row.get("entry_state") != "candidate":
return True
status = str(row.get("status") or "candidate").lower()
if candidate_status_filter == "all":
return True
if candidate_status_filter in {"", "active"}:
return status != "rejected"
return status == candidate_status_filter
def render_element_rows(
rows: list[dict],
repository_id: int,
analysis_run_id: int | None,
) -> str:
if not rows:
return '<tr><td colspan="4" class="muted">No matching elements.</td></tr>'
return "\n".join(render_element_row(row) for row in rows)
return '<tr><td colspan="6" class="muted">No matching elements.</td></tr>'
return "\n".join(
render_element_row(row, repository_id, analysis_run_id)
for row in rows
)
def render_element_row(row: dict) -> str:
def render_element_row(
row: dict,
repository_id: int,
analysis_run_id: int | None,
) -> str:
return f"""
<tr>
<td>{render_entry_badge(row)}</td>
<td><span class="pill">{escape(str(row["primary_class"]))}</span></td>
<td>{escape(str(row["name"]))}</td>
<td>{escape(str(row["parent"]))}</td>
<td>{render_sources(row["source_refs"])}</td>
<td>{render_element_actions(row, repository_id, analysis_run_id)}</td>
</tr>
"""
def render_element_actions(
row: dict,
repository_id: int,
analysis_run_id: int | None,
) -> str:
item_id = row.get("item_id")
item_kind = row.get("item_kind")
if not item_id or not item_kind:
return ""
if row.get("entry_state") == "approved":
return render_approved_element_actions(row, repository_id)
if (
row.get("entry_state") == "candidate"
and analysis_run_id is not None
and row.get("status", "candidate") in {"candidate", "rejected"}
):
return render_candidate_element_actions(row, repository_id, analysis_run_id)
if row.get("entry_state") == "candidate" and row.get("status") == "approved":
return '<span class="muted">Accepted</span>'
return ""
def render_entry_badge(row: dict) -> str:
entry_state = row.get("entry_state")
if not entry_state:
return '<span class="pill">fact</span>'
status = row.get("status")
label = str(entry_state)
if entry_state == "candidate" and status and status != "candidate":
label = f"candidate: {status}"
return f'<span class="pill">{escape(label)}</span>'
def render_entry_filter(entry_filter: str) -> str:
options = [
("", "Approved and candidate"),
("approved", "Approved only"),
("candidate", "Candidate only"),
]
rendered_options = "".join(
f'<option value="{escape(value)}"{" selected" if entry_filter == value else ""}>{escape(label)}</option>'
for value, label in options
)
return f"""
<label>Entry
<select name="entry_filter">
{rendered_options}
</select>
</label>
"""
def render_approved_element_actions(row: dict, repository_id: int) -> str:
item_id = row["item_id"]
item_kind = row["item_kind"]
edit_action = f"/ui/repos/{repository_id}/{item_kind}/{item_id}/edit"
delete_action = f"/ui/repos/{repository_id}/{item_kind}/{item_id}/delete"
hidden_fields = render_element_hidden_fields(row)
edit_fields = render_element_edit_fields(row)
return f"""
<form class="stack" method="post" action="{edit_action}">
{hidden_fields}
{edit_fields}
<button class="secondary" type="submit">Save</button>
</form>
<form method="post" action="{delete_action}">
<button class="secondary" type="submit">Delete</button>
</form>
"""
def render_candidate_element_actions(
row: dict,
repository_id: int,
analysis_run_id: int,
) -> str:
item_id = row["item_id"]
item_kind = row["item_kind"]
collection = f"candidate-{item_kind}"
reject_action = (
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
f"/{collection}/{item_id}/reject"
)
accept_action = (
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
f"/{collection}/{item_id}/accept"
)
status = row.get("status", "candidate")
edit = ""
if item_kind in {"abilities", "capabilities"}:
edit_action = (
f"/ui/repos/{repository_id}/analysis-runs/{analysis_run_id}"
f"/{collection}/{item_id}/edit"
)
edit = f"""
<form class="stack" method="post" action="{edit_action}">
{render_candidate_hidden_fields(row)}
{render_candidate_edit_fields(row)}
<button class="secondary" type="submit">Save</button>
</form>
"""
remove = (
f"""
<form method="post" action="{reject_action}">
<button class="secondary" type="submit">Remove</button>
</form>
"""
if status == "candidate"
else ""
)
return f"""
{edit}
<form method="post" action="{accept_action}">
<button type="submit">Accept</button>
</form>
{remove}
"""
def render_candidate_status_filter(candidate_status_filter: str) -> str:
options = [
("active", "Hide rejected"),
("candidate", "Pending only"),
("approved", "Accepted only"),
("rejected", "Rejected only"),
("all", "Include rejected"),
]
rendered_options = "".join(
f'<option value="{escape(value)}"{" selected" if candidate_status_filter == value else ""}>{escape(label)}</option>'
for value, label in options
)
return f"""
<label>Candidate status
<select name="candidate_status_filter">
{rendered_options}
</select>
</label>
"""
def render_element_edit_fields(row: dict) -> str:
item_kind = row["item_kind"]
name = escape(str(row["name"]))
if item_kind == "features":
feature_type = escape(str(row["primary_class"]))
location = escape(str(row.get("location", "")))
return f"""
<label>Name <input name="name" value="{name}" required></label>
<label>Type <input name="type" value="{feature_type}" required></label>
<label>Location <input name="location" value="{location}"></label>
"""
return f'<label>Name <input name="name" value="{name}" required></label>'
def render_element_hidden_fields(row: dict) -> str:
item_kind = row["item_kind"]
confidence = float(row.get("confidence", 1.0))
fields = [f'<input type="hidden" name="confidence" value="{confidence:.2f}">']
if item_kind in {"abilities", "capabilities"}:
fields.append(
f'<input type="hidden" name="description" value="{escape(str(row.get("description", "")))}">'
)
if item_kind == "capabilities":
inputs = escape(", ".join(row.get("inputs", [])))
outputs = escape(", ".join(row.get("outputs", [])))
fields.append(
f'<input type="hidden" name="inputs" value="{inputs}">'
)
fields.append(
f'<input type="hidden" name="outputs" value="{outputs}">'
)
return "".join(fields)
def render_candidate_edit_fields(row: dict) -> str:
return f'<label>Name <input name="name" value="{escape(str(row["name"]))}" required></label>'
def render_candidate_hidden_fields(row: dict) -> str:
return (
f'<input type="hidden" name="description" value="{escape(str(row.get("description", "")))}">'
f'<input type="hidden" name="confidence" value="{float(row.get("confidence", 1.0)):.2f}">'
)
def render_class_datalist(rows: list[dict]) -> str:
classes = sorted({str(row["primary_class"]) for row in rows if row["primary_class"]})
options = "".join(

View File

@@ -804,6 +804,59 @@ def test_approve_candidate_graph_publishes_ability_map_once(tmp_path):
assert decisions[0].notes == "Looks good for the first pass."
def test_accept_candidate_feature_promotes_parent_context_once(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "README.md").write_text(
"# Feature Accept\nReports health over HTTP.\n",
encoding="utf-8",
)
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/health")\n'
"def health():\n"
" return {}\n",
encoding="utf-8",
)
service = make_service(tmp_path)
repository = service.register_repository(name="Feature Accept", url=str(source))
summary = service.analyze_repository(repository.id)
graph = service.candidate_graph(repository.id, summary.analysis_run.id)
candidate_feature = graph.abilities[0].capabilities[0].features[0]
ability_map = service.accept_candidate_feature(
repository.id,
summary.analysis_run.id,
candidate_feature.id,
)
graph_after_feature_accept = service.candidate_graph(
repository.id,
summary.analysis_run.id,
)
assert len(ability_map.abilities) == 1
assert ability_map.abilities[0].capabilities[0].features[0].name == "GET /health"
assert graph_after_feature_accept.abilities[0].capabilities[0].features[0].status == (
"approved"
)
final_map = service.approve_candidate_graph(repository.id, summary.analysis_run.id)
assert len(final_map.abilities) == 1
interface_capabilities = [
capability
for capability in final_map.abilities[0].capabilities
if capability.name == "Expose Repository Interface"
]
assert len(interface_capabilities) == 1
assert len(interface_capabilities[0].features) == 1
decisions = service.list_review_decisions(repository.id, summary.analysis_run.id)
assert {decision.action for decision in decisions} >= {
"accept_candidate_feature",
"approve_candidate_graph",
}
def test_analysis_run_diff_keeps_approved_map_stable_until_change_approval(tmp_path):
source = tmp_path / "repo"
source.mkdir()

View File

@@ -1194,6 +1194,17 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
assert "Use OpenRouter Models" in run_detail.text
assert "Expected from provider docs." in run_detail.text
pending_candidate_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={
"scope": "candidate",
"analysis_run_id": first_run_id,
"type": "features",
},
)
assert pending_candidate_listing.status_code == 200
assert "Accept" in pending_candidate_listing.text
approve_response = client.post(
f"{run_path}/candidate-graph/approve",
follow_redirects=False,
@@ -1215,14 +1226,14 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
assert "Search Profile" in approved_detail.text
assert "Discovery" in approved_detail.text
assert "Export" in approved_detail.text
assert "Approved Elements" in approved_detail.text
assert "Elements" in approved_detail.text
assert "q=Report+Service+Status" in approved_detail.text
assert (
f"/ui/repos/{repository_id}/elements?scope=approved&amp;type=abilities"
f"/ui/repos/{repository_id}/elements?scope=all&amp;entry_filter=approved&amp;type=abilities"
in approved_detail.text
)
assert (
f"/ui/repos/{repository_id}/elements?scope=candidate&amp;analysis_run_id={first_run_id}&amp;type=features"
f"/ui/repos/{repository_id}/elements?scope=all&amp;entry_filter=candidate&amp;analysis_run_id={first_run_id}&amp;type=features"
in approved_detail.text
)
assert (
@@ -1237,11 +1248,49 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
approved_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={"scope": "approved", "type": "capabilities"},
params={"scope": "all", "entry_filter": "approved", "type": "capabilities"},
)
assert approved_listing.status_code == 200
assert "Approved Capabilities" in approved_listing.text
assert "Registry Capabilities" in approved_listing.text
assert "Entry" in approved_listing.text
assert "Approved only" in approved_listing.text
assert "Expose Repository Interface" in approved_listing.text
assert "Save" in approved_listing.text
assert "Delete" in approved_listing.text
combined_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={
"scope": "all",
"analysis_run_id": first_run_id,
"type": "features",
},
)
assert combined_listing.status_code == 200
assert "Registry Features" in combined_listing.text
assert "Approved and candidate" in combined_listing.text
assert ">approved<" in combined_listing.text
assert ">candidate: approved<" in combined_listing.text
approved_map = client.get(f"/repos/{repository_id}/ability-map").json()
approved_capability = approved_map["abilities"][0]["capabilities"][0]
tune_response = client.post(
f"/ui/repos/{repository_id}/capabilities/{approved_capability['id']}/edit",
data={
"name": "Expose Tuned Repository Interface",
"description": approved_capability["description"],
"inputs": ", ".join(approved_capability["inputs"]),
"outputs": ", ".join(approved_capability["outputs"]),
"confidence": str(approved_capability["confidence"]),
},
follow_redirects=False,
)
assert tune_response.status_code == 303
tuned_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={"scope": "all", "entry_filter": "approved", "type": "capabilities"},
)
assert "Expose Tuned Repository Interface" in tuned_listing.text
candidate_listing = client.get(
f"/ui/repos/{repository_id}/elements",
@@ -1359,6 +1408,95 @@ def test_ui_register_analyze_and_approve_loop(tmp_path):
)
assert filtered_search_response.status_code == 200
assert "UI Repo" in filtered_search_response.text
final_map = client.get(f"/repos/{repository_id}/ability-map").json()
feature_id = final_map["abilities"][0]["capabilities"][0]["features"][0]["id"]
delete_feature_response = client.post(
f"/ui/repos/{repository_id}/features/{feature_id}/delete",
follow_redirects=False,
)
assert delete_feature_response.status_code == 303
deleted_feature_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={"scope": "approved", "type": "features"},
)
assert f"/ui/repos/{repository_id}/features/{feature_id}/delete" not in (
deleted_feature_listing.text
)
finally:
app.dependency_overrides.clear()
def test_ui_element_listing_hides_rejected_candidates_by_default(tmp_path):
source = tmp_path / "repo"
source.mkdir()
(source / "requirements.txt").write_text("fastapi\n", encoding="utf-8")
(source / "app.py").write_text(
"from fastapi import FastAPI\n"
"app = FastAPI()\n"
'@app.get("/status")\n'
"def status():\n"
" return {}\n",
encoding="utf-8",
)
def override_settings():
return Settings(
database_path=str(tmp_path / "ui-rejected.sqlite3"),
checkout_root=str(tmp_path / "ui-rejected-checkouts"),
)
app.dependency_overrides[get_settings] = override_settings
client = TestClient(app)
try:
repository_response = client.post(
"/repos",
json={"name": "Rejected UI", "url": str(source)},
)
repository_id = repository_response.json()["id"]
run_response = client.post(f"/repos/{repository_id}/analysis-runs", json={})
run_id = run_response.json()["analysis_run"]["id"]
candidate_graph = client.get(
f"/repos/{repository_id}/analysis-runs/{run_id}/candidate-graph"
).json()
candidate_feature = candidate_graph["abilities"][0]["capabilities"][0][
"features"
][0]
candidate_feature_id = candidate_feature["id"]
candidate_feature_name = candidate_feature["name"]
reject_response = client.post(
f"/ui/repos/{repository_id}/analysis-runs/{run_id}"
f"/candidate-features/{candidate_feature_id}/reject",
follow_redirects=False,
)
assert reject_response.status_code == 303
default_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={
"scope": "all",
"analysis_run_id": run_id,
"type": "features",
},
)
assert default_listing.status_code == 200
assert "Hide rejected" in default_listing.text
assert candidate_feature_name not in default_listing.text
rejected_listing = client.get(
f"/ui/repos/{repository_id}/elements",
params={
"scope": "all",
"analysis_run_id": run_id,
"type": "features",
"candidate_status_filter": "all",
},
)
assert rejected_listing.status_code == 200
assert candidate_feature_name in rejected_listing.text
assert "candidate: rejected" in rejected_listing.text
assert "Accept" in rejected_listing.text
finally:
app.dependency_overrides.clear()