From 42e373aba1980cf1cd8599b6b2e48928758a690a Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 5 Jun 2026 19:27:03 +0200 Subject: [PATCH] Harden WSJF triage report recovery --- docs/runbook.md | 8 ++- scripts/verify_daily_triage.py | 2 +- src/activity_core/activities.py | 1 + src/activity_core/report_sinks.py | 10 +++ src/activity_core/rules/executor.py | 66 ++++++++++++++++++- tests/rules/test_executor.py | 57 +++++++++++++++- tests/test_daily_triage_verifier.py | 7 ++ tests/test_instruction_evaluation.py | 57 ++++++++++++++++ tests/test_report_sinks.py | 10 +++ ...-0006-post-triage-operational-hardening.md | 13 +++- 10 files changed, 223 insertions(+), 8 deletions(-) diff --git a/docs/runbook.md b/docs/runbook.md index 384f87e..49b2894 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -169,7 +169,7 @@ TEMPORAL_HOST=localhost:7233 \ STATE_HUB_URL=http://127.0.0.1:8000 \ uv run python scripts/verify_daily_triage.py \ --activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \ - --working-memory-dir /home/worsch/the-custodian/working-memory \ + --working-memory-dir /home/worsch/the-custodian/memory/working \ --live ``` @@ -182,9 +182,9 @@ The verification is complete when all of these agree: - `activity_runs` has a row for the daily triage ActivityDefinition with today's `scheduled_for` or `fired_at` date. - State Hub `/progress/` contains a `daily_triage` event whose detail includes - the same `activity_core_run_id`. + the same `activity_core_run_id` and its `output_validated` flag. - The working-memory sink wrote `daily-triage-YYYY-MM-DD-.md` and its - frontmatter contains the same `activity_core_run_id`. + frontmatter contains the same `activity_core_run_id` and validation metadata. - The ActivityDefinition's instruction model, token budget, and sink timeouts fit under `ACTIVITY_TIMEOUT_SECONDS` (default 900 seconds). Temporal retries each activity up to 10 attempts, so a slow LLM or sink failure should show as @@ -280,6 +280,8 @@ Leave a State Hub progress note, but do not page, when: - A planned outage caused one skipped run and the schedule is healthy again. - A sink idempotency check reports `exists` for the expected run id. +- An instruction report has `output_validated=false` but still emitted a + validation-failure note preserving partial model output for review. - The report completed but calibration feedback says the recommendations were noisy, too long, or under-sensitive. diff --git a/scripts/verify_daily_triage.py b/scripts/verify_daily_triage.py index a76e5f0..df27743 100644 --- a/scripts/verify_daily_triage.py +++ b/scripts/verify_daily_triage.py @@ -24,7 +24,7 @@ DEFAULT_PROGRESS_EVENT_TYPE = "daily_triage" DEFAULT_TEMPORAL_HOST = "localhost:7233" DEFAULT_TEMPORAL_NAMESPACE = "default" DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" -DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/working-memory" +DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working" def parse_args(argv: list[str] | None = None) -> argparse.Namespace: diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index 3502061..a0ae1bb 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -328,6 +328,7 @@ async def evaluate_instructions(payload: dict) -> dict: "model": result.model, "output_validated": result.output_validated, "review_required": result.review_required, + "validation_error": result.validation_error, }) for spec in result.tasks: task_specs.append({ diff --git a/src/activity_core/report_sinks.py b/src/activity_core/report_sinks.py index 326f42c..f445fb7 100644 --- a/src/activity_core/report_sinks.py +++ b/src/activity_core/report_sinks.py @@ -126,6 +126,9 @@ def _post_state_hub_progress( "activity_core_run_id": run_id, "instruction_id": instruction_id, "scheduled_for": payload.get("scheduled_for"), + "output_validated": report_entry.get("output_validated"), + "review_required": report_entry.get("review_required"), + "validation_error": report_entry.get("validation_error"), "report": report, }, } @@ -179,6 +182,7 @@ def _render_markdown( report = report_entry.get("report") or {} instruction_id = report_entry.get("instruction_id", "instruction") summary = report.get("summary", "") + validation_error = report_entry.get("validation_error") lines = [ "---", "type: working-memory", @@ -187,6 +191,10 @@ def _render_markdown( f"activity_core_run_id: {payload.get('run_id')}", f"instruction_id: {instruction_id}", f"scheduled_for: {payload.get('scheduled_for')}", + f"output_validated: {str(bool(report_entry.get('output_validated'))).lower()}", + f"review_required: {str(bool(report_entry.get('review_required'))).lower()}", + f"model: {report_entry.get('model') or ''}", + f"prompt_hash: {report_entry.get('prompt_hash') or ''}", f"created: {datetime.now(tz=timezone.utc).isoformat()}", "---", "", @@ -195,6 +203,8 @@ def _render_markdown( ] if summary: lines.extend([summary, ""]) + if validation_error: + lines.extend(["Validation error:", "", f"`{validation_error}`", ""]) lines.extend([ "```json", json.dumps(report, indent=2, sort_keys=True), diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py index 159bde1..aa2c9ff 100644 --- a/src/activity_core/rules/executor.py +++ b/src/activity_core/rules/executor.py @@ -39,6 +39,7 @@ class InstructionResult: output_validated: bool = False review_required: bool = False condition_matched: str | None = None + validation_error: str | None = None def _resolve_path(obj: Any, path: str) -> Any: @@ -164,7 +165,19 @@ def _execute( "error=%s, raw_output_preview=%r", instr.id, prompt_hash, error, preview, ) - return _empty_result(instr, prompt_hash=prompt_hash) + failure_report = _invalid_output_report(instr, error, raw_output) + if failure_report is not None: + return InstructionResult( + tasks=[], + report=failure_report, + prompt_hash=prompt_hash, + model=instr.model, + output_validated=False, + review_required=True, + condition_matched=instr.condition or None, + validation_error=error, + ) + return _empty_result(instr, prompt_hash=prompt_hash, validation_error=error) return InstructionResult( tasks=task_specs, @@ -193,7 +206,11 @@ def _llm_run_config(instr: Any) -> dict[str, Any]: return config -def _empty_result(instr: Any, prompt_hash: str | None = None) -> InstructionResult: +def _empty_result( + instr: Any, + prompt_hash: str | None = None, + validation_error: str | None = None, +) -> InstructionResult: return InstructionResult( tasks=[], prompt_hash=prompt_hash, @@ -201,9 +218,54 @@ def _empty_result(instr: Any, prompt_hash: str | None = None) -> InstructionResu output_validated=False, review_required=bool(getattr(instr, "review_required", False)), condition_matched=getattr(instr, "condition", "") or None, + validation_error=validation_error, ) +def _invalid_output_report( + instr: Any, + validation_error: str, + raw_output: Any, +) -> dict[str, Any] | None: + """Build a durable diagnostic report for invalid report-sink output. + + Task-only instructions keep the legacy empty-result behavior. Instructions + with report sinks should leave operators a bounded artifact that preserves + the partial model output without marking it as schema-valid. + """ + if not getattr(instr, "report_sinks", None): + return None + + partial_output: Any + raw_preview: str | None = None + if isinstance(raw_output, str): + try: + partial_output = json.loads(raw_output) + except json.JSONDecodeError: + partial_output = None + raw_preview = raw_output[:4000] + else: + partial_output = raw_output + + report: dict[str, Any] = { + "summary": ( + f"Instruction {instr.id} produced output that failed validation; " + "partial output was preserved for operator review." + ), + "status": "validation_failed", + "validation_error": validation_error, + } + if isinstance(partial_output, dict): + if isinstance(partial_output.get("summary"), str): + report["partial_summary"] = partial_output["summary"] + report["partial_report"] = partial_output + elif isinstance(partial_output, list): + report["partial_report"] = partial_output + elif raw_preview is not None: + report["raw_output_preview"] = raw_preview + return report + + def _validate_output( raw_output: Any, instr: Any, diff --git a/tests/rules/test_executor.py b/tests/rules/test_executor.py index aa73b7d..28fc499 100644 --- a/tests/rules/test_executor.py +++ b/tests/rules/test_executor.py @@ -4,7 +4,8 @@ Covers: - UntrustedFieldError raised when prompt references untrusted field - Object-type attribute rejected even when listed in trusted_fields - Injection fixture: untrusted field raises UntrustedFieldError before rendering -- Schema validation: NullLLM returning invalid JSON → retry → second invalid → [] +- Schema validation: invalid JSON retries once; report-sink instructions preserve + a validation-failure artifact after the second invalid output. - review_required flag: present on InstructionDef model """ @@ -98,6 +99,7 @@ def _instr( max_tokens: int | None = None, max_depth: int | None = None, model_params: dict[str, Any] | None = None, + report_sinks: list[dict[str, Any]] | None = None, ) -> SimpleNamespace: return SimpleNamespace( id=id, @@ -111,6 +113,7 @@ def _instr( model_params=model_params or {}, output_schema=output_schema, review_required=review_required, + report_sinks=report_sinks or [], ) @@ -353,6 +356,58 @@ def test_execute_instruction_with_audit_rejects_invalid_report_schema(): assert llm.call_count == 2 +def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks( + tmp_path, + monkeypatch, +): + schema_dir = tmp_path / "schemas" + schema_dir.mkdir() + schema_path = schema_dir / "daily-triage-report.json" + schema_path.write_text( + json.dumps({ + "type": "object", + "required": ["summary", "recommendations"], + "properties": { + "summary": {"type": "string"}, + "recommendations": { + "type": "array", + "items": { + "type": "object", + "required": ["action"], + }, + }, + }, + }), + encoding="utf-8", + ) + monkeypatch.chdir(tmp_path) + + report_data = { + "summary": "Generated partial triage.", + "recommendations": [{"rank": 1, "candidate": "CUST-WP-0045"}], + } + llm = _CountingLLM([json.dumps(report_data), json.dumps(report_data)]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + report_sinks=[{"type": "working-memory", "path": "/tmp"}], + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.tasks == [] + assert result.output_validated is False + assert result.review_required is True + assert result.validation_error == "$.recommendations[0]: missing required property 'action'" + assert result.report is not None + assert result.report["status"] == "validation_failed" + assert result.report["partial_summary"] == "Generated partial triage." + assert result.report["partial_report"] == report_data + assert llm.call_count == 2 + + def test_execute_instruction_with_audit_accepts_report_and_tasks_envelope(): envelope = { "report": {"summary": "Review needed."}, diff --git a/tests/test_daily_triage_verifier.py b/tests/test_daily_triage_verifier.py index 2fb7b42..878d922 100644 --- a/tests/test_daily_triage_verifier.py +++ b/tests/test_daily_triage_verifier.py @@ -54,3 +54,10 @@ def test_daily_triage_verifier_dry_run_names_all_operator_checks() -> None: assert "where id = '00000000-0000-0000-0000-000000000123'" in timeout_check["sql"] assert timeout_check["activity_timeout_seconds"] == 900 assert timeout_check["retry_attempts"] == 10 + + +def test_daily_triage_verifier_default_working_memory_dir() -> None: + script = _load_script() + args = script.parse_args([]) + + assert args.working_memory_dir == "/home/worsch/the-custodian/memory/working" diff --git a/tests/test_instruction_evaluation.py b/tests/test_instruction_evaluation.py index a32cf52..4dee59c 100644 --- a/tests/test_instruction_evaluation.py +++ b/tests/test_instruction_evaluation.py @@ -98,6 +98,63 @@ async def test_evaluate_instructions_returns_report_payload(monkeypatch) -> None assert report["prompt_hash"] is not None +@pytest.mark.asyncio +async def test_evaluate_instructions_returns_invalid_report_for_report_sinks( + monkeypatch, + tmp_path, +) -> None: + schema_dir = tmp_path / "schemas" + schema_dir.mkdir() + (schema_dir / "daily-triage-report.json").write_text( + json.dumps({ + "type": "object", + "required": ["summary", "recommendations"], + "properties": { + "summary": {"type": "string"}, + "recommendations": { + "type": "array", + "items": { + "type": "object", + "required": ["wsjf"], + }, + }, + }, + }), + encoding="utf-8", + ) + monkeypatch.chdir(tmp_path) + llm = FakeLLMClient(json.dumps({ + "summary": "Partial triage.", + "recommendations": [{"rank": 1, "candidate": "CUST-WP-0045"}], + })) + monkeypatch.setattr(activities, "get_llm_client", lambda: llm) + + result = await activities.evaluate_instructions({ + "instructions": [ + { + "id": "daily-triage-report", + "trusted_fields": [], + "model": "test-model", + "prompt": "Run report.", + "output_schema": "schemas/daily-triage-report.json", + "review_required": False, + "report_sinks": [{"type": "working-memory", "path": "/tmp"}], + } + ], + "event": {}, + "context": {}, + }) + + assert result["task_specs"] == [] + assert len(result["reports"]) == 1 + report = result["reports"][0] + assert report["output_validated"] is False + assert report["review_required"] is True + assert report["validation_error"] == "$.recommendations[0]: missing required property 'wsjf'" + assert report["report"]["status"] == "validation_failed" + assert report["report"]["partial_summary"] == "Partial triage." + + @pytest.mark.asyncio async def test_evaluate_instructions_without_llm_client_returns_no_tasks(monkeypatch) -> None: class RaisingClient: diff --git a/tests/test_report_sinks.py b/tests/test_report_sinks.py index 7d5831b..9ed446c 100644 --- a/tests/test_report_sinks.py +++ b/tests/test_report_sinks.py @@ -32,6 +32,11 @@ def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]: "recommendations": [{"candidate": "CUST-WP-0045"}], }, "sinks": sinks, + "prompt_hash": "abc123", + "model": "test-model", + "output_validated": True, + "review_required": False, + "validation_error": None, } ], } @@ -54,6 +59,9 @@ def test_working_memory_sink_writes_idempotently(tmp_path) -> None: note = tmp_path / "daily-triage-2026-05-19-12345678.md" text = note.read_text(encoding="utf-8") assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text + assert "output_validated: true" in text + assert "review_required: false" in text + assert "model: test-model" in text assert "State Hub has loose ends." in text @@ -103,6 +111,8 @@ def test_state_hub_progress_sink_posts(monkeypatch) -> None: assert posts[0]["url"] == "http://state-hub.test/progress/" assert posts[0]["json"]["workstream_id"] == "workstream-1" assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id() + assert posts[0]["json"]["detail"]["output_validated"] is True + assert posts[0]["json"]["detail"]["review_required"] is False def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: diff --git a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md index d89c0b6..18a08ba 100644 --- a/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md +++ b/workplans/ACTIVITY-WP-0006-post-triage-operational-hardening.md @@ -8,7 +8,7 @@ status: active owner: codex topic_slug: custodian created: "2026-06-03" -updated: "2026-06-04" +updated: "2026-06-05" state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733" --- @@ -84,6 +84,11 @@ live modes, plus `tests/test_daily_triage_verifier.py`. `docs/runbook.md` now covers Temporal schedule/workflow checks, `activity_runs`, State Hub progress, working-memory notes, missed-run `skip` behavior, and LLM timeout budget. +2026-06-05: Follow-up hardening after the scheduled WSJF triage ran but emitted +no report because the live schema required `wsjf` fields and the stale DB prompt +did not request them. The verifier default and runbook now point at the live +working-memory sink path, `/home/worsch/the-custodian/memory/working`. + ## Three-Run Calibration Feedback ```task @@ -166,6 +171,12 @@ failure behavior, LLM timeout/retry behavior, and page/note/next-session classification. Task emission sink failures now raise from `emit_tasks`, making them visible to Temporal retries instead of warning-only logs. +2026-06-05: Added instruction-output robustness for report-sink instructions: +after retry exhaustion, schema-invalid model output now produces a durable +validation-failure report containing bounded partial output instead of a silent +empty result. Report sinks include validation metadata in working-memory +frontmatter and State Hub progress detail. + ## Issue-Core Emission Boundary Verification ```task