From 176867cbe30dea725b5782eb6d29e8a6ff0a9361 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 14 May 2026 23:02:33 +0200 Subject: [PATCH] feat(WP-0003b): parser, workflow wiring, triggers, webhooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T44: ActivityDefinition markdown file parser (definition_parser.py) - Scans activity-definitions/*.md and ACTIVITY_DEFINITION_DIRS paths - Parses YAML frontmatter + fenced rule/instruction blocks - Raises ParseError on any malformed file — never silently skips T45: ActivityDefinition sync command - Migration 0006: adds rules_json/instructions_json JSONB columns - sync_activity_definitions.py + make sync-activity-definitions - Called at worker startup before schedule sync T46: Rule/instruction pipeline wired into RunActivityWorkflow - New evaluate_rules and emit_tasks Temporal activities - Workflow passes event_envelope_json to enable rule evaluation - EventRouter now passes full envelope JSON as 4th workflow arg - IssueSink.emit() writes task_spawn_log rows per task T47: ScheduledTriggerConfig model (one-off future datetime trigger) T48: One-off Temporal Schedule support - Fixed timezone_name → time_zone_name (was causing all schedule tests to fail) - Added ScheduleCalendarSpec-based one-off schedule with remaining_actions=1 - cancel_scheduled() for admin cancellation - Fixed backfill() call to use *args unpacking (not list wrapper) - Fixed ScheduleAlreadyRunningError catch in upsert_schedule - sync_schedules now handles ScheduledTriggerConfig definitions T49: Webhook receiver - POST /webhooks/gitea — HMAC-SHA256 via X-Gitea-Signature-256 - POST /webhooks/github — HMAC-SHA256 via X-Hub-Signature-256 - Normalisers: repo.created, push, issue.closed → EventEnvelope - Publishes to NATS activity.{type} subject after registry validation - Mounted in api.py at /webhooks prefix T50: Gitea event type definitions - gitea.repo.created.md, gitea.push.md, gitea.issue.closed.md - Each includes normaliser field mapping in Consumer Notes Tests: 18 passed, 1 skipped (integration). Fixed embedded Temporal server visibility latency in test_upsert_schedule_creates_schedule. Co-Authored-By: Claude Sonnet 4.6 --- Makefile | 3 + event-types/gitea.issue.closed.md | 69 +++++ event-types/gitea.push.md | 81 ++++++ event-types/gitea.repo.created.md | 69 +++++ ...es_instructions_to_activity_definitions.py | 31 +++ src/activity_core/activities.py | 106 +++++++- src/activity_core/api.py | 2 + src/activity_core/definition_parser.py | 192 ++++++++++++++ src/activity_core/event_router.py | 7 +- src/activity_core/models.py | 10 +- src/activity_core/orm.py | 2 + src/activity_core/schedule_manager.py | 104 ++++++-- .../sync_activity_definitions.py | 113 ++++++++ src/activity_core/sync_schedules.py | 8 +- src/activity_core/webhook_receiver.py | 246 ++++++++++++++++++ src/activity_core/worker.py | 15 +- src/activity_core/workflows.py | 91 ++++--- tests/test_schedule_lifecycle.py | 12 +- 18 files changed, 1106 insertions(+), 55 deletions(-) create mode 100644 event-types/gitea.issue.closed.md create mode 100644 event-types/gitea.push.md create mode 100644 event-types/gitea.repo.created.md create mode 100644 migrations/versions/0006_add_rules_instructions_to_activity_definitions.py create mode 100644 src/activity_core/definition_parser.py create mode 100644 src/activity_core/sync_activity_definitions.py create mode 100644 src/activity_core/webhook_receiver.py diff --git a/Makefile b/Makefile index 3ee7247..f32880a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ .PHONY: sync-event-types sync-activity-definitions test +sync-activity-definitions: + uv run python -m activity_core.sync_activity_definitions + sync-event-types: uv run python -c " import asyncio, os diff --git a/event-types/gitea.issue.closed.md b/event-types/gitea.issue.closed.md new file mode 100644 index 0000000..04a426e --- /dev/null +++ b/event-types/gitea.issue.closed.md @@ -0,0 +1,69 @@ +--- +type_id: gitea.issue.closed +version: "1.0" +publisher: gitea/webhook +governance: publisher-declared +status: active +--- + +# gitea.issue.closed + +## Intent + +Fired when an issue is closed in a Gitea repository. Received via the +`issues` webhook event (action=closed). + +## When Published + +- Gitea webhook fires `issues` event with `action: closed` +- The activity-core webhook receiver normalises it and publishes to NATS + +## Attributes + +| Name | Type | Required | Description | +|------|------|----------|-------------| +| repo_full_name | string | yes | Full repository name (owner/repo) | +| issue_number | integer | yes | Issue number in the repository | +| issue_title | string | yes | Title of the closed issue | +| closer | string | yes | Username of the user who closed the issue | +| closed_at | datetime | yes | ISO-8601 timestamp when the issue was closed | + +## Example Payload + +```json +{ + "id": "c3d4e5f6-...", + "type": "gitea.issue.closed", + "version": "1.0", + "timestamp": "2026-05-14T14:00:00Z", + "publisher": "gitea/webhook", + "attributes": { + "repo_full_name": "the-custodian/activity-core", + "issue_number": 42, + "issue_title": "Implement webhook receiver", + "closer": "bernd", + "closed_at": "2026-05-14T14:00:00Z" + } +} +``` + +## Consumer Notes + +**Normaliser mapping** (raw Gitea webhook → EventEnvelope attribute): + +| Gitea field | EventEnvelope attribute | +|-------------|------------------------| +| `repository.full_name` | `repo_full_name` | +| `issue.number` | `issue_number` | +| `issue.title` | `issue_title` | +| `sender.login` | `closer` | +| (reception time) | `closed_at` | + +Only fired when `action = "closed"`. Other issue actions (opened, edited, +assigned, etc.) are not normalised and return HTTP 200 with `status: ignored`. + +## Debugging + +- Check NATS subject `activity.gitea.issue.closed` +- Webhook signature validated via `X-Gitea-Signature-256` and `WEBHOOK_SECRET_GITEA` +- Registry lookup: `get_event_type("gitea.issue.closed")` diff --git a/event-types/gitea.push.md b/event-types/gitea.push.md new file mode 100644 index 0000000..999d5d9 --- /dev/null +++ b/event-types/gitea.push.md @@ -0,0 +1,81 @@ +--- +type_id: gitea.push +version: "1.0" +publisher: gitea/webhook +governance: publisher-declared +status: active +--- + +# gitea.push + +## Intent + +Fired when commits are pushed to a Gitea repository branch. Received via +the `push` webhook event. Also emitted for equivalent GitHub push events. + +## When Published + +- Gitea webhook fires `push` event on any branch push +- GitHub webhook fires `push` event (normalised to same envelope) +- The activity-core webhook receiver normalises it and publishes to NATS + +## Attributes + +| Name | Type | Required | Description | +|------|------|----------|-------------| +| repo_full_name | string | yes | Full repository name (owner/repo) | +| branch | string | yes | Branch name (ref with `refs/heads/` stripped) | +| pusher | string | yes | Username of the user who pushed | +| commits_count | integer | yes | Number of commits in the push | +| compare_url | string | yes | URL comparing before..after for this push | +| pushed_at | datetime | yes | ISO-8601 timestamp when the push was received | + +## Example Payload + +```json +{ + "id": "b2c3d4e5-...", + "type": "gitea.push", + "version": "1.0", + "timestamp": "2026-05-14T11:30:00Z", + "publisher": "gitea/webhook", + "attributes": { + "repo_full_name": "the-custodian/activity-core", + "branch": "main", + "pusher": "bernd", + "commits_count": 3, + "compare_url": "https://gitea.example.com/the-custodian/activity-core/compare/abc...def", + "pushed_at": "2026-05-14T11:30:00Z" + } +} +``` + +## Consumer Notes + +**Normaliser mapping — Gitea** (raw webhook → EventEnvelope attribute): + +| Gitea field | EventEnvelope attribute | +|-------------|------------------------| +| `repository.full_name` | `repo_full_name` | +| `ref` (stripped of `refs/heads/`) | `branch` | +| `pusher.login` | `pusher` | +| `len(commits)` | `commits_count` | +| `compare` | `compare_url` | +| (reception time) | `pushed_at` | + +**Normaliser mapping — GitHub** (same envelope, different source fields): + +| GitHub field | EventEnvelope attribute | +|--------------|------------------------| +| `repository.full_name` | `repo_full_name` | +| `ref` (stripped) | `branch` | +| `pusher.name` | `pusher` | +| `len(commits)` | `commits_count` | +| `compare` | `compare_url` | + +## Debugging + +- Check NATS subject `activity.gitea.push` +- Gitea: signature via `X-Gitea-Signature-256` and `WEBHOOK_SECRET_GITEA` +- GitHub: signature via `X-Hub-Signature-256` and `WEBHOOK_SECRET_GITHUB` +- Registry lookup: `get_event_type("gitea.push")` diff --git a/event-types/gitea.repo.created.md b/event-types/gitea.repo.created.md new file mode 100644 index 0000000..37df98e --- /dev/null +++ b/event-types/gitea.repo.created.md @@ -0,0 +1,69 @@ +--- +type_id: gitea.repo.created +version: "1.0" +publisher: gitea/webhook +governance: publisher-declared +status: active +--- + +# gitea.repo.created + +## Intent + +Fired when a new repository is created in the Gitea instance. Received via +the `repository` webhook event (action=created). + +## When Published + +- Gitea webhook fires `repository` event with `action: created` +- The activity-core webhook receiver normalises it and publishes to NATS + +## Attributes + +| Name | Type | Required | Description | +|------|------|----------|-------------| +| repo_full_name | string | yes | Full repository name (owner/repo) | +| repo_slug | string | yes | Repository short name (last segment of repo_full_name) | +| owner | string | yes | Gitea username of the repository owner | +| html_url | string | yes | Browser URL of the repository | +| created_at | datetime | yes | ISO-8601 timestamp when the repository was created | + +## Example Payload + +```json +{ + "id": "a1b2c3d4-...", + "type": "gitea.repo.created", + "version": "1.0", + "timestamp": "2026-05-14T10:00:00Z", + "publisher": "gitea/webhook", + "attributes": { + "repo_full_name": "the-custodian/activity-core", + "repo_slug": "activity-core", + "owner": "the-custodian", + "html_url": "https://gitea.example.com/the-custodian/activity-core", + "created_at": "2026-05-14T10:00:00Z" + } +} +``` + +## Consumer Notes + +**Normaliser mapping** (raw Gitea webhook → EventEnvelope attribute): + +| Gitea field | EventEnvelope attribute | +|-------------|------------------------| +| `repository.full_name` | `repo_full_name` | +| `repository.name` | `repo_slug` | +| `repository.owner.login` | `owner` | +| `repository.html_url` | `html_url` | +| `repository.created` | `created_at` | + +Only fired when `action = "created"`. Other repository actions (deleted, forked, +starred) are not normalised and return HTTP 200 with `status: ignored`. + +## Debugging + +- Check NATS subject `activity.gitea.repo.created` +- Webhook signature validated via `X-Gitea-Signature-256` and `WEBHOOK_SECRET_GITEA` +- Registry lookup: `get_event_type("gitea.repo.created")` diff --git a/migrations/versions/0006_add_rules_instructions_to_activity_definitions.py b/migrations/versions/0006_add_rules_instructions_to_activity_definitions.py new file mode 100644 index 0000000..1e3aabc --- /dev/null +++ b/migrations/versions/0006_add_rules_instructions_to_activity_definitions.py @@ -0,0 +1,31 @@ +"""Add rules_json and instructions_json to activity_definitions. + +Revision ID: 0006 +Revises: 0005 +""" +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB + +revision = "0006" +down_revision = "0005" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "activity_definitions", + sa.Column("rules_json", JSONB, nullable=False, server_default="[]"), + ) + op.add_column( + "activity_definitions", + sa.Column("instructions_json", JSONB, nullable=False, server_default="[]"), + ) + + +def downgrade() -> None: + op.drop_column("activity_definitions", "instructions_json") + op.drop_column("activity_definitions", "rules_json") diff --git a/src/activity_core/activities.py b/src/activity_core/activities.py index de9dfb5..11e31de 100644 --- a/src/activity_core/activities.py +++ b/src/activity_core/activities.py @@ -21,8 +21,10 @@ from temporalio import activity from temporalio.exceptions import ApplicationError from activity_core.db import make_engine +from activity_core.issue_sink import get_issue_sink from activity_core.orm import ActivityDefinition as ActivityDefinitionRow -from activity_core.orm import ActivityRun, TaskInstance +from activity_core.orm import ActivityRun, TaskInstance, TaskSpawnLog +from activity_core.rules import evaluate_condition _session_factory: async_sessionmaker[AsyncSession] | None = None @@ -79,6 +81,8 @@ async def load_activity_definition(activity_id: str) -> dict: "trigger_config": row.trigger_config, "context_sources": row.context_sources, "task_templates": row.task_templates, + "rules": row.rules_json, + "instructions": row.instructions_json, "dedupe_key_strategy": row.dedupe_key_strategy, "version": row.version, } @@ -200,3 +204,103 @@ async def persist_task_instance(task_payload: dict) -> str: await session.execute(stmt) return str(task_id) + + +@activity.defn +async def evaluate_rules(payload: dict) -> list[dict]: + """Evaluate each rule condition against the event and context. + + Returns the list of matching rule dicts (those whose condition is True). + Rules that raise UnsafeExpression or any other error are skipped and logged. + + Expected keys in payload: + rules list[dict] — RuleDef serialised dicts + event dict — EventEnvelope attributes (or empty for cron) + context dict — context snapshot from resolve_context + """ + from activity_core.rules.evaluator import UnsafeExpression + + rules = payload.get("rules", []) + event_attrs = payload.get("event", {}) + context = payload.get("context", {}) + + # Build a simple object whose attributes mirror event fields for the evaluator. + class _Env: + def __init__(self, attrs: dict) -> None: + self.attributes = _DictObj(attrs) + + class _DictObj: + def __init__(self, d: dict) -> None: + self.__dict__.update(d) + + event_obj = _Env(event_attrs) + + matched: list[dict] = [] + for rule in rules: + condition = rule.get("condition", "") + try: + if evaluate_condition(condition, event_obj, context): + matched.append(rule) + except UnsafeExpression as exc: + activity.logger.warning("rule %r unsafe expression — skipping: %s", rule.get("id"), exc) + except Exception as exc: + activity.logger.warning("rule %r eval error — skipping: %s", rule.get("id"), exc) + + return matched + + +@activity.defn +async def emit_tasks(payload: dict) -> list[str]: + """Emit TaskSpecs to IssueSink and write task_spawn_log rows. + + Returns list of external task ref IDs. + + Expected keys in payload: + task_specs list[dict] — from evaluate_rules matched actions + activity_id str — UUID of the ActivityDefinition + triggering_event_id str — event ID or workflow ID for cron + run_id str — UUID of the ActivityRun + """ + from activity_core.rules.models import TaskSpec + + task_specs_raw = payload.get("task_specs", []) + activity_id = payload.get("activity_id", "") + triggering_event_id = payload.get("triggering_event_id", "") + + sink = get_issue_sink() + Session = _get_session_factory() + + refs: list[str] = [] + async with Session() as session: + async with session.begin(): + for spec_dict in task_specs_raw: + spec = TaskSpec( + title=spec_dict.get("title", ""), + description=spec_dict.get("description", ""), + target_repo=spec_dict.get("target_repo"), + priority=spec_dict.get("priority", "medium"), + labels=spec_dict.get("labels", []), + due_in_days=spec_dict.get("due_in_days"), + source_type=spec_dict.get("source_type", "rule"), + source_id=spec_dict.get("source_id", ""), + triggering_event_id=triggering_event_id, + activity_definition_id=activity_id, + ) + try: + ref = sink.emit(spec) + refs.append(ref.external_id) + + log_row = TaskSpawnLog( + activity_def_id=uuid.UUID(activity_id), + source_type=spec.source_type, + source_id=spec.source_id, + source_version="1", + triggering_event_id=triggering_event_id, + task_ref=ref.external_id, + condition_matched=spec_dict.get("condition"), + ) + session.add(log_row) + except Exception as exc: + activity.logger.warning("emit_tasks: sink.emit failed — %s", exc) + + return refs diff --git a/src/activity_core/api.py b/src/activity_core/api.py index 722ab3f..e88cbb8 100644 --- a/src/activity_core/api.py +++ b/src/activity_core/api.py @@ -38,6 +38,7 @@ from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow from activity_core.schedule_manager import delete_schedule, upsert_schedule +from activity_core.webhook_receiver import router as webhook_router TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") @@ -72,6 +73,7 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] app = FastAPI(title="activity-core API", lifespan=lifespan) +app.include_router(webhook_router) def _get_db() -> async_sessionmaker[AsyncSession]: diff --git a/src/activity_core/definition_parser.py b/src/activity_core/definition_parser.py new file mode 100644 index 0000000..086fc90 --- /dev/null +++ b/src/activity_core/definition_parser.py @@ -0,0 +1,192 @@ +"""ActivityDefinition markdown file parser (T44). + +Scans activity-definitions/*.md in the local repo and any directories +listed in ACTIVITY_DEFINITION_DIRS (colon-separated). Returns parsed +ActivityDefinitionDef objects; raises ParseError on malformed input. + +Never silently ignores a broken definition file. +""" + +from __future__ import annotations + +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + + +class ParseError(Exception): + """Raised when a definition file cannot be parsed.""" + + def __init__(self, file: Path, line: int | None, message: str) -> None: + self.file = file + self.line = line + self.message = message + loc = f"{file}:{line}" if line else str(file) + super().__init__(f"{loc}: {message}") + + +@dataclass +class ActivityDefinitionDef: + """Parsed in-memory representation of an activity-definitions/*.md file.""" + + id: str + name: str + enabled: bool + trigger_config: dict[str, Any] + context_sources: list[dict[str, Any]] + rules: list[dict[str, Any]] + instructions: list[dict[str, Any]] + governance: str + owner: str + status: str + source_file: Path + + +_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL) +_FENCED_BLOCK_RE = re.compile(r"^```(\w+)\s*\n(.*?)\n```", re.DOTALL | re.MULTILINE) + + +def _scan_dirs() -> list[Path]: + dirs: list[Path] = [] + default_dir = Path("activity-definitions") + if default_dir.is_dir(): + dirs.append(default_dir) + extra = os.environ.get("ACTIVITY_DEFINITION_DIRS", "") + for part in extra.split(":"): + part = part.strip() + if not part: + continue + p = Path(part) / "activity-definitions" + if p.is_dir(): + dirs.append(p) + return dirs + + +def _parse_trigger(trigger_dict: dict, file: Path) -> dict[str, Any]: + """Normalise the trigger section into a trigger_config dict.""" + trigger_type = trigger_dict.get("type") + if trigger_type == "cron": + return { + "trigger_type": "cron", + "cron_expression": trigger_dict.get("cron_expression", ""), + "timezone": trigger_dict.get("timezone", "UTC"), + "jitter_seconds": trigger_dict.get("jitter_seconds", 0), + "misfire_policy": trigger_dict.get("misfire_policy", "skip"), + } + elif trigger_type == "event": + return { + "trigger_type": "event", + "event_type": trigger_dict.get("event_type", ""), + "filters": trigger_dict.get("filters", {}), + } + elif trigger_type == "scheduled": + at = trigger_dict.get("at") + if at is None: + raise ParseError(file, None, "trigger.at is required for type=scheduled") + at_str = at.isoformat() if hasattr(at, "isoformat") else str(at) + return { + "trigger_type": "scheduled", + "at": at_str, + "timezone": trigger_dict.get("timezone", "UTC"), + } + else: + raise ParseError(file, None, f"unknown trigger type {trigger_type!r}") + + +def parse_file(path: Path) -> ActivityDefinitionDef: + """Parse one ActivityDefinition markdown file. + + Raises ParseError on malformed or missing required fields. + """ + try: + text = path.read_text(encoding="utf-8") + except OSError as exc: + raise ParseError(path, None, f"cannot read file: {exc}") from exc + + fm_match = _FRONTMATTER_RE.match(text) + if not fm_match: + raise ParseError(path, 1, "missing or malformed YAML frontmatter") + + try: + fm = yaml.safe_load(fm_match.group(1)) + except yaml.YAMLError as exc: + raise ParseError(path, None, f"YAML parse error in frontmatter: {exc}") from exc + + if not isinstance(fm, dict): + raise ParseError(path, 1, "frontmatter must be a YAML mapping") + + for req in ("id", "name", "trigger"): + if req not in fm: + raise ParseError(path, None, f"missing required frontmatter field: {req!r}") + + trigger_section = fm["trigger"] + if not isinstance(trigger_section, dict): + raise ParseError(path, None, "trigger must be a YAML mapping") + + trigger_config = _parse_trigger(trigger_section, path) + + raw_sources = fm.get("context_sources", []) or [] + context_sources: list[dict[str, Any]] = [] + for cs in raw_sources: + if not isinstance(cs, dict): + raise ParseError(path, None, "each context_source must be a mapping") + context_sources.append(cs) + + # Parse fenced rule/instruction blocks from the body + body = text[fm_match.end():] + rules: list[dict[str, Any]] = [] + instructions: list[dict[str, Any]] = [] + + for block_match in _FENCED_BLOCK_RE.finditer(body): + lang = block_match.group(1).strip() + block_body = block_match.group(2) + try: + block_data = yaml.safe_load(block_body) + except yaml.YAMLError as exc: + raise ParseError(path, None, f"YAML parse error in {lang!r} block: {exc}") from exc + + if lang == "rule": + if not isinstance(block_data, dict): + raise ParseError(path, None, "rule block must be a YAML mapping") + if "id" not in block_data: + raise ParseError(path, None, "rule block missing required field 'id'") + if "action" not in block_data: + raise ParseError(path, None, f"rule {block_data['id']!r} missing 'action'") + rules.append(block_data) + elif lang == "instruction": + if not isinstance(block_data, dict): + raise ParseError(path, None, "instruction block must be a YAML mapping") + if "id" not in block_data: + raise ParseError(path, None, "instruction block missing required field 'id'") + instructions.append(block_data) + + return ActivityDefinitionDef( + id=str(fm["id"]), + name=str(fm["name"]), + enabled=bool(fm.get("enabled", True)), + trigger_config=trigger_config, + context_sources=context_sources, + rules=rules, + instructions=instructions, + governance=str(fm.get("governance", "publisher-declared")), + owner=str(fm.get("owner", "")), + status=str(fm.get("status", "active")), + source_file=path, + ) + + +def scan_and_parse() -> list[ActivityDefinitionDef]: + """Scan all definition directories and parse all .md files. + + Raises ParseError on any broken file — never silently skips. + """ + dirs = _scan_dirs() + defs: list[ActivityDefinitionDef] = [] + for d in dirs: + for path in sorted(d.glob("*.md")): + defs.append(parse_file(path)) + return defs diff --git a/src/activity_core/event_router.py b/src/activity_core/event_router.py index 1414ffc..b54aac7 100644 --- a/src/activity_core/event_router.py +++ b/src/activity_core/event_router.py @@ -129,7 +129,12 @@ class EventRouter: try: await self._temporal.start_workflow( "RunActivityWorkflow", - args=[activity_id, envelope.id, envelope.timestamp.isoformat()], + args=[ + activity_id, + envelope.id, + envelope.timestamp.isoformat(), + envelope.model_dump_json(), + ], id=workflow_id, task_queue=_ORCHESTRATOR_TASK_QUEUE, id_conflict_policy=WorkflowIDConflictPolicy.FAIL, diff --git a/src/activity_core/models.py b/src/activity_core/models.py index 3949bad..dc29e9f 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -63,8 +63,16 @@ class EventTriggerConfig(BaseModel): ) +class ScheduledTriggerConfig(BaseModel): + """One-off future trigger that fires once at a specified UTC datetime.""" + + trigger_type: Literal["scheduled"] = "scheduled" + at: datetime = Field(description="UTC datetime when the workflow should be triggered.") + timezone: str = Field(default="UTC", description="IANA timezone name (informational).") + + TriggerConfig = Annotated[ - Union[CronTriggerConfig, EventTriggerConfig], + Union[CronTriggerConfig, EventTriggerConfig, ScheduledTriggerConfig], Field(discriminator="trigger_type"), ] diff --git a/src/activity_core/orm.py b/src/activity_core/orm.py index 5c93c6b..6818a51 100644 --- a/src/activity_core/orm.py +++ b/src/activity_core/orm.py @@ -36,6 +36,8 @@ class ActivityDefinition(Base): trigger_config: Mapped[dict] = mapped_column(JSONB, nullable=False) context_sources: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) task_templates: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + rules_json: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) + instructions_json: Mapped[list] = mapped_column(JSONB, nullable=False, default=list) dedupe_key_strategy: Mapped[str] = mapped_column( Text, nullable=False, default="skip" ) diff --git a/src/activity_core/schedule_manager.py b/src/activity_core/schedule_manager.py index 3267a6e..4692680 100644 --- a/src/activity_core/schedule_manager.py +++ b/src/activity_core/schedule_manager.py @@ -16,10 +16,13 @@ from temporalio.client import ( Client, Schedule, ScheduleActionStartWorkflow, + ScheduleAlreadyRunningError, ScheduleBackfill, + ScheduleCalendarSpec, ScheduleHandle, ScheduleOverlapPolicy, SchedulePolicy, + ScheduleRange, ScheduleSpec, ScheduleState, ScheduleUpdate, @@ -27,7 +30,7 @@ from temporalio.client import ( ) from temporalio.service import RPCError -from activity_core.models import ActivityDefinition, CronTriggerConfig +from activity_core.models import ActivityDefinition, CronTriggerConfig, ScheduledTriggerConfig _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" @@ -68,7 +71,7 @@ def _build_schedule(defn: ActivityDefinition) -> Schedule: spec = ScheduleSpec( cron_expressions=[cfg.cron_expression], - timezone_name=cfg.timezone, + time_zone_name=cfg.timezone, jitter=timedelta(seconds=cfg.jitter_seconds) if cfg.jitter_seconds else None, ) @@ -78,19 +81,90 @@ def _build_schedule(defn: ActivityDefinition) -> Schedule: return Schedule(action=action, spec=spec, policy=policy, state=state) -async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleHandle: - """Create or update a Temporal Schedule for a cron ActivityDefinition. +def _onetime_schedule_id(activity_id: str | UUID) -> str: + return f"activity-schedule-{activity_id}-once" - - Only operates on definitions with trigger_type='cron'. + +def _build_onetime_schedule(defn: ActivityDefinition) -> tuple[str, Schedule]: + """Build a one-off Temporal Schedule that fires once at defn.trigger_config.at. + + Returns (schedule_id, Schedule). + Uses ScheduleState(remaining_actions=1) so the schedule self-disarms after firing. + """ + assert isinstance(defn.trigger_config, ScheduledTriggerConfig) + cfg: ScheduledTriggerConfig = defn.trigger_config + at = cfg.at + + action = ScheduleActionStartWorkflow( + "RunActivityWorkflow", + args=[str(defn.id), SCHEDULED_TRIGGER_KEY, at.isoformat(), None], + id=f"activity-{defn.id}:once", + task_queue=_ORCHESTRATOR_TASK_QUEUE, + ) + + # Calendar spec pinned to the exact minute — combined with remaining_actions=1 + # this fires exactly once at the specified time. + spec = ScheduleSpec( + calendars=[ + ScheduleCalendarSpec( + second=[ScheduleRange(0)], + minute=[ScheduleRange(at.minute)], + hour=[ScheduleRange(at.hour)], + day_of_month=[ScheduleRange(at.day)], + month=[ScheduleRange(at.month)], + year=[ScheduleRange(at.year)], + ) + ], + time_zone_name=cfg.timezone, + ) + + state = ScheduleState( + limited_actions=True, + remaining_actions=1, + paused=not defn.enabled, + ) + + sid = _onetime_schedule_id(defn.id) + return sid, Schedule(action=action, spec=spec, state=state) + + +async def cancel_scheduled(client: Client, activity_id: str | UUID) -> None: + """Delete the one-off Temporal Schedule for a ScheduledTriggerConfig definition. + + No-op if the schedule does not exist. + """ + handle = client.get_schedule_handle(_onetime_schedule_id(activity_id)) + try: + await handle.delete() + except RPCError: + pass + + +async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleHandle: + """Create or update a Temporal Schedule for a cron or scheduled ActivityDefinition. + + - For cron: creates/updates the recurring schedule. + - For scheduled: creates a one-off schedule (remaining_actions=1). - If enabled=False the schedule is created paused. - - For misfire_policy='catchup', triggers a backfill covering the last hour - after each upsert to replay any recently missed fires. + - For cron with misfire_policy='catchup', triggers a backfill covering the + last hour after each upsert to replay any recently missed fires. Returns the ScheduleHandle for the created/updated schedule. """ + if isinstance(defn.trigger_config, ScheduledTriggerConfig): + sid, sched = _build_onetime_schedule(defn) + try: + handle = await client.create_schedule(sid, sched) + except RPCError: + handle = client.get_schedule_handle(sid) + async def _updater_once(inp: ScheduleUpdateInput) -> ScheduleUpdate: # noqa: ARG001 + return ScheduleUpdate(schedule=sched) + await handle.update(_updater_once) + return handle + if not isinstance(defn.trigger_config, CronTriggerConfig): raise ValueError( - f"upsert_schedule requires trigger_type='cron', " + f"upsert_schedule requires trigger_type='cron' or 'scheduled', " f"got {defn.trigger_config.trigger_type!r}" ) @@ -99,7 +173,7 @@ async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleH try: handle = await client.create_schedule(sid, sched) - except RPCError: + except (RPCError, ScheduleAlreadyRunningError): # Schedule already exists — update it in place. handle = client.get_schedule_handle(sid) @@ -121,13 +195,11 @@ async def upsert_schedule(client: Client, defn: ActivityDefinition) -> ScheduleH now = datetime.now(tz=timezone.utc) backfill_start = now - timedelta(hours=1) await handle.backfill( - [ - ScheduleBackfill( - start_at=backfill_start, - end_at=now, - overlap=ScheduleOverlapPolicy.BUFFER_ALL, - ) - ] + ScheduleBackfill( + start_at=backfill_start, + end_at=now, + overlap=ScheduleOverlapPolicy.BUFFER_ALL, + ) ) return handle diff --git a/src/activity_core/sync_activity_definitions.py b/src/activity_core/sync_activity_definitions.py new file mode 100644 index 0000000..70f184b --- /dev/null +++ b/src/activity_core/sync_activity_definitions.py @@ -0,0 +1,113 @@ +"""ActivityDefinition file → DB sync (T45). + +Reads activity-definitions/*.md, upserts rows into activity_definitions. +Definitions absent from the filesystem are soft-deleted (status = 'inactive'). + +Run: + make sync-activity-definitions + ACTCORE_DB_URL=... uv run python -m activity_core.sync_activity_definitions + +Also called from worker.py at startup. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import uuid + +from sqlalchemy import select, text +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from activity_core.definition_parser import scan_and_parse +from activity_core.orm import ActivityDefinition as ActivityDefinitionRow + +logger = logging.getLogger(__name__) + +TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") + + +async def sync(session_factory: async_sessionmaker[AsyncSession]) -> int: + """Parse all definition files and upsert into the DB. + + Definitions absent from the filesystem are set to status='inactive'. + Returns count of upserted definitions. + """ + defs = scan_and_parse() + file_ids: set[str] = set() + + upserted = 0 + async with session_factory() as session: + async with session.begin(): + for d in defs: + file_ids.add(d.id) + stmt = ( + pg_insert(ActivityDefinitionRow) + .values( + id=uuid.UUID(d.id), + name=d.name, + enabled=d.enabled, + trigger_type=d.trigger_config["trigger_type"], + trigger_config=d.trigger_config, + context_sources=d.context_sources, + task_templates=[], + rules_json=d.rules, + instructions_json=d.instructions, + dedupe_key_strategy="skip", + version=1, + ) + .on_conflict_do_update( + index_elements=["id"], + set_={ + "name": d.name, + "enabled": d.enabled, + "trigger_type": d.trigger_config["trigger_type"], + "trigger_config": d.trigger_config, + "context_sources": d.context_sources, + "rules_json": d.rules, + "instructions_json": d.instructions, + }, + ) + ) + await session.execute(stmt) + upserted += 1 + logger.info("upserted activity definition %s (%s)", d.id, d.name) + + # Soft-delete definitions removed from filesystem + if file_ids: + id_list = [uuid.UUID(i) for i in file_ids] + await session.execute( + text( + "UPDATE activity_definitions SET enabled = false" + " WHERE id NOT IN :ids" + ).bindparams(ids=tuple(id_list)) + ) + else: + await session.execute( + text("UPDATE activity_definitions SET enabled = false") + ) + + logger.info("sync_activity_definitions: upserted %d definitions", upserted) + return upserted + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + db_url = os.environ.get("ACTCORE_DB_URL") + if not db_url: + raise RuntimeError("ACTCORE_DB_URL is required") + + engine = create_async_engine(db_url) + factory = async_sessionmaker(engine, expire_on_commit=False) + try: + n = await sync(factory) + print(f"Synced {n} activity definitions") + finally: + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/activity_core/sync_schedules.py b/src/activity_core/sync_schedules.py index d9f33ad..f97b781 100644 --- a/src/activity_core/sync_schedules.py +++ b/src/activity_core/sync_schedules.py @@ -20,7 +20,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from temporalio.client import Client -from activity_core.models import ActivityDefinition, CronTriggerConfig +from activity_core.models import ActivityDefinition, CronTriggerConfig, ScheduledTriggerConfig from activity_core.orm import ActivityDefinition as ActivityDefinitionRow from activity_core.schedule_manager import delete_schedule, list_schedules, upsert_schedule @@ -63,7 +63,7 @@ async def sync(client: Client, db_url: str) -> None: rows = ( await session.scalars( select(ActivityDefinitionRow).where( - ActivityDefinitionRow.trigger_type == "cron" + ActivityDefinitionRow.trigger_type.in_(["cron", "scheduled"]) ) ) ).all() @@ -76,8 +76,8 @@ async def sync(client: Client, db_url: str) -> None: for row in rows: defn = _row_to_domain(row) - if not isinstance(defn.trigger_config, CronTriggerConfig): - continue # should not happen given the WHERE clause, but guard anyway + if not isinstance(defn.trigger_config, (CronTriggerConfig, ScheduledTriggerConfig)): + continue db_activity_ids.add(str(defn.id)) diff --git a/src/activity_core/webhook_receiver.py b/src/activity_core/webhook_receiver.py new file mode 100644 index 0000000..88608cf --- /dev/null +++ b/src/activity_core/webhook_receiver.py @@ -0,0 +1,246 @@ +"""Webhook receiver for Gitea and GitHub events (T49). + +Mounted at /webhooks/{source} in api.py. + +Validates HMAC signatures, normalises payloads to EventEnvelope, validates +against the event type registry, and publishes to NATS subject activity.events. + +Config: + WEBHOOK_SECRET_GITEA — shared secret for Gitea HMAC-SHA256 + WEBHOOK_SECRET_GITHUB — shared secret for GitHub HMAC-SHA256 + NATS_URL — NATS server URL (default: nats://localhost:4222) +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import os +import uuid +from datetime import datetime, timezone +from typing import Any + +from fastapi import APIRouter, Header, HTTPException, Request + +from activity_core.models import EventEnvelope + +logger = logging.getLogger(__name__) + +WEBHOOK_SECRET_GITEA = os.environ.get("WEBHOOK_SECRET_GITEA", "") +WEBHOOK_SECRET_GITHUB = os.environ.get("WEBHOOK_SECRET_GITHUB", "") +NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222") + +router = APIRouter(prefix="/webhooks", tags=["webhooks"]) + + +def _verify_hmac(body: bytes, signature: str, secret: str) -> bool: + """Return True if signature matches HMAC-SHA256(body, secret).""" + if not secret: + return False + expected = "sha256=" + hmac.new( + secret.encode(), body, hashlib.sha256 + ).hexdigest() + return hmac.compare_digest(expected.encode(), signature.encode()) + + +# ── Gitea normalisers ───────────────────────────────────────────────────────── + +def _gitea_repo_created(payload: dict, now: datetime) -> EventEnvelope | None: + """Gitea repository event (action=created) → gitea.repo.created.""" + if payload.get("action") != "created": + return None + repo = payload.get("repository") or payload + name = repo.get("full_name", "") + slug = repo.get("name", name.split("/")[-1] if "/" in name else name) + return EventEnvelope( + id=str(uuid.uuid4()), + type="gitea.repo.created", + timestamp=now, + publisher="gitea/webhook", + attributes={ + "repo_full_name": name, + "repo_slug": slug, + "owner": (repo.get("owner") or {}).get("login", ""), + "html_url": repo.get("html_url", ""), + "created_at": repo.get("created", now.isoformat()), + }, + ) + + +def _gitea_push(payload: dict, now: datetime) -> EventEnvelope | None: + """Gitea push event → gitea.push.""" + repo = payload.get("repository") or {} + ref = payload.get("ref", "") + branch = ref.removeprefix("refs/heads/") if ref.startswith("refs/heads/") else ref + commits = payload.get("commits") or [] + return EventEnvelope( + id=str(uuid.uuid4()), + type="gitea.push", + timestamp=now, + publisher="gitea/webhook", + attributes={ + "repo_full_name": repo.get("full_name", ""), + "branch": branch, + "pusher": (payload.get("pusher") or {}).get("login", ""), + "commits_count": len(commits), + "compare_url": payload.get("compare", ""), + "pushed_at": now.isoformat(), + }, + ) + + +def _gitea_issue_closed(payload: dict, now: datetime) -> EventEnvelope | None: + """Gitea issues event (action=closed) → gitea.issue.closed.""" + if payload.get("action") != "closed": + return None + issue = payload.get("issue") or {} + repo = payload.get("repository") or {} + return EventEnvelope( + id=str(uuid.uuid4()), + type="gitea.issue.closed", + timestamp=now, + publisher="gitea/webhook", + attributes={ + "repo_full_name": repo.get("full_name", ""), + "issue_number": issue.get("number", 0), + "issue_title": issue.get("title", ""), + "closer": (payload.get("sender") or {}).get("login", ""), + "closed_at": now.isoformat(), + }, + ) + + +_GITEA_NORMALISERS: dict[str, Any] = { + "repository": _gitea_repo_created, + "push": _gitea_push, + "issues": _gitea_issue_closed, +} + +# ── GitHub normalisers ──────────────────────────────────────────────────────── + +def _github_push(payload: dict, now: datetime) -> EventEnvelope | None: + """GitHub push event → gitea.push (reuse same event type).""" + repo = payload.get("repository") or {} + ref = payload.get("ref", "") + branch = ref.removeprefix("refs/heads/") if ref.startswith("refs/heads/") else ref + commits = payload.get("commits") or [] + return EventEnvelope( + id=str(uuid.uuid4()), + type="gitea.push", + timestamp=now, + publisher="github/webhook", + attributes={ + "repo_full_name": repo.get("full_name", ""), + "branch": branch, + "pusher": (payload.get("pusher") or {}).get("name", ""), + "commits_count": len(commits), + "compare_url": payload.get("compare", ""), + "pushed_at": now.isoformat(), + }, + ) + + +_GITHUB_NORMALISERS: dict[str, Any] = { + "push": _github_push, +} + + +async def _publish_to_nats(envelope: EventEnvelope) -> None: + """Publish the normalised envelope to NATS subject activity.events.""" + try: + import nats + nc = await nats.connect(NATS_URL) + subject = f"activity.{envelope.type}" + await nc.publish(subject, envelope.model_dump_json().encode()) + await nc.flush() + await nc.close() + logger.info("published %r to NATS subject %r", envelope.id, subject) + except Exception as exc: + logger.error("failed to publish %r to NATS: %s", envelope.id, exc) + raise + + +# ── Route handlers ──────────────────────────────────────────────────────────── + +@router.post("/gitea") +async def receive_gitea( + request: Request, + x_gitea_event: str = Header(default=""), + x_gitea_signature_256: str = Header(default="", alias="X-Gitea-Signature-256"), +) -> dict[str, str]: + body = await request.body() + + if WEBHOOK_SECRET_GITEA: + if not _verify_hmac(body, x_gitea_signature_256, WEBHOOK_SECRET_GITEA): + raise HTTPException(status_code=401, detail="invalid signature") + + try: + payload = json.loads(body) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="invalid JSON body") + + now = datetime.now(tz=timezone.utc) + event_type = x_gitea_event.lower() + normaliser = _GITEA_NORMALISERS.get(event_type) + if normaliser is None: + raise HTTPException( + status_code=422, + detail=f"unsupported Gitea event type: {event_type!r}", + ) + + envelope = normaliser(payload, now) + if envelope is None: + return {"status": "ignored", "reason": "action not mapped"} + + from activity_core.event_type_registry import is_event_type_allowed + if not is_event_type_allowed(envelope.type): + raise HTTPException( + status_code=422, + detail=f"event type {envelope.type!r} not registered or not accepted", + ) + + await _publish_to_nats(envelope) + return {"status": "accepted", "event_id": envelope.id} + + +@router.post("/github") +async def receive_github( + request: Request, + x_github_event: str = Header(default="", alias="X-GitHub-Event"), + x_hub_signature_256: str = Header(default="", alias="X-Hub-Signature-256"), +) -> dict[str, str]: + body = await request.body() + + if WEBHOOK_SECRET_GITHUB: + if not _verify_hmac(body, x_hub_signature_256, WEBHOOK_SECRET_GITHUB): + raise HTTPException(status_code=401, detail="invalid signature") + + try: + payload = json.loads(body) + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="invalid JSON body") + + now = datetime.now(tz=timezone.utc) + event_type = x_github_event.lower() + normaliser = _GITHUB_NORMALISERS.get(event_type) + if normaliser is None: + raise HTTPException( + status_code=422, + detail=f"unsupported GitHub event type: {event_type!r}", + ) + + envelope = normaliser(payload, now) + if envelope is None: + return {"status": "ignored", "reason": "action not mapped"} + + from activity_core.event_type_registry import is_event_type_allowed + if not is_event_type_allowed(envelope.type): + raise HTTPException( + status_code=422, + detail=f"event type {envelope.type!r} not registered or not accepted", + ) + + await _publish_to_nats(envelope) + return {"status": "accepted", "event_id": envelope.id} diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index 461b325..58a62fd 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -32,12 +32,17 @@ from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import Worker from activity_core.activities import ( + emit_tasks, + evaluate_rules, init_session_factory, load_activity_definition, log_run, persist_task_instance, resolve_context, ) +from activity_core.db import make_engine +from sqlalchemy.ext.asyncio import async_sessionmaker +from activity_core.sync_activity_definitions import sync as sync_activity_defs from activity_core.sync_schedules import sync as sync_schedules from activity_core.workflows import RunActivityWorkflow, TaskExecutorWorkflow @@ -68,6 +73,14 @@ async def run() -> None: TEMPORAL_HOST, namespace=TEMPORAL_NAMESPACE, runtime=runtime ) + # T45: Sync ActivityDefinition files into DB before schedule sync. + logger.info("Syncing ActivityDefinition files...") + try: + session_factory = async_sessionmaker(make_engine(db_url), expire_on_commit=False) + await sync_activity_defs(session_factory) + except Exception: + logger.exception("activity definition sync failed — continuing worker startup") + # T23: Sync Temporal Schedules with the DB before workers start accepting tasks. logger.info("Syncing Temporal Schedules with ActivityDefinition DB...") try: @@ -79,7 +92,7 @@ async def run() -> None: client, task_queue=ORCHESTRATOR_TASK_QUEUE, workflows=[RunActivityWorkflow], - activities=[load_activity_definition, resolve_context, log_run], + activities=[load_activity_definition, resolve_context, log_run, evaluate_rules, emit_tasks], ) task_worker = Worker( diff --git a/src/activity_core/workflows.py b/src/activity_core/workflows.py index 5514885..0983b2c 100644 --- a/src/activity_core/workflows.py +++ b/src/activity_core/workflows.py @@ -19,12 +19,13 @@ from temporalio.common import RetryPolicy, SearchAttributeKey, TypedSearchAttrib with workflow.unsafe.imports_passed_through(): from activity_core.activities import ( + emit_tasks, + evaluate_rules, load_activity_definition, log_run, persist_task_instance, resolve_context, ) - from activity_core.template_engine import evaluate_templates from activity_core.schedule_manager import SCHEDULED_TRIGGER_KEY # T32: Custom search attributes for Temporal visibility (must be registered in Temporal first). @@ -50,9 +51,9 @@ class RunActivityWorkflow: Sequence: 1. load_activity_definition(activity_id) → defn dict 2. resolve_context(defn.context_sources) → context snapshot - 3. evaluate_templates(templates, context) → task specs (pure, no activity) - 4. log_run(...) → run_id - 5. start_child_workflow per task spec (fire-and-forget, detached) + 3. evaluate_rules(rules, event, context) → matching rules → TaskSpec dicts + 4. emit_tasks(task_specs) → TaskRef list via IssueSink + 5. log_run(...) → activity_runs row """ @workflow.run @@ -61,13 +62,14 @@ class RunActivityWorkflow: activity_id: str, trigger_key: str, scheduled_for: str | None = None, + event_envelope_json: str | None = None, ) -> dict: """ Args: - activity_id: UUID of the ActivityDefinition row. - trigger_key: ISO-8601 datetime (cron) or event_id (event trigger). - Used as the idempotency key component. - scheduled_for: ISO-8601 string of the nominal scheduled time (cron only). + activity_id: UUID of the ActivityDefinition row. + trigger_key: event_id (event trigger) or "scheduled" (cron). + scheduled_for: ISO-8601 nominal scheduled time (cron only). + event_envelope_json: JSON-serialised EventEnvelope (event trigger only). Returns: {"run_id": str, "tasks_spawned": int} @@ -98,21 +100,63 @@ class RunActivityWorkflow: retry_policy=_RETRY_POLICY, ) - # ── 3. Evaluate templates (pure — no activity) ──────────────────────── - task_specs: list[dict] = evaluate_templates( - defn["task_templates"], context_snapshot + # ── 3. Evaluate rules ───────────────────────────────────────────────── + import json as _json + event_attrs: dict = {} + if event_envelope_json: + try: + event_attrs = _json.loads(event_envelope_json).get("attributes", {}) + except Exception: + pass + + matched_rules: list[dict] = await workflow.execute_activity( + evaluate_rules, + { + "rules": defn.get("rules", []), + "event": event_attrs, + "context": context_snapshot, + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, ) - # ── 4. Log the run ──────────────────────────────────────────────────── - # run_id is derived deterministically so log_run retries are idempotent. - # For schedule-fired runs the trigger_key is the sentinel "scheduled"; - # each fire has a unique workflow_id (embeds ${firstScheduledTime}), so - # we use the workflow_id as the dedup key instead. + # Convert matched rules to TaskSpec dicts for emission. + task_spec_dicts: list[dict] = [] + for rule in matched_rules: + action = rule.get("action", {}) + task_spec_dicts.append({ + "title": action.get("task_template", rule.get("id", "")), + "description": "", + "target_repo": action.get("target_repo"), + "priority": action.get("priority", "medium"), + "labels": action.get("labels", []), + "due_in_days": action.get("due_in_days"), + "source_type": "rule", + "source_id": rule.get("id", ""), + "condition": rule.get("condition", ""), + }) + + # ── 4. Emit tasks via IssueSink ─────────────────────────────────────── if trigger_key == SCHEDULED_TRIGGER_KEY: dedup_source = workflow.info().workflow_id else: dedup_source = f"{activity_id}:{trigger_key}" run_id = str(uuid.uuid5(uuid.NAMESPACE_URL, dedup_source)) + + if task_spec_dicts: + await workflow.execute_activity( + emit_tasks, + { + "task_specs": task_spec_dicts, + "activity_id": activity_id, + "triggering_event_id": trigger_key, + "run_id": run_id, + }, + start_to_close_timeout=_ACTIVITY_TIMEOUT, + retry_policy=_RETRY_POLICY, + ) + + # ── 5. Log the run ──────────────────────────────────────────────────── await workflow.execute_activity( log_run, { @@ -120,25 +164,14 @@ class RunActivityWorkflow: "activity_id": activity_id, "scheduled_for": scheduled_for, "context_snapshot": context_snapshot, - "tasks_spawned": len(task_specs), + "tasks_spawned": len(task_spec_dicts), "version_used": defn["version"], }, start_to_close_timeout=_ACTIVITY_TIMEOUT, retry_policy=_RETRY_POLICY, ) - # ── 5. Spawn task executor children (fire-and-forget) ───────────────── - for index, spec in enumerate(task_specs): - child_id = f"task-{run_id}:{spec['task_type']}:{index}" - await workflow.start_child_workflow( - TaskExecutorWorkflow, - args=[run_id, spec["task_type"], spec["params"]], - id=child_id, - task_queue=_TASK_QUEUE, - parent_close_policy=workflow.ParentClosePolicy.ABANDON, - ) - - return {"run_id": run_id, "tasks_spawned": len(task_specs)} + return {"run_id": run_id, "tasks_spawned": len(task_spec_dicts)} @workflow.defn diff --git a/tests/test_schedule_lifecycle.py b/tests/test_schedule_lifecycle.py index d30be36..9069635 100644 --- a/tests/test_schedule_lifecycle.py +++ b/tests/test_schedule_lifecycle.py @@ -11,6 +11,7 @@ Run with: from __future__ import annotations +import asyncio import uuid import pytest @@ -61,8 +62,15 @@ async def test_upsert_schedule_creates_schedule(env: WorkflowEnvironment) -> Non await upsert_schedule(env.client, defn) - schedules = await list_schedules(env.client) - ids = [s["schedule_id"] for s in schedules] + # The embedded test server's visibility index is eventually consistent — + # wait briefly for the new schedule to appear in the listing. + ids: list[str] = [] + for _ in range(10): + schedules = await list_schedules(env.client) + ids = [s["schedule_id"] for s in schedules] + if sid in ids: + break + await asyncio.sleep(0.3) assert sid in ids, f"Expected schedule {sid!r} in {ids}" # Cleanup