feat(capability-requests): add routing_note, PATCH endpoint, word-boundary fix, and ops-bridge tunnel targets

- Add `routing_note` column (migration l9g0h1i2j3k4) to persist why a request was routed to a given domain
- Fix substring-match bug in `_route_capability`: use `\b` word-boundary regex so 'postgres' no longer matches inside 'postgresql'
- Include `title` in keyword scoring for better routing accuracy
- Return `routing_note` string from `_route_capability` and store it on the request
- Add `PATCH /capability-requests/{id}` endpoint + `CapabilityRequestPatch` schema to correct mutable metadata (catalog_entry_id, priority, blocking_task_id, fulfilling_workstream_id)
- Add `patch_capability_request` MCP tool wrapping the new endpoint
- Add 105 lines of routing tests (word-boundary, title-match, multi-entry scoring, broadcast fallback)
- Add `tunnels-up`, `tunnels-status`, `tunnels-check` Makefile targets for ops-bridge managed tunnels

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 03:47:54 +01:00
parent 101c953e69
commit 62d407cae7
7 changed files with 282 additions and 18 deletions

View File

@@ -65,6 +65,7 @@ class CapabilityRequest(Base, TimestampMixin):
)
resolution_note: Mapped[str | None] = mapped_column(Text, nullable=True)
routing_note: Mapped[str | None] = mapped_column(Text, nullable=True)
accepted_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)

View File

@@ -1,3 +1,4 @@
import re
import uuid
from datetime import datetime, timezone
@@ -16,6 +17,7 @@ from api.schemas.capability_request import (
CatalogRead,
CapabilityRequestAccept,
CapabilityRequestCreate,
CapabilityRequestPatch,
CapabilityRequestRead,
CapabilityRequestStatusPatch,
)
@@ -100,8 +102,8 @@ async def create_request(
req_domain = await _resolve_domain(body.requesting_domain, session)
# Route to provider
fulfilling_domain_id, catalog_entry_id = await _route_capability(
session, body.capability_type, body.description or ""
fulfilling_domain_id, catalog_entry_id, routing_note = await _route_capability(
session, body.capability_type, body.title, body.description or ""
)
req = CapabilityRequest(
@@ -115,6 +117,7 @@ async def create_request(
blocking_task_id=body.blocking_task_id,
fulfilling_domain_id=fulfilling_domain_id,
catalog_entry_id=catalog_entry_id,
routing_note=routing_note,
)
session.add(req)
await session.flush() # get req.id before creating notification
@@ -277,16 +280,70 @@ async def patch_request_status(
return req
@router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead)
async def patch_request(
request_id: uuid.UUID,
body: CapabilityRequestPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Correct mutable metadata: catalog_entry_id (re-derives fulfilling domain),
priority, blocking_task_id, fulfilling_workstream_id.
Only fields present in the request body (non-None) are updated.
"""
req = await _get_request_or_404(request_id, session)
corrections: list[str] = []
if body.catalog_entry_id is not None:
old_entry_id = req.catalog_entry_id
entry = await session.get(CapabilityCatalog, body.catalog_entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found")
req.catalog_entry_id = entry.id
# Re-derive fulfilling domain from catalog entry
old_domain_id = req.fulfilling_domain_id
req.fulfilling_domain_id = entry.domain_id
corrections.append(
f"catalog_entry: {old_entry_id}{entry.id} ({entry.title}); "
f"fulfilling_domain re-derived → {entry.domain_id}"
)
if body.priority is not None:
req.priority = body.priority
corrections.append(f"priority → {body.priority}")
if body.blocking_task_id is not None:
req.blocking_task_id = body.blocking_task_id
corrections.append(f"blocking_task_id → {body.blocking_task_id}")
if body.fulfilling_workstream_id is not None:
req.fulfilling_workstream_id = body.fulfilling_workstream_id
corrections.append(f"fulfilling_workstream_id → {body.fulfilling_workstream_id}")
if not corrections:
return req # no-op
correction_note = "hub correction: " + "; ".join(corrections)
req.routing_note = (req.routing_note + "\n" + correction_note) if req.routing_note else correction_note
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Routing algorithm
# ---------------------------------------------------------------------------
async def _route_capability(
session: AsyncSession, capability_type: str, description: str
) -> tuple[uuid.UUID | None, uuid.UUID | None]:
"""Find the best-matching domain for a capability request.
session: AsyncSession, capability_type: str, title: str, description: str
) -> tuple[uuid.UUID | None, uuid.UUID | None, str]:
"""Find the best-matching catalog entry for a capability request.
Returns (domain_id, catalog_entry_id) or (None, None) for broadcast.
Returns (domain_id, catalog_entry_id, routing_note).
Uses word-boundary matching on (title + description) combined to avoid
false positives from substring matches (e.g. 'postgres' inside 'postgresql',
'ha' inside 'has').
"""
q = select(CapabilityCatalog).where(
CapabilityCatalog.capability_type == capability_type,
@@ -294,20 +351,41 @@ async def _route_capability(
)
entries = list((await session.execute(q)).scalars().all())
if not entries:
return None, None, f"no active catalog entries for type '{capability_type}' — broadcast"
if len(entries) == 1:
return entries[0].domain_id, entries[0].id
e = entries[0]
return e.domain_id, e.id, f"single match: '{e.title}' (domain={e.domain_id})"
if len(entries) > 1 and description:
desc_lower = description.lower()
scored: list[tuple[int, CapabilityCatalog]] = []
for entry in entries:
score = sum(1 for kw in (entry.keywords or []) if kw.lower() in desc_lower)
scored.append((score, entry))
scored.sort(key=lambda x: -x[0])
if scored[0][0] > 0 and (len(scored) < 2 or scored[0][0] > scored[1][0]):
return scored[0][1].domain_id, scored[0][1].id
# Score by word-boundary keyword overlap against title + description combined
combined = f"{title} {description or ''}".lower()
scored: list[tuple[int, CapabilityCatalog]] = []
for entry in entries:
keywords = [kw for kw in (entry.keywords or []) if len(kw) >= 3]
score = sum(
1 for kw in keywords
if re.search(r'\b' + re.escape(kw.lower()) + r'\b', combined)
)
scored.append((score, entry))
scored.sort(key=lambda x: -x[0])
return None, None
best_score, best = scored[0]
if best_score == 0:
return None, None, (
f"no keyword overlap for type '{capability_type}' among "
f"{len(entries)} entries — broadcast"
)
if len(scored) >= 2 and scored[1][0] == best_score:
return None, None, (
f"ambiguous routing: '{scored[0][1].title}' and '{scored[1][1].title}' "
f"both scored {best_score} — broadcast"
)
return best.domain_id, best.id, (
f"matched '{best.title}' (score={best_score}, "
f"keywords matched from: {title!r})"
)
# ---------------------------------------------------------------------------

View File

@@ -55,6 +55,13 @@ class CapabilityRequestStatusPatch(BaseModel):
note: str | None = None
class CapabilityRequestPatch(BaseModel):
catalog_entry_id: uuid.UUID | None = None
priority: str | None = None
blocking_task_id: uuid.UUID | None = None
fulfilling_workstream_id: uuid.UUID | None = None
class CapabilityRequestRead(BaseModel):
model_config = ConfigDict(from_attributes=True)
@@ -73,6 +80,7 @@ class CapabilityRequestRead(BaseModel):
blocking_task_id: uuid.UUID | None = None
catalog_entry_id: uuid.UUID | None = None
resolution_note: str | None = None
routing_note: str | None = None
accepted_at: datetime | None = None
completed_at: datetime | None = None
created_at: datetime