8.8 KiB
id, type, domain, repo, status, state_hub_workstream_id, split_from, split_part, depends_on, tasks, created
| id | type | domain | repo | status | state_hub_workstream_id | split_from | split_part | depends_on | tasks | created | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| custodian-WP-0003b | workplan | custodian | activity-core | done | b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 | custodian-WP-0003 | 2 of 3 |
|
|
2026-05-14 |
activity-core WP-0003b — Parser, Workflow Wiring, Triggers & Webhooks
Split from: custodian-WP-0003 (part 2 of 3)
Hub workstream: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5
Depends on: custodian-WP-0003a (model, rules, registry)
Next part: custodian-WP-0003c (context adapters, integration)
Architecture: ACT-ADR-001, ACT-ADR-002, ACT-ADR-003
Purpose
Phases 9 and 10 of the Event Bridge implementation. With the rules module, IssueSink, and event type registry in place (WP-0003a), this part wires everything into the workflow: markdown-format ActivityDefinition parser, sync command, rule/instruction pipeline in RunActivityWorkflow, one-off scheduled trigger type, and Gitea webhook receiver.
Prerequisites
All tasks from custodian-WP-0003a must be done:
src/activity_core/rules/module withevaluate_conditionandexecute_instructionIssueSinkABC +NullSink+IssueCoreRestSinkEventEnvelopemodel finalizedevent_typestable andsync-event-typescommand working
Build Order
Phase 9:
T44 (parser) → T45 (sync) → T46 (wire workflow)
T46 also needs: T36 (RuleEvaluator), T37 (InstructionExecutor), T39 (IssueSink) from 0003a
Phase 10 (parallel with phase 9):
T47 (model) → T48 (Temporal schedule)
T49, T50 (parallel — need T40, T41 from 0003a)
Phase 9 — ActivityDefinition as Markdown
T44: ActivityDefinition file parser
src/activity_core/definition_parser.py
Scans activity-definitions/*.md in the local repo and any registered domain
repos listed in ACTIVITY_DEFINITION_DIRS env var (colon-separated paths).
Parses:
- YAML frontmatter → trigger config, context_sources, governance, owner, status.
- Fenced
```ruleblocks →RuleDeflist (YAML body). - Fenced
```instructionblocks →InstructionDeflist (YAML body).
Returns ActivityDefinitionDef. Raises ParseError(file, line, message) on
malformed input — never silently ignores a broken definition.
T45: ActivityDefinition sync command
make sync-activity-definitions (also in worker.py startup sequence, before
the Temporal run loop).
Extend activity_definitions table: add rules_json (JSONB) and
instructions_json (JSONB) columns. On sync: parse all definition files,
upsert rows. Definitions absent from filesystem set status = 'inactive'
(soft delete — Temporal Schedules for those definitions are paused, not deleted).
Alembic migration for new columns included in this task.
T46: Wire rule/instruction evaluator into RunActivityWorkflow
Replace RunActivityWorkflow's current unconditional task-spawning logic:
load_activity_definition()
→ resolve_context() [calls each context_source adapter]
→ evaluate_rules() [RuleEvaluator — all rules, all-match]
→ execute_instructions() [InstructionExecutor — all passing pre-filters]
→ emit_tasks() [IssueSink.emit() for each TaskSpec]
→ log_run() [activity_runs row + task_spawn_log rows]
A rule that raises UnsafeExpression or any other error is skipped and logged
as rule_eval_error; other rules in the same run still execute. The run is not
failed by a single bad rule.
Phase 10 — One-off Scheduled Trigger and Webhook Receiver
T47: Add ScheduledTriggerConfig
src/activity_core/models.py
@dataclass
class ScheduledTriggerConfig:
at: datetime # UTC; warns if in the past
timezone: str = "UTC"
TriggerConfig = CronTriggerConfig | EventTriggerConfig | ScheduledTriggerConfig
Add to ActivityDefinition frontmatter schema:
trigger:
type: scheduled
at: "2026-09-01T09:00:00Z"
timezone: "Europe/Berlin"
T48: Implement one-off Temporal Schedule
src/activity_core/schedule_manager.py
async def upsert_schedule(client, defn: ActivityDefinition):
if isinstance(defn.trigger_config, ScheduledTriggerConfig):
# Create Schedule with remaining_actions=1
# Schedule ID: activity-schedule-{defn.id}-once
# start_at = defn.trigger_config.at
...
sync_schedules.py: detect ScheduledTriggerConfig, use one-off path.
Add cancel_scheduled(client, activity_id) for admin cancellation of pending
one-off definitions.
T49: Implement webhook receiver
src/activity_core/webhook_receiver.py
FastAPI APIRouter mounted at /webhooks/{source} in api.py.
POST /webhooks/gitea → validate X-Gitea-Signature (HMAC-SHA256)
POST /webhooks/github → validate X-Hub-Signature-256 (HMAC-SHA256)
Per-source normalisers convert raw webhook payload to EventEnvelope:
- Gitea
repositoryevent →gitea.repo.created - Gitea
pushevent →gitea.push - Gitea
issuesevent (action=closed) →gitea.issue.closed
After normalisation: validate type against event type registry; publish to
NATS subject activity.events. Discard unknown event types with HTTP 422
(not 500 — unknown type is not a server error).
Config: WEBHOOK_SECRET_GITEA and WEBHOOK_SECRET_GITHUB env vars.
T50: Write Gitea webhook event type definitions
Three files under event-types/:
gitea.repo.created.md— publisher: gitea/webhook; attrs: repo_full_name (string), repo_slug (string, derived from repo name), owner (string), html_url (string), created_at (datetime).gitea.push.md— publisher: gitea/webhook; attrs: repo_full_name (string), branch (string), pusher (string), commits_count (integer), compare_url (string), pushed_at (datetime).gitea.issue.closed.md— publisher: gitea/webhook; attrs: repo_full_name (string), issue_number (integer), issue_title (string), closer (string), closed_at (datetime).
Each includes a "Normaliser mapping" subsection in Consumer Notes showing the raw Gitea webhook field → EventEnvelope attribute mapping.
Completion Criteria for This Part
make sync-activity-definitionsloads definitions fromactivity-definitions/and upserts them (withrules_json,instructions_json) cleanlyRunActivityWorkflowuses the rule/instruction pipeline end-to-end (no unconditional task spawning)- One-off
scheduledtrigger type creates a Temporal Schedule withremaining_actions=1 POST /webhooks/giteavalidates HMAC, normalises payload, publishes to NATS- Three Gitea event type definitions committed and loaded by
sync-event-types
New Files Produced
| Path | Task |
|---|---|
src/activity_core/definition_parser.py |
T44 |
src/activity_core/webhook_receiver.py |
T49 |
event-types/gitea.repo.created.md |
T50 |
event-types/gitea.push.md |
T50 |
event-types/gitea.issue.closed.md |
T50 |
Modified Files
| Path | Task | Change |
|---|---|---|
src/activity_core/models.py |
T47 | ScheduledTriggerConfig, TriggerConfig union |
src/activity_core/workflows.py |
T46 | Rule/instruction pipeline replaces unconditional spawn |
src/activity_core/activities.py |
T46 | resolve_context calls adapter chain |
src/activity_core/schedule_manager.py |
T48 | One-off schedule path |
src/activity_core/sync_schedules.py |
T48 | Detect ScheduledTriggerConfig |
src/activity_core/api.py |
T49 | Mount webhook router |
src/activity_core/worker.py |
T45 | sync-activity-definitions at startup |
alembic/versions/ |
T45 | Migration: rules_json, instructions_json columns |
Makefile |
T45 | sync-activity-definitions target |
Change History
- v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 9–10).