Files
activity-core/workplans/custodian-WP-0003b-event-bridge-parser-workflow-triggers.md

8.8 KiB
Raw Permalink Blame History

id, type, domain, repo, status, state_hub_workstream_id, split_from, split_part, depends_on, tasks, created
id type domain repo status state_hub_workstream_id split_from split_part depends_on tasks created
custodian-WP-0003b workplan custodian activity-core done b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 custodian-WP-0003 2 of 3
custodian-WP-0003a
id title status priority state_hub_task_id
T44 ActivityDefinition file parser done high a86dcffe-789a-482e-bc5f-a5ac9db9608e
id title status priority state_hub_task_id
T45 ActivityDefinition sync command done medium a714c511-9f93-4594-b745-9330bc645384
id title status priority state_hub_task_id
T46 Wire rule/instruction evaluator into RunActivityWorkflow done high 615dec10-7e63-46b3-80ad-83ff2e51d6ba
id title status priority state_hub_task_id
T47 Add ScheduledTriggerConfig (one-off future datetime) done medium 1e06cb43-d0e3-4af5-9216-0877cac2082d
id title status priority state_hub_task_id
T48 Implement one-off Temporal Schedule done medium 01b507e3-6420-422f-83ed-90c7b4d49bb7
id title status priority state_hub_task_id
T49 Implement webhook receiver done medium 673ca7b2-49cc-4314-8be2-9848f686da37
id title status priority state_hub_task_id
T50 Write Gitea webhook event type definitions done medium 7ba289e9-fef6-469a-9504-c3664227a2f7
2026-05-14

activity-core WP-0003b — Parser, Workflow Wiring, Triggers & Webhooks

Split from: custodian-WP-0003 (part 2 of 3) Hub workstream: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 Depends on: custodian-WP-0003a (model, rules, registry) Next part: custodian-WP-0003c (context adapters, integration) Architecture: ACT-ADR-001, ACT-ADR-002, ACT-ADR-003

Purpose

Phases 9 and 10 of the Event Bridge implementation. With the rules module, IssueSink, and event type registry in place (WP-0003a), this part wires everything into the workflow: markdown-format ActivityDefinition parser, sync command, rule/instruction pipeline in RunActivityWorkflow, one-off scheduled trigger type, and Gitea webhook receiver.

Prerequisites

All tasks from custodian-WP-0003a must be done:

  • src/activity_core/rules/ module with evaluate_condition and execute_instruction
  • IssueSink ABC + NullSink + IssueCoreRestSink
  • EventEnvelope model finalized
  • event_types table and sync-event-types command working

Build Order

Phase 9:
  T44 (parser) → T45 (sync) → T46 (wire workflow)
  T46 also needs: T36 (RuleEvaluator), T37 (InstructionExecutor), T39 (IssueSink) from 0003a

Phase 10 (parallel with phase 9):
  T47 (model) → T48 (Temporal schedule)
  T49, T50 (parallel — need T40, T41 from 0003a)

Phase 9 — ActivityDefinition as Markdown

T44: ActivityDefinition file parser

src/activity_core/definition_parser.py

Scans activity-definitions/*.md in the local repo and any registered domain repos listed in ACTIVITY_DEFINITION_DIRS env var (colon-separated paths).

Parses:

  • YAML frontmatter → trigger config, context_sources, governance, owner, status.
  • Fenced ```rule blocks → RuleDef list (YAML body).
  • Fenced ```instruction blocks → InstructionDef list (YAML body).

Returns ActivityDefinitionDef. Raises ParseError(file, line, message) on malformed input — never silently ignores a broken definition.


T45: ActivityDefinition sync command

make sync-activity-definitions (also in worker.py startup sequence, before the Temporal run loop).

Extend activity_definitions table: add rules_json (JSONB) and instructions_json (JSONB) columns. On sync: parse all definition files, upsert rows. Definitions absent from filesystem set status = 'inactive' (soft delete — Temporal Schedules for those definitions are paused, not deleted).

Alembic migration for new columns included in this task.


T46: Wire rule/instruction evaluator into RunActivityWorkflow

Replace RunActivityWorkflow's current unconditional task-spawning logic:

load_activity_definition()
  → resolve_context()          [calls each context_source adapter]
  → evaluate_rules()           [RuleEvaluator — all rules, all-match]
  → execute_instructions()     [InstructionExecutor — all passing pre-filters]
  → emit_tasks()               [IssueSink.emit() for each TaskSpec]
  → log_run()                  [activity_runs row + task_spawn_log rows]

A rule that raises UnsafeExpression or any other error is skipped and logged as rule_eval_error; other rules in the same run still execute. The run is not failed by a single bad rule.


Phase 10 — One-off Scheduled Trigger and Webhook Receiver

T47: Add ScheduledTriggerConfig

src/activity_core/models.py

@dataclass
class ScheduledTriggerConfig:
    at: datetime                   # UTC; warns if in the past
    timezone: str = "UTC"

TriggerConfig = CronTriggerConfig | EventTriggerConfig | ScheduledTriggerConfig

Add to ActivityDefinition frontmatter schema:

trigger:
  type: scheduled
  at: "2026-09-01T09:00:00Z"
  timezone: "Europe/Berlin"

T48: Implement one-off Temporal Schedule

src/activity_core/schedule_manager.py

async def upsert_schedule(client, defn: ActivityDefinition):
    if isinstance(defn.trigger_config, ScheduledTriggerConfig):
        # Create Schedule with remaining_actions=1
        # Schedule ID: activity-schedule-{defn.id}-once
        # start_at = defn.trigger_config.at
        ...

sync_schedules.py: detect ScheduledTriggerConfig, use one-off path. Add cancel_scheduled(client, activity_id) for admin cancellation of pending one-off definitions.


T49: Implement webhook receiver

src/activity_core/webhook_receiver.py

FastAPI APIRouter mounted at /webhooks/{source} in api.py.

POST /webhooks/gitea   → validate X-Gitea-Signature (HMAC-SHA256)
POST /webhooks/github  → validate X-Hub-Signature-256 (HMAC-SHA256)

Per-source normalisers convert raw webhook payload to EventEnvelope:

  • Gitea repository event → gitea.repo.created
  • Gitea push event → gitea.push
  • Gitea issues event (action=closed) → gitea.issue.closed

After normalisation: validate type against event type registry; publish to NATS subject activity.events. Discard unknown event types with HTTP 422 (not 500 — unknown type is not a server error).

Config: WEBHOOK_SECRET_GITEA and WEBHOOK_SECRET_GITHUB env vars.


T50: Write Gitea webhook event type definitions

Three files under event-types/:

  • gitea.repo.created.md — publisher: gitea/webhook; attrs: repo_full_name (string), repo_slug (string, derived from repo name), owner (string), html_url (string), created_at (datetime).
  • gitea.push.md — publisher: gitea/webhook; attrs: repo_full_name (string), branch (string), pusher (string), commits_count (integer), compare_url (string), pushed_at (datetime).
  • gitea.issue.closed.md — publisher: gitea/webhook; attrs: repo_full_name (string), issue_number (integer), issue_title (string), closer (string), closed_at (datetime).

Each includes a "Normaliser mapping" subsection in Consumer Notes showing the raw Gitea webhook field → EventEnvelope attribute mapping.


Completion Criteria for This Part

  1. make sync-activity-definitions loads definitions from activity-definitions/ and upserts them (with rules_json, instructions_json) cleanly
  2. RunActivityWorkflow uses the rule/instruction pipeline end-to-end (no unconditional task spawning)
  3. One-off scheduled trigger type creates a Temporal Schedule with remaining_actions=1
  4. POST /webhooks/gitea validates HMAC, normalises payload, publishes to NATS
  5. Three Gitea event type definitions committed and loaded by sync-event-types

New Files Produced

Path Task
src/activity_core/definition_parser.py T44
src/activity_core/webhook_receiver.py T49
event-types/gitea.repo.created.md T50
event-types/gitea.push.md T50
event-types/gitea.issue.closed.md T50

Modified Files

Path Task Change
src/activity_core/models.py T47 ScheduledTriggerConfig, TriggerConfig union
src/activity_core/workflows.py T46 Rule/instruction pipeline replaces unconditional spawn
src/activity_core/activities.py T46 resolve_context calls adapter chain
src/activity_core/schedule_manager.py T48 One-off schedule path
src/activity_core/sync_schedules.py T48 Detect ScheduledTriggerConfig
src/activity_core/api.py T49 Mount webhook router
src/activity_core/worker.py T45 sync-activity-definitions at startup
alembic/versions/ T45 Migration: rules_json, instructions_json columns
Makefile T45 sync-activity-definitions target

Change History

  • v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 910).