UI can now build an approved profile by hand

This commit is contained in:
2026-04-26 02:26:34 +02:00
parent 6da0e8966b
commit 6b7c6443ae
7 changed files with 794 additions and 1 deletions

View File

@@ -1196,6 +1196,35 @@ class RegistryStore:
f"ability {ability_id} was not found for repository {repository_id}"
)
def update_ability(
self,
repository_id: int,
ability_id: int,
*,
name: str | None = None,
description: str | None = None,
confidence: float | None = None,
) -> None:
self._update_approved_row(
table="approved_abilities",
label="ability",
repository_id=repository_id,
row_id=ability_id,
values={
"name": name,
"description": description,
"confidence": confidence,
},
)
def delete_ability(self, repository_id: int, ability_id: int) -> None:
self._delete_approved_row(
table="approved_abilities",
label="ability",
repository_id=repository_id,
row_id=ability_id,
)
def create_capability(
self,
repository_id: int,
@@ -1240,6 +1269,39 @@ class RegistryStore:
f"capability {capability_id} was not found for repository {repository_id}"
)
def update_capability(
self,
repository_id: int,
capability_id: int,
*,
name: str | None = None,
description: str | None = None,
inputs: list[str] | None = None,
outputs: list[str] | None = None,
confidence: float | None = None,
) -> None:
self._update_approved_row(
table="approved_capabilities",
label="capability",
repository_id=repository_id,
row_id=capability_id,
values={
"name": name,
"description": description,
"inputs": json.dumps(inputs) if inputs is not None else None,
"outputs": json.dumps(outputs) if outputs is not None else None,
"confidence": confidence,
},
)
def delete_capability(self, repository_id: int, capability_id: int) -> None:
self._delete_approved_row(
table="approved_capabilities",
label="capability",
repository_id=repository_id,
row_id=capability_id,
)
def create_feature(
self,
repository_id: int,
@@ -1270,6 +1332,37 @@ class RegistryStore:
)
return int(cursor.lastrowid)
def update_feature(
self,
repository_id: int,
feature_id: int,
*,
name: str | None = None,
type: str | None = None,
location: str | None = None,
confidence: float | None = None,
) -> None:
self._update_approved_row(
table="approved_features",
label="feature",
repository_id=repository_id,
row_id=feature_id,
values={
"name": name,
"type": type,
"location": location,
"confidence": confidence,
},
)
def delete_feature(self, repository_id: int, feature_id: int) -> None:
self._delete_approved_row(
table="approved_features",
label="feature",
repository_id=repository_id,
row_id=feature_id,
)
def create_evidence(
self,
repository_id: int,
@@ -1298,6 +1391,35 @@ class RegistryStore:
)
return int(cursor.lastrowid)
def update_evidence(
self,
repository_id: int,
evidence_id: int,
*,
type: str | None = None,
reference: str | None = None,
strength: str | None = None,
) -> None:
self._update_approved_row(
table="approved_evidence",
label="evidence",
repository_id=repository_id,
row_id=evidence_id,
values={
"type": type,
"reference": reference,
"strength": strength,
},
)
def delete_evidence(self, repository_id: int, evidence_id: int) -> None:
self._delete_approved_row(
table="approved_evidence",
label="evidence",
repository_id=repository_id,
row_id=evidence_id,
)
def get_ability_map(self, repository_id: int) -> RepositoryAbilityMap:
repository = self.get_repository(repository_id)
with self.connect() as connection:
@@ -1715,6 +1837,81 @@ class RegistryStore:
],
)
def _update_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
values: dict[str, str | float | None],
) -> None:
assignments: list[str] = []
params: list[str | float | int] = []
for column, value in values.items():
if value is None:
continue
assignments.append(f"{column} = ?")
params.append(value)
if not assignments:
self._ensure_approved_row(
table=table,
label=label,
repository_id=repository_id,
row_id=row_id,
)
return
params.extend([row_id, repository_id])
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET {", ".join(assignments)}
WHERE id = ? AND repository_id = ?
""",
params,
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _delete_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
) -> None:
with self.connect() as connection:
cursor = connection.execute(
f"DELETE FROM {table} WHERE id = ? AND repository_id = ?",
(row_id, repository_id),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _ensure_approved_row(
self,
*,
table: str,
label: str,
repository_id: int,
row_id: int,
) -> None:
with self.connect() as connection:
row = connection.execute(
f"SELECT id FROM {table} WHERE id = ? AND repository_id = ?",
(row_id, repository_id),
).fetchone()
if row is None:
raise NotFoundError(
f"{label} {row_id} was not found for repository {repository_id}"
)
def _source_refs_to_json(self, source_refs: list[SourceReference]) -> str:
return json.dumps(
[