diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index de20c82..9c6957f 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -27,6 +27,7 @@ from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog from activity_core.rules import evaluate_condition from activity_core.llm_client import get_llm_client from activity_core.models import InstructionDef +from activity_core.report_sinks import persist_reports from activity_core.rules.executor import execute_instruction_with_audit @@ -313,6 +314,7 @@ async def evaluate_instructions(payload: dict) -> dict: reports.append({ "instruction_id": instruction.id, "report": result.report, + "sinks": instruction.report_sinks, "condition": result.condition_matched, "prompt_hash": result.prompt_hash, "model": result.model, @@ -339,6 +341,12 @@ async def evaluate_instructions(payload: dict) -> dict: return {"task_specs": task_specs, "reports": reports} +@activity.defn +async def persist_instruction_reports(payload: dict) -> list[dict]: + """Persist report payloads to deterministic configured sinks.""" + return persist_reports(payload) + + @activity.defn async def emit_tasks(payload: dict) -> list[str]: """Emit TaskSpecs to IssueSink and write task_spawn_log rows. diff --git a/src/activity_core/models.py b/src/activity_core/models.py index dc29e9f..ced06ca 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -112,6 +112,7 @@ class InstructionDef(BaseModel): prompt: str = Field(description="Prompt template with {field.path} placeholders.") output_schema: str = Field(description="Path to JSON Schema file for output validation.") review_required: bool = Field(default=False) + report_sinks: list[dict[str, Any]] = Field(default_factory=list) # ── Context sources ─────────────────────────────────────────────────────────── diff --git a/src/activity_core/report_sinks.py b/src/activity_core/report_sinks.py new file mode 100644 index 0000000..326f42c --- /dev/null +++ b/src/activity_core/report_sinks.py @@ -0,0 +1,225 @@ +"""Deterministic sinks for instruction report payloads.""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from zoneinfo import ZoneInfo + +import httpx + +_DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" +_THE_CUSTODIAN_ROOT = Path("/home/worsch/the-custodian") +_FORBIDDEN_CUSTODIAN_ROOTS = ( + _THE_CUSTODIAN_ROOT / "canon", + _THE_CUSTODIAN_ROOT / "workplans", +) + + +def persist_reports(payload: dict[str, Any]) -> list[dict[str, Any]]: + """Persist instruction report payloads to configured sinks. + + Raises RuntimeError if any configured sink fails. Successful sinks are + idempotent by run_id/date, so Temporal retries can safely replay this + activity after a partial failure. + """ + results: list[dict[str, Any]] = [] + for report_entry in payload.get("reports", []): + for sink in report_entry.get("sinks", []): + sink_type = sink.get("type") + try: + if sink_type == "working-memory": + results.append(_write_working_memory(payload, report_entry, sink)) + elif sink_type == "state-hub-progress": + results.append(_post_state_hub_progress(payload, report_entry, sink)) + else: + results.append({ + "type": sink_type or "unknown", + "status": "skipped", + "reason": "unknown sink type", + }) + except Exception as exc: + results.append({ + "type": sink_type or "unknown", + "status": "error", + "error": str(exc), + }) + + errors = [result for result in results if result.get("status") == "error"] + if errors: + raise RuntimeError(f"report sink failure: {errors!r}") + return results + + +def _write_working_memory( + payload: dict[str, Any], + report_entry: dict[str, Any], + sink: dict[str, Any], +) -> dict[str, Any]: + directory = Path(sink.get("path", "")).expanduser() + if not directory: + raise ValueError("working-memory sink requires path") + + run_id = payload["run_id"] + local_date = _local_date(payload.get("scheduled_for"), sink.get("timezone", "UTC")) + instruction_id = report_entry.get("instruction_id", "instruction") + filename_template = sink.get( + "filename_template", + "daily-triage-{date}-{run_id_short}.md", + ) + filename = filename_template.format( + date=local_date, + run_id=run_id, + run_id_short=run_id[:8], + instruction_id=instruction_id, + ) + target = (directory / filename).resolve() + _assert_allowed_output_path(target) + + if target.exists(): + text = target.read_text(encoding="utf-8") + if f"activity_core_run_id: {run_id}" in text: + return { + "type": "working-memory", + "status": "exists", + "path": str(target), + } + raise FileExistsError(f"refusing to overwrite existing report note: {target}") + + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(_render_markdown(payload, report_entry, local_date), encoding="utf-8") + return { + "type": "working-memory", + "status": "written", + "path": str(target), + } + + +def _post_state_hub_progress( + payload: dict[str, Any], + report_entry: dict[str, Any], + sink: dict[str, Any], +) -> dict[str, Any]: + base_url = sink.get("state_hub_url") or os.environ.get("STATE_HUB_URL", _DEFAULT_STATE_HUB_URL) + base_url = str(base_url).rstrip("/") + run_id = payload["run_id"] + instruction_id = report_entry.get("instruction_id", "") + event_type = sink.get("event_type", "daily_triage") + + if _progress_exists(base_url, run_id, instruction_id, event_type): + return { + "type": "state-hub-progress", + "status": "exists", + "event_type": event_type, + } + + report = report_entry.get("report") or {} + body: dict[str, Any] = { + "event_type": event_type, + "author": sink.get("author", "activity-core"), + "summary": report.get("summary", f"Activity report from {instruction_id}"), + "detail": { + "activity_id": payload.get("activity_id"), + "activity_core_run_id": run_id, + "instruction_id": instruction_id, + "scheduled_for": payload.get("scheduled_for"), + "report": report, + }, + } + for key in ("topic_id", "workstream_id", "task_id", "decision_id"): + if sink.get(key): + body[key] = sink[key] + + resp = httpx.post( + f"{base_url}/progress/", + json=body, + timeout=float(sink.get("timeout_seconds", 10.0)), + ) + resp.raise_for_status() + data = resp.json() + return { + "type": "state-hub-progress", + "status": "posted", + "event_type": event_type, + "progress_id": data.get("id"), + } + + +def _progress_exists( + base_url: str, + run_id: str, + instruction_id: str, + event_type: str, +) -> bool: + resp = httpx.get( + f"{base_url}/progress/", + params={"limit": 100}, + timeout=10.0, + ) + resp.raise_for_status() + for item in resp.json(): + detail = item.get("detail") or {} + if ( + item.get("event_type") == event_type + and detail.get("activity_core_run_id") == run_id + and detail.get("instruction_id") == instruction_id + ): + return True + return False + + +def _render_markdown( + payload: dict[str, Any], + report_entry: dict[str, Any], + local_date: str, +) -> str: + report = report_entry.get("report") or {} + instruction_id = report_entry.get("instruction_id", "instruction") + summary = report.get("summary", "") + lines = [ + "---", + "type: working-memory", + "source: activity-core", + f"activity_id: {payload.get('activity_id')}", + f"activity_core_run_id: {payload.get('run_id')}", + f"instruction_id: {instruction_id}", + f"scheduled_for: {payload.get('scheduled_for')}", + f"created: {datetime.now(tz=timezone.utc).isoformat()}", + "---", + "", + f"# Daily State Hub WSJF Triage - {local_date}", + "", + ] + if summary: + lines.extend([summary, ""]) + lines.extend([ + "```json", + json.dumps(report, indent=2, sort_keys=True), + "```", + "", + ]) + return "\n".join(lines) + + +def _local_date(scheduled_for: str | None, timezone_name: str) -> str: + tz = ZoneInfo(timezone_name) + if scheduled_for: + raw = scheduled_for.replace("Z", "+00:00") + dt = datetime.fromisoformat(raw) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = datetime.now(tz=timezone.utc) + return dt.astimezone(tz).date().isoformat() + + +def _assert_allowed_output_path(path: Path) -> None: + for forbidden in _FORBIDDEN_CUSTODIAN_ROOTS: + try: + path.relative_to(forbidden) + except ValueError: + continue + raise ValueError(f"refusing to write report into canonical path: {path}") diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index e56efe6..c2f1ee4 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -24,6 +24,7 @@ with workflow.unsafe.imports_passed_through(): evaluate_instructions, load_activity_definition, log_run, + persist_instruction_reports, persist_task_instance, resolve_context, ) @@ -137,6 +138,7 @@ class RunActivityWorkflow: "condition": rule.get("condition", ""), }) + report_dicts: list[dict] = [] if defn.get("instructions"): instruction_result: dict = await workflow.execute_activity( evaluate_instructions, @@ -149,14 +151,29 @@ class RunActivityWorkflow: retry_policy=_RETRY_POLICY, ) task_spec_dicts.extend(instruction_result.get("task_specs", [])) + report_dicts.extend(instruction_result.get("reports", [])) - # ── 4. Emit tasks via IssueSink ─────────────────────────────────────── if trigger_key == SCHEDULED_TRIGGER_KEY: dedup_source = workflow.info().workflow_id else: dedup_source = f"{activity_id}:{trigger_key}" run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) + # ── 4. Persist reports and emit tasks ──────────────────────────────── + if report_dicts: + await workflow.execute_activity( + persist_instruction_reports, + { + "reports": report_dicts, + "activity_id": activity_id, + "run_id": run_id, + "scheduled_for": scheduled_for, + "version_used": defn["version"], + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + if task_spec_dicts: await workflow.execute_activity( emit_tasks, diff --git a/tests/test_report_sinks.py b/tests/test_report_sinks.py new file mode 100644 index 0000000..7d5831b --- /dev/null +++ b/tests/test_report_sinks.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from typing import Any + +import httpx +import pytest + +from activity_core.report_sinks import persist_reports + + +class DummyResponse: + def __init__(self, payload: Any) -> None: + self.payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> Any: + return self.payload + + +def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]: + return { + "activity_id": "activity-1", + "run_id": "12345678-aaaa-bbbb-cccc-123456789abc", + "scheduled_for": "2026-05-19T05:20:00+00:00", + "reports": [ + { + "instruction_id": "daily-triage-report", + "report": { + "summary": "State Hub has loose ends.", + "recommendations": [{"candidate": "CUST-WP-0045"}], + }, + "sinks": sinks, + } + ], + } + + +def test_working_memory_sink_writes_idempotently(tmp_path) -> None: + payload = _payload([ + { + "type": "working-memory", + "path": str(tmp_path), + "timezone": "Europe/Berlin", + } + ]) + + first = persist_reports(payload) + second = persist_reports(payload) + + assert first[0]["status"] == "written" + assert second[0]["status"] == "exists" + note = tmp_path / "daily-triage-2026-05-19-12345678.md" + text = note.read_text(encoding="utf-8") + assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text + assert "State Hub has loose ends." in text + + +def test_working_memory_sink_refuses_canonical_custodian_path() -> None: + payload = _payload([ + { + "type": "working-memory", + "path": "/home/worsch/the-custodian/workplans", + } + ]) + + with pytest.raises(RuntimeError, match="refusing to write report"): + persist_reports(payload) + + +def test_state_hub_progress_sink_posts(monkeypatch) -> None: + posts: list[dict[str, Any]] = [] + + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + assert url == "http://state-hub.test/progress/" + return DummyResponse([]) + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + posts.append({"url": url, **kwargs}) + return DummyResponse({"id": "progress-1"}) + + monkeypatch.setattr(httpx, "get", fake_get) + monkeypatch.setattr(httpx, "post", fake_post) + + result = persist_reports(_payload([ + { + "type": "state-hub-progress", + "state_hub_url": "http://state-hub.test", + "event_type": "daily_triage", + "workstream_id": "workstream-1", + } + ])) + + assert result == [ + { + "type": "state-hub-progress", + "status": "posted", + "event_type": "daily_triage", + "progress_id": "progress-1", + } + ] + assert posts[0]["url"] == "http://state-hub.test/progress/" + assert posts[0]["json"]["workstream_id"] == "workstream-1" + assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id() + + +def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: + def fake_get(url: str, **kwargs: Any) -> DummyResponse: + return DummyResponse([ + { + "event_type": "daily_triage", + "detail": { + "activity_core_run_id": payload_run_id(), + "instruction_id": "daily-triage-report", + }, + } + ]) + + def fake_post(url: str, **kwargs: Any) -> DummyResponse: + raise AssertionError("post should not be called") + + monkeypatch.setattr(httpx, "get", fake_get) + monkeypatch.setattr(httpx, "post", fake_post) + + result = persist_reports(_payload([ + { + "type": "state-hub-progress", + "state_hub_url": "http://state-hub.test", + "event_type": "daily_triage", + } + ])) + + assert result[0]["status"] == "exists" + + +def payload_run_id() -> str: + return "12345678-aaaa-bbbb-cccc-123456789abc"