From 6cb0718e905f6669a979cb3ebf49912ea6eee135 Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 19 May 2026 19:09:21 +0200 Subject: [PATCH] Add curated daily triage digest --- .../context_resolvers/state_hub.py | 179 ++++++++++++++++++ tests/test_state_hub_context_resolver.py | 115 +++++++++++ 2 files changed, 294 insertions(+) diff --git a/src/activity_core/context_resolvers/state_hub.py b/src/activity_core/context_resolvers/state_hub.py index 9372d0e..31da486 100644 --- a/src/activity_core/context_resolvers/state_hub.py +++ b/src/activity_core/context_resolvers/state_hub.py @@ -8,6 +8,7 @@ Supported queries: - next_steps: GET {STATE_HUB_URL}/state/next_steps - workplan_index: GET {STATE_HUB_URL}/workstreams/workplan-index - hub_inbox: GET {STATE_HUB_URL}/messages/?to_agent=hub&unread_only=true + - daily_triage_digest: curated scalar JSON digest for daily WSJF triage No caching — state hub data is live operational state and must not be stale within a single workflow run. @@ -16,6 +17,7 @@ Config: STATE_HUB_URL env var (default: http://127.0.0.1:8000). from __future__ import annotations +import json import os from typing import Any @@ -25,6 +27,8 @@ from activity_core.context_resolvers.base import CONTEXT_RESOLVER_REGISTRY, Cont _DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" _TIMEOUT_SECONDS = 10.0 +_OPEN_WORKSTREAM_STATUSES = {"active", "ready", "blocked"} +_OPEN_TASK_STATUSES = {"todo", "in_progress", "blocked"} def _base_url() -> str: @@ -64,7 +68,182 @@ class StateHubContextResolver(ContextResolver): "unread_only": params.get("unread_only", True), } return _fetch_json("/messages/", query_params) + if query == "daily_triage_digest": + return _daily_triage_digest(params) return {} CONTEXT_RESOLVER_REGISTRY["state-hub"] = StateHubContextResolver + + +def _daily_triage_digest(params: dict[str, Any]) -> str: + """Return a compact JSON string safe to inject into an instruction prompt. + + This is intentionally a scalar string rather than raw State Hub objects. + It limits fields to operational identifiers, counts, status, priority, and + short titles. That keeps the ActivityDefinition's trusted field small while + leaving an explicit `deterministic_scoring` extension point for future + code-driven WSJF selection of especially critical/high-gain candidates. + """ + summary = _fetch_json("/state/summary") + if not isinstance(summary, dict): + return "{}" + + workplan_index = _fetch_json( + "/workstreams/workplan-index", + {"refresh": params.get("refresh", False)}, + ) + if not isinstance(workplan_index, dict): + workplan_index = {} + + next_steps = _fetch_json("/state/next_steps") + if not isinstance(next_steps, list): + next_steps = [] + + inbox = _fetch_json( + "/messages/", + { + "to_agent": params.get("to_agent", "hub"), + "unread_only": params.get("unread_only", True), + }, + ) + if not isinstance(inbox, list): + inbox = [] + + max_workstreams = int(params.get("max_workstreams", 12)) + max_next_steps = int(params.get("max_next_steps", 8)) + open_workstreams = _open_workstream_digest(summary, workplan_index, max_workstreams) + digest = { + "generated_at": summary.get("generated_at"), + "totals": summary.get("totals", {}), + "open_workstreams": open_workstreams, + "next_steps": [_safe_next_step(item) for item in next_steps[:max_next_steps]], + "inbox": { + "unread_count": len(inbox), + "samples": [_safe_inbox_item(item) for item in inbox[:3]], + }, + "deterministic_scoring": { + "mode": "candidate_digest_only", + "future_mode": "code_score_high_gain_high_effort_candidates", + "candidate_fields": [ + "planning_priority", + "status", + "open_task_counts", + "needs_human_count", + "blocked_task_count", + "workplan_health_labels", + ], + }, + } + return json.dumps(digest, sort_keys=True, separators=(",", ":")) + + +def _open_workstream_digest( + summary: dict[str, Any], + workplan_index: dict[str, Any], + max_workstreams: int, +) -> list[dict[str, Any]]: + index = workplan_index.get("workstreams") or {} + candidates: list[dict[str, Any]] = [] + for topic in summary.get("topics", []): + domain = topic.get("domain_slug") or topic.get("slug") + for workstream in topic.get("workstreams", []): + if workstream.get("status") not in _OPEN_WORKSTREAM_STATUSES: + continue + workstream_id = workstream.get("id") + detail = _fetch_json(f"/workstreams/{workstream_id}") if workstream_id else {} + tasks = _fetch_json("/tasks/", {"workstream_id": workstream_id, "limit": 200}) + if not isinstance(detail, dict): + detail = {} + if not isinstance(tasks, list): + tasks = [] + counts = _task_counts(tasks) + indexed = index.get(workstream_id, {}) if isinstance(index, dict) else {} + candidates.append({ + "id": workstream_id, + "slug": workstream.get("slug"), + "title": _short_text(workstream.get("title", ""), 120), + "domain": domain, + "repo_slug": indexed.get("repo_slug"), + "status": workstream.get("status"), + "owner": workstream.get("owner"), + "planning_priority": detail.get("planning_priority"), + "planning_order": detail.get("planning_order"), + "file": indexed.get("relative_path"), + "needs_review": bool(indexed.get("needs_review", False)), + "health_labels": indexed.get("health_labels", []), + "open_task_counts": counts, + "representative_next_tasks": _representative_tasks(tasks, 3), + }) + + candidates.sort(key=_candidate_sort_key) + return candidates[:max_workstreams] + + +def _task_counts(tasks: list[dict[str, Any]]) -> dict[str, int]: + counts = {"todo": 0, "in_progress": 0, "blocked": 0, "needs_human": 0} + for task in tasks: + status = task.get("status") + if status in counts: + counts[status] += 1 + if task.get("needs_human"): + counts["needs_human"] += 1 + counts["open_total"] = counts["todo"] + counts["in_progress"] + counts["blocked"] + return counts + + +def _representative_tasks(tasks: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]: + open_tasks = [task for task in tasks if task.get("status") in _OPEN_TASK_STATUSES] + open_tasks.sort(key=lambda task: (_priority_rank(task.get("priority")), task.get("created_at", ""))) + return [ + { + "id": task.get("id"), + "title": _short_text(task.get("title", ""), 100), + "status": task.get("status"), + "priority": task.get("priority"), + "needs_human": bool(task.get("needs_human", False)), + } + for task in open_tasks[:limit] + ] + + +def _safe_next_step(item: dict[str, Any]) -> dict[str, Any]: + return { + "type": item.get("type"), + "domain": item.get("domain"), + "workstream_id": item.get("workstream_id"), + "workstream_slug": item.get("workstream_slug"), + "workstream_title": _short_text(item.get("workstream_title", ""), 120), + "task_id": item.get("task_id"), + "task_title": _short_text(item.get("task_title", ""), 120), + } + + +def _safe_inbox_item(item: dict[str, Any]) -> dict[str, Any]: + return { + "id": item.get("id"), + "from_agent": item.get("from_agent"), + "subject": _short_text(item.get("subject") or item.get("summary") or "", 120), + "created_at": item.get("created_at"), + } + + +def _candidate_sort_key(candidate: dict[str, Any]) -> tuple[int, int, int, int]: + counts = candidate.get("open_task_counts", {}) + return ( + _priority_rank(candidate.get("planning_priority")), + 0 if candidate.get("status") == "active" else 1, + -int(counts.get("needs_human", 0)), + -int(counts.get("blocked", 0)), + ) + + +def _priority_rank(priority: Any) -> int: + return {"high": 0, "medium": 1, "low": 2}.get(str(priority or "").lower(), 3) + + +def _short_text(value: Any, limit: int) -> str: + text = " ".join(str(value or "").split()) + if len(text) <= limit: + return text + return text[: limit - 1].rstrip() + "…" diff --git a/tests/test_state_hub_context_resolver.py b/tests/test_state_hub_context_resolver.py index b5e2d13..4702e97 100644 --- a/tests/test_state_hub_context_resolver.py +++ b/tests/test_state_hub_context_resolver.py @@ -115,3 +115,118 @@ def test_resolver_failure_returns_empty(monkeypatch) -> None: def test_unknown_query_returns_empty() -> None: assert StateHubContextResolver().resolve("unknown", None, {}) == {} + + +def test_daily_triage_digest_is_curated_scalar_json(monkeypatch) -> None: + payloads = { + "/state/summary": { + "generated_at": "2026-05-19T05:20:00Z", + "totals": {"tasks": {"todo": 4, "blocked": 1}}, + "topics": [ + { + "slug": "custodian", + "domain_slug": "custodian", + "workstreams": [ + { + "id": "ws-1", + "slug": "cust-wp-0045", + "title": "Activity-Core Daily Triage Runner Cutover", + "status": "ready", + "owner": "custodian", + }, + { + "id": "ws-closed", + "slug": "closed", + "title": "Closed", + "status": "finished", + "owner": "custodian", + }, + ], + } + ], + }, + "/workstreams/workplan-index": { + "workstreams": { + "ws-1": { + "repo_slug": "the-custodian", + "relative_path": "workplans/CUST-WP-0045.md", + "needs_review": True, + "health_labels": ["needs_review"], + } + } + }, + "/state/next_steps": [ + { + "type": "resolved_decision", + "domain": "custodian", + "workstream_id": "ws-1", + "workstream_slug": "cust-wp-0045", + "workstream_title": "Activity-Core Daily Triage Runner Cutover", + "task_id": "task-1", + "task_title": "T05 - Update ActivityDefinition", + "message": "free text should not be included", + } + ], + "/messages/": [ + { + "id": "msg-1", + "from_agent": "hub", + "subject": "Please review", + "body": "free text should not be included", + "created_at": "2026-05-19T05:00:00Z", + } + ], + "/workstreams/ws-1": { + "planning_priority": "high", + "planning_order": 45, + }, + "/tasks/": [ + { + "id": "task-1", + "title": "T05 - Update ActivityDefinition", + "status": "todo", + "priority": "high", + "needs_human": False, + "description": "free text should not be included", + }, + { + "id": "task-2", + "title": "T06 - Canary Cutover", + "status": "blocked", + "priority": "medium", + "needs_human": True, + }, + ], + } + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + path = url.removeprefix("http://state-hub.test") + return DummyResponse(payloads[path]) + + monkeypatch.setenv("STATE_HUB_URL", "http://state-hub.test") + monkeypatch.setattr(httpx, "get", fake_get) + + raw_digest = StateHubContextResolver().resolve( + "daily_triage_digest", + None, + {"max_workstreams": 4, "max_next_steps": 4}, + ) + + assert isinstance(raw_digest, str) + assert "free text should not be included" not in raw_digest + + import json + digest = json.loads(raw_digest) + assert digest["totals"] == {"tasks": {"todo": 4, "blocked": 1}} + assert digest["open_workstreams"][0]["slug"] == "cust-wp-0045" + assert digest["open_workstreams"][0]["planning_priority"] == "high" + assert digest["open_workstreams"][0]["open_task_counts"] == { + "todo": 1, + "in_progress": 0, + "blocked": 1, + "needs_human": 1, + "open_total": 2, + } + assert digest["deterministic_scoring"]["future_mode"] == ( + "code_score_high_gain_high_effort_candidates" + )