From a70c00a789f0cd1e25d0bd7c383c6a8c5041ceb2 Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 26 Jun 2026 17:56:28 +0200 Subject: [PATCH] feat(ACTIVITY-WP-0016-T03): resilient per-item report recovery with quarantine lane When the whole-document parse + one retry still fail, report instructions now run _resilient_report before the total-loss path. A brace/quote-aware scanner (_extract_object_spans) recovers each recommendation object whether pretty-printed across many lines or NDJSON one-per-line; a truncated tail gets a best-effort _try_repair; _partition_items validates each recovered object against the T02 item schema. Valid items survive (output_validated=True, partial=True), malformed/ over-maxItems items are quarantined with provenance (index, error, raw, reason), capped at 20. Error locality now matches the unit of work: one bad item costs one item, not the whole report. Verified against the real 06-26 shape: 7 valid recommendations + a truncated tail now recovers all 7 and quarantines the broken tail (previously the whole run was discarded). Happy-path maxItems top-N enforcement is deferred to T04 (count caps). Full suite: 215 passed, 1 skipped. Co-Authored-By: Claude Opus 4.8 --- src/activity_core/rules/executor.py | 233 ++++++++++++++++++++++++++++ tests/rules/test_executor.py | 72 +++++++++ 2 files changed, 305 insertions(+) diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py index 61a5517..20f9f5d 100644 --- a/src/activity_core/rules/executor.py +++ b/src/activity_core/rules/executor.py @@ -178,6 +178,12 @@ def _execute( "error=%s, raw_output_preview=%r", instr.id, prompt_hash, error, preview, ) + # Posture B (WP-0016-T03): try to recover a partial-but-usable + # report from individually-parseable items before declaring total + # loss. One bad item should cost one item, not the whole report. + recovered = _resilient_report(instr, raw_output, error, prompt_hash) + if recovered is not None: + return recovered failure_report = _invalid_output_report(instr, error, raw_output) if failure_report is not None: return InstructionResult( @@ -279,6 +285,233 @@ def _invalid_output_report( return report +# --------------------------------------------------------------------------- +# Resilient report recovery (ACTIVITY-WP-0016-T03) +# +# Posture B — verify & mitigate at the producer→consumer boundary. When the +# whole-document parse/validate fails, recover individually-parseable +# recommendation objects, validate each against the item schema, keep the valid +# ones, and quarantine the malformed/over-limit ones with provenance. One bad +# item costs one item, not the whole report (error locality == unit of work). +# --------------------------------------------------------------------------- + +_QUARANTINE_LIMIT = 20 +_SNIPPET_LIMIT = 200 +_SUMMARY_RE = re.compile(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"') + + +def _snippet(value: Any) -> str: + text = value if isinstance(value, str) else json.dumps(value, default=str) + return text[:_SNIPPET_LIMIT] + + +def _report_contract(instr: Any) -> tuple[dict[str, Any] | None, int | None]: + """Extract (item_schema, max_items) for the recommendations list, if any.""" + try: + schema = _load_output_schema(getattr(instr, "output_schema", "")) + except (OSError, json.JSONDecodeError, TypeError): + return None, None + if not isinstance(schema, dict): + return None, None + recs = (schema.get("properties") or {}).get("recommendations") + if not isinstance(recs, dict): + return None, None + item_schema = recs.get("items") if isinstance(recs.get("items"), dict) else None + max_items = recs.get("maxItems") if isinstance(recs.get("maxItems"), int) else None + return item_schema, max_items + + +def _extract_object_spans(raw: str) -> list[tuple[str, bool]]: + """Return (span, complete) for each recommendation object in raw output. + + Scans the `recommendations` array brace-aware and string-aware so it recovers + objects whether they are pretty-printed across many lines or emitted one per + line (NDJSON). A truncated trailing object is returned with complete=False. + """ + key = raw.find('"recommendations"') + start_region = raw.find("[", key) if key >= 0 else -1 + if start_region < 0: + return [] + spans: list[tuple[str, bool]] = [] + i, n = start_region + 1, len(raw) + while i < n: + ch = raw[i] + if ch == "]": + break + if ch != "{": + i += 1 + continue + depth, in_str, esc, j = 0, False, False, i + closed = False + while j < n: + c = raw[j] + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + elif c == '"': + in_str = True + elif c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + spans.append((raw[i:j + 1], True)) + closed = True + break + j += 1 + if not closed: + spans.append((raw[i:], False)) # truncated tail + break + i = j + 1 + return spans + + +def _try_repair(span: str) -> str: + """Best-effort close of a truncated JSON object: balance quote, braces, brackets.""" + in_str, esc, depth_c, depth_b = False, False, 0, 0 + for c in span: + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + elif c == '"': + in_str = True + elif c == "{": + depth_c += 1 + elif c == "}": + depth_c -= 1 + elif c == "[": + depth_b += 1 + elif c == "]": + depth_b -= 1 + repaired = span.rstrip().rstrip(",") + if in_str: + repaired += '"' + return repaired + "]" * max(depth_b, 0) + "}" * max(depth_c, 0) + + +def _recover_recommendations( + raw: str, +) -> tuple[str | None, list[dict[str, Any]], list[dict[str, Any]]]: + """Recover (summary, items, quarantined) from a failed report payload.""" + summary_match = _SUMMARY_RE.search(raw) + summary = None + if summary_match: + try: + summary = json.loads(f'"{summary_match.group(1)}"') + except json.JSONDecodeError: + summary = summary_match.group(1) + items: list[dict[str, Any]] = [] + quarantined: list[dict[str, Any]] = [] + for index, (span, complete) in enumerate(_extract_object_spans(raw)): + parsed: Any = None + try: + parsed = json.loads(span) + except json.JSONDecodeError as exc: + if not complete: + try: + parsed = json.loads(_try_repair(span)) + except json.JSONDecodeError: + parsed = None + if parsed is None: + quarantined.append( + {"index": index, "error": str(exc), "raw": _snippet(span), + "reason": "truncated" if not complete else "unparseable"} + ) + continue + if isinstance(parsed, dict): + items.append(parsed) + else: + quarantined.append( + {"index": index, "error": "item is not a JSON object", + "raw": _snippet(span)} + ) + return summary, items, quarantined + + +def _partition_items( + items: list[dict[str, Any]], + item_schema: dict[str, Any] | None, + max_items: int | None, +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Split items into (valid, quarantined): schema-invalid then over-limit.""" + valid: list[dict[str, Any]] = [] + quarantined: list[dict[str, Any]] = [] + for index, item in enumerate(items): + error = ( + _validate_schema_node(item, item_schema, f"recommendations[{index}]") + if item_schema + else None + ) + if error: + quarantined.append({"index": index, "error": error, "raw": _snippet(item)}) + else: + valid.append(item) + if max_items is not None and len(valid) > max_items: + for item in valid[max_items:]: + quarantined.append( + {"index": None, "error": f"exceeds maxItems={max_items}", + "raw": _snippet(item), "reason": "over_limit"} + ) + valid = valid[:max_items] + return valid, quarantined + + +def _resilient_report( + instr: Any, + raw_output: Any, + original_error: str, + prompt_hash: str | None, +) -> InstructionResult | None: + """Recover a partial-but-usable report from output that failed validation. + + Returns None when nothing usable can be recovered, so the caller falls back + to the total-loss diagnostic artifact (_invalid_output_report). + """ + if not getattr(instr, "report_sinks", None) or not isinstance(raw_output, str): + return None + item_schema, max_items = _report_contract(instr) + summary, items, quarantined = _recover_recommendations(raw_output) + if not items: + return None + valid, item_quarantine = _partition_items(items, item_schema, max_items) + quarantined.extend(item_quarantine) + if not valid: + return None + report: dict[str, Any] = { + "summary": summary + or f"Partial daily triage: recovered {len(valid)} recommendation(s) " + "after the full report failed validation.", + "recommendations": valid, + "status": "partial", + "partial": True, + "quarantined_count": len(quarantined), + "quarantined_items": quarantined[:_QUARANTINE_LIMIT], + "recovery_note": f"original validation error: {original_error}", + } + logger.warning( + "instruction_output_recovered: instruction=%r, kept=%d, quarantined=%d", + getattr(instr, "id", None), len(valid), len(quarantined), + ) + return InstructionResult( + tasks=[], + report=report, + prompt_hash=prompt_hash, + model=getattr(instr, "model", None), + output_validated=True, + review_required=True, + condition_matched=getattr(instr, "condition", "") or None, + validation_error=None, + ) + + def _execution_failure_report(instr: Any, error: str) -> dict[str, Any] | None: """Build a durable diagnostic report when a report instruction cannot run.""" if not getattr(instr, "report_sinks", None): diff --git a/tests/rules/test_executor.py b/tests/rules/test_executor.py index 45b661e..81d361d 100644 --- a/tests/rules/test_executor.py +++ b/tests/rules/test_executor.py @@ -403,6 +403,78 @@ def test_execute_instruction_with_audit_rejects_invalid_report_schema(): assert llm.call_count == 2 +# ── WP-0016-T03 resilient report recovery ───────────────────────────────────── + +def _valid_rec(rank: int) -> dict[str, Any]: + return { + "rank": rank, + "candidate": f"WS-{rank}", + "action": "work-next", + "why": f"reason {rank}", + "wsjf": {"score": 5.0}, + } + + +def _pretty_triage_with_truncated_tail(num_valid: int) -> str: + body = ",\n".join(" " + json.dumps(_valid_rec(i)) for i in range(1, num_valid + 1)) + # Trailing object is cut off mid-string — the whole document is invalid JSON, + # reproducing the 2026-06-26 failure shape (valid prefix, broken tail). + return ( + '{\n "summary": "Daily triage.",\n "recommendations": [\n' + + body + + ',\n {\n "rank": ' + + str(num_valid + 1) + + ',\n "candidate": "WS-X",\n "action": "work-' + ) + + +def test_resilient_report_recovers_valid_prefix_and_quarantines_truncated_tail(): + raw = _pretty_triage_with_truncated_tail(7) + llm = _CountingLLM([raw, raw]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + report_sinks=[{"type": "working-memory"}], + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.output_validated is True + assert result.review_required is True + assert result.report is not None + assert result.report["partial"] is True + assert len(result.report["recommendations"]) == 7 + assert result.report["summary"] == "Daily triage." + assert result.report["quarantined_count"] >= 1 + # The broken tail is dropped — either as an unparseable/truncated span or, + # if _try_repair salvages its structure, as a schema-invalid item. Either way + # it carries a diagnostic error and never pollutes the surviving report. + assert result.report["quarantined_items"][0]["error"] + + +def test_resilient_report_quarantines_one_bad_item_among_valid(): + recs = [_valid_rec(1), {"candidate": "WS-2", "action": "x", "why": "no rank"}, _valid_rec(3)] + raw = json.dumps({"summary": "Triage.", "recommendations": recs}) + llm = _CountingLLM([raw, raw]) + instr = _instr( + id="daily-triage-report", + prompt="Report.", + trusted_fields=[], + output_schema="schemas/daily-triage-report.json", + report_sinks=[{"type": "working-memory"}], + ) + + result = execute_instruction_with_audit(instr, _Event(), {}, llm) + + assert result.output_validated is True + assert result.report["partial"] is True + assert len(result.report["recommendations"]) == 2 + assert result.report["quarantined_count"] == 1 + assert "rank" in result.report["quarantined_items"][0]["error"] + + def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks( tmp_path, monkeypatch,