feat(activities): resolve_context stub + evaluate_templates — T15/T16

activities.py — resolve_context (T15):
  - dispatches on source.type: 'static' returns config["value"]
  - 'http_get' / 'db_query' raise ApplicationError(non_retryable=True)
  - unknown types raise ApplicationError(non_retryable=True)

template_engine.py — evaluate_templates (T16, pure function):
  - evaluates optional condition expressions against context snapshot
    (restricted eval, no builtins)
  - interpolates {context.<name>.<key>} placeholders via str.format_map
  - returns list[{task_type, params}] with falsy-condition rows omitted

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 22:06:09 +00:00
parent 5e4dc6c946
commit bac3efee89
3 changed files with 130 additions and 5 deletions

View File

@@ -83,11 +83,35 @@ async def load_activity_definition(activity_id: str) -> dict:
@activity.defn
async def resolve_context(context_sources: list[dict]) -> dict:
"""Fetch and merge all context sources into a single snapshot dict.
"""Resolve each context source and merge into a snapshot dict.
Implemented in T15.
Returns: {source.name: resolved_value, ...}
Supported source types:
static — returns config["value"] directly
http_get — not yet implemented
db_query — not yet implemented
"""
raise NotImplementedError("T15")
snapshot: dict = {}
for source in context_sources:
name = source["name"]
source_type = source["type"]
config = source.get("config", {})
if source_type == "static":
snapshot[name] = config.get("value")
elif source_type in ("http_get", "db_query"):
raise ApplicationError(
f"Context source type {source_type!r} is not yet implemented",
non_retryable=True,
)
else:
raise ApplicationError(
f"Unknown context source type {source_type!r}",
non_retryable=True,
)
return snapshot
@activity.defn