Harden WSJF triage report recovery

This commit is contained in:
2026-06-05 19:27:03 +02:00
parent 20d4f26166
commit 42e373aba1
10 changed files with 223 additions and 8 deletions

View File

@@ -169,7 +169,7 @@ TEMPORAL_HOST=localhost:7233 \
STATE_HUB_URL=http://127.0.0.1:8000 \ STATE_HUB_URL=http://127.0.0.1:8000 \
uv run python scripts/verify_daily_triage.py \ uv run python scripts/verify_daily_triage.py \
--activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \ --activity-id "$DAILY_TRIAGE_ACTIVITY_ID" \
--working-memory-dir /home/worsch/the-custodian/working-memory \ --working-memory-dir /home/worsch/the-custodian/memory/working \
--live --live
``` ```
@@ -182,9 +182,9 @@ The verification is complete when all of these agree:
- `activity_runs` has a row for the daily triage ActivityDefinition with today's - `activity_runs` has a row for the daily triage ActivityDefinition with today's
`scheduled_for` or `fired_at` date. `scheduled_for` or `fired_at` date.
- State Hub `/progress/` contains a `daily_triage` event whose detail includes - State Hub `/progress/` contains a `daily_triage` event whose detail includes
the same `activity_core_run_id`. the same `activity_core_run_id` and its `output_validated` flag.
- The working-memory sink wrote `daily-triage-YYYY-MM-DD-<run>.md` and its - The working-memory sink wrote `daily-triage-YYYY-MM-DD-<run>.md` and its
frontmatter contains the same `activity_core_run_id`. frontmatter contains the same `activity_core_run_id` and validation metadata.
- The ActivityDefinition's instruction model, token budget, and sink timeouts fit - The ActivityDefinition's instruction model, token budget, and sink timeouts fit
under `ACTIVITY_TIMEOUT_SECONDS` (default 900 seconds). Temporal retries each under `ACTIVITY_TIMEOUT_SECONDS` (default 900 seconds). Temporal retries each
activity up to 10 attempts, so a slow LLM or sink failure should show as activity up to 10 attempts, so a slow LLM or sink failure should show as
@@ -280,6 +280,8 @@ Leave a State Hub progress note, but do not page, when:
- A planned outage caused one skipped run and the schedule is healthy again. - A planned outage caused one skipped run and the schedule is healthy again.
- A sink idempotency check reports `exists` for the expected run id. - A sink idempotency check reports `exists` for the expected run id.
- An instruction report has `output_validated=false` but still emitted a
validation-failure note preserving partial model output for review.
- The report completed but calibration feedback says the recommendations were - The report completed but calibration feedback says the recommendations were
noisy, too long, or under-sensitive. noisy, too long, or under-sensitive.

View File

@@ -24,7 +24,7 @@ DEFAULT_PROGRESS_EVENT_TYPE = "daily_triage"
DEFAULT_TEMPORAL_HOST = "localhost:7233" DEFAULT_TEMPORAL_HOST = "localhost:7233"
DEFAULT_TEMPORAL_NAMESPACE = "default" DEFAULT_TEMPORAL_NAMESPACE = "default"
DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000" DEFAULT_STATE_HUB_URL = "http://127.0.0.1:8000"
DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/working-memory" DEFAULT_WORKING_MEMORY_DIR = "/home/worsch/the-custodian/memory/working"
def parse_args(argv: list[str] | None = None) -> argparse.Namespace: def parse_args(argv: list[str] | None = None) -> argparse.Namespace:

View File

@@ -328,6 +328,7 @@ async def evaluate_instructions(payload: dict) -> dict:
"model": result.model, "model": result.model,
"output_validated": result.output_validated, "output_validated": result.output_validated,
"review_required": result.review_required, "review_required": result.review_required,
"validation_error": result.validation_error,
}) })
for spec in result.tasks: for spec in result.tasks:
task_specs.append({ task_specs.append({

View File

@@ -126,6 +126,9 @@ def _post_state_hub_progress(
"activity_core_run_id": run_id, "activity_core_run_id": run_id,
"instruction_id": instruction_id, "instruction_id": instruction_id,
"scheduled_for": payload.get("scheduled_for"), "scheduled_for": payload.get("scheduled_for"),
"output_validated": report_entry.get("output_validated"),
"review_required": report_entry.get("review_required"),
"validation_error": report_entry.get("validation_error"),
"report": report, "report": report,
}, },
} }
@@ -179,6 +182,7 @@ def _render_markdown(
report = report_entry.get("report") or {} report = report_entry.get("report") or {}
instruction_id = report_entry.get("instruction_id", "instruction") instruction_id = report_entry.get("instruction_id", "instruction")
summary = report.get("summary", "") summary = report.get("summary", "")
validation_error = report_entry.get("validation_error")
lines = [ lines = [
"---", "---",
"type: working-memory", "type: working-memory",
@@ -187,6 +191,10 @@ def _render_markdown(
f"activity_core_run_id: {payload.get('run_id')}", f"activity_core_run_id: {payload.get('run_id')}",
f"instruction_id: {instruction_id}", f"instruction_id: {instruction_id}",
f"scheduled_for: {payload.get('scheduled_for')}", f"scheduled_for: {payload.get('scheduled_for')}",
f"output_validated: {str(bool(report_entry.get('output_validated'))).lower()}",
f"review_required: {str(bool(report_entry.get('review_required'))).lower()}",
f"model: {report_entry.get('model') or ''}",
f"prompt_hash: {report_entry.get('prompt_hash') or ''}",
f"created: {datetime.now(tz=timezone.utc).isoformat()}", f"created: {datetime.now(tz=timezone.utc).isoformat()}",
"---", "---",
"", "",
@@ -195,6 +203,8 @@ def _render_markdown(
] ]
if summary: if summary:
lines.extend([summary, ""]) lines.extend([summary, ""])
if validation_error:
lines.extend(["Validation error:", "", f"`{validation_error}`", ""])
lines.extend([ lines.extend([
"```json", "```json",
json.dumps(report, indent=2, sort_keys=True), json.dumps(report, indent=2, sort_keys=True),

View File

@@ -39,6 +39,7 @@ class InstructionResult:
output_validated: bool = False output_validated: bool = False
review_required: bool = False review_required: bool = False
condition_matched: str | None = None condition_matched: str | None = None
validation_error: str | None = None
def _resolve_path(obj: Any, path: str) -> Any: def _resolve_path(obj: Any, path: str) -> Any:
@@ -164,7 +165,19 @@ def _execute(
"error=%s, raw_output_preview=%r", "error=%s, raw_output_preview=%r",
instr.id, prompt_hash, error, preview, instr.id, prompt_hash, error, preview,
) )
return _empty_result(instr, prompt_hash=prompt_hash) failure_report = _invalid_output_report(instr, error, raw_output)
if failure_report is not None:
return InstructionResult(
tasks=[],
report=failure_report,
prompt_hash=prompt_hash,
model=instr.model,
output_validated=False,
review_required=True,
condition_matched=instr.condition or None,
validation_error=error,
)
return _empty_result(instr, prompt_hash=prompt_hash, validation_error=error)
return InstructionResult( return InstructionResult(
tasks=task_specs, tasks=task_specs,
@@ -193,7 +206,11 @@ def _llm_run_config(instr: Any) -> dict[str, Any]:
return config return config
def _empty_result(instr: Any, prompt_hash: str | None = None) -> InstructionResult: def _empty_result(
instr: Any,
prompt_hash: str | None = None,
validation_error: str | None = None,
) -> InstructionResult:
return InstructionResult( return InstructionResult(
tasks=[], tasks=[],
prompt_hash=prompt_hash, prompt_hash=prompt_hash,
@@ -201,9 +218,54 @@ def _empty_result(instr: Any, prompt_hash: str | None = None) -> InstructionResu
output_validated=False, output_validated=False,
review_required=bool(getattr(instr, "review_required", False)), review_required=bool(getattr(instr, "review_required", False)),
condition_matched=getattr(instr, "condition", "") or None, condition_matched=getattr(instr, "condition", "") or None,
validation_error=validation_error,
) )
def _invalid_output_report(
instr: Any,
validation_error: str,
raw_output: Any,
) -> dict[str, Any] | None:
"""Build a durable diagnostic report for invalid report-sink output.
Task-only instructions keep the legacy empty-result behavior. Instructions
with report sinks should leave operators a bounded artifact that preserves
the partial model output without marking it as schema-valid.
"""
if not getattr(instr, "report_sinks", None):
return None
partial_output: Any
raw_preview: str | None = None
if isinstance(raw_output, str):
try:
partial_output = json.loads(raw_output)
except json.JSONDecodeError:
partial_output = None
raw_preview = raw_output[:4000]
else:
partial_output = raw_output
report: dict[str, Any] = {
"summary": (
f"Instruction {instr.id} produced output that failed validation; "
"partial output was preserved for operator review."
),
"status": "validation_failed",
"validation_error": validation_error,
}
if isinstance(partial_output, dict):
if isinstance(partial_output.get("summary"), str):
report["partial_summary"] = partial_output["summary"]
report["partial_report"] = partial_output
elif isinstance(partial_output, list):
report["partial_report"] = partial_output
elif raw_preview is not None:
report["raw_output_preview"] = raw_preview
return report
def _validate_output( def _validate_output(
raw_output: Any, raw_output: Any,
instr: Any, instr: Any,

View File

@@ -4,7 +4,8 @@ Covers:
- UntrustedFieldError raised when prompt references untrusted field - UntrustedFieldError raised when prompt references untrusted field
- Object-type attribute rejected even when listed in trusted_fields - Object-type attribute rejected even when listed in trusted_fields
- Injection fixture: untrusted field raises UntrustedFieldError before rendering - Injection fixture: untrusted field raises UntrustedFieldError before rendering
- Schema validation: NullLLM returning invalid JSON retry → second invalid → [] - Schema validation: invalid JSON retries once; report-sink instructions preserve
a validation-failure artifact after the second invalid output.
- review_required flag: present on InstructionDef model - review_required flag: present on InstructionDef model
""" """
@@ -98,6 +99,7 @@ def _instr(
max_tokens: int | None = None, max_tokens: int | None = None,
max_depth: int | None = None, max_depth: int | None = None,
model_params: dict[str, Any] | None = None, model_params: dict[str, Any] | None = None,
report_sinks: list[dict[str, Any]] | None = None,
) -> SimpleNamespace: ) -> SimpleNamespace:
return SimpleNamespace( return SimpleNamespace(
id=id, id=id,
@@ -111,6 +113,7 @@ def _instr(
model_params=model_params or {}, model_params=model_params or {},
output_schema=output_schema, output_schema=output_schema,
review_required=review_required, review_required=review_required,
report_sinks=report_sinks or [],
) )
@@ -353,6 +356,58 @@ def test_execute_instruction_with_audit_rejects_invalid_report_schema():
assert llm.call_count == 2 assert llm.call_count == 2
def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks(
tmp_path,
monkeypatch,
):
schema_dir = tmp_path / "schemas"
schema_dir.mkdir()
schema_path = schema_dir / "daily-triage-report.json"
schema_path.write_text(
json.dumps({
"type": "object",
"required": ["summary", "recommendations"],
"properties": {
"summary": {"type": "string"},
"recommendations": {
"type": "array",
"items": {
"type": "object",
"required": ["action"],
},
},
},
}),
encoding="utf-8",
)
monkeypatch.chdir(tmp_path)
report_data = {
"summary": "Generated partial triage.",
"recommendations": [{"rank": 1, "candidate": "CUST-WP-0045"}],
}
llm = _CountingLLM([json.dumps(report_data), json.dumps(report_data)])
instr = _instr(
id="daily-triage-report",
prompt="Report.",
trusted_fields=[],
output_schema="schemas/daily-triage-report.json",
report_sinks=[{"type": "working-memory", "path": "/tmp"}],
)
result = execute_instruction_with_audit(instr, _Event(), {}, llm)
assert result.tasks == []
assert result.output_validated is False
assert result.review_required is True
assert result.validation_error == "$.recommendations[0]: missing required property 'action'"
assert result.report is not None
assert result.report["status"] == "validation_failed"
assert result.report["partial_summary"] == "Generated partial triage."
assert result.report["partial_report"] == report_data
assert llm.call_count == 2
def test_execute_instruction_with_audit_accepts_report_and_tasks_envelope(): def test_execute_instruction_with_audit_accepts_report_and_tasks_envelope():
envelope = { envelope = {
"report": {"summary": "Review needed."}, "report": {"summary": "Review needed."},

View File

@@ -54,3 +54,10 @@ def test_daily_triage_verifier_dry_run_names_all_operator_checks() -> None:
assert "where id = '00000000-0000-0000-0000-000000000123'" in timeout_check["sql"] assert "where id = '00000000-0000-0000-0000-000000000123'" in timeout_check["sql"]
assert timeout_check["activity_timeout_seconds"] == 900 assert timeout_check["activity_timeout_seconds"] == 900
assert timeout_check["retry_attempts"] == 10 assert timeout_check["retry_attempts"] == 10
def test_daily_triage_verifier_default_working_memory_dir() -> None:
script = _load_script()
args = script.parse_args([])
assert args.working_memory_dir == "/home/worsch/the-custodian/memory/working"

View File

@@ -98,6 +98,63 @@ async def test_evaluate_instructions_returns_report_payload(monkeypatch) -> None
assert report["prompt_hash"] is not None assert report["prompt_hash"] is not None
@pytest.mark.asyncio
async def test_evaluate_instructions_returns_invalid_report_for_report_sinks(
monkeypatch,
tmp_path,
) -> None:
schema_dir = tmp_path / "schemas"
schema_dir.mkdir()
(schema_dir / "daily-triage-report.json").write_text(
json.dumps({
"type": "object",
"required": ["summary", "recommendations"],
"properties": {
"summary": {"type": "string"},
"recommendations": {
"type": "array",
"items": {
"type": "object",
"required": ["wsjf"],
},
},
},
}),
encoding="utf-8",
)
monkeypatch.chdir(tmp_path)
llm = FakeLLMClient(json.dumps({
"summary": "Partial triage.",
"recommendations": [{"rank": 1, "candidate": "CUST-WP-0045"}],
}))
monkeypatch.setattr(activities, "get_llm_client", lambda: llm)
result = await activities.evaluate_instructions({
"instructions": [
{
"id": "daily-triage-report",
"trusted_fields": [],
"model": "test-model",
"prompt": "Run report.",
"output_schema": "schemas/daily-triage-report.json",
"review_required": False,
"report_sinks": [{"type": "working-memory", "path": "/tmp"}],
}
],
"event": {},
"context": {},
})
assert result["task_specs"] == []
assert len(result["reports"]) == 1
report = result["reports"][0]
assert report["output_validated"] is False
assert report["review_required"] is True
assert report["validation_error"] == "$.recommendations[0]: missing required property 'wsjf'"
assert report["report"]["status"] == "validation_failed"
assert report["report"]["partial_summary"] == "Partial triage."
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_evaluate_instructions_without_llm_client_returns_no_tasks(monkeypatch) -> None: async def test_evaluate_instructions_without_llm_client_returns_no_tasks(monkeypatch) -> None:
class RaisingClient: class RaisingClient:

View File

@@ -32,6 +32,11 @@ def _payload(sinks: list[dict[str, Any]]) -> dict[str, Any]:
"recommendations": [{"candidate": "CUST-WP-0045"}], "recommendations": [{"candidate": "CUST-WP-0045"}],
}, },
"sinks": sinks, "sinks": sinks,
"prompt_hash": "abc123",
"model": "test-model",
"output_validated": True,
"review_required": False,
"validation_error": None,
} }
], ],
} }
@@ -54,6 +59,9 @@ def test_working_memory_sink_writes_idempotently(tmp_path) -> None:
note = tmp_path / "daily-triage-2026-05-19-12345678.md" note = tmp_path / "daily-triage-2026-05-19-12345678.md"
text = note.read_text(encoding="utf-8") text = note.read_text(encoding="utf-8")
assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text assert "activity_core_run_id: 12345678-aaaa-bbbb-cccc-123456789abc" in text
assert "output_validated: true" in text
assert "review_required: false" in text
assert "model: test-model" in text
assert "State Hub has loose ends." in text assert "State Hub has loose ends." in text
@@ -103,6 +111,8 @@ def test_state_hub_progress_sink_posts(monkeypatch) -> None:
assert posts[0]["url"] == "http://state-hub.test/progress/" assert posts[0]["url"] == "http://state-hub.test/progress/"
assert posts[0]["json"]["workstream_id"] == "workstream-1" assert posts[0]["json"]["workstream_id"] == "workstream-1"
assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id() assert posts[0]["json"]["detail"]["activity_core_run_id"] == payload_run_id()
assert posts[0]["json"]["detail"]["output_validated"] is True
assert posts[0]["json"]["detail"]["review_required"] is False
def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None: def test_state_hub_progress_sink_is_idempotent(monkeypatch) -> None:

View File

@@ -8,7 +8,7 @@ status: active
owner: codex owner: codex
topic_slug: custodian topic_slug: custodian
created: "2026-06-03" created: "2026-06-03"
updated: "2026-06-04" updated: "2026-06-05"
state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733" state_hub_workstream_id: "5646e13a-13af-4724-bca6-3c0d86f96733"
--- ---
@@ -84,6 +84,11 @@ live modes, plus `tests/test_daily_triage_verifier.py`. `docs/runbook.md` now
covers Temporal schedule/workflow checks, `activity_runs`, State Hub progress, covers Temporal schedule/workflow checks, `activity_runs`, State Hub progress,
working-memory notes, missed-run `skip` behavior, and LLM timeout budget. working-memory notes, missed-run `skip` behavior, and LLM timeout budget.
2026-06-05: Follow-up hardening after the scheduled WSJF triage ran but emitted
no report because the live schema required `wsjf` fields and the stale DB prompt
did not request them. The verifier default and runbook now point at the live
working-memory sink path, `/home/worsch/the-custodian/memory/working`.
## Three-Run Calibration Feedback ## Three-Run Calibration Feedback
```task ```task
@@ -166,6 +171,12 @@ failure behavior, LLM timeout/retry behavior, and page/note/next-session
classification. Task emission sink failures now raise from `emit_tasks`, making classification. Task emission sink failures now raise from `emit_tasks`, making
them visible to Temporal retries instead of warning-only logs. them visible to Temporal retries instead of warning-only logs.
2026-06-05: Added instruction-output robustness for report-sink instructions:
after retry exhaustion, schema-invalid model output now produces a durable
validation-failure report containing bounded partial output instead of a silent
empty result. Report sinks include validation metadata in working-memory
frontmatter and State Hub progress detail.
## Issue-Core Emission Boundary Verification ## Issue-Core Emission Boundary Verification
```task ```task