feat(ACTIVITY-WP-0016-T03): resilient per-item report recovery with quarantine lane

When the whole-document parse + one retry still fail, report instructions now run
_resilient_report before the total-loss path. A brace/quote-aware scanner
(_extract_object_spans) recovers each recommendation object whether pretty-printed
across many lines or NDJSON one-per-line; a truncated tail gets a best-effort
_try_repair; _partition_items validates each recovered object against the T02 item
schema. Valid items survive (output_validated=True, partial=True), malformed/
over-maxItems items are quarantined with provenance (index, error, raw, reason),
capped at 20. Error locality now matches the unit of work: one bad item costs one
item, not the whole report.

Verified against the real 06-26 shape: 7 valid recommendations + a truncated tail
now recovers all 7 and quarantines the broken tail (previously the whole run was
discarded). Happy-path maxItems top-N enforcement is deferred to T04 (count caps).
Full suite: 215 passed, 1 skipped.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-26 17:56:28 +02:00
parent b41b6034ee
commit a70c00a789
2 changed files with 305 additions and 0 deletions

View File

@@ -178,6 +178,12 @@ def _execute(
"error=%s, raw_output_preview=%r", "error=%s, raw_output_preview=%r",
instr.id, prompt_hash, error, preview, instr.id, prompt_hash, error, preview,
) )
# Posture B (WP-0016-T03): try to recover a partial-but-usable
# report from individually-parseable items before declaring total
# loss. One bad item should cost one item, not the whole report.
recovered = _resilient_report(instr, raw_output, error, prompt_hash)
if recovered is not None:
return recovered
failure_report = _invalid_output_report(instr, error, raw_output) failure_report = _invalid_output_report(instr, error, raw_output)
if failure_report is not None: if failure_report is not None:
return InstructionResult( return InstructionResult(
@@ -279,6 +285,233 @@ def _invalid_output_report(
return report return report
# ---------------------------------------------------------------------------
# Resilient report recovery (ACTIVITY-WP-0016-T03)
#
# Posture B — verify & mitigate at the producer→consumer boundary. When the
# whole-document parse/validate fails, recover individually-parseable
# recommendation objects, validate each against the item schema, keep the valid
# ones, and quarantine the malformed/over-limit ones with provenance. One bad
# item costs one item, not the whole report (error locality == unit of work).
# ---------------------------------------------------------------------------
_QUARANTINE_LIMIT = 20
_SNIPPET_LIMIT = 200
_SUMMARY_RE = re.compile(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"')
def _snippet(value: Any) -> str:
text = value if isinstance(value, str) else json.dumps(value, default=str)
return text[:_SNIPPET_LIMIT]
def _report_contract(instr: Any) -> tuple[dict[str, Any] | None, int | None]:
"""Extract (item_schema, max_items) for the recommendations list, if any."""
try:
schema = _load_output_schema(getattr(instr, "output_schema", ""))
except (OSError, json.JSONDecodeError, TypeError):
return None, None
if not isinstance(schema, dict):
return None, None
recs = (schema.get("properties") or {}).get("recommendations")
if not isinstance(recs, dict):
return None, None
item_schema = recs.get("items") if isinstance(recs.get("items"), dict) else None
max_items = recs.get("maxItems") if isinstance(recs.get("maxItems"), int) else None
return item_schema, max_items
def _extract_object_spans(raw: str) -> list[tuple[str, bool]]:
"""Return (span, complete) for each recommendation object in raw output.
Scans the `recommendations` array brace-aware and string-aware so it recovers
objects whether they are pretty-printed across many lines or emitted one per
line (NDJSON). A truncated trailing object is returned with complete=False.
"""
key = raw.find('"recommendations"')
start_region = raw.find("[", key) if key >= 0 else -1
if start_region < 0:
return []
spans: list[tuple[str, bool]] = []
i, n = start_region + 1, len(raw)
while i < n:
ch = raw[i]
if ch == "]":
break
if ch != "{":
i += 1
continue
depth, in_str, esc, j = 0, False, False, i
closed = False
while j < n:
c = raw[j]
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
elif c == '"':
in_str = True
elif c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
spans.append((raw[i:j + 1], True))
closed = True
break
j += 1
if not closed:
spans.append((raw[i:], False)) # truncated tail
break
i = j + 1
return spans
def _try_repair(span: str) -> str:
"""Best-effort close of a truncated JSON object: balance quote, braces, brackets."""
in_str, esc, depth_c, depth_b = False, False, 0, 0
for c in span:
if in_str:
if esc:
esc = False
elif c == "\\":
esc = True
elif c == '"':
in_str = False
elif c == '"':
in_str = True
elif c == "{":
depth_c += 1
elif c == "}":
depth_c -= 1
elif c == "[":
depth_b += 1
elif c == "]":
depth_b -= 1
repaired = span.rstrip().rstrip(",")
if in_str:
repaired += '"'
return repaired + "]" * max(depth_b, 0) + "}" * max(depth_c, 0)
def _recover_recommendations(
raw: str,
) -> tuple[str | None, list[dict[str, Any]], list[dict[str, Any]]]:
"""Recover (summary, items, quarantined) from a failed report payload."""
summary_match = _SUMMARY_RE.search(raw)
summary = None
if summary_match:
try:
summary = json.loads(f'"{summary_match.group(1)}"')
except json.JSONDecodeError:
summary = summary_match.group(1)
items: list[dict[str, Any]] = []
quarantined: list[dict[str, Any]] = []
for index, (span, complete) in enumerate(_extract_object_spans(raw)):
parsed: Any = None
try:
parsed = json.loads(span)
except json.JSONDecodeError as exc:
if not complete:
try:
parsed = json.loads(_try_repair(span))
except json.JSONDecodeError:
parsed = None
if parsed is None:
quarantined.append(
{"index": index, "error": str(exc), "raw": _snippet(span),
"reason": "truncated" if not complete else "unparseable"}
)
continue
if isinstance(parsed, dict):
items.append(parsed)
else:
quarantined.append(
{"index": index, "error": "item is not a JSON object",
"raw": _snippet(span)}
)
return summary, items, quarantined
def _partition_items(
items: list[dict[str, Any]],
item_schema: dict[str, Any] | None,
max_items: int | None,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Split items into (valid, quarantined): schema-invalid then over-limit."""
valid: list[dict[str, Any]] = []
quarantined: list[dict[str, Any]] = []
for index, item in enumerate(items):
error = (
_validate_schema_node(item, item_schema, f"recommendations[{index}]")
if item_schema
else None
)
if error:
quarantined.append({"index": index, "error": error, "raw": _snippet(item)})
else:
valid.append(item)
if max_items is not None and len(valid) > max_items:
for item in valid[max_items:]:
quarantined.append(
{"index": None, "error": f"exceeds maxItems={max_items}",
"raw": _snippet(item), "reason": "over_limit"}
)
valid = valid[:max_items]
return valid, quarantined
def _resilient_report(
instr: Any,
raw_output: Any,
original_error: str,
prompt_hash: str | None,
) -> InstructionResult | None:
"""Recover a partial-but-usable report from output that failed validation.
Returns None when nothing usable can be recovered, so the caller falls back
to the total-loss diagnostic artifact (_invalid_output_report).
"""
if not getattr(instr, "report_sinks", None) or not isinstance(raw_output, str):
return None
item_schema, max_items = _report_contract(instr)
summary, items, quarantined = _recover_recommendations(raw_output)
if not items:
return None
valid, item_quarantine = _partition_items(items, item_schema, max_items)
quarantined.extend(item_quarantine)
if not valid:
return None
report: dict[str, Any] = {
"summary": summary
or f"Partial daily triage: recovered {len(valid)} recommendation(s) "
"after the full report failed validation.",
"recommendations": valid,
"status": "partial",
"partial": True,
"quarantined_count": len(quarantined),
"quarantined_items": quarantined[:_QUARANTINE_LIMIT],
"recovery_note": f"original validation error: {original_error}",
}
logger.warning(
"instruction_output_recovered: instruction=%r, kept=%d, quarantined=%d",
getattr(instr, "id", None), len(valid), len(quarantined),
)
return InstructionResult(
tasks=[],
report=report,
prompt_hash=prompt_hash,
model=getattr(instr, "model", None),
output_validated=True,
review_required=True,
condition_matched=getattr(instr, "condition", "") or None,
validation_error=None,
)
def _execution_failure_report(instr: Any, error: str) -> dict[str, Any] | None: def _execution_failure_report(instr: Any, error: str) -> dict[str, Any] | None:
"""Build a durable diagnostic report when a report instruction cannot run.""" """Build a durable diagnostic report when a report instruction cannot run."""
if not getattr(instr, "report_sinks", None): if not getattr(instr, "report_sinks", None):

View File

@@ -403,6 +403,78 @@ def test_execute_instruction_with_audit_rejects_invalid_report_schema():
assert llm.call_count == 2 assert llm.call_count == 2
# ── WP-0016-T03 resilient report recovery ─────────────────────────────────────
def _valid_rec(rank: int) -> dict[str, Any]:
return {
"rank": rank,
"candidate": f"WS-{rank}",
"action": "work-next",
"why": f"reason {rank}",
"wsjf": {"score": 5.0},
}
def _pretty_triage_with_truncated_tail(num_valid: int) -> str:
body = ",\n".join(" " + json.dumps(_valid_rec(i)) for i in range(1, num_valid + 1))
# Trailing object is cut off mid-string — the whole document is invalid JSON,
# reproducing the 2026-06-26 failure shape (valid prefix, broken tail).
return (
'{\n "summary": "Daily triage.",\n "recommendations": [\n'
+ body
+ ',\n {\n "rank": '
+ str(num_valid + 1)
+ ',\n "candidate": "WS-X",\n "action": "work-'
)
def test_resilient_report_recovers_valid_prefix_and_quarantines_truncated_tail():
raw = _pretty_triage_with_truncated_tail(7)
llm = _CountingLLM([raw, raw])
instr = _instr(
id="daily-triage-report",
prompt="Report.",
trusted_fields=[],
output_schema="schemas/daily-triage-report.json",
report_sinks=[{"type": "working-memory"}],
)
result = execute_instruction_with_audit(instr, _Event(), {}, llm)
assert result.output_validated is True
assert result.review_required is True
assert result.report is not None
assert result.report["partial"] is True
assert len(result.report["recommendations"]) == 7
assert result.report["summary"] == "Daily triage."
assert result.report["quarantined_count"] >= 1
# The broken tail is dropped — either as an unparseable/truncated span or,
# if _try_repair salvages its structure, as a schema-invalid item. Either way
# it carries a diagnostic error and never pollutes the surviving report.
assert result.report["quarantined_items"][0]["error"]
def test_resilient_report_quarantines_one_bad_item_among_valid():
recs = [_valid_rec(1), {"candidate": "WS-2", "action": "x", "why": "no rank"}, _valid_rec(3)]
raw = json.dumps({"summary": "Triage.", "recommendations": recs})
llm = _CountingLLM([raw, raw])
instr = _instr(
id="daily-triage-report",
prompt="Report.",
trusted_fields=[],
output_schema="schemas/daily-triage-report.json",
report_sinks=[{"type": "working-memory"}],
)
result = execute_instruction_with_audit(instr, _Event(), {}, llm)
assert result.output_validated is True
assert result.report["partial"] is True
assert len(result.report["recommendations"]) == 2
assert result.report["quarantined_count"] == 1
assert "rank" in result.report["quarantined_items"][0]["error"]
def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks( def test_execute_instruction_with_audit_preserves_invalid_report_with_sinks(
tmp_path, tmp_path,
monkeypatch, monkeypatch,