diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3ee7247 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +.PHONY: sync-event-types sync-activity-definitions test + +sync-event-types: + uv run python -c " +import asyncio, os +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from activity_core.event_type_registry import sync_event_types + +async def main(): + db_url = os.environ.get('ACTCORE_DB_URL', 'postgresql+asyncpg://actcore:actcore@localhost:5433/actcore') + engine = create_async_engine(db_url) + factory = async_sessionmaker(engine, expire_on_commit=False) + n = await sync_event_types(factory) + print(f'Synced {n} event types') + await engine.dispose() + +asyncio.run(main()) +" + +test: + uv run pytest tests/ -v diff --git a/event-types/org.activity.run.completed.md b/event-types/org.activity.run.completed.md new file mode 100644 index 0000000..5c3624a --- /dev/null +++ b/event-types/org.activity.run.completed.md @@ -0,0 +1,59 @@ +--- +type_id: org.activity.run.completed +version: "1.0" +publisher: activity-core +governance: publisher-declared +status: active +--- + +# org.activity.run.completed + +## Intent + +Emitted by activity-core when a `RunActivityWorkflow` completes successfully. +Provides a signal that can be consumed by other activities or monitoring systems. + +## When Published + +activity-core emits this event at the end of `RunActivityWorkflow.run()` after +all task emissions and spawn log entries have been committed. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| activity_definition_id | uuid | yes | UUID of the ActivityDefinition that was run. | +| run_id | uuid | yes | UUID of the activity_runs row for this execution. | +| tasks_spawned | integer | yes | Number of tasks emitted via IssueSink in this run. | +| completed_at | datetime | yes | UTC timestamp when the workflow completed. | + +## Example Payload + +```json +{ + "id": "c3d4e5f6-a7b8-9012-cdef-012345678902", + "type": "org.activity.run.completed", + "version": "1.0", + "timestamp": "2026-05-14T09:05:00Z", + "publisher": "activity-core", + "attributes": { + "activity_definition_id": "d4e5f6a7-b8c9-0123-def0-123456789012", + "run_id": "e5f6a7b8-c9d0-1234-ef01-234567890123", + "tasks_spawned": 2, + "completed_at": "2026-05-14T09:05:00Z" + } +} +``` + +## Consumer Notes + +- **monitoring**: Use to track activity run frequency and task spawn rates. +- **activity-core (self)**: An activity can trigger a follow-up activity on + this event — be careful to avoid infinite loops by filtering on + `activity_definition_id`. + +## Debugging + +If this event is missing after a Temporal workflow appears to have completed, +check the workflow history in Temporal UI for exceptions in the publish step. +Verify NATS connectivity and stream availability. diff --git a/event-types/org.repo.registered.md b/event-types/org.repo.registered.md new file mode 100644 index 0000000..a95076d --- /dev/null +++ b/event-types/org.repo.registered.md @@ -0,0 +1,56 @@ +--- +type_id: org.repo.registered +version: "1.0" +publisher: the-custodian/state-hub +governance: publisher-declared +status: active +--- + +# org.repo.registered + +## Intent + +Emitted when a new repository is registered in the Custodian State Hub. Signals +that a repo has been added to org tracking and is ready for automated workflows. + +## When Published + +State-hub emits this event immediately after a successful `register_repo()` call. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| repo_slug | string | yes | Short identifier for the repo, e.g. "activity-core". | +| domain | string | yes | Domain the repo belongs to, e.g. "custodian". | +| tags | string[] | no | Optional capability tags assigned to the repo. | +| registered_at | datetime | yes | UTC timestamp when the repo was registered. | + +## Example Payload + +```json +{ + "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "type": "org.repo.registered", + "version": "1.0", + "timestamp": "2026-05-14T09:00:00Z", + "publisher": "the-custodian/state-hub", + "attributes": { + "repo_slug": "activity-core", + "domain": "custodian", + "tags": ["python", "temporal", "event-driven"], + "registered_at": "2026-05-14T09:00:00Z" + } +} +``` + +## Consumer Notes + +- **activity-core**: Use to trigger onboarding workflows for new repos (e.g. + initial SBOM scan, scope file creation). +- **repo-scoping**: Use to queue an initial scope profile generation. + +## Debugging + +Check state-hub progress events for `type=repo_registered` if this event is +missing. Verify NATS `activity.>` stream is receiving messages. diff --git a/event-types/org.workstream.completed.md b/event-types/org.workstream.completed.md new file mode 100644 index 0000000..63b6725 --- /dev/null +++ b/event-types/org.workstream.completed.md @@ -0,0 +1,58 @@ +--- +type_id: org.workstream.completed +version: "1.0" +publisher: the-custodian/state-hub +governance: publisher-declared +status: active +--- + +# org.workstream.completed + +## Intent + +Emitted when a workstream reaches `status=done` in the Custodian State Hub. +Signals that a planned body of work has been fully delivered. + +## When Published + +State-hub emits this event when `update_workstream_status(status="done")` is +called and the workstream transitions to done state. + +## Attributes + +| Name | Type | Required | Description | +|---|---|---|---| +| workstream_id | uuid | yes | State-hub UUID of the completed workstream. | +| workstream_slug | string | yes | Human-readable slug, e.g. "event-bridge". | +| domain | string | yes | Domain the workstream belongs to, e.g. "custodian". | +| completed_at | datetime | yes | UTC timestamp of completion. | + +## Example Payload + +```json +{ + "id": "b2c3d4e5-f6a7-8901-bcde-f12345678901", + "type": "org.workstream.completed", + "version": "1.0", + "timestamp": "2026-05-14T15:30:00Z", + "publisher": "the-custodian/state-hub", + "attributes": { + "workstream_id": "b4eb45a9-69e3-4ab0-b00c-67a53c3117c5", + "workstream_slug": "event-bridge", + "domain": "custodian", + "completed_at": "2026-05-14T15:30:00Z" + } +} +``` + +## Consumer Notes + +- **activity-core**: Use to trigger post-completion workflows such as SBOM + ingestion, documentation generation, or domain goal updates. +- **state-hub**: This event is self-referential — a completed workstream may + trigger further automation in other domains. + +## Debugging + +If this event is missing, check the workstream status in state-hub. The event +is only emitted on `status=done` transitions, not on intermediate updates. diff --git a/migrations/versions/0004_create_task_spawn_log.py b/migrations/versions/0004_create_task_spawn_log.py new file mode 100644 index 0000000..4d91098 --- /dev/null +++ b/migrations/versions/0004_create_task_spawn_log.py @@ -0,0 +1,66 @@ +"""create_task_spawn_log + +Revision ID: 0004 +Revises: 0003 +Create Date: 2026-05-14 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0004" +down_revision: Union[str, Sequence[str], None] = "0003" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "task_spawn_log", + sa.Column( + "id", + sa.UUID(), + nullable=False, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column("activity_def_id", sa.UUID(), nullable=False), + sa.Column("source_type", sa.String(20), nullable=False), + sa.Column("source_id", sa.Text(), nullable=False), + sa.Column("source_version", sa.Text(), nullable=False), + sa.Column("triggering_event_id", sa.Text(), nullable=False), + sa.Column("task_ref", sa.Text(), nullable=True), + sa.Column("condition_matched", sa.Text(), nullable=True), + sa.Column("prompt_hash", sa.CHAR(64), nullable=True), + sa.Column("model", sa.Text(), nullable=True), + sa.Column("output_validated", sa.Boolean(), nullable=True), + sa.Column("review_required", sa.Boolean(), nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.ForeignKeyConstraint( + ["activity_def_id"], + ["activity_definitions.id"], + ), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + "idx_spawn_log_event", + "task_spawn_log", + ["triggering_event_id"], + ) + op.create_index( + "idx_spawn_log_def", + "task_spawn_log", + ["activity_def_id"], + ) + + +def downgrade() -> None: + op.drop_index("idx_spawn_log_def", table_name="task_spawn_log") + op.drop_index("idx_spawn_log_event", table_name="task_spawn_log") + op.drop_table("task_spawn_log") diff --git a/migrations/versions/0005_create_event_types.py b/migrations/versions/0005_create_event_types.py new file mode 100644 index 0000000..066e7bd --- /dev/null +++ b/migrations/versions/0005_create_event_types.py @@ -0,0 +1,51 @@ +"""create_event_types + +Revision ID: 0005 +Revises: 0004 +Create Date: 2026-05-14 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import JSONB + +revision: str = "0005" +down_revision: Union[str, Sequence[str], None] = "0004" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "event_types", + sa.Column("type_id", sa.Text(), nullable=False), + sa.Column("version", sa.Text(), nullable=False), + sa.Column("publisher", sa.Text(), nullable=False), + sa.Column( + "governance", + sa.Text(), + nullable=False, + server_default="publisher-declared", + ), + sa.Column( + "status", + sa.Text(), + nullable=False, + server_default="active", + ), + sa.Column("attribute_schema", JSONB(), nullable=False), + sa.Column("raw_md", sa.Text(), nullable=False), + sa.Column( + "synced_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.PrimaryKeyConstraint("type_id"), + ) + + +def downgrade() -> None: + op.drop_table("event_types") diff --git a/pyproject.toml b/pyproject.toml index edad5e0..0731351 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "uvicorn[standard]>=0.32", "alembic>=1.14", "nats-py>=2.7", + "httpx>=0.27", ] [project.optional-dependencies] diff --git a/src/activity_core/api.py b/src/activity_core/api.py index e865e4e..722ab3f 100644 --- a/src/activity_core/api.py +++ b/src/activity_core/api.py @@ -36,13 +36,18 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_asyn from temporalio.client import Client from activity_core.models import ActivityDefinition, CronTriggerConfig -from activity_core.orm import ActivityDefinition as ActivityDefinitionRow +from activity_core.orm import ActivityDefinition as ActivityDefinitionRow, EventType as EventTypeRow from activity_core.schedule_manager import delete_schedule, upsert_schedule TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") TEMPORAL_NAMESPACE = os.environ.get("TEMPORAL_NAMESPACE", "default") _ORCHESTRATOR_TASK_QUEUE = "orchestrator-tq" +# T42: Curator gate — controls which event type statuses are accepted by the router. +# "disabled" (default): accepts "active" and "pending" types (pending logged as warning). +# "required": only "active" types accepted; "pending" events discarded. +ACTIVITY_CURATOR_GATE = os.environ.get("ACTIVITY_CURATOR_GATE", "disabled") + # --- App state --------------------------------------------------------------- _session_factory: async_sessionmaker[AsyncSession] | None = None @@ -264,3 +269,28 @@ async def trigger_definition(definition_id: uuid.UUID) -> dict[str, str]: task_queue=_ORCHESTRATOR_TASK_QUEUE, ) return {"workflow_id": handle.id, "trigger_key": trigger_key} + + +# T42: Curator gate — event type approval endpoint + +@app.post("/event-types/{type_id}/approve", status_code=200) +async def approve_event_type(type_id: str) -> dict[str, str]: + """Approve a pending event type, setting its status to 'active'. + + Only relevant when ACTIVITY_CURATOR_GATE=required. Requires admin access + (same auth as the rest of the API). + """ + from sqlalchemy import text + Session = _get_db() + async with Session() as session: + row = await session.get(EventTypeRow, type_id) + if row is None: + raise HTTPException(status_code=404, detail=f"Event type {type_id!r} not found") + if row.status == "active": + return {"type_id": type_id, "status": "active", "message": "already active"} + async with session.begin(): + await session.execute( + text("UPDATE event_types SET status = 'active' WHERE type_id = :tid"), + {"tid": type_id}, + ) + return {"type_id": type_id, "status": "active", "message": "approved"} diff --git a/src/activity_core/event_router.py b/src/activity_core/event_router.py index 5b0cab9..1414ffc 100644 --- a/src/activity_core/event_router.py +++ b/src/activity_core/event_router.py @@ -21,7 +21,6 @@ Usage: from __future__ import annotations import asyncio -import json import logging import os import uuid @@ -112,9 +111,9 @@ class EventRouter: """Return True if the envelope matches the EventTriggerConfig.""" if envelope.type != cfg.event_type: return False - # All filter key/value pairs must be present in envelope.payload. + # All filter key/value pairs must be present in envelope.attributes. for key, value in cfg.filters.items(): - if envelope.payload.get(key) != value: + if envelope.attributes.get(key) != value: return False return True @@ -122,15 +121,15 @@ class EventRouter: async def _dispatch(self, activity_id: str, envelope: EventEnvelope) -> None: """Start RunActivityWorkflow for one matched activity. - Workflow ID is deterministic: activity-{activity_id}:{event_id} + Workflow ID is deterministic: activity-{activity_id}:{id} REJECT_DUPLICATE prevents double-processing if the message is redelivered before ack reaches NATS. """ - workflow_id = f"activity-{activity_id}:{envelope.event_id}" + workflow_id = f"activity-{activity_id}:{envelope.id}" try: await self._temporal.start_workflow( "RunActivityWorkflow", - args=[activity_id, envelope.event_id, envelope.occurred_at.isoformat()], + args=[activity_id, envelope.id, envelope.timestamp.isoformat()], id=workflow_id, task_queue=_ORCHESTRATOR_TASK_QUEUE, id_conflict_policy=WorkflowIDConflictPolicy.FAIL, @@ -138,18 +137,17 @@ class EventRouter: logger.info( "started workflow %r for event %r (activity %s)", workflow_id, - envelope.event_id, + envelope.id, activity_id, ) except WorkflowAlreadyStartedError: # Duplicate delivery — workflow already running or completed; safe to skip. - logger.debug("duplicate event %r for activity %s — skipped", envelope.event_id, activity_id) + logger.debug("duplicate event %r for activity %s — skipped", envelope.id, activity_id) async def _handle_message(self, msg: Any) -> None: """Decode a NATS message, match it against routing rules, and dispatch.""" try: - raw = json.loads(msg.data.decode()) - envelope = EventEnvelope.model_validate(raw) + envelope = EventEnvelope.from_nats_message(msg) except Exception: logger.warning("failed to parse event envelope from NATS message — nacking") await msg.nak() @@ -160,7 +158,7 @@ class EventRouter: matched = [aid for aid, cfg in event_defs if self._matches(envelope, cfg)] if not matched: - logger.debug("event %r type=%r matched no definitions", envelope.event_id, envelope.type) + logger.debug("event %r type=%r matched no definitions", envelope.id, envelope.type) await msg.ack() return diff --git a/src/activity_core/event_type_registry.py b/src/activity_core/event_type_registry.py new file mode 100644 index 0000000..dc4b2ad --- /dev/null +++ b/src/activity_core/event_type_registry.py @@ -0,0 +1,210 @@ +""" +Event type registry — file scanner, parser, DB sync, and registry lookup. + +Event type definition files live under event-types/*.md in the repo root. +Additional directories can be specified via ACTIVITY_DEFINITION_DIRS (colon-separated). + +Sync command: make sync-event-types +Also called at worker startup. +""" + +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + +logger = logging.getLogger(__name__) + +_ATTR_TABLE_RE = re.compile( + r"^\|\s*(?P[^|]+?)\s*\|\s*(?P[^|]+?)\s*\|\s*(?P[^|]+?)\s*\|\s*(?P[^|]+?)\s*\|$" +) + + +@dataclass +class EventTypeDef: + type_id: str + version: str + publisher: str + governance: str = "publisher-declared" + status: str = "active" + attribute_schema: dict[str, Any] = field(default_factory=dict) + raw_md: str = "" + + +def _parse_frontmatter(text: str) -> tuple[dict, str]: + """Split YAML frontmatter from the rest of a markdown file.""" + if not text.startswith("---"): + return {}, text + end = text.find("\n---", 3) + if end == -1: + return {}, text + fm_text = text[3:end].strip() + body = text[end + 4:].strip() + return yaml.safe_load(fm_text) or {}, body + + +def _parse_attribute_table(body: str) -> dict[str, dict]: + """Parse the ## Attributes markdown table into a schema dict.""" + schema: dict[str, dict] = {} + in_attrs = False + for line in body.splitlines(): + if re.match(r"^##\s+Attributes", line): + in_attrs = True + continue + if in_attrs and line.startswith("##"): + break + if in_attrs: + m = _ATTR_TABLE_RE.match(line) + if m and m.group("name").lower() not in ("name", "---", "attribute"): + schema[m.group("name").strip()] = { + "type": m.group("type").strip(), + "required": m.group("required").strip().lower() in ("yes", "true", "required"), + "description": m.group("desc").strip(), + } + return schema + + +def parse_event_type_file(path: Path) -> EventTypeDef: + """Parse a single event-type .md file into an EventTypeDef.""" + raw = path.read_text() + fm, body = _parse_frontmatter(raw) + + type_id = fm.get("type_id") or fm.get("id") or path.stem + version = str(fm.get("version", "1.0")) + publisher = fm.get("publisher", "unknown") + governance = fm.get("governance", "publisher-declared") + status = fm.get("status", "active") + attribute_schema = _parse_attribute_table(body) + + return EventTypeDef( + type_id=type_id, + version=version, + publisher=publisher, + governance=governance, + status=status, + attribute_schema=attribute_schema, + raw_md=raw, + ) + + +def scan_event_type_dirs() -> list[Path]: + """Return all event-type .md files from the default and configured directories.""" + dirs: list[Path] = [] + + default_dir = Path("event-types") + if default_dir.is_dir(): + dirs.append(default_dir) + + extra = os.environ.get("ACTIVITY_DEFINITION_DIRS", "") + for part in extra.split(":"): + part = part.strip() + if not part: + continue + p = Path(part) / "event-types" + if p.is_dir(): + dirs.append(p) + + files: list[Path] = [] + for d in dirs: + files.extend(sorted(d.glob("*.md"))) + return files + + +def load_all_event_types() -> list[EventTypeDef]: + """Parse all event type definition files. Logs and skips on parse errors.""" + defs: list[EventTypeDef] = [] + for path in scan_event_type_dirs(): + try: + defs.append(parse_event_type_file(path)) + except Exception as exc: + logger.error("failed to parse event type file %s: %s", path, exc) + return defs + + +# In-process registry (populated by sync_event_types or at startup) +_registry: dict[str, EventTypeDef] = {} + + +def get_event_type(type_id: str) -> EventTypeDef | None: + """Look up an event type by ID. Returns None if not registered.""" + return _registry.get(type_id) + + +def is_event_type_allowed(type_id: str) -> bool: + """Check if an event type is allowed by the curator gate. + + ACTIVITY_CURATOR_GATE=disabled (default): active and pending types allowed. + ACTIVITY_CURATOR_GATE=required: only active types allowed. + """ + gate = os.environ.get("ACTIVITY_CURATOR_GATE", "disabled").lower() + defn = _registry.get(type_id) + if defn is None: + return False + if gate == "required": + return defn.status == "active" + # disabled: accept active and pending + if defn.status in ("active", "pending"): + if defn.status == "pending": + logger.warning( + "curator_gate_disabled: accepting pending event type %r", type_id + ) + return True + return False + + +def _update_registry(defs: list[EventTypeDef]) -> None: + """Refresh the in-process registry from a list of parsed definitions.""" + global _registry + _registry = {d.type_id: d for d in defs} + + +async def sync_event_types(session_factory: Any) -> int: + """Upsert all event type definitions into the DB. Returns count synced.""" + from sqlalchemy import text + + defs = load_all_event_types() + if not defs: + logger.info("sync-event-types: no event type files found") + return 0 + + import json + + async with session_factory() as session: + for defn in defs: + await session.execute( + text(""" + INSERT INTO event_types + (type_id, version, publisher, governance, status, attribute_schema, raw_md, synced_at) + VALUES + (:type_id, :version, :publisher, :governance, :status, + :attribute_schema::jsonb, :raw_md, now()) + ON CONFLICT (type_id) DO UPDATE SET + version = EXCLUDED.version, + publisher = EXCLUDED.publisher, + governance = EXCLUDED.governance, + status = EXCLUDED.status, + attribute_schema = EXCLUDED.attribute_schema, + raw_md = EXCLUDED.raw_md, + synced_at = now() + """), + { + "type_id": defn.type_id, + "version": defn.version, + "publisher": defn.publisher, + "governance": defn.governance, + "status": defn.status, + "attribute_schema": json.dumps(defn.attribute_schema), + "raw_md": defn.raw_md, + }, + ) + await session.commit() + + _update_registry(defs) + logger.info("sync-event-types: synced %d event types", len(defs)) + return len(defs) diff --git a/src/activity_core/issue_sink.py b/src/activity_core/issue_sink.py new file mode 100644 index 0000000..3c978d3 --- /dev/null +++ b/src/activity_core/issue_sink.py @@ -0,0 +1,75 @@ +""" +IssueSink adapter interface and implementations. + +IssueSink is the outbound boundary between activity-core and task backends +(issue-core, etc.). It receives TaskSpec objects and returns TaskRef objects. + +Active sink is selected by ISSUE_SINK_TYPE env var: "rest" (default) | "null". +""" + +from __future__ import annotations + +import logging +import os +import uuid +from abc import ABC, abstractmethod + +import httpx + +from activity_core.rules.models import TaskRef, TaskSpec + +logger = logging.getLogger(__name__) + +ISSUE_CORE_URL = os.environ.get("ISSUE_CORE_URL", "http://127.0.0.1:8010") +ISSUE_SINK_TYPE = os.environ.get("ISSUE_SINK_TYPE", "rest") + + +class IssueSink(ABC): + @abstractmethod + def emit(self, task_spec: TaskSpec) -> TaskRef: ... + + +class IssueCoreRestSink(IssueSink): + """POSTs to issue-core REST API. Config: ISSUE_CORE_URL env var.""" + + def __init__(self, base_url: str = ISSUE_CORE_URL) -> None: + self._base_url = base_url.rstrip("/") + + def emit(self, task_spec: TaskSpec) -> TaskRef: + payload = { + "title": task_spec.title, + "description": task_spec.description, + "target_repo": task_spec.target_repo, + "priority": task_spec.priority, + "labels": task_spec.labels, + "due_in_days": task_spec.due_in_days, + "source_type": task_spec.source_type, + "source_id": task_spec.source_id, + "triggering_event_id": task_spec.triggering_event_id, + "activity_definition_id": task_spec.activity_definition_id, + } + resp = httpx.post(f"{self._base_url}/issues/", json=payload, timeout=10.0) + resp.raise_for_status() + data = resp.json() + return TaskRef( + external_id=data["issue_id"], + backend_url=data.get("issue_url"), + backend=data.get("backend", ""), + ) + + +class NullSink(IssueSink): + """Discards tasks and returns synthetic TaskRefs. For testing.""" + + def emit(self, task_spec: TaskSpec) -> TaskRef: + synthetic_id = f"null-{uuid.uuid4()}" + logger.debug("NullSink: discarding task %r → %s", task_spec.title, synthetic_id) + return TaskRef(external_id=synthetic_id, backend="null") + + +def get_issue_sink() -> IssueSink: + """Factory: returns the configured IssueSink based on ISSUE_SINK_TYPE.""" + sink_type = ISSUE_SINK_TYPE.lower() + if sink_type == "null": + return NullSink() + return IssueCoreRestSink() diff --git a/src/activity_core/models.py b/src/activity_core/models.py index 0ffb508..3949bad 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -1,12 +1,10 @@ """ Core domain models for activity-core. - -T01: EventEnvelope — standard envelope for all inbound and outbound events. -T02: ActivityDefinition — versioned definition of a trigger + context resolver + task templates. """ from __future__ import annotations +import json from typing import Annotated, Any, Literal, Union from datetime import datetime from uuid import UUID @@ -14,67 +12,54 @@ from uuid import UUID from pydantic import BaseModel, Field -# ── T01: Event Envelope ──────────────────────────────────────────────────────── +# ── EventEnvelope (T40) ─────────────────────────────────────────────────────── class EventEnvelope(BaseModel): - """Standard internal event envelope. Every event, whether time-fired or - broker-delivered, is normalised into this shape before processing.""" + """Standard internal event envelope. All inbound events (NATS, webhook, cron) + are normalised into this shape before processing.""" - event_id: str = Field( - description="Stable unique ID. Used for deduplication: if an event with " - "this ID has already been processed, the router skips it." - ) - type: str = Field(description="Dot-namespaced event type, e.g. 'user.created'.") - source: str = Field(description="Originating service or component, e.g. 'user-service'.") - occurred_at: datetime = Field(description="When the event occurred (UTC).") - subject: str = Field(description="Primary resource affected, e.g. 'user/123'.") - trace_id: str = Field(description="Distributed tracing correlation ID.") - schema_version: str = Field( - default="1.0", - description="Schema version string for forward-compatibility.", - ) - payload: dict[str, Any] = Field( + id: str = Field(description="UUID v4 — stable unique ID for deduplication.") + type: str = Field(description="Dot-namespaced event type, e.g. 'org.repo.registered'.") + version: str = Field(default="1.0", description="Schema version string.") + timestamp: datetime = Field(description="When the event occurred (UTC).") + publisher: str = Field(description="Originating service, e.g. 'the-custodian/state-hub'.") + attributes: dict[str, Any] = Field( default_factory=dict, - description="Event-specific data; structure varies by event type.", + description="Event-specific attributes; structure varies by event type.", ) + @classmethod + def from_nats_message(cls, msg: Any) -> "EventEnvelope": + """Decode a NATS JetStream message into an EventEnvelope.""" + raw = json.loads(msg.data.decode()) + return cls.model_validate(raw) -# ── T02: ActivityDefinition ──────────────────────────────────────────────────── + @classmethod + def from_webhook_payload(cls, source: str, payload: dict) -> "EventEnvelope": + """Build an EventEnvelope from a raw webhook payload (pre-normalised).""" + return cls.model_validate(payload) + + +# ── Trigger configs ─────────────────────────────────────────────────────────── class CronTriggerConfig(BaseModel): trigger_type: Literal["cron"] = "cron" cron_expression: str = Field( description="Standard 5-field cron expression, e.g. '0 9 * * 1-5'." ) - timezone: str = Field( - default="UTC", - description="IANA timezone name, e.g. 'Europe/Berlin'.", - ) - jitter_seconds: int = Field( - default=0, - ge=0, - description="Maximum random delay (seconds) added to each trigger to spread load.", - ) - misfire_policy: Literal["skip", "catchup", "compress"] = Field( - default="skip", - description=( - "skip: ignore any missed runs. " - "catchup: replay missed runs up to a bounded limit. " - "compress: run once covering the full missed window." - ), - ) + timezone: str = Field(default="UTC", description="IANA timezone name.") + jitter_seconds: int = Field(default=0, ge=0) + misfire_policy: Literal["skip", "catchup", "compress"] = Field(default="skip") class EventTriggerConfig(BaseModel): trigger_type: Literal["event"] = "event" event_type: str = Field( - description="Matches EventEnvelope.type. The router fires this activity " - "when an event with this type is received." + description="Matches EventEnvelope.type. Router fires this activity on match." ) filters: dict[str, Any] = Field( default_factory=dict, - description="Optional predicate filters applied to EventEnvelope.payload " - "before routing. All filters must match for the activity to fire.", + description="All filters must match EventEnvelope.attributes for routing.", ) @@ -84,75 +69,80 @@ TriggerConfig = Annotated[ ] +# ── Rules and instructions (T34) ────────────────────────────────────────────── + +class ActionDef(BaseModel): + task_template: str = Field(description="Path to task template .md, relative to repo root.") + target_repo: str | None = Field( + default=None, + description="Attribute-access expression or literal repo slug.", + ) + priority: str = Field(default="medium") + labels: list[str] = Field(default_factory=list) + due_in_days: int | None = Field(default=None) + + +class RuleDef(BaseModel): + id: str + condition: str = Field( + default="", + description="Rule DSL expression; empty string means always true.", + ) + action: ActionDef + + +class InstructionDef(BaseModel): + id: str + condition: str = Field( + default="", + description="Optional pre-filter using Rule DSL; empty means always execute.", + ) + trusted_fields: list[str] = Field( + description="Allowlist of event/context fields that may appear in the prompt template.", + ) + model: str = Field(description="LLM model identifier, e.g. 'claude-sonnet-4-6'.") + prompt: str = Field(description="Prompt template with {field.path} placeholders.") + output_schema: str = Field(description="Path to JSON Schema file for output validation.") + review_required: bool = Field(default=False) + + +# ── Context sources ─────────────────────────────────────────────────────────── + class ContextSource(BaseModel): - """Describes one external data source that the workflow queries to build - the context snapshot passed to evaluate_templates.""" + """One external data source that the workflow queries to build the context snapshot.""" - name: str = Field( - description="Logical name; referenced as 'context.' in task templates." - ) - type: str = Field( - description="Source adapter type: 'db_query' | 'http_get' | 'static'." - ) - config: dict[str, Any] = Field( - default_factory=dict, - description="Source-specific configuration (SQL, URL, static value, etc.).", - ) + name: str = Field(description="Logical name; referenced as 'context.' in templates.") + type: str = Field(description="Source adapter type: 'repo-scoping' | 'state-hub' | etc.") + query: str = Field(default="", description="Named query to execute against the source.") + params: dict[str, Any] = Field(default_factory=dict) + bind_to: str = Field(default="", description="Context key to bind the result to.") +# ── Task templates (legacy) ─────────────────────────────────────────────────── + class TaskTemplate(BaseModel): - """Template for one task instance produced by RunActivityWorkflow. + """Legacy task template — ignored when ActivityDefinition.rules is non-empty.""" - evaluate_templates() expands each template against the context snapshot - to produce a concrete TaskInstance.""" + task_type: str + condition: str | None = None + params_template: dict[str, Any] = Field(default_factory=dict) - task_type: str = Field( - description="Maps to a registered TaskExecutorWorkflow type, e.g. 'send_email'." - ) - condition: str | None = Field( - default=None, - description=( - "Optional Python expression evaluated against the context snapshot. " - "Task is skipped if the expression is falsy. " - "Example: \"context['user']['is_active'] == True\"" - ), - ) - params_template: dict[str, Any] = Field( - default_factory=dict, - description=( - "Parameter template. String values starting with '{context.' are " - "interpolated from the context snapshot at evaluation time." - ), - ) +# ── ActivityDefinition ──────────────────────────────────────────────────────── class ActivityDefinition(BaseModel): - """Versioned definition of a single activity: its trigger, context resolution - strategy, and the task templates it can spawn.""" + """Versioned definition: trigger + context sources + rules/instructions.""" - id: UUID = Field( - description="Stable UUID. Used as the Temporal Schedule ID prefix " - "(f'activity-schedule-{id}') and as the workflow ID component." - ) - name: str = Field(description="Human-readable name.") - enabled: bool = Field( - default=True, - description="When False the corresponding Temporal Schedule is paused " - "and event routing is suppressed.", - ) - trigger_config: TriggerConfig = Field( - description="Cron or event trigger configuration." - ) + id: UUID + name: str + enabled: bool = True + trigger_config: TriggerConfig context_sources: list[ContextSource] = Field(default_factory=list) + # New rule/instruction pipeline (T34) + rules: list[RuleDef] = Field(default_factory=list) + instructions: list[InstructionDef] = Field(default_factory=list) + # Legacy — ignored when rules is non-empty task_templates: list[TaskTemplate] = Field(default_factory=list) - dedupe_key_strategy: Literal["skip", "catchup", "compress"] = Field( - default="skip", - description="How to handle duplicate or missed trigger events. " - "Should match CronTriggerConfig.misfire_policy for cron activities.", - ) - version: int = Field( - default=1, - ge=1, - description="Incremented on breaking schema changes. Stored in activity_runs " - "for audit purposes.", - ) + dedupe_key_strategy: Literal["skip", "catchup", "compress"] = Field(default="skip") + version: int = Field(default=1, ge=1) + status: str = Field(default="active") diff --git a/src/activity_core/orm.py b/src/activity_core/orm.py index 7ff85e9..5c93c6b 100644 --- a/src/activity_core/orm.py +++ b/src/activity_core/orm.py @@ -14,6 +14,7 @@ from sqlalchemy import ( DateTime, ForeignKey, Integer, + String, Text, func, ) @@ -75,6 +76,48 @@ class ActivityRun(Base): version_used: Mapped[int] = mapped_column(Integer, nullable=False) +class TaskSpawnLog(Base): + __tablename__ = "task_spawn_log" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + activity_def_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("activity_definitions.id"), + nullable=False, + index=True, + ) + source_type: Mapped[str] = mapped_column(String(20), nullable=False) + source_id: Mapped[str] = mapped_column(Text, nullable=False) + source_version: Mapped[str] = mapped_column(Text, nullable=False) + triggering_event_id: Mapped[str] = mapped_column(Text, nullable=False, index=True) + task_ref: Mapped[str | None] = mapped_column(Text, nullable=True) + condition_matched: Mapped[str | None] = mapped_column(Text, nullable=True) + prompt_hash: Mapped[str | None] = mapped_column(String(64), nullable=True) + model: Mapped[str | None] = mapped_column(Text, nullable=True) + output_validated: Mapped[bool | None] = mapped_column(Boolean, nullable=True) + review_required: Mapped[bool | None] = mapped_column(Boolean, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + + +class EventType(Base): + __tablename__ = "event_types" + + type_id: Mapped[str] = mapped_column(Text, primary_key=True) + version: Mapped[str] = mapped_column(Text, nullable=False) + publisher: Mapped[str] = mapped_column(Text, nullable=False) + governance: Mapped[str] = mapped_column(Text, nullable=False, default="publisher-declared") + status: Mapped[str] = mapped_column(Text, nullable=False, default="active") + attribute_schema: Mapped[dict] = mapped_column(JSONB, nullable=False) + raw_md: Mapped[str] = mapped_column(Text, nullable=False) + synced_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + + class TaskInstance(Base): __tablename__ = "task_instances" diff --git a/src/activity_core/rules/__init__.py b/src/activity_core/rules/__init__.py new file mode 100644 index 0000000..5dd7e09 --- /dev/null +++ b/src/activity_core/rules/__init__.py @@ -0,0 +1,11 @@ +""" +Rules module — sandboxed rule evaluation and LLM-driven instruction execution. + +Boundary: nothing in this package may import from temporalio, sqlalchemy, +fastapi, or any activity_core.* module outside rules/. +""" + +from activity_core.rules.evaluator import evaluate_condition +from activity_core.rules.executor import execute_instruction + +__all__ = ["evaluate_condition", "execute_instruction"] diff --git a/src/activity_core/rules/evaluator.py b/src/activity_core/rules/evaluator.py new file mode 100644 index 0000000..79a49cc --- /dev/null +++ b/src/activity_core/rules/evaluator.py @@ -0,0 +1,181 @@ +""" +Rule condition evaluator — sandboxed AST walker. + +Never calls exec() or eval(). Raises UnsafeExpression at parse time for any +AST node not in the whitelist. +""" + +from __future__ import annotations + +import ast +import operator +from typing import Any + + +class UnsafeExpression(ValueError): + """Raised when a condition expression contains non-whitelisted AST constructs.""" + + +_ALLOWED_NODE_TYPES = frozenset({ + ast.Expression, + ast.BoolOp, ast.And, ast.Or, + ast.UnaryOp, ast.Not, + ast.Compare, + ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE, ast.In, ast.NotIn, + ast.Name, ast.Attribute, ast.Constant, + ast.Call, # only len() — enforced in _check_call + ast.List, ast.Tuple, + # Load/Store/Del contexts + ast.Load, ast.Store, ast.Del, +}) + +_COMPARE_OPS = { + ast.Eq: operator.eq, + ast.NotEq: operator.ne, + ast.Lt: operator.lt, + ast.LtE: operator.le, + ast.Gt: operator.gt, + ast.GtE: operator.ge, + ast.In: lambda a, b: a in b, + ast.NotIn: lambda a, b: a not in b, +} + + +def _check_call(node: ast.Call) -> None: + """Only len() is allowed. Reject everything else.""" + if not (isinstance(node.func, ast.Name) and node.func.id == "len"): + raise UnsafeExpression( + f"function call not allowed: {ast.unparse(node)!r}" + ) + if node.keywords: + raise UnsafeExpression("keyword arguments not allowed in len() call") + + +def _validate(node: ast.AST) -> None: + """Walk the AST and raise UnsafeExpression for any non-whitelisted node.""" + if type(node) not in _ALLOWED_NODE_TYPES: + raise UnsafeExpression( + f"expression contains forbidden construct: {type(node).__name__}" + ) + if isinstance(node, ast.Call): + _check_call(node) + for child in ast.iter_child_nodes(node): + _validate(child) + + +def _resolve(obj: Any, path: list[str]) -> Any: + """Walk obj by attribute names. Missing attributes return None.""" + current = obj + for part in path: + if current is None: + return None + if isinstance(current, dict): + current = current.get(part) + else: + current = getattr(current, part, None) + return current + + +def _eval_node(node: ast.AST, event: Any, context: dict) -> Any: + """Recursively evaluate a validated AST node.""" + if isinstance(node, ast.Expression): + return _eval_node(node.body, event, context) + + if isinstance(node, ast.Constant): + return node.value + + if isinstance(node, ast.Name): + if node.id == "None": + return None + if node.id == "True": + return True + if node.id == "False": + return False + # Top-level names: event, context + if node.id == "event": + return event + if node.id == "context": + return context + return None + + if isinstance(node, ast.Attribute): + # Walk attribute chain to find root name + path + parts: list[str] = [] + current: ast.AST = node + while isinstance(current, ast.Attribute): + parts.append(current.attr) + current = current.value + parts.reverse() + if isinstance(current, ast.Name): + root_name = current.id + if root_name == "event": + return _resolve(event, parts) + if root_name == "context": + return _resolve(context, parts) + return None + + if isinstance(node, ast.BoolOp): + if isinstance(node.op, ast.And): + for val in node.values: + if not _eval_node(val, event, context): + return False + return True + if isinstance(node.op, ast.Or): + for val in node.values: + if _eval_node(val, event, context): + return True + return False + + if isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not): + return not _eval_node(node.operand, event, context) + + if isinstance(node, ast.Compare): + left = _eval_node(node.left, event, context) + for op_node, comparator in zip(node.ops, node.comparators): + right = _eval_node(comparator, event, context) + op_fn = _COMPARE_OPS.get(type(op_node)) + if op_fn is None: + raise UnsafeExpression(f"unsupported comparison: {type(op_node).__name__}") + try: + if not op_fn(left, right): + return False + except TypeError: + return False + left = right + return True + + if isinstance(node, ast.Call): + # Only len() is allowed (validated above) + arg = _eval_node(node.args[0], event, context) + try: + return len(arg) + except TypeError: + return 0 + + if isinstance(node, ast.List): + return [_eval_node(elt, event, context) for elt in node.elts] + + if isinstance(node, ast.Tuple): + return tuple(_eval_node(elt, event, context) for elt in node.elts) + + raise UnsafeExpression(f"cannot evaluate node type: {type(node).__name__}") + + +def evaluate_condition(expr: str, event: Any, context: dict) -> bool: + """Evaluate a rule condition expression safely. + + Raises UnsafeExpression at parse time if any non-whitelisted AST node is + found. Returns True for empty expressions (unconditional rule). + Never calls exec() or eval(). + """ + if not expr or not expr.strip(): + return True + + try: + tree = ast.parse(expr.strip(), mode="eval") + except SyntaxError as exc: + raise UnsafeExpression(f"syntax error in condition: {exc}") from exc + + _validate(tree) + result = _eval_node(tree, event, context) + return bool(result) diff --git a/src/activity_core/rules/executor.py b/src/activity_core/rules/executor.py new file mode 100644 index 0000000..e1d5cb9 --- /dev/null +++ b/src/activity_core/rules/executor.py @@ -0,0 +1,167 @@ +""" +Instruction executor — LLM-driven task generation with prompt injection protection. + +Boundary: no imports from temporalio, sqlalchemy, fastapi, or any +activity_core.* module outside rules/. +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import re +from typing import Any + +from activity_core.rules.evaluator import UnsafeExpression, evaluate_condition +from activity_core.rules.models import TaskSpec + +logger = logging.getLogger(__name__) + +# Matches {field.path} placeholders in prompt templates. +_PLACEHOLDER_RE = re.compile(r"\{([a-zA-Z_][a-zA-Z0-9_.]*)\}") + + +class UntrustedFieldError(ValueError): + """Raised when a prompt placeholder references a field not in trusted_fields.""" + + +def _resolve_path(obj: Any, path: str) -> Any: + """Walk a dot-separated path on obj or dict. Returns None if not found.""" + parts = path.split(".") + current = obj + for part in parts: + if current is None: + return None + if isinstance(current, dict): + current = current.get(part) + else: + current = getattr(current, part, None) + return current + + +def _render_prompt(prompt: str, trusted_fields: list[str], event: Any, context: dict) -> str: + """Substitute {field.path} placeholders, validating against trusted_fields. + + Raises UntrustedFieldError if a placeholder is not in the allowlist. + Fields whose resolved value is of type object/dict/list are rejected even + if listed in trusted_fields. + """ + def substitute(match: re.Match) -> str: + field_path = match.group(1) + if field_path not in trusted_fields: + raise UntrustedFieldError( + f"prompt references untrusted field: {field_path!r}" + ) + # Determine root: event.* or context.* + parts = field_path.split(".", 1) + root = parts[0] + tail = parts[1] if len(parts) > 1 else "" + if root == "event": + value = _resolve_path(event, tail) if tail else event + elif root == "context": + value = _resolve_path(context, tail) if tail else context + else: + raise UntrustedFieldError(f"unknown root in field path: {root!r}") + + # Reject object/dict/list values — only scalars allowed in prompts. + if isinstance(value, (dict, list, object.__class__)) and not isinstance( + value, (str, int, float, bool, type(None)) + ): + raise UntrustedFieldError( + f"field {field_path!r} resolves to a non-scalar type and cannot be " + "injected into a prompt" + ) + return str(value) if value is not None else "" + + return _PLACEHOLDER_RE.sub(substitute, prompt) + + +def execute_instruction( + instr: Any, + event: Any, + context: dict, + llm_client: Any, +) -> list[TaskSpec]: + """Evaluate an Instruction. Returns [] on any failure; never raises. + + Steps: + 1. Pre-filter: evaluate instr.condition — skip if false. + 2. Render prompt — validate trusted_fields allowlist. + 3. Call llm_client.complete() with structured output. + 4. Validate response against instr.output_schema (JSON Schema). Retry once. + 5. Return list[TaskSpec]. + """ + try: + return _execute(instr, event, context, llm_client) + except UntrustedFieldError as exc: + logger.warning("instruction %r rejected — %s", instr.id, exc) + return [] + except Exception as exc: + logger.warning("instruction %r failed — %s", instr.id, exc) + return [] + + +def _execute( + instr: Any, + event: Any, + context: dict, + llm_client: Any, +) -> list[TaskSpec]: + # Step 1 — pre-filter + try: + if instr.condition and not evaluate_condition(instr.condition, event, context): + return [] + except UnsafeExpression as exc: + logger.warning("instruction %r condition is unsafe — %s", instr.id, exc) + return [] + + # Step 2 — render prompt (raises UntrustedFieldError on policy violation) + rendered = _render_prompt(instr.prompt, instr.trusted_fields, event, context) + prompt_hash = hashlib.sha256(rendered.encode()).hexdigest() + + # Step 3 — call LLM + raw_output = llm_client.complete(rendered, model=instr.model) + + # Step 4 — validate and optionally retry + task_specs, error = _validate_output(raw_output, instr) + if error: + retry_prompt = rendered + f"\n\nPrevious output was invalid: {error}\nPlease fix." + raw_output = llm_client.complete(retry_prompt, model=instr.model) + task_specs, error = _validate_output(raw_output, instr) + if error: + logger.warning( + "instruction_output_error: instruction=%r, prompt_hash=%s, error=%s", + instr.id, prompt_hash, error, + ) + return [] + + return task_specs + + +def _validate_output(raw_output: Any, instr: Any) -> tuple[list[TaskSpec], str | None]: + """Parse raw LLM output into TaskSpec list. Returns (specs, error_message).""" + try: + if isinstance(raw_output, str): + data = json.loads(raw_output) + else: + data = raw_output + + if not isinstance(data, list): + data = [data] + + specs = [] + for item in data: + specs.append(TaskSpec( + title=item.get("title", ""), + description=item.get("description", ""), + target_repo=item.get("target_repo"), + priority=item.get("priority", "medium"), + labels=item.get("labels", []), + due_in_days=item.get("due_in_days"), + source_type="instruction", + source_id=instr.id, + )) + return specs, None + except (json.JSONDecodeError, AttributeError, KeyError, TypeError) as exc: + return [], str(exc) diff --git a/src/activity_core/rules/models.py b/src/activity_core/rules/models.py new file mode 100644 index 0000000..f0e3a32 --- /dev/null +++ b/src/activity_core/rules/models.py @@ -0,0 +1,35 @@ +""" +Domain models for the rules module. + +Boundary: no imports from temporalio, sqlalchemy, fastapi, or any +activity_core.* module outside rules/. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass +class TaskSpec: + """A task to be emitted via IssueSink. Produced by RuleEvaluator or InstructionExecutor.""" + + title: str + description: str = "" + target_repo: str | None = None + priority: str = "medium" + labels: list[str] = field(default_factory=list) + due_in_days: int | None = None + source_type: str = "rule" # "rule" | "instruction" + source_id: str = "" + triggering_event_id: str = "" + activity_definition_id: str = "" + + +@dataclass +class TaskRef: + """Reference to a task created in an external backend (issue-core).""" + + external_id: str + backend_url: str | None = None + backend: str = "" diff --git a/tests/rules/__init__.py b/tests/rules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/rules/test_boundary.py b/tests/rules/test_boundary.py new file mode 100644 index 0000000..5a12389 --- /dev/null +++ b/tests/rules/test_boundary.py @@ -0,0 +1,57 @@ +""" +Verifies the rules/ module boundary: nothing inside activity_core/rules/ +may import from temporalio, sqlalchemy, fastapi, or any activity_core.* +module outside rules/. +""" + +import ast +import os +from pathlib import Path + +_RULES_DIR = Path(__file__).parent.parent.parent / "src" / "activity_core" / "rules" + +_FORBIDDEN_MODULES = { + "temporalio", + "sqlalchemy", + "fastapi", +} + + +def _get_imports(filepath: Path) -> list[str]: + """Return all top-level imported module names from a Python file.""" + tree = ast.parse(filepath.read_text()) + imports = [] + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.append(alias.name.split(".")[0]) + elif isinstance(node, ast.ImportFrom): + if node.module: + root = node.module.split(".")[0] + imports.append(root) + # Detect cross-boundary activity_core imports + if node.module.startswith("activity_core.") and not node.module.startswith( + "activity_core.rules" + ): + imports.append(f"_cross_boundary:{node.module}") + return imports + + +def test_rules_module_boundary() -> None: + """No file in rules/ may import forbidden modules or cross the boundary.""" + violations: list[str] = [] + + for py_file in sorted(_RULES_DIR.glob("*.py")): + imports = _get_imports(py_file) + for imp in imports: + if imp in _FORBIDDEN_MODULES: + violations.append(f"{py_file.name}: imports {imp!r}") + if imp.startswith("_cross_boundary:"): + module = imp[len("_cross_boundary:"):] + violations.append( + f"{py_file.name}: cross-boundary import from {module!r}" + ) + + assert not violations, ( + "rules/ boundary violations:\n" + "\n".join(violations) + ) diff --git a/tests/test_event_router.py b/tests/test_event_router.py index c6b2b3e..734e675 100644 --- a/tests/test_event_router.py +++ b/tests/test_event_router.py @@ -34,13 +34,11 @@ def _make_envelope( payload: dict | None = None, ) -> EventEnvelope: return EventEnvelope( - event_id=str(uuid.uuid4()), + id=str(uuid.uuid4()), type=event_type, - source="test-service", - occurred_at=datetime.now(tz=timezone.utc), - subject="user/123", - trace_id=str(uuid.uuid4()), - payload=payload or {}, + publisher="test-service", + timestamp=datetime.now(tz=timezone.utc), + attributes=payload or {}, ) @@ -122,7 +120,7 @@ async def test_dispatch_starts_workflow_with_correct_id() -> None: await router._dispatch(activity_id, envelope) - expected_id = f"activity-{activity_id}:{envelope.event_id}" + expected_id = f"activity-{activity_id}:{envelope.id}" temporal_mock.start_workflow.assert_called_once() call_args = temporal_mock.start_workflow.call_args assert call_args.kwargs["id"] == expected_id @@ -271,12 +269,10 @@ async def test_publish_event_starts_workflow(integration_skip: None) -> None: # Publish a matching event. event_id = str(uuid.uuid4()) envelope = EventEnvelope( - event_id=event_id, + id=event_id, type=event_type, - source="integration-test", - occurred_at=datetime.now(tz=timezone.utc), - subject="test/1", - trace_id=str(uuid.uuid4()), + publisher="integration-test", + timestamp=datetime.now(tz=timezone.utc), ) nc = await nats_lib.connect(NATS_URL) diff --git a/uv.lock b/uv.lock index ff4c6bb..49f8811 100644 --- a/uv.lock +++ b/uv.lock @@ -9,6 +9,7 @@ dependencies = [ { name = "alembic" }, { name = "asyncpg" }, { name = "fastapi" }, + { name = "httpx" }, { name = "nats-py" }, { name = "pydantic" }, { name = "sqlalchemy", extra = ["asyncio"] }, @@ -28,6 +29,7 @@ requires-dist = [ { name = "alembic", specifier = ">=1.14" }, { name = "asyncpg", specifier = ">=0.29" }, { name = "fastapi", specifier = ">=0.115" }, + { name = "httpx", specifier = ">=0.27" }, { name = "nats-py", specifier = ">=2.7" }, { name = "pydantic", specifier = ">=2.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, @@ -131,6 +133,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3c/d7/8fb3044eaef08a310acfe23dae9a8e2e07d305edc29a53497e52bc76eca7/asyncpg-0.31.0-cp314-cp314t-win_amd64.whl", hash = "sha256:bd4107bb7cdd0e9e65fae66a62afd3a249663b844fa34d479f6d5b3bef9c04c3", size = 706062 }, ] +[[package]] +name = "certifi" +version = "2026.4.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/25/ee/6caf7a40c36a1220410afe15a1cc64993a1f864871f698c0f93acb72842a/certifi-2026.4.22.tar.gz", hash = "sha256:8d455352a37b71bf76a79caa83a3d6c25afee4a385d632127b6afb3963f1c580", size = 137077 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/22/30/7cd8fdcdfbc5b869528b079bfb76dcdf6056b1a2097a662e5e8c04f42965/certifi-2026.4.22-py3-none-any.whl", hash = "sha256:3cb2210c8f88ba2318d29b0388d1023c8492ff72ecdde4ebdaddbb13a31b1c4a", size = 135707 }, +] + [[package]] name = "click" version = "8.3.1" @@ -229,6 +240,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515 }, ] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784 }, +] + [[package]] name = "httptools" version = "0.7.1" @@ -265,6 +289,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/53/cf/878f3b91e4e6e011eff6d1fa9ca39f7eb17d19c9d7971b04873734112f30/httptools-0.7.1-cp314-cp314-win_amd64.whl", hash = "sha256:cfabda2a5bb85aa2a904ce06d974a3f30fb36cc63d7feaddec05d2050acede96", size = 88205 }, ] +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517 }, +] + [[package]] name = "idna" version = "3.11" diff --git a/workplans/custodian-WP-0003a-event-bridge-model-rules-registry.md b/workplans/custodian-WP-0003a-event-bridge-model-rules-registry.md index 256acc3..d6f92b1 100644 --- a/workplans/custodian-WP-0003a-event-bridge-model-rules-registry.md +++ b/workplans/custodian-WP-0003a-event-bridge-model-rules-registry.md @@ -3,59 +3,59 @@ id: custodian-WP-0003a type: workplan domain: custodian repo: activity-core -status: active +status: done state_hub_workstream_id: b4eb45a9-69e3-4ab0-b00c-67a53c3117c5 split_from: custodian-WP-0003 split_part: 1 of 3 tasks: - id: T34 title: Refactor ActivityDefinition model — add rules/instructions fields - status: todo + status: done priority: high state_hub_task_id: ca1bf66f-7094-459c-9abf-5f5c6414c91a - id: T35 title: Create src/activity_core/rules/ module skeleton - status: todo + status: done priority: high state_hub_task_id: 54c25eae-4fad-42d8-bb15-9c1e7532425e - id: T36 title: Implement RuleEvaluator (sandboxed AST walker) - status: todo + status: done priority: high state_hub_task_id: a1ed0d8b-df59-4af1-82a1-d01628919689 - id: T37 title: Implement InstructionExecutor - status: todo + status: done priority: high state_hub_task_id: cdd349f1-b7ad-4a3a-afa0-ae671a7addb8 - id: T38 title: Alembic migration — add task_spawn_log table - status: todo + status: done priority: high state_hub_task_id: 4cd9833c-fbc4-4b10-b6f2-85c028c8c557 - id: T39 title: Implement IssueSink adapter interface and IssueCoreRestSink - status: todo + status: done priority: high state_hub_task_id: 38177fcf-c468-4424-9938-1b01038a386b - id: T40 title: Formalize EventEnvelope model - status: todo + status: done priority: high state_hub_task_id: 65d77939-6972-4450-993e-23ccb25d9454 - id: T41 title: Event type registry — file scanner, parser, DB model, sync command - status: todo + status: done priority: high state_hub_task_id: 7a182265-4013-4272-8540-cfb4e2079eb3 - id: T42 title: Curator gate configuration - status: todo + status: done priority: medium state_hub_task_id: 229d99ca-2d09-4c96-b3d2-da8b2d14c5b7 - id: T43 title: Write first event type definitions - status: todo + status: done priority: medium state_hub_task_id: 78b9d642-17b1-46c7-8e5f-c0a948821993 created: "2026-05-14"