Files
activity-core/src/activity_core/models.py
tegwick a83b117f60 feat(ACTIVITY-WP-0014): explicit run-miss recovery policies (T02, T04)
Set Temporal catchup_window on cron schedules so a fire missed during a
worker/Temporal outage is no longer silently dropped. Redefine misfire_policy
into three explicit modes — skip, catchup_all, catchup_latest — mapping to
(catchup_window, overlap) pairs; legacy catchup/compress aliased. Add
catchup_window_seconds override. Remove the ad-hoc upsert-time 1h backfill in
favour of native catchup. Apply catchup_latest to daily-statehub-wsjf-triage in
the Railiance runtime manifest and document run-miss policies in the runbook.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 14:15:45 +02:00

188 lines
8.0 KiB
Python

"""
Core domain models for activity-core.
"""
from __future__ import annotations
import json
from typing import Annotated, Any, Literal, Union
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, Field
# ── EventEnvelope (T40) ───────────────────────────────────────────────────────
class EventEnvelope(BaseModel):
"""Standard internal event envelope. All inbound events (NATS, webhook, cron)
are normalised into this shape before processing."""
id: str = Field(description="UUID v4 — stable unique ID for deduplication.")
type: str = Field(description="Dot-namespaced event type, e.g. 'org.repo.registered'.")
version: str = Field(default="1.0", description="Schema version string.")
timestamp: datetime = Field(description="When the event occurred (UTC).")
publisher: str = Field(description="Originating service, e.g. 'the-custodian/state-hub'.")
attributes: dict[str, Any] = Field(
default_factory=dict,
description="Event-specific attributes; structure varies by event type.",
)
@classmethod
def from_nats_message(cls, msg: Any) -> "EventEnvelope":
"""Decode a NATS JetStream message into an EventEnvelope."""
raw = json.loads(msg.data.decode())
return cls.model_validate(raw)
@classmethod
def from_webhook_payload(cls, source: str, payload: dict) -> "EventEnvelope":
"""Build an EventEnvelope from a raw webhook payload (pre-normalised)."""
return cls.model_validate(payload)
# ── Trigger configs ───────────────────────────────────────────────────────────
class CronTriggerConfig(BaseModel):
trigger_type: Literal["cron"] = "cron"
cron_expression: str = Field(
description="Standard 5-field cron expression, e.g. '0 9 * * 1-5'."
)
timezone: str = Field(default="UTC", description="IANA timezone name.")
jitter_seconds: int = Field(default=0, ge=0)
# Run-miss recovery behaviour (ACTIVITY-WP-0014). What happens when a fire is
# missed because the worker / Temporal was unavailable at trigger time:
# skip - run on trigger or skip; a missed fire is never recovered
# catchup_all - recover every fire missed during the outage window
# catchup_latest - recover only the most recent missed fire; do not accumulate
# Legacy aliases are accepted: catchup → catchup_all, compress → catchup_latest.
misfire_policy: Literal[
"skip", "catchup_all", "catchup_latest", "catchup", "compress"
] = Field(default="skip")
# Override the per-policy default catchup window (how far back Temporal will
# recover missed fires after an outage). None uses the policy default.
catchup_window_seconds: int | None = Field(default=None, ge=0)
class EventTriggerConfig(BaseModel):
trigger_type: Literal["event"] = "event"
event_type: str = Field(
description="Matches EventEnvelope.type. Router fires this activity on match."
)
filters: dict[str, Any] = Field(
default_factory=dict,
description="All filters must match EventEnvelope.attributes for routing.",
)
class ScheduledTriggerConfig(BaseModel):
"""One-off future trigger that fires once at a specified UTC datetime."""
trigger_type: Literal["scheduled"] = "scheduled"
at: datetime = Field(description="UTC datetime when the workflow should be triggered.")
timezone: str = Field(default="UTC", description="IANA timezone name (informational).")
TriggerConfig = Annotated[
Union[CronTriggerConfig, EventTriggerConfig, ScheduledTriggerConfig],
Field(discriminator="trigger_type"),
]
# ── Rules and instructions (T34) ──────────────────────────────────────────────
class ActionDef(BaseModel):
task_template: str = Field(description="Path to task template .md, relative to repo root.")
target_repo: str | None = Field(
default=None,
description="Attribute-access expression or literal repo slug.",
)
priority: str = Field(default="medium")
labels: list[str] = Field(default_factory=list)
due_in_days: int | None = Field(default=None)
class RuleDef(BaseModel):
id: str
for_each: str | None = Field(
default=None,
description="Optional event/context path to a list for per-item rule expansion.",
)
bind_as: str = Field(
default="item",
description="Context key used for each item when for_each is set.",
)
condition: str = Field(
default="",
description="Rule DSL expression; empty string means always true.",
)
action: ActionDef
class InstructionDef(BaseModel):
id: str
condition: str = Field(
default="",
description="Optional pre-filter using Rule DSL; empty means always execute.",
)
trusted_fields: list[str] = Field(
description="Allowlist of event/context fields that may appear in the prompt template.",
)
model: str = Field(description="LLM model identifier, e.g. 'claude-sonnet-4-6'.")
temperature: float | None = Field(default=None)
max_tokens: int | None = Field(default=None)
max_depth: int | None = Field(default=None)
model_params: dict[str, Any] = Field(default_factory=dict)
prompt: str = Field(description="Prompt template with {field.path} placeholders.")
output_schema: str = Field(description="Path to JSON Schema file for output validation.")
review_required: bool = Field(default=False)
report_sinks: list[dict[str, Any]] = Field(default_factory=list)
# ── Context sources ───────────────────────────────────────────────────────────
class ContextSource(BaseModel):
"""One external data source that the workflow queries to build the context snapshot."""
name: str = Field(
default="",
description="Logical name; referenced as 'context.<name>' in templates.",
)
type: str = Field(description="Source adapter type: 'repo-scoping' | 'state-hub' | etc.")
query: str = Field(default="", description="Named query to execute against the source.")
params: dict[str, Any] = Field(default_factory=dict)
bind_to: str = Field(default="", description="Context key to bind the result to.")
required: bool = Field(
default=False,
description="When true, resolver failures fail the activity run instead of binding {}.",
)
# ── Task templates (legacy) ───────────────────────────────────────────────────
class TaskTemplate(BaseModel):
"""Legacy task template — ignored when ActivityDefinition.rules is non-empty."""
task_type: str
condition: str | None = None
params_template: dict[str, Any] = Field(default_factory=dict)
# ── ActivityDefinition ────────────────────────────────────────────────────────
class ActivityDefinition(BaseModel):
"""Versioned definition: trigger + context sources + rules/instructions."""
id: UUID
name: str
enabled: bool = True
trigger_config: TriggerConfig
context_sources: list[ContextSource] = Field(default_factory=list)
# New rule/instruction pipeline (T34)
rules: list[RuleDef] = Field(default_factory=list)
instructions: list[InstructionDef] = Field(default_factory=list)
# Legacy — ignored when rules is non-empty
task_templates: list[TaskTemplate] = Field(default_factory=list)
dedupe_key_strategy: Literal["skip", "catchup", "compress"] = Field(default="skip")
version: int = Field(default=1, ge=1)
status: str = Field(default="active")