diff --git a/state-hub/Makefile b/state-hub/Makefile index 058a10d..1bd7a55 100644 --- a/state-hub/Makefile +++ b/state-hub/Makefile @@ -1,4 +1,4 @@ -.PHONY: install install-cli db db-tools migrate seed api dashboard check test clean register-project validate-adr add-domain rename-domain add-repo list-repos register-path cleanup-stale tunnel tunnel-daemon tunnel-loop tunnel-status tunnel-stop install-hooks install-hooks-all gitea-inventory +.PHONY: install install-cli db db-tools migrate seed api dashboard check test clean register-project validate-adr add-domain rename-domain add-repo list-repos register-path cleanup-stale tunnel tunnel-daemon tunnel-loop tunnel-status tunnel-stop tunnels-up tunnels-status tunnels-check install-hooks install-hooks-all gitea-inventory COMPOSE = docker compose -f infra/docker-compose.yml --env-file .env @@ -103,6 +103,19 @@ tunnel-stop: @pkill -f "autossh.*$(TUNNEL_PORT)" 2>/dev/null && echo "autossh stopped" || true @pkill -f "ssh.*-R $(TUNNEL_PORT)" 2>/dev/null && echo "ssh loop stopped" || true +## ops-bridge managed tunnels (preferred over tunnel-*/tunnel-daemon) +## Requires ops-bridge: bridge is at /home/worsch/.local/bin/bridge +tunnels-up: + bridge up + +tunnels-status: + bridge status + +## End-to-end check: verifies SSH process alive + remote port listening on COULOMBCORE. +## Exits non-zero if any tunnel is not fully operational. +tunnels-check: + bridge check + ## Start (or restart) the full backend — db + migrate + uvicorn. ## Stops uvicorn on :8000 if already running, then starts fresh. api: db diff --git a/state-hub/api/models/capability_request.py b/state-hub/api/models/capability_request.py index c890d3e..7051390 100644 --- a/state-hub/api/models/capability_request.py +++ b/state-hub/api/models/capability_request.py @@ -65,6 +65,7 @@ class CapabilityRequest(Base, TimestampMixin): ) resolution_note: Mapped[str | None] = mapped_column(Text, nullable=True) + routing_note: Mapped[str | None] = mapped_column(Text, nullable=True) accepted_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True ) diff --git a/state-hub/api/routers/capability_requests.py b/state-hub/api/routers/capability_requests.py index fda2b3d..b46f313 100644 --- a/state-hub/api/routers/capability_requests.py +++ b/state-hub/api/routers/capability_requests.py @@ -1,3 +1,4 @@ +import re import uuid from datetime import datetime, timezone @@ -16,6 +17,7 @@ from api.schemas.capability_request import ( CatalogRead, CapabilityRequestAccept, CapabilityRequestCreate, + CapabilityRequestPatch, CapabilityRequestRead, CapabilityRequestStatusPatch, ) @@ -100,8 +102,8 @@ async def create_request( req_domain = await _resolve_domain(body.requesting_domain, session) # Route to provider - fulfilling_domain_id, catalog_entry_id = await _route_capability( - session, body.capability_type, body.description or "" + fulfilling_domain_id, catalog_entry_id, routing_note = await _route_capability( + session, body.capability_type, body.title, body.description or "" ) req = CapabilityRequest( @@ -115,6 +117,7 @@ async def create_request( blocking_task_id=body.blocking_task_id, fulfilling_domain_id=fulfilling_domain_id, catalog_entry_id=catalog_entry_id, + routing_note=routing_note, ) session.add(req) await session.flush() # get req.id before creating notification @@ -277,16 +280,70 @@ async def patch_request_status( return req +@router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead) +async def patch_request( + request_id: uuid.UUID, + body: CapabilityRequestPatch, + session: AsyncSession = Depends(get_session), +) -> CapabilityRequest: + """Correct mutable metadata: catalog_entry_id (re-derives fulfilling domain), + priority, blocking_task_id, fulfilling_workstream_id. + Only fields present in the request body (non-None) are updated. + """ + req = await _get_request_or_404(request_id, session) + + corrections: list[str] = [] + + if body.catalog_entry_id is not None: + old_entry_id = req.catalog_entry_id + entry = await session.get(CapabilityCatalog, body.catalog_entry_id) + if entry is None: + raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found") + req.catalog_entry_id = entry.id + # Re-derive fulfilling domain from catalog entry + old_domain_id = req.fulfilling_domain_id + req.fulfilling_domain_id = entry.domain_id + corrections.append( + f"catalog_entry: {old_entry_id} → {entry.id} ({entry.title}); " + f"fulfilling_domain re-derived → {entry.domain_id}" + ) + + if body.priority is not None: + req.priority = body.priority + corrections.append(f"priority → {body.priority}") + + if body.blocking_task_id is not None: + req.blocking_task_id = body.blocking_task_id + corrections.append(f"blocking_task_id → {body.blocking_task_id}") + + if body.fulfilling_workstream_id is not None: + req.fulfilling_workstream_id = body.fulfilling_workstream_id + corrections.append(f"fulfilling_workstream_id → {body.fulfilling_workstream_id}") + + if not corrections: + return req # no-op + + correction_note = "hub correction: " + "; ".join(corrections) + req.routing_note = (req.routing_note + "\n" + correction_note) if req.routing_note else correction_note + + await session.commit() + await session.refresh(req) + return req + + # --------------------------------------------------------------------------- # Routing algorithm # --------------------------------------------------------------------------- async def _route_capability( - session: AsyncSession, capability_type: str, description: str -) -> tuple[uuid.UUID | None, uuid.UUID | None]: - """Find the best-matching domain for a capability request. + session: AsyncSession, capability_type: str, title: str, description: str +) -> tuple[uuid.UUID | None, uuid.UUID | None, str]: + """Find the best-matching catalog entry for a capability request. - Returns (domain_id, catalog_entry_id) or (None, None) for broadcast. + Returns (domain_id, catalog_entry_id, routing_note). + Uses word-boundary matching on (title + description) combined to avoid + false positives from substring matches (e.g. 'postgres' inside 'postgresql', + 'ha' inside 'has'). """ q = select(CapabilityCatalog).where( CapabilityCatalog.capability_type == capability_type, @@ -294,20 +351,41 @@ async def _route_capability( ) entries = list((await session.execute(q)).scalars().all()) + if not entries: + return None, None, f"no active catalog entries for type '{capability_type}' — broadcast" + if len(entries) == 1: - return entries[0].domain_id, entries[0].id + e = entries[0] + return e.domain_id, e.id, f"single match: '{e.title}' (domain={e.domain_id})" - if len(entries) > 1 and description: - desc_lower = description.lower() - scored: list[tuple[int, CapabilityCatalog]] = [] - for entry in entries: - score = sum(1 for kw in (entry.keywords or []) if kw.lower() in desc_lower) - scored.append((score, entry)) - scored.sort(key=lambda x: -x[0]) - if scored[0][0] > 0 and (len(scored) < 2 or scored[0][0] > scored[1][0]): - return scored[0][1].domain_id, scored[0][1].id + # Score by word-boundary keyword overlap against title + description combined + combined = f"{title} {description or ''}".lower() + scored: list[tuple[int, CapabilityCatalog]] = [] + for entry in entries: + keywords = [kw for kw in (entry.keywords or []) if len(kw) >= 3] + score = sum( + 1 for kw in keywords + if re.search(r'\b' + re.escape(kw.lower()) + r'\b', combined) + ) + scored.append((score, entry)) + scored.sort(key=lambda x: -x[0]) - return None, None + best_score, best = scored[0] + if best_score == 0: + return None, None, ( + f"no keyword overlap for type '{capability_type}' among " + f"{len(entries)} entries — broadcast" + ) + if len(scored) >= 2 and scored[1][0] == best_score: + return None, None, ( + f"ambiguous routing: '{scored[0][1].title}' and '{scored[1][1].title}' " + f"both scored {best_score} — broadcast" + ) + + return best.domain_id, best.id, ( + f"matched '{best.title}' (score={best_score}, " + f"keywords matched from: {title!r})" + ) # --------------------------------------------------------------------------- diff --git a/state-hub/api/schemas/capability_request.py b/state-hub/api/schemas/capability_request.py index e1f01f1..6c0c8e4 100644 --- a/state-hub/api/schemas/capability_request.py +++ b/state-hub/api/schemas/capability_request.py @@ -55,6 +55,13 @@ class CapabilityRequestStatusPatch(BaseModel): note: str | None = None +class CapabilityRequestPatch(BaseModel): + catalog_entry_id: uuid.UUID | None = None + priority: str | None = None + blocking_task_id: uuid.UUID | None = None + fulfilling_workstream_id: uuid.UUID | None = None + + class CapabilityRequestRead(BaseModel): model_config = ConfigDict(from_attributes=True) @@ -73,6 +80,7 @@ class CapabilityRequestRead(BaseModel): blocking_task_id: uuid.UUID | None = None catalog_entry_id: uuid.UUID | None = None resolution_note: str | None = None + routing_note: str | None = None accepted_at: datetime | None = None completed_at: datetime | None = None created_at: datetime diff --git a/state-hub/mcp_server/server.py b/state-hub/mcp_server/server.py index a1f707d..c33fa56 100644 --- a/state-hub/mcp_server/server.py +++ b/state-hub/mcp_server/server.py @@ -1881,6 +1881,46 @@ def update_capability_request_status( return json.dumps(result, indent=2) +@mcp.tool() +def patch_capability_request( + request_id: str, + catalog_entry_id: Optional[str] = None, + priority: Optional[str] = None, + blocking_task_id: Optional[str] = None, + fulfilling_workstream_id: Optional[str] = None, +) -> dict: + """Correct mutable metadata on a capability request. + + Correcting catalog_entry_id automatically re-derives the fulfilling domain. + Use this when the hub mis-routed a request (wrong catalog entry or domain). + Only provided (non-None) fields are updated. + + Args: + request_id: UUID of the capability request to patch. + catalog_entry_id: Correct catalog entry UUID. Re-derives fulfilling domain. + priority: New priority (low/medium/high/critical). + blocking_task_id: UUID of the task this request unblocks on completion. + fulfilling_workstream_id: UUID of the workstream delivering this capability. + + Returns: + Updated capability request dict, or {"error": "..."}. + """ + body: dict = {} + if catalog_entry_id is not None: + body["catalog_entry_id"] = catalog_entry_id + if priority is not None: + body["priority"] = priority + if blocking_task_id is not None: + body["blocking_task_id"] = blocking_task_id + if fulfilling_workstream_id is not None: + body["fulfilling_workstream_id"] = fulfilling_workstream_id + + if not body: + return {"error": "no fields provided to patch"} + + return json.dumps(_patch(f"/capability-requests/{request_id}", body), indent=2) + + @mcp.tool() def list_capability_requests( domain: str | None = None, diff --git a/state-hub/migrations/versions/l9g0h1i2j3k4_capability_request_routing_note.py b/state-hub/migrations/versions/l9g0h1i2j3k4_capability_request_routing_note.py new file mode 100644 index 0000000..5d9b8f9 --- /dev/null +++ b/state-hub/migrations/versions/l9g0h1i2j3k4_capability_request_routing_note.py @@ -0,0 +1,19 @@ +"""Add routing_note to capability_requests + +Revision ID: l9g0h1i2j3k4 +Revises: k8f9a0b1c2d3 +Create Date: 2026-03-20 +""" +from alembic import op +import sqlalchemy as sa + +revision = 'l9g0h1i2j3k4' +down_revision = 'k8f9a0b1c2d3' +branch_labels = None +depends_on = None + +def upgrade() -> None: + op.add_column('capability_requests', sa.Column('routing_note', sa.Text(), nullable=True)) + +def downgrade() -> None: + op.drop_column('capability_requests', 'routing_note') diff --git a/state-hub/tests/test_capability_requests.py b/state-hub/tests/test_capability_requests.py index a908b87..ad756e9 100644 --- a/state-hub/tests/test_capability_requests.py +++ b/state-hub/tests/test_capability_requests.py @@ -335,3 +335,108 @@ class TestCapabilityRequestLifecycle: "status": "requested", }) assert r.status_code == 422 + + +class TestCapabilityRequestRouting: + async def test_word_boundary_avoids_substring_false_positive(self, client): + """'postgres' keyword must not match inside 'postgresql'.""" + await _setup_two_domains(client) + # Register two entries: PostgreSQL HA (keywords: postgres, ha) and k3s (keywords: k3s, cluster) + await _register_catalog(client, domain="railiance", cap_type="infrastructure", + title="PostgreSQL HA", keywords=["postgresql", "postgres", "ha"]) + await _register_catalog(client, domain="custodian", cap_type="infrastructure", + title="K3s provisioning", keywords=["k3s", "cluster", "k8s"]) + + # Description mentions k8s and cluster but NOT standalone "postgres" or "ha" + req = await _create_request( + client, + title="K3s cluster access", + description="Need k8s foundations. cluster must be up. kubeconfig required.", + cap_type="infrastructure", + ) + # k3s entry should win (k8s + cluster match), not postgres entry + assert req["catalog_entry_id"] is not None + # Routing note should mention k3s + assert req["routing_note"] is not None + assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower() + + async def test_title_included_in_routing(self, client): + """Title keywords should contribute to routing score.""" + await _setup_two_domains(client) + await _register_catalog(client, domain="railiance", cap_type="infrastructure", + title="K3s cluster", keywords=["k3s", "cluster", "kubernetes"]) + await _register_catalog(client, domain="custodian", cap_type="infrastructure", + title="Postgres DB", keywords=["postgresql", "postgres", "database"]) + + # Title contains "k3s" but description is generic + req = await _create_request( + client, + title="k3s cluster access needed", + description="Need access to proceed with deployment.", + cap_type="infrastructure", + ) + assert req["routing_note"] is not None + assert req["catalog_entry_id"] is not None + # Should match k3s (title has "k3s" and "cluster") + # Verify via routing note + assert "K3s" in req["routing_note"] or "k3s" in req["routing_note"].lower() + + async def test_ambiguous_routing_broadcasts(self, client): + """Tied scores should broadcast (no fulfilling domain).""" + await _setup_two_domains(client) + await _register_catalog(client, domain="railiance", cap_type="infrastructure", + title="Entry A", keywords=["cluster"]) + await _register_catalog(client, domain="custodian", cap_type="infrastructure", + title="Entry B", keywords=["cluster"]) + req = await _create_request( + client, + title="cluster needed", + description="cluster access", + cap_type="infrastructure", + ) + assert req["fulfilling_domain_slug"] is None + assert "ambiguous" in req["routing_note"] + + async def test_patch_corrects_catalog_entry_and_reroutes(self, client): + """PATCH /capability-requests/{id} corrects catalog_entry_id and re-derives domain.""" + req_d, ful_d = await _setup_two_domains(client) + # Register wrong entry (postgres) and correct entry (k3s) + wrong = await _register_catalog(client, domain="railiance", cap_type="infrastructure", + title="PostgreSQL HA", keywords=["postgresql"]) + correct = await _register_catalog(client, domain="custodian", cap_type="infrastructure", + title="K3s cluster", keywords=["k3s"]) + + # Create request — auto-routes to postgres (only postgres keyword matches "postgresql" in description) + req = await _create_request( + client, + description="Need postgresql and k3s access", + cap_type="infrastructure", + ) + + # Patch to correct catalog entry + r = await client.patch(f"/capability-requests/{req['id']}", json={ + "catalog_entry_id": correct["id"], + }) + assert r.status_code == 200 + data = r.json() + assert data["catalog_entry_id"] == correct["id"] + assert data["fulfilling_domain_slug"] == "custodian" + assert "hub correction" in data["routing_note"] + assert "K3s" in data["routing_note"] + + async def test_patch_unknown_catalog_entry_404(self, client): + await _setup_two_domains(client) + req = await _create_request(client, cap_type="security") + import uuid as _uuid + r = await client.patch(f"/capability-requests/{req['id']}", json={ + "catalog_entry_id": str(_uuid.uuid4()), + }) + assert r.status_code == 404 + + async def test_patch_priority(self, client): + await _setup_two_domains(client) + req = await _create_request(client, cap_type="security") + r = await client.patch(f"/capability-requests/{req['id']}", json={"priority": "critical"}) + assert r.status_code == 200 + assert r.json()["priority"] == "critical" + assert "priority" in r.json()["routing_note"]