feat(ACTIVITY-WP-0016-T04): producer trust-boundary guardrails + ADR-004

Add ADR-004 documenting the producer trust boundary: untrusted producers (LLM,
agent, human; erroneous and malicious), the trust-but-handle vs verify-and-mitigate
postures, error-locality and quarantine-with-provenance principles, and the concrete
activity-core mechanisms.

Implement producer-agnostic guardrails in executor.py, applied uniformly on the
happy path and the recovery path via _partition_items: structural-type -> schema ->
structural caps (_MAX_DEPTH, _MAX_STRING_LEN) -> reference allow-list -> count cap.
Each quarantine carries a reason. Closes the happy-path maxItems count cap deferred
from T03 (valid 9-item report keeps 7, quarantines 2). Reference allow-list reads
context["known_candidates"] via _allow_list_from_context; inert until a resolver
populates it. SCOPE.md updated (executor bullet + ADR list); no INTENT drift.

New tests: happy-path count cap, oversized-string guardrail, allow-list rejection.
Full suite: 218 passed, 1 skipped.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-26 18:10:17 +02:00
parent c5440e8429
commit 9be4ddbdb7
5 changed files with 373 additions and 12 deletions

View File

@@ -160,15 +160,20 @@ def _execute(
prompt_hash = hashlib.sha256(rendered.encode()).hexdigest()
llm_config = _llm_run_config(instr)
# Reference allow-list (WP-0016-T04): if a context resolver supplied the set
# of known candidate ids, recommendations pointing at anything else are
# quarantined. Absent (None) today → the check is inert until wired.
allow_list = _allow_list_from_context(context)
# Step 3 — call LLM
raw_output = llm_client.complete(rendered, model=instr.model, config=llm_config)
# Step 4 — validate and optionally retry
task_specs, report, error = _validate_output(raw_output, instr)
task_specs, report, error = _validate_output(raw_output, instr, allow_list)
if error:
retry_prompt = rendered + f"\n\nPrevious output was invalid: {error}\nPlease fix."
raw_output = llm_client.complete(retry_prompt, model=instr.model, config=llm_config)
task_specs, report, error = _validate_output(raw_output, instr)
task_specs, report, error = _validate_output(raw_output, instr, allow_list)
if error:
# Truncate to keep log volume bounded but long enough to see the
# actual JSON shape mismatch (typical reports are <2KB).
@@ -181,7 +186,9 @@ def _execute(
# Posture B (WP-0016-T03): try to recover a partial-but-usable
# report from individually-parseable items before declaring total
# loss. One bad item should cost one item, not the whole report.
recovered = _resilient_report(instr, raw_output, error, prompt_hash)
recovered = _resilient_report(
instr, raw_output, error, prompt_hash, allow_list,
)
if recovered is not None:
return recovered
failure_report = _invalid_output_report(instr, error, raw_output)
@@ -297,6 +304,12 @@ def _invalid_output_report(
_QUARANTINE_LIMIT = 20
_SNIPPET_LIMIT = 200
# Producer guardrails (ACTIVITY-WP-0016-T04): structural bounds applied to every
# recommendation regardless of producer (LLM, agent, or human). These are
# verify-and-mitigate limits — an offending item is quarantined, never allowed to
# fail the whole report or flow unbounded into a downstream consumer.
_MAX_STRING_LEN = 4000
_MAX_DEPTH = 8
_SUMMARY_RE = re.compile(r'"summary"\s*:\s*"((?:[^"\\]|\\.)*)"')
@@ -305,6 +318,51 @@ def _snippet(value: Any) -> str:
return text[:_SNIPPET_LIMIT]
def _json_depth(value: Any, depth: int = 1) -> int:
if depth > _MAX_DEPTH:
return depth
if isinstance(value, dict):
return max((_json_depth(v, depth + 1) for v in value.values()), default=depth)
if isinstance(value, list):
return max((_json_depth(v, depth + 1) for v in value), default=depth)
return depth
def _has_oversized_string(value: Any) -> bool:
if isinstance(value, str):
return len(value) > _MAX_STRING_LEN
if isinstance(value, dict):
return any(_has_oversized_string(v) for v in value.values())
if isinstance(value, list):
return any(_has_oversized_string(v) for v in value)
return False
def _item_structure_error(item: Any) -> str | None:
"""Producer-agnostic structural guardrail: depth and string-length caps."""
if _json_depth(item) > _MAX_DEPTH:
return f"exceeds max nesting depth {_MAX_DEPTH}"
if _has_oversized_string(item):
return f"contains a string longer than {_MAX_STRING_LEN} chars"
return None
def _allow_list_from_context(context: dict | None) -> set[str] | None:
"""Build the recommendation-candidate allow-list from resolved context.
Looks for `context["known_candidates"]` (a list/set of valid candidate ids).
Returns None when absent so the allow-list check stays inert until a context
resolver populates it — the guardrail capability ships now; activation is a
one-line resolver change.
"""
if not isinstance(context, dict):
return None
known = context.get("known_candidates")
if isinstance(known, (list, set, tuple)):
return {str(item) for item in known}
return None
def _report_contract(instr: Any) -> tuple[dict[str, Any] | None, int | None]:
"""Extract (item_schema, max_items) for the recommendations list, if any."""
try:
@@ -440,20 +498,53 @@ def _partition_items(
items: list[dict[str, Any]],
item_schema: dict[str, Any] | None,
max_items: int | None,
*,
run_schema: bool = True,
allow_list: set[str] | None = None,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Split items into (valid, quarantined): schema-invalid then over-limit."""
"""Screen items into (valid, quarantined).
Applied uniformly to recovered items (run_schema=True) and to already
schema-valid happy-path items (run_schema=False). Order of checks: structural
type → schema → producer guardrails (depth/length) → reference allow-list →
count cap. The first failing check quarantines the item with provenance.
"""
valid: list[dict[str, Any]] = []
quarantined: list[dict[str, Any]] = []
for index, item in enumerate(items):
error = (
if not isinstance(item, dict):
quarantined.append(
{"index": index, "error": "item is not a JSON object",
"raw": _snippet(item), "reason": "malformed"}
)
continue
schema_error = (
_validate_schema_node(item, item_schema, f"recommendations[{index}]")
if item_schema
if (run_schema and item_schema)
else None
)
if error:
quarantined.append({"index": index, "error": error, "raw": _snippet(item)})
else:
valid.append(item)
if schema_error:
quarantined.append(
{"index": index, "error": schema_error, "raw": _snippet(item),
"reason": "schema"}
)
continue
structure_error = _item_structure_error(item)
if structure_error:
quarantined.append(
{"index": index, "error": structure_error, "raw": _snippet(item),
"reason": "guardrail"}
)
continue
if allow_list is not None:
candidate = item.get("candidate")
if not isinstance(candidate, str) or candidate not in allow_list:
quarantined.append(
{"index": index, "error": f"candidate {candidate!r} not in allow-list",
"raw": _snippet(item), "reason": "allow_list"}
)
continue
valid.append(item)
if max_items is not None and len(valid) > max_items:
for item in valid[max_items:]:
quarantined.append(
@@ -469,6 +560,7 @@ def _resilient_report(
raw_output: Any,
original_error: str,
prompt_hash: str | None,
allow_list: set[str] | None = None,
) -> InstructionResult | None:
"""Recover a partial-but-usable report from output that failed validation.
@@ -481,7 +573,9 @@ def _resilient_report(
summary, items, quarantined = _recover_recommendations(raw_output)
if not items:
return None
valid, item_quarantine = _partition_items(items, item_schema, max_items)
valid, item_quarantine = _partition_items(
items, item_schema, max_items, allow_list=allow_list,
)
quarantined.extend(item_quarantine)
if not valid:
return None
@@ -528,6 +622,7 @@ def _execution_failure_report(instr: Any, error: str) -> dict[str, Any] | None:
def _validate_output(
raw_output: Any,
instr: Any,
allow_list: set[str] | None = None,
) -> tuple[list[TaskSpec], dict[str, Any] | None, str | None]:
"""Parse raw LLM output into TaskSpecs and optional report payload.
@@ -582,6 +677,28 @@ def _validate_output(
source_type="instruction",
source_id=instr.id,
))
# Happy-path producer guardrails (WP-0016-T04): the whole document already
# passed schema validation, so recommendations are schema-valid; still apply
# the count cap, structural caps, and reference allow-list, quarantining any
# offenders rather than emitting them. Report shape only changes when an item
# is actually quarantined.
if isinstance(report, dict) and isinstance(report.get("recommendations"), list):
item_schema, max_items = _report_contract(instr)
kept, quarantined = _partition_items(
report["recommendations"], item_schema, max_items,
run_schema=False, allow_list=allow_list,
)
if quarantined:
report = {
**report,
"recommendations": kept,
"status": "partial",
"partial": True,
"quarantined_count": len(quarantined),
"quarantined_items": quarantined[:_QUARANTINE_LIMIT],
}
return specs, report, None
except (json.JSONDecodeError, AttributeError, KeyError, TypeError) as exc:
return [], None, str(exc)