Add instruction report sinks

This commit is contained in:
2026-05-19 18:36:58 +02:00
parent 0dc342eb1b
commit 3110399b11
5 changed files with 390 additions and 1 deletions

View File

@@ -27,6 +27,7 @@ from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog
from activity_core.rules import evaluate_condition from activity_core.rules import evaluate_condition
from activity_core.llm_client import get_llm_client from activity_core.llm_client import get_llm_client
from activity_core.models import InstructionDef from activity_core.models import InstructionDef
from activity_core.report_sinks import persist_reports
from activity_core.rules.executor import execute_instruction_with_audit from activity_core.rules.executor import execute_instruction_with_audit
@@ -313,6 +314,7 @@ async def evaluate_instructions(payload: dict) -> dict:
reports.append({ reports.append({
"instruction_id": instruction.id, "instruction_id": instruction.id,
"report": result.report, "report": result.report,
"sinks": instruction.report_sinks,
"condition": result.condition_matched, "condition": result.condition_matched,
"prompt_hash": result.prompt_hash, "prompt_hash": result.prompt_hash,
"model": result.model, "model": result.model,
@@ -339,6 +341,12 @@ async def evaluate_instructions(payload: dict) -> dict:
return {"task_specs": task_specs, "reports": reports} return {"task_specs": task_specs, "reports": reports}
@activity.defn
async def persist_instruction_reports(payload: dict) -> list[dict]:
"""Persist report payloads to deterministic configured sinks."""
return persist_reports(payload)
@activity.defn @activity.defn
async def emit_tasks(payload: dict) -> list[str]: async def emit_tasks(payload: dict) -> list[str]:
"""Emit TaskSpecs to IssueSink and write task_spawn_log rows. """Emit TaskSpecs to IssueSink and write task_spawn_log rows.

View File

@@ -112,6 +112,7 @@ class InstructionDef(BaseModel):
prompt: str = Field(description="Prompt template with {field.path} placeholders.") prompt: str = Field(description="Prompt template with {field.path} placeholders.")
output_schema: str = Field(description="Path to JSON Schema file for output validation.") output_schema: str = Field(description="Path to JSON Schema file for output validation.")
review_required: bool = Field(default=False) review_required: bool = Field(default=False)
report_sinks: list[dict[str, Any]] = Field(default_factory=list)
# ── Context sources ─────────────────────────────────────────────────────────── # ── Context sources ───────────────────────────────────────────────────────────

View File

@@ -0,0 +1,225 @@
"""Deterministic sinks for instruction report payloads."""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import httpx
_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
_THE_CUSTODIAN_ROOT = Path("/home/worsch/the-custodian")
_FORBIDDEN_CUSTODIAN_ROOTS = (
_THE_CUSTODIAN_ROOT / "canon",
_THE_CUSTODIAN_ROOT / "workplans",
)
def persist_reports(payload: dict[str, Any]) -> list[dict[str, Any]]:
"""Persist instruction report payloads to configured sinks.
Raises RuntimeError if any configured sink fails. Successful sinks are
idempotent by run_id/date, so Temporal retries can safely replay this
activity after a partial failure.
"""
results: list[dict[str, Any]] = []
for report_entry in payload.get("reports", []):
for sink in report_entry.get("sinks", []):
sink_type = sink.get("type")
try:
if sink_type == "working-memory":
results.append(_write_working_memory(payload, report_entry, sink))
elif sink_type == "state-hub-progress":
results.append(_post_state_hub_progress(payload, report_entry, sink))
else:
results.append({
"type": sink_type or "unknown",
"status": "skipped",
"reason": "unknown sink type",
})
except Exception as exc:
results.append({
"type": sink_type or "unknown",
"status": "error",
"error": str(exc),
})
errors = [result for result in results if result.get("status") == "error"]
if errors:
raise RuntimeError(f"report sink failure: {errors!r}")
return results
def _write_working_memory(
payload: dict[str, Any],
report_entry: dict[str, Any],
sink: dict[str, Any],
) -> dict[str, Any]:
directory = Path(sink.get("path", "")).expanduser()
if not directory:
raise ValueError("working-memory sink requires path")
run_id = payload["run_id"]
local_date = _local_date(payload.get("scheduled_for"), sink.get("timezone", "UTC"))
instruction_id = report_entry.get("instruction_id", "instruction")
filename_template = sink.get(
"filename_template",
"daily-triage-{date}-{run_id_short}.md",
)
filename = filename_template.format(
date=local_date,
run_id=run_id,
run_id_short=run_id[:8],
instruction_id=instruction_id,
)
target = (directory / filename).resolve()
_assert_allowed_output_path(target)
if target.exists():
text = target.read_text(encoding="utf-8")
if f"activity_core_run_id: {run_id}" in text:
return {
"type": "working-memory",
"status": "exists",
"path": str(target),
}
raise FileExistsError(f"refusing to overwrite existing report note: {target}")
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(_render_markdown(payload, report_entry, local_date), encoding="utf-8")
return {
"type": "working-memory",
"status": "written",
"path": str(target),
}
def _post_state_hub_progress(
payload: dict[str, Any],
report_entry: dict[str, Any],
sink: dict[str, Any],
) -> dict[str, Any]:
base_url = sink.get("state_hub_url") or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL)
base_url = str(base_url).rstrip("/")
run_id = payload["run_id"]
instruction_id = report_entry.get("instruction_id", "")
event_type = sink.get("event_type", "daily_triage")
if _progress_exists(base_url, run_id, instruction_id, event_type):
return {
"type": "state-hub-progress",
"status": "exists",
"event_type": event_type,
}
report = report_entry.get("report") or {}
body: dict[str, Any] = {
"event_type": event_type,
"author": sink.get("author", "activity-core"),
"summary": report.get("summary", f"Activity report from {instruction_id}"),
"detail": {
"activity_id": payload.get("activity_id"),
"activity_core_run_id": run_id,
"instruction_id": instruction_id,
"scheduled_for": payload.get("scheduled_for"),
"report": report,
},
}
for key in ("topic_id", "workstream_id", "task_id", "decision_id"):
if sink.get(key):
body[key] = sink[key]
resp = httpx.post(
f"{base_url}/progress/",
json=body,
timeout=float(sink.get("timeout_seconds", 10.0)),
)
resp.raise_for_status()
data = resp.json()
return {
"type": "state-hub-progress",
"status": "posted",
"event_type": event_type,
"progress_id": data.get("id"),
}
def _progress_exists(
base_url: str,
run_id: str,
instruction_id: str,
event_type: str,
) -> bool:
resp = httpx.get(
f"{base_url}/progress/",
params={"limit": 100},
timeout=10.0,
)
resp.raise_for_status()
for item in resp.json():
detail = item.get("detail") or {}
if (
item.get("event_type") == event_type
and detail.get("activity_core_run_id") == run_id
and detail.get("instruction_id") == instruction_id
):
return True
return False
def _render_markdown(
payload: dict[str, Any],
report_entry: dict[str, Any],
local_date: str,
) -> str:
report = report_entry.get("report") or {}
instruction_id = report_entry.get("instruction_id", "instruction")
summary = report.get("summary", "")
lines = [
"---",
"type: working-memory",
"source: activity-core",
f"activity_id: {payload.get('activity_id')}",
f"activity_core_run_id: {payload.get('run_id')}",
f"instruction_id: {instruction_id}",
f"scheduled_for: {payload.get('scheduled_for')}",
f"created: {datetime.now(tz=timezone.utc).isoformat()}",
"---",
"",
f"# Daily State Hub WSJF Triage - {local_date}",
"",
]
if summary:
lines.extend([summary, ""])
lines.extend([
"```json",
json.dumps(report, indent=2, sort_keys=True),
"```",
"",
])
return "\n".join(lines)
def _local_date(scheduled_for: str | None, timezone_name: str) -> str:
tz = ZoneInfo(timezone_name)
if scheduled_for:
raw = scheduled_for.replace("Z", "+00:00")
dt = datetime.fromisoformat(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = datetime.now(tz=timezone.utc)
return dt.astimezone(tz).date().isoformat()
def _assert_allowed_output_path(path: Path) -> None:
for forbidden in _FORBIDDEN_CUSTODIAN_ROOTS:
try:
path.relative_to(forbidden)
except ValueError:
continue
raise ValueError(f"refusing to write report into canonical path: {path}")

View File

@@ -24,6 +24,7 @@ with workflow.unsafe.imports_passed_through():
evaluate_instructions, evaluate_instructions,
load_activity_definition, load_activity_definition,
log_run, log_run,
persist_instruction_reports,
persist_task_instance, persist_task_instance,
resolve_context, resolve_context,
) )
@@ -137,6 +138,7 @@ class RunActivityWorkflow:
"condition": rule.get("condition", ""), "condition": rule.get("condition", ""),
}) })
report_dicts: list[dict] = []
if defn.get("instructions"): if defn.get("instructions"):
instruction_result: dict = await workflow.execute_activity( instruction_result: dict = await workflow.execute_activity(
evaluate_instructions, evaluate_instructions,
@@ -149,14 +151,29 @@ class RunActivityWorkflow:
retry_policy=_RETRY_POLICY, retry_policy=_RETRY_POLICY,
) )
task_spec_dicts.extend(instruction_result.get("task_specs", [])) task_spec_dicts.extend(instruction_result.get("task_specs", []))
report_dicts.extend(instruction_result.get("reports", []))
# ── 4. Emit tasks via IssueSink ───────────────────────────────────────
if trigger_key == SCHEDULED_TRIGGER_KEY: if trigger_key == SCHEDULED_TRIGGER_KEY:
dedup_source = workflow.info().workflow_id dedup_source = workflow.info().workflow_id
else: else:
dedup_source = f"{activity_id}:{trigger_key}" dedup_source = f"{activity_id}:{trigger_key}"
run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source))
# ── 4. Persist reports and emit tasks ────────────────────────────────
if report_dicts:
await workflow.execute_activity(
persist_instruction_reports,
{
"reports": report_dicts,
"activity_id": activity_id,
"run_id": run_id,
"scheduled_for": scheduled_for,
"version_used": defn["version"],
},
start_to_close_timeout=_ACTIVITY_TIMEOUT,
retry_policy=_RETRY_POLICY,
)
if task_spec_dicts: if task_spec_dicts:
await workflow.execute_activity( await workflow.execute_activity(
emit_tasks, emit_tasks,

138
tests/test_report_sinks.py Normal file
View File

@@ -0,0 +1,138 @@
from __future__ import annotations
from typing import Any
import httpx
import pytest
from activity_core.report_sinks import persist_reports
class DummyResponse:
def __init__(self, payload: Any) -> None:
self.payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> Any:
return self.payload
def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]:
return {
"activity_id": "activity-1",
"run_id": "12345678-aaaa-bbbb-cccc-123456789abc",
"scheduled_for": "2026-05-19T05:20:00+00:00",
"reports": [
{
"instruction_id": "daily-triage-report",
"report": {
"summary": "State Hub has loose ends.",
"recommendations": [{"candidate": "CUST-WP-0045"}],
},
"sinks": sinks,
}
],
}
def test_working_memory_sink_writes_idempotently(tmp_path) -> None:
payload = _payload([
{
"type": "working-memory",
"path": str(tmp_path),
"timezone": "Europe/Berlin",
}
])
first = persist_reports(payload)
second = persist_reports(payload)
assert first[0]["status"] == "written"
assert second[0]["status"] == "exists"
note = tmp_path / "daily-triage-2026-05-19-12345678.md"
text = note.read_text(encoding="utf-8")
assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text
assert "State Hub has loose ends." in text
def test_working_memory_sink_refuses_canonical_custodian_path() -> None:
payload = _payload([
{
"type": "working-memory",
"path": "/home/worsch/the-custodian/workplans",
}
])
with pytest.raises(RuntimeError, match="refusing to write report"):
persist_reports(payload)
def test_state_hub_progress_sink_posts(monkeypatch) -> None:
posts: list[dict[str, Any]] = []
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
assert url == "http://state-hub.test/progress/"
return DummyResponse([])
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
posts.append({"url": url, **kwargs})
return DummyResponse({"id": "progress-1"})
monkeypatch.setattr(httpx, "get", fake_get)
monkeypatch.setattr(httpx, "post", fake_post)
result = persist_reports(_payload([
{
"type": "state-hub-progress",
"state_hub_url": "http://state-hub.test",
"event_type": "daily_triage",
"workstream_id": "workstream-1",
}
]))
assert result == [
{
"type": "state-hub-progress",
"status": "posted",
"event_type": "daily_triage",
"progress_id": "progress-1",
}
]
assert posts[0]["url"] == "http://state-hub.test/progress/"
assert posts[0]["json"]["workstream_id"] == "workstream-1"
assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id()
def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None:
def fake_get(url: str, **kwargs: Any) -> DummyResponse:
return DummyResponse([
{
"event_type": "daily_triage",
"detail": {
"activity_core_run_id": payload_run_id(),
"instruction_id": "daily-triage-report",
},
}
])
def fake_post(url: str, **kwargs: Any) -> DummyResponse:
raise AssertionError("post should not be called")
monkeypatch.setattr(httpx, "get", fake_get)
monkeypatch.setattr(httpx, "post", fake_post)
result = persist_reports(_payload([
{
"type": "state-hub-progress",
"state_hub_url": "http://state-hub.test",
"event_type": "daily_triage",
}
]))
assert result[0]["status"] == "exists"
def payload_run_id() -> str:
return "12345678-aaaa-bbbb-cccc-123456789abc"