relinking workflow

This commit is contained in:
2026-04-25 23:44:25 +02:00
parent 71beb0d458
commit 1d6d103bc2
6 changed files with 558 additions and 0 deletions

View File

@@ -576,6 +576,133 @@ class RegistryStore:
f"{repository_id} analysis run {analysis_run_id}"
)
def relink_candidate_capability(
self,
repository_id: int,
analysis_run_id: int,
candidate_capability_id: int,
target_ability_id: int,
) -> None:
self._ensure_candidate_row(
table="candidate_abilities",
label="target candidate ability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_ability_id,
)
with self.connect() as connection:
cursor = connection.execute(
"""
UPDATE candidate_capabilities
SET ability_id = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
target_ability_id,
candidate_capability_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
"candidate capability "
f"{candidate_capability_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def relink_candidate_feature(
self,
repository_id: int,
analysis_run_id: int,
candidate_feature_id: int,
target_capability_id: int,
) -> None:
self._relink_candidate_leaf(
table="candidate_features",
label="candidate feature",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_feature_id,
target_capability_id=target_capability_id,
)
def relink_candidate_evidence(
self,
repository_id: int,
analysis_run_id: int,
candidate_evidence_id: int,
target_capability_id: int,
) -> None:
self._relink_candidate_leaf(
table="candidate_evidence",
label="candidate evidence",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=candidate_evidence_id,
target_capability_id=target_capability_id,
)
def _ensure_candidate_row(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
candidate_id: int,
) -> None:
with self.connect() as connection:
row = connection.execute(
f"""
SELECT id FROM {table}
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(candidate_id, repository_id, analysis_run_id),
).fetchone()
if row is None:
raise NotFoundError(
f"{label} {candidate_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _relink_candidate_leaf(
self,
*,
table: str,
label: str,
repository_id: int,
analysis_run_id: int,
candidate_id: int,
target_capability_id: int,
) -> None:
self._ensure_candidate_row(
table="candidate_capabilities",
label="target candidate capability",
repository_id=repository_id,
analysis_run_id=analysis_run_id,
candidate_id=target_capability_id,
)
with self.connect() as connection:
cursor = connection.execute(
f"""
UPDATE {table}
SET capability_id = ?
WHERE id = ? AND repository_id = ? AND analysis_run_id = ?
""",
(
target_capability_id,
candidate_id,
repository_id,
analysis_run_id,
),
)
if cursor.rowcount == 0:
raise NotFoundError(
f"{label} {candidate_id} was not found for repository "
f"{repository_id} analysis run {analysis_run_id}"
)
def _reject_candidate_leaf(
self,
*,