Add curated daily triage digest

This commit is contained in:
2026-05-19 19:09:21 +02:00
parent 3110399b11
commit 6cb0718e90
2 changed files with 294 additions and 0 deletions

View File

@@ -8,6 +8,7 @@ Supported queries:
- next_steps: GET {STATE_HUB_URL}/state/next_steps
- workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index
- hub_inbox: GET {STATE_HUB_URL}/messages/?to_agent=hub&unread_only=true
- daily_triage_digest: curated scalar JSON digest for daily WSJF triage
No caching — state hub data is live operational state and must not be stale
within a single workflow run.
@@ -16,6 +17,7 @@ Config: STATE_HUB_URL env var (default: http://127.0.0.1:8000).
from __future__ import annotations
import json
import os
from typing import Any
@@ -25,6 +27,8 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_TIMEOUT_SECONDS = 10.0
_OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"}
_OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"}
def _base_url() -> str:
@@ -64,7 +68,182 @@ class StateHubContextResolver(ContextResolver):
"unread_only": params.get("unread_only", True),
}
return _fetch_json("/messages/", query_params)
if query == "daily_triage_digest":
return _daily_triage_digest(params)
return {}
CONTEXT_RESOLVER_REGISTRY["state-hub"] = StateHubContextResolver
def _daily_triage_digest(params: dict[str, Any]) -> str:
"""Return a compact JSON string safe to inject into an instruction prompt.
This is intentionally a scalar string rather than raw State Hub objects.
It limits fields to operational identifiers, counts, status, priority, and
short titles. That keeps the ActivityDefinition's trusted field small while
leaving an explicit `deterministic_scoring` extension point for future
code-driven WSJF selection of especially critical/high-gain candidates.
"""
summary = _fetch_json("/state/summary")
if not isinstance(summary, dict):
return "{}"
workplan_index = _fetch_json(
"/workstreams/workplan-index",
{"refresh": params.get("refresh", False)},
)
if not isinstance(workplan_index, dict):
workplan_index = {}
next_steps = _fetch_json("/state/next_steps")
if not isinstance(next_steps, list):
next_steps = []
inbox = _fetch_json(
"/messages/",
{
"to_agent": params.get("to_agent", "hub"),
"unread_only": params.get("unread_only", True),
},
)
if not isinstance(inbox, list):
inbox = []
max_workstreams = int(params.get("max_workstreams", 12))
max_next_steps = int(params.get("max_next_steps", 8))
open_workstreams = _open_workstream_digest(summary, workplan_index, max_workstreams)
digest = {
"generated_at": summary.get("generated_at"),
"totals": summary.get("totals", {}),
"open_workstreams": open_workstreams,
"next_steps": [_safe_next_step(item) for item in next_steps[:max_next_steps]],
"inbox": {
"unread_count": len(inbox),
"samples": [_safe_inbox_item(item) for item in inbox[:3]],
},
"deterministic_scoring": {
"mode": "candidate_digest_only",
"future_mode": "code_score_high_gain_high_effort_candidates",
"candidate_fields": [
"planning_priority",
"status",
"open_task_counts",
"needs_human_count",
"blocked_task_count",
"workplan_health_labels",
],
},
}
return json.dumps(digest, sort_keys=True, separators=(",", ":"))
def _open_workstream_digest(
summary: dict[str, Any],
workplan_index: dict[str, Any],
max_workstreams: int,
) -> list[dict[str, Any]]:
index = workplan_index.get("workstreams") or {}
candidates: list[dict[str, Any]] = []
for topic in summary.get("topics", []):
domain = topic.get("domain_slug") or topic.get("slug")
for workstream in topic.get("workstreams", []):
if workstream.get("status") not in _OPEN_WORKSTREAM_STATUSES:
continue
workstream_id = workstream.get("id")
detail = _fetch_json(f"/workstreams/{workstream_id}") if workstream_id else {}
tasks = _fetch_json("/tasks/", {"workstream_id": workstream_id, "limit": 200})
if not isinstance(detail, dict):
detail = {}
if not isinstance(tasks, list):
tasks = []
counts = _task_counts(tasks)
indexed = index.get(workstream_id, {}) if isinstance(index, dict) else {}
candidates.append({
"id": workstream_id,
"slug": workstream.get("slug"),
"title": _short_text(workstream.get("title", ""), 120),
"domain": domain,
"repo_slug": indexed.get("repo_slug"),
"status": workstream.get("status"),
"owner": workstream.get("owner"),
"planning_priority": detail.get("planning_priority"),
"planning_order": detail.get("planning_order"),
"file": indexed.get("relative_path"),
"needs_review": bool(indexed.get("needs_review", False)),
"health_labels": indexed.get("health_labels", []),
"open_task_counts": counts,
"representative_next_tasks": _representative_tasks(tasks, 3),
})
candidates.sort(key=_candidate_sort_key)
return candidates[:max_workstreams]
def _task_counts(tasks: list[dict[str, Any]]) -> dict[str, int]:
counts = {"todo": 0, "in_progress": 0, "blocked": 0, "needs_human": 0}
for task in tasks:
status = task.get("status")
if status in counts:
counts[status] += 1
if task.get("needs_human"):
counts["needs_human"] += 1
counts["open_total"] = counts["todo"] + counts["in_progress"] + counts["blocked"]
return counts
def _representative_tasks(tasks: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]:
open_tasks = [task for task in tasks if task.get("status") in _OPEN_TASK_STATUSES]
open_tasks.sort(key=lambda task: (_priority_rank(task.get("priority")), task.get("created_at", "")))
return [
{
"id": task.get("id"),
"title": _short_text(task.get("title", ""), 100),
"status": task.get("status"),
"priority": task.get("priority"),
"needs_human": bool(task.get("needs_human", False)),
}
for task in open_tasks[:limit]
]
def _safe_next_step(item: dict[str, Any]) -> dict[str, Any]:
return {
"type": item.get("type"),
"domain": item.get("domain"),
"workstream_id": item.get("workstream_id"),
"workstream_slug": item.get("workstream_slug"),
"workstream_title": _short_text(item.get("workstream_title", ""), 120),
"task_id": item.get("task_id"),
"task_title": _short_text(item.get("task_title", ""), 120),
}
def _safe_inbox_item(item: dict[str, Any]) -> dict[str, Any]:
return {
"id": item.get("id"),
"from_agent": item.get("from_agent"),
"subject": _short_text(item.get("subject") or item.get("summary") or "", 120),
"created_at": item.get("created_at"),
}
def _candidate_sort_key(candidate: dict[str, Any]) -> tuple[int, int, int, int]:
counts = candidate.get("open_task_counts", {})
return (
_priority_rank(candidate.get("planning_priority")),
0 if candidate.get("status") == "active" else 1,
-int(counts.get("needs_human", 0)),
-int(counts.get("blocked", 0)),
)
def _priority_rank(priority: Any) -> int:
return {"high": 0, "medium": 1, "low": 2}.get(str(priority or "").lower(), 3)
def _short_text(value: Any, limit: int) -> str:
text = " ".join(str(value or "").split())
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + ""