Files
activity-core/src/activity_core/models.py
tegwick 6f9132314f Add project scaffold: contracts, schemas, docker-compose, workplans
Phase 0 contracts (event envelope, ActivityDefinition, idempotency doc,
naming conventions) and Phase 1 Temporal cluster setup (docker-compose.dev.yml,
Temporal dynamic config) are complete. Includes Pydantic models, JSON schemas,
wiki architecture docs, and ADR-001 workplan files for both workstreams.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-04 22:45:40 +01:00

159 lines
5.9 KiB
Python

"""
Core domain models for activity-core.
T01: EventEnvelope — standard envelope for all inbound and outbound events.
T02: ActivityDefinition — versioned definition of a trigger + context resolver + task templates.
"""
from __future__ import annotations
from typing import Annotated, Any, Literal, Union
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, Field
# ── T01: Event Envelope ────────────────────────────────────────────────────────
class EventEnvelope(BaseModel):
"""Standard internal event envelope. Every event, whether time-fired or
broker-delivered, is normalised into this shape before processing."""
event_id: str = Field(
description="Stable unique ID. Used for deduplication: if an event with "
"this ID has already been processed, the router skips it."
)
type: str = Field(description="Dot-namespaced event type, e.g. 'user.created'.")
source: str = Field(description="Originating service or component, e.g. 'user-service'.")
occurred_at: datetime = Field(description="When the event occurred (UTC).")
subject: str = Field(description="Primary resource affected, e.g. 'user/123'.")
trace_id: str = Field(description="Distributed tracing correlation ID.")
schema_version: str = Field(
default="1.0",
description="Schema version string for forward-compatibility.",
)
payload: dict[str, Any] = Field(
default_factory=dict,
description="Event-specific data; structure varies by event type.",
)
# ── T02: ActivityDefinition ────────────────────────────────────────────────────
class CronTriggerConfig(BaseModel):
trigger_type: Literal["cron"] = "cron"
cron_expression: str = Field(
description="Standard 5-field cron expression, e.g. '0 9 * * 1-5'."
)
timezone: str = Field(
default="UTC",
description="IANA timezone name, e.g. 'Europe/Berlin'.",
)
jitter_seconds: int = Field(
default=0,
ge=0,
description="Maximum random delay (seconds) added to each trigger to spread load.",
)
misfire_policy: Literal["skip", "catchup", "compress"] = Field(
default="skip",
description=(
"skip: ignore any missed runs. "
"catchup: replay missed runs up to a bounded limit. "
"compress: run once covering the full missed window."
),
)
class EventTriggerConfig(BaseModel):
trigger_type: Literal["event"] = "event"
event_type: str = Field(
description="Matches EventEnvelope.type. The router fires this activity "
"when an event with this type is received."
)
filters: dict[str, Any] = Field(
default_factory=dict,
description="Optional predicate filters applied to EventEnvelope.payload "
"before routing. All filters must match for the activity to fire.",
)
TriggerConfig = Annotated[
Union[CronTriggerConfig, EventTriggerConfig],
Field(discriminator="trigger_type"),
]
class ContextSource(BaseModel):
"""Describes one external data source that the workflow queries to build
the context snapshot passed to evaluate_templates."""
name: str = Field(
description="Logical name; referenced as 'context.<name>' in task templates."
)
type: str = Field(
description="Source adapter type: 'db_query' | 'http_get' | 'static'."
)
config: dict[str, Any] = Field(
default_factory=dict,
description="Source-specific configuration (SQL, URL, static value, etc.).",
)
class TaskTemplate(BaseModel):
"""Template for one task instance produced by RunActivityWorkflow.
evaluate_templates() expands each template against the context snapshot
to produce a concrete TaskInstance."""
task_type: str = Field(
description="Maps to a registered TaskExecutorWorkflow type, e.g. 'send_email'."
)
condition: str | None = Field(
default=None,
description=(
"Optional Python expression evaluated against the context snapshot. "
"Task is skipped if the expression is falsy. "
"Example: \"context['user']['is_active'] == True\""
),
)
params_template: dict[str, Any] = Field(
default_factory=dict,
description=(
"Parameter template. String values starting with '{context.' are "
"interpolated from the context snapshot at evaluation time."
),
)
class ActivityDefinition(BaseModel):
"""Versioned definition of a single activity: its trigger, context resolution
strategy, and the task templates it can spawn."""
id: UUID = Field(
description="Stable UUID. Used as the Temporal Schedule ID prefix "
"(f'activity-schedule-{id}') and as the workflow ID component."
)
name: str = Field(description="Human-readable name.")
enabled: bool = Field(
default=True,
description="When False the corresponding Temporal Schedule is paused "
"and event routing is suppressed.",
)
trigger_config: TriggerConfig = Field(
description="Cron or event trigger configuration."
)
context_sources: list[ContextSource] = Field(default_factory=list)
task_templates: list[TaskTemplate] = Field(default_factory=list)
dedupe_key_strategy: Literal["skip", "catchup", "compress"] = Field(
default="skip",
description="How to handle duplicate or missed trigger events. "
"Should match CronTriggerConfig.misfire_policy for cron activities.",
)
version: int = Field(
default=1,
ge=1,
description="Incremented on breaking schema changes. Stored in activity_runs "
"for audit purposes.",
)