Files
state-hub/api/routers/capability_requests.py

503 lines
18 KiB
Python

import re
import uuid
from datetime import datetime, timezone
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.database import get_session
from api.flow_defs import assertion_result_to_dict, evaluate_transition, flow_result_to_dict
from api.models.agent_message import AgentMessage
from api.models.capability_catalog import CapabilityCatalog
from api.models.capability_request import CapabilityRequest
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.task import Task
from api.schemas.capability_request import (
CapabilityRequestAccept,
CapabilityRequestCreate,
CapabilityRequestDispute,
CapabilityRequestPatch,
CapabilityRequestRead,
CapabilityRequestReroute,
CapabilityRequestStatusPatch,
)
from hub_core.routers.capabilities import (
create_capability_catalog_router,
create_capability_request_read_router,
)
router = create_capability_catalog_router(
get_session,
domain_model=Domain,
repo_model=ManagedRepo,
catalog_model=CapabilityCatalog,
)
router.include_router(
create_capability_request_read_router(
get_session,
domain_model=Domain,
request_model=CapabilityRequest,
request_read_schema=CapabilityRequestRead,
)
)
# ---------------------------------------------------------------------------
# Capability Request endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-requests/", response_model=CapabilityRequestRead, status_code=status.HTTP_201_CREATED)
async def create_request(
body: CapabilityRequestCreate,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req_domain = await _resolve_domain(body.requesting_domain, session)
# Route to provider
fulfilling_domain_id, catalog_entry_id, routing_note = await _route_capability(
session, body.capability_type, body.title, body.description or ""
)
req = CapabilityRequest(
title=body.title,
description=body.description,
capability_type=body.capability_type,
priority=body.priority,
requesting_domain_id=req_domain.id,
requesting_agent=body.requesting_agent,
requesting_workstream_id=body.requesting_workstream_id,
blocking_task_id=body.blocking_task_id,
fulfilling_domain_id=fulfilling_domain_id,
catalog_entry_id=catalog_entry_id,
routing_note=routing_note,
)
session.add(req)
await session.flush() # get req.id before creating notification
# Auto-notify
if fulfilling_domain_id:
ful_domain = await session.get(Domain, fulfilling_domain_id)
to_agent = ful_domain.slug if ful_domain else "broadcast"
else:
to_agent = "broadcast"
_add_notification(
session,
from_agent="system",
to_agent=to_agent,
subject=f"[capability-request] {body.title}",
body=(
f"New capability request from **{body.requesting_agent}** "
f"({body.requesting_domain}):\n\n"
f"**Type:** {body.capability_type}\n"
f"**Priority:** {body.priority}\n\n"
f"{body.description or '(no description)'}"
),
)
await session.commit()
await session.refresh(req)
return req
@router.post("/capability-requests/{request_id}/accept", response_model=CapabilityRequestRead)
async def accept_request(
request_id: uuid.UUID,
body: CapabilityRequestAccept,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, "accepted")
now = datetime.now(tz=timezone.utc)
req.status = "accepted"
req.fulfilling_agent = body.fulfilling_agent
req.fulfilling_workstream_id = body.fulfilling_workstream_id
req.accepted_at = now
# If no fulfilling domain was set by routing, infer from the accepting agent's context
# (The agent can also PATCH it later if needed)
_add_notification(
session,
from_agent=body.fulfilling_agent,
to_agent=req.requesting_agent,
subject=f"[capability-accepted] {req.title}",
body=f"Your capability request **{req.title}** has been accepted by **{body.fulfilling_agent}**.",
)
await session.commit()
await session.refresh(req)
return req
@router.patch("/capability-requests/{request_id}/status", response_model=CapabilityRequestRead)
async def patch_request_status(
request_id: uuid.UUID,
body: CapabilityRequestStatusPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, body.status)
req.status = body.status
if body.note:
req.resolution_note = body.note
now = datetime.now(tz=timezone.utc)
# Status-specific side effects
if body.status == "completed":
req.completed_at = now
# Auto-unblock the blocking task
if req.blocking_task_id:
task = await session.get(Task, req.blocking_task_id)
if task and task.status == "wait":
task.status = "todo"
task.blocking_reason = None
_add_notification(
session,
from_agent="system",
to_agent=req.requesting_agent,
subject=f"[capability-completed] {req.title}",
body=(
f"Capability request **{req.title}** has been completed.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "ready_for_review":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-ready] {req.title} -- please review",
body=(
f"Capability **{req.title}** is ready for your review and optimization.\n\n"
f"{body.note or ''}"
),
)
elif body.status == "rejected":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-rejected] {req.title}",
body=(
f"Capability request **{req.title}** has been rejected.\n\n"
f"**Reason:** {body.note or '(no reason given)'}"
),
)
elif body.status == "in_progress":
_add_notification(
session,
from_agent=req.fulfilling_agent or "system",
to_agent=req.requesting_agent,
subject=f"[capability-in-progress] {req.title}",
body=f"Work on capability **{req.title}** is now in progress.",
)
await session.commit()
await session.refresh(req)
return req
@router.patch("/capability-requests/{request_id}", response_model=CapabilityRequestRead)
async def patch_request(
request_id: uuid.UUID,
body: CapabilityRequestPatch,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Correct mutable metadata: catalog_entry_id (re-derives fulfilling domain),
priority, blocking_task_id, fulfilling_workstream_id.
Only fields present in the request body (non-None) are updated.
"""
req = await _get_request_or_404(request_id, session)
corrections: list[str] = []
if body.catalog_entry_id is not None:
old_entry_id = req.catalog_entry_id
entry = await session.get(CapabilityCatalog, body.catalog_entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found")
req.catalog_entry_id = entry.id
# Re-derive fulfilling domain from catalog entry
old_domain_id = req.fulfilling_domain_id
req.fulfilling_domain_id = entry.domain_id
corrections.append(
f"catalog_entry: {old_entry_id}{entry.id} ({entry.title}); "
f"fulfilling_domain re-derived → {entry.domain_id}"
)
if body.priority is not None:
req.priority = body.priority
corrections.append(f"priority → {body.priority}")
if body.blocking_task_id is not None:
req.blocking_task_id = body.blocking_task_id
corrections.append(f"blocking_task_id → {body.blocking_task_id}")
if body.fulfilling_workstream_id is not None:
req.fulfilling_workstream_id = body.fulfilling_workstream_id
corrections.append(f"fulfilling_workstream_id → {body.fulfilling_workstream_id}")
if not corrections:
return req # no-op
correction_note = "hub correction: " + "; ".join(corrections)
req.routing_note = (req.routing_note + "\n" + correction_note) if req.routing_note else correction_note
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Dispute endpoints
# ---------------------------------------------------------------------------
@router.post("/capability-requests/{request_id}/dispute", response_model=CapabilityRequestRead)
async def dispute_request(
request_id: uuid.UUID,
body: CapabilityRequestDispute,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Flag a routing decision as incorrect. Transitions to routing_disputed."""
req = await _get_request_or_404(request_id, session)
_check_transition(req.status, "routing_disputed")
now = datetime.now(tz=timezone.utc)
req.status = "routing_disputed"
req.dispute_reason = body.reason
req.disputed_by = body.disputed_by
req.dispute_suggested_domain = body.suggested_domain
req.disputed_at = now
dispute_entry = (
f"disputed by {body.disputed_by}: {body.reason}"
+ (f" (suggested: {body.suggested_domain})" if body.suggested_domain else "")
)
req.routing_note = (req.routing_note + "\n" + dispute_entry) if req.routing_note else dispute_entry
# Notify custodian
_add_notification(
session,
from_agent=body.disputed_by,
to_agent="custodian",
subject=f"[routing-disputed] {req.title}",
body=(
f"**{body.disputed_by}** has disputed the routing of capability request "
f"**{req.title}**.\n\n"
f"**Reason:** {body.reason}\n"
+ (f"**Suggested domain:** {body.suggested_domain}\n" if body.suggested_domain else "")
+ f"\nCurrently routed to: {req.fulfilling_domain_slug or 'unrouted'}"
),
)
# Notify current fulfilling domain
if req.fulfilling_domain_slug:
_add_notification(
session,
from_agent=body.disputed_by,
to_agent=req.fulfilling_domain_slug,
subject=f"[routing-disputed] {req.title}",
body=(
f"The capability request **{req.title}** routed to your domain has been disputed "
f"by **{body.disputed_by}**.\n\n"
f"**Reason:** {body.reason}\n"
+ (f"**Suggested domain:** {body.suggested_domain}" if body.suggested_domain else "")
),
)
await session.commit()
await session.refresh(req)
return req
@router.post("/capability-requests/{request_id}/reroute", response_model=CapabilityRequestRead)
async def reroute_request(
request_id: uuid.UUID,
body: CapabilityRequestReroute,
session: AsyncSession = Depends(get_session),
) -> CapabilityRequest:
"""Re-route a disputed request to a new domain. Resets to requested."""
req = await _get_request_or_404(request_id, session)
if req.status != "routing_disputed":
raise HTTPException(
status_code=422,
detail=f"Cannot reroute from status '{req.status}'. Only 'routing_disputed' requests can be rerouted.",
)
if body.catalog_entry_id is None and body.domain is None:
raise HTTPException(status_code=422, detail="Either catalog_entry_id or domain must be provided.")
if body.catalog_entry_id is not None:
entry = await session.get(CapabilityCatalog, body.catalog_entry_id)
if entry is None:
raise HTTPException(status_code=404, detail=f"Catalog entry '{body.catalog_entry_id}' not found")
req.catalog_entry_id = entry.id
req.fulfilling_domain_id = entry.domain_id
new_domain_slug = (await session.get(Domain, entry.domain_id)).slug if entry.domain_id else "unknown"
else:
new_domain = await _resolve_domain(body.domain, session)
req.fulfilling_domain_id = new_domain.id
new_domain_slug = new_domain.slug
old_domain = req.dispute_suggested_domain or "unknown"
# Clear dispute fields
req.dispute_reason = None
req.disputed_by = None
req.dispute_suggested_domain = None
req.disputed_at = None
req.status = "requested"
reroute_entry = f"re-routed by {body.rerouted_by}{new_domain_slug}: {body.note}"
req.routing_note = (req.routing_note + "\n" + reroute_entry) if req.routing_note else reroute_entry
# Notify requester
_add_notification(
session,
from_agent=body.rerouted_by,
to_agent=req.requesting_agent,
subject=f"[re-routed] {req.title}",
body=(
f"Capability request **{req.title}** has been re-routed to **{new_domain_slug}**.\n\n"
f"**Note:** {body.note}"
),
)
# Notify new fulfilling domain
_add_notification(
session,
from_agent=body.rerouted_by,
to_agent=new_domain_slug,
subject=f"[capability-request] {req.title}",
body=(
f"Capability request **{req.title}** has been re-routed to your domain.\n\n"
f"**From:** {req.requesting_agent} ({req.requesting_domain_slug})\n"
f"**Type:** {req.capability_type}\n"
f"**Priority:** {req.priority}\n\n"
f"{req.description or '(no description)'}"
),
)
await session.commit()
await session.refresh(req)
return req
# ---------------------------------------------------------------------------
# Routing algorithm
# ---------------------------------------------------------------------------
async def _route_capability(
session: AsyncSession, capability_type: str, title: str, description: str
) -> tuple[uuid.UUID | None, uuid.UUID | None, str]:
"""Find the best-matching catalog entry for a capability request.
Returns (domain_id, catalog_entry_id, routing_note).
Uses word-boundary matching on (title + description) combined to avoid
false positives from substring matches (e.g. 'postgres' inside 'postgresql',
'ha' inside 'has').
"""
q = select(CapabilityCatalog).where(
CapabilityCatalog.capability_type == capability_type,
CapabilityCatalog.status == "active",
)
entries = list((await session.execute(q)).scalars().all())
if not entries:
return None, None, f"no active catalog entries for type '{capability_type}' — broadcast"
if len(entries) == 1:
e = entries[0]
return e.domain_id, e.id, f"single match: '{e.title}' (domain={e.domain_id})"
# Score by word-boundary keyword overlap against title + description combined
combined = f"{title} {description or ''}".lower()
scored: list[tuple[int, CapabilityCatalog]] = []
for entry in entries:
keywords = [kw for kw in (entry.keywords or []) if len(kw) >= 3]
score = sum(
1 for kw in keywords
if re.search(r'\b' + re.escape(kw.lower()) + r'\b', combined)
)
scored.append((score, entry))
scored.sort(key=lambda x: -x[0])
best_score, best = scored[0]
if best_score == 0:
return None, None, (
f"no keyword overlap for type '{capability_type}' among "
f"{len(entries)} entries — broadcast"
)
if len(scored) >= 2 and scored[1][0] == best_score:
return None, None, (
f"ambiguous routing: '{scored[0][1].title}' and '{scored[1][1].title}' "
f"both scored {best_score} — broadcast"
)
return best.domain_id, best.id, (
f"matched '{best.title}' (score={best_score}, "
f"keywords matched from: {title!r})"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _add_notification(
session: AsyncSession,
from_agent: str,
to_agent: str,
subject: str,
body: str,
) -> None:
"""Create an AgentMessage notification in the current session (no commit)."""
msg = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
subject=subject,
body=body,
)
session.add(msg)
async def _resolve_domain(slug: str, session: AsyncSession) -> Domain:
result = await session.execute(select(Domain).where(Domain.slug == slug))
domain = result.scalar_one_or_none()
if domain is None:
raise HTTPException(status_code=404, detail=f"Domain '{slug}' not found")
return domain
async def _get_request_or_404(request_id: uuid.UUID, session: AsyncSession) -> CapabilityRequest:
req = await session.get(CapabilityRequest, request_id)
if req is None:
raise HTTPException(status_code=404, detail=f"Capability request '{request_id}' not found")
return req
def _check_transition(current: str, target: str) -> None:
can_reach, failures, flow_result = evaluate_transition(
"capability_request",
current,
target,
)
if not can_reach:
raise HTTPException(
status_code=422,
detail={
"message": f"Cannot transition from '{current}' to '{target}'.",
"current_workstation": current,
"target_workstation": target,
"blocking_assertions": [
assertion_result_to_dict(item) for item in failures
],
"flow_result": flow_result_to_dict(flow_result),
},
)