Files
activity-core/workplans/custodian-WP-0003a-event-bridge-model-rules-registry.md
tegwick c3a256509b feat(event-bridge): WP-0003a — domain model, rules module, event type registry
Implements phases 7–8 of the Event Bridge architecture (custodian-WP-0003a).

Domain model (T34, T40):
- Added RuleDef, InstructionDef, ActionDef to models.py
- Updated ActivityDefinition with rules/instructions fields (task_templates deprecated)
- Formalized EventEnvelope: id, type, version, timestamp, publisher, attributes
- Added from_nats_message() and from_webhook_payload() classmethods

Rules module (T35, T36, T37):
- src/activity_core/rules/ skeleton with boundary enforcement
- evaluate_condition() — sandboxed AST walker, whitelisted nodes only, never exec()
- execute_instruction() — LLM task generation with trusted_fields injection guard
- tests/rules/test_boundary.py verifies no cross-boundary imports

Infrastructure (T38, T39):
- Alembic migrations 0004 (task_spawn_log) and 0005 (event_types)
- IssueSink ABC + IssueCoreRestSink (REST) + NullSink (testing)
- TaskSpawnLog and EventType ORM models

Event type registry (T41, T42, T43):
- event_type_registry.py: file scanner, parser, DB sync, in-process lookup
- ACTIVITY_CURATOR_GATE env var (disabled|required) + approve endpoint
- Three org event type definitions: org.repo.registered, org.workstream.completed,
  org.activity.run.completed

All 10 tests pass. Boundary test confirms rules/ isolation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 22:01:15 +02:00

13 KiB
Raw Permalink Blame History

id, type, domain, repo, status, state_hub_workstream_id, split_from, split_part, tasks, created
id type domain repo status state_hub_workstream_id split_from split_part tasks created
custodian-WP-0003a workplan custodian activity-core done b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 custodian-WP-0003 1 of 3
id title status priority state_hub_task_id
T34 Refactor ActivityDefinition model — add rules/instructions fields done high ca1bf66f-7094-459c-9abf-5f5c6414c91a
id title status priority state_hub_task_id
T35 Create src/activity_core/rules/ module skeleton done high 54c25eae-4fad-42d8-bb15-9c1e7532425e
id title status priority state_hub_task_id
T36 Implement RuleEvaluator (sandboxed AST walker) done high a1ed0d8b-df59-4af1-82a1-d01628919689
id title status priority state_hub_task_id
T37 Implement InstructionExecutor done high cdd349f1-b7ad-4a3a-afa0-ae671a7addb8
id title status priority state_hub_task_id
T38 Alembic migration — add task_spawn_log table done high 4cd9833c-fbc4-4b10-b6f2-85c028c8c557
id title status priority state_hub_task_id
T39 Implement IssueSink adapter interface and IssueCoreRestSink done high 38177fcf-c468-4424-9938-1b01038a386b
id title status priority state_hub_task_id
T40 Formalize EventEnvelope model done high 65d77939-6972-4450-993e-23ccb25d9454
id title status priority state_hub_task_id
T41 Event type registry — file scanner, parser, DB model, sync command done high 7a182265-4013-4272-8540-cfb4e2079eb3
id title status priority state_hub_task_id
T42 Curator gate configuration done medium 229d99ca-2d09-4c96-b3d2-da8b2d14c5b7
id title status priority state_hub_task_id
T43 Write first event type definitions done medium 78b9d642-17b1-46c7-8e5f-c0a948821993
2026-05-14

activity-core WP-0003a — Domain Model, Rules Infrastructure & Event Type Registry

Split from: custodian-WP-0003 (part 1 of 3) Hub workstream: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 Next part: custodian-WP-0003b (parser, workflow wiring, triggers, webhooks) Architecture: ACT-ADR-001, ACT-ADR-002, ACT-ADR-003

Purpose

Phases 7 and 8 of the Event Bridge implementation: establish the domain model with rules/instructions fields, build the sandboxed rule evaluator, implement the instruction executor with prompt injection protection, add the spawn audit log, implement the IssueSink adapter, formalize EventEnvelope, build the event type registry, and write the first event type definition files.

This part produces the foundational building blocks that WP-0003b and WP-0003c depend on.

Build Order

Phase 7 (parallel tracks):
  T34 (model) → T35 (rules/ skeleton) → T36, T37 (parallel)
  T38 (migration — independent)
  T39 (IssueSink — independent)
  T40 (EventEnvelope — independent)

Phase 8 (needs T40):
  T40 → T41 (registry) → T42, T43 (parallel)

Phase 7 — Domain Model & Rules Infrastructure

T34: Refactor ActivityDefinition model — add rules/instructions fields

src/activity_core/models.py

Add RuleDef and InstructionDef dataclasses and update ActivityDefinition:

@dataclass
class ActionDef:
    task_template: str        # path relative to repo root
    target_repo: str | None   # attribute-access expression or literal
    priority: str = "medium"
    labels: list[str] = field(default_factory=list)
    due_in_days: int | None = None

@dataclass
class RuleDef:
    id: str
    condition: str            # expression string; empty = always true
    action: ActionDef

@dataclass
class InstructionDef:
    id: str
    condition: str            # optional pre-filter (Rule DSL)
    trusted_fields: list[str]
    model: str
    prompt: str
    output_schema: str        # path to JSON Schema file
    review_required: bool = False

ActivityDefinition gains rules: list[RuleDef] and instructions: list[InstructionDef]. The existing task_templates field is deprecated and ignored if rules is non-empty.


T35: Create src/activity_core/rules/ module skeleton

src/activity_core/rules/
  __init__.py        # exports: evaluate_condition, execute_instruction
  models.py          # RuleDef, InstructionDef, TaskSpec, TaskRef (no upstream imports)
  evaluator.py       # RuleEvaluator stub
  executor.py        # InstructionExecutor stub

Boundary contract: nothing inside rules/ may import from temporalio, sqlalchemy, fastapi, or any activity_core.* module outside rules/. A CI test (tests/rules/test_boundary.py) verifies this by inspecting imports.


T36: Implement RuleEvaluator (sandboxed AST walker)

src/activity_core/rules/evaluator.py

def evaluate_condition(
    expr: str,
    event: EventEnvelope,
    context: dict,
) -> bool:
    """
    Evaluates a Rule condition expression safely.
    Raises UnsafeExpression at parse time if any non-whitelisted AST node
    is found. Never calls exec() or eval().
    """

Whitelisted AST node types: Expression, BoolOp, And, Or, UnaryOp, Not, Compare, Eq, NotEq, Lt, LtE, Gt, GtE, In, NotIn, Name, Attribute, Constant, Call (only len), List, Tuple. IfExp — forbidden, raise UnsafeExpression.

Attribute resolution: event.attributes.repo_slug walks the EventEnvelope object. context.repo_profile.tags walks the context dict. Missing attributes return None rather than raising — this lets rules use is None checks without crashing on optional fields.

Empty expression string → returns True.


T37: Implement InstructionExecutor

src/activity_core/rules/executor.py

def execute_instruction(
    instr: InstructionDef,
    event: EventEnvelope,
    context: dict,
    llm_client,           # injected; implements llm-connect interface
) -> list[TaskSpec]:
    """
    Evaluates an Instruction. Returns [] on any failure; never raises.
    """

Steps:

  1. If instr.condition is non-empty, call evaluate_condition — return [] if false.
  2. Render prompt: scan for {field.path} placeholders; resolve each against trusted_fields allowlist. Raise UntrustedFieldError (caught by caller, logged, returns []) if any placeholder is not listed. Fields of type object in the event schema are always rejected even if listed.
  3. Call llm_client.complete(prompt, model=instr.model) with structured output.
  4. Validate response against instr.output_schema (JSON Schema). On failure: append schema error to prompt, retry once.
  5. If second attempt fails: log instruction_output_error, return [].
  6. Return validated list[TaskSpec].

T38: Alembic migration — add task_spawn_log table

New migration file in alembic/versions/. Table definition:

CREATE TABLE task_spawn_log (
    id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    activity_def_id     UUID NOT NULL REFERENCES activity_definitions(id),
    source_type         VARCHAR(20) NOT NULL,  -- 'rule' | 'instruction'
    source_id           TEXT NOT NULL,
    source_version      TEXT NOT NULL,
    triggering_event_id TEXT NOT NULL,
    task_ref            TEXT,                  -- issue-core external ref
    condition_matched   TEXT,
    prompt_hash         CHAR(64),              -- SHA-256, nullable
    model               TEXT,
    output_validated    BOOLEAN,
    review_required     BOOLEAN,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_spawn_log_event ON task_spawn_log(triggering_event_id);
CREATE INDEX idx_spawn_log_def   ON task_spawn_log(activity_def_id);

T39: Implement IssueSink adapter interface and IssueCoreRestSink

src/activity_core/issue_sink.py

class IssueSink(ABC):
    @abstractmethod
    def emit(self, task_spec: TaskSpec) -> TaskRef: ...

class IssueCoreRestSink(IssueSink):
    """POSTs to issue-core REST API. Config: ISSUE_CORE_URL env var."""

class NullSink(IssueSink):
    """Discards tasks and returns synthetic TaskRefs. For testing."""

IssueCoreRestSink.emit():

  1. POST {ISSUE_CORE_URL}/issues/ with task_spec serialised as issue fields.
  2. Parse response → TaskRef(external_id, backend_url).
  3. Write task_spawn_log row (via injected db session).
  4. Return TaskRef.

Active sink is selected by ISSUE_SINK_TYPE env var: rest (default) | null.


Phase 8 — Event Type Registry

T40: Formalize EventEnvelope model

Update src/activity_core/models.py:

@dataclass
class EventEnvelope:
    id: str                        # UUID v4
    type: str                      # e.g. "org.repo.registered"
    version: str                   # e.g. "1.0"
    timestamp: datetime
    publisher: str                 # e.g. "the-custodian/state-hub"
    attributes: dict[str, Any]    # typed per event type schema

    @classmethod
    def from_nats_message(cls, msg: nats.aio.msg.Msg) -> "EventEnvelope": ...

    @classmethod
    def from_webhook_payload(cls, source: str, payload: dict) -> "EventEnvelope": ...

Update event_router.py to produce compliant envelopes. Existing NATS message format may require a migration shim if the current format omits id, version, or publisher.


T41: Event type registry — file scanner, parser, DB model, sync command

File scanner: glob event-types/*.md; parse YAML frontmatter block. Attribute schema: parse the ## Attributes markdown table into a dict of {name: {type, required, description}}. Types map to Python annotations for validation.

DB model (event_types table — new Alembic migration):

CREATE TABLE event_types (
    type_id         TEXT PRIMARY KEY,   -- e.g. "org.repo.registered"
    version         TEXT NOT NULL,
    publisher       TEXT NOT NULL,
    governance      TEXT NOT NULL DEFAULT 'publisher-declared',
    status          TEXT NOT NULL DEFAULT 'active',
    attribute_schema JSONB NOT NULL,
    raw_md          TEXT NOT NULL,
    synced_at       TIMESTAMPTZ NOT NULL DEFAULT now()
);

Sync command: make sync-event-types (also called at worker startup).

Registry lookup: get_event_type(type_id: str) -> EventTypeDef | None — used by Event Router and webhook receiver to validate incoming events.


T42: Curator gate configuration

ACTIVITY_CURATOR_GATE env var: disabled (default) | required.

  • disabled: all event types with status IN ('active', 'pending') are accepted by the Event Router. pending types are logged as a warning.
  • required: only status = 'active' types are accepted. Events of type pending are discarded with a curator_gate_rejected log entry.

Curator approval endpoint: POST /event-types/{type_id}/approve sets status = 'active'. Requires admin auth (same as existing API auth).


T43: Write first event type definitions

Three files under event-types/:

  • org.repo.registered.md — publisher: state-hub; attrs: repo_slug (string, required), domain (string, required), tags (string[], optional), registered_at (datetime, required).
  • org.workstream.completed.md — publisher: state-hub; attrs: workstream_id (uuid, required), workstream_slug (string, required), domain (string, required), completed_at (datetime, required).
  • org.activity.run.completed.md — publisher: activity-core; attrs: activity_definition_id (uuid, required), run_id (uuid, required), tasks_spawned (integer, required), completed_at (datetime, required).

Each file follows the full ACT-ADR-002 structure: Intent, When Published, Attributes table, Example Payload, Consumer Notes, Debugging.


Completion Criteria for This Part

  1. src/activity_core/rules/ module importable with no upstream deps (test_boundary.py passes)
  2. evaluate_condition handles all whitelisted operators and rejects all forbidden constructs
  3. IssueSink ABC and IssueCoreRestSink / NullSink implemented
  4. Alembic migrations for task_spawn_log and event_types tables apply cleanly
  5. make sync-event-types loads all three org event type definitions
  6. EventEnvelope model updated and from_nats_message works with existing event_router

New Files Produced

Path Task
src/activity_core/rules/__init__.py T35
src/activity_core/rules/models.py T35
src/activity_core/rules/evaluator.py T36
src/activity_core/rules/executor.py T37
src/activity_core/issue_sink.py T39
event-types/org.repo.registered.md T43
event-types/org.workstream.completed.md T43
event-types/org.activity.run.completed.md T43

Modified Files

Path Task Change
src/activity_core/models.py T34, T40 RuleDef, InstructionDef, EventEnvelope update
src/activity_core/event_router.py T40 Produce compliant EventEnvelope
alembic/versions/ T38, T41 Two new migrations
src/activity_core/api.py T42 Curator approve endpoint
src/activity_core/worker.py T41 sync-event-types at startup
Makefile T41 sync-event-types target

Change History

  • v1.0 (2026-05-14): Split from custodian-WP-0003 (phases 78).