diff --git a/src/activity_core/models.py b/src/activity_core/models.py index ced06ca..89c2525 100644 --- a/src/activity_core/models.py +++ b/src/activity_core/models.py @@ -120,7 +120,10 @@ class InstructionDef(BaseModel): class ContextSource(BaseModel): """One external data source that the workflow queries to build the context snapshot.""" - name: str = Field(description="Logical name; referenced as 'context.' in templates.") + name: str = Field( + default="", + description="Logical name; referenced as 'context.' in templates.", + ) type: str = Field(description="Source adapter type: 'repo-scoping' | 'state-hub' | etc.") query: str = Field(default="", description="Named query to execute against the source.") params: dict[str, Any] = Field(default_factory=dict) diff --git a/src/activity_core/sync_activity_definitions.py b/src/activity_core/sync_activity_definitions.py index 70f184b..eb6978c 100644 --- a/src/activity_core/sync_activity_definitions.py +++ b/src/activity_core/sync_activity_definitions.py @@ -18,7 +18,7 @@ import logging import os import uuid -from sqlalchemy import select, text +from sqlalchemy import update from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine @@ -28,6 +28,21 @@ from activity_core.orm import ActivityDefinition as ActivityDefinitionRow logger = logging.getLogger(__name__) TEMPORAL_HOST = os.environ.get("TEMPORAL_HOST", "localhost:7233") +ACTIVITY_DEFINITION_ID_NAMESPACE = uuid.uuid5( + uuid.NAMESPACE_URL, + "activity-core:activity-definition", +) + + +def _definition_uuid(raw_id: str) -> uuid.UUID: + """Return the DB UUID for a file-authored ActivityDefinition id.""" + try: + return uuid.UUID(raw_id) + except ValueError: + return uuid.uuid5( + ACTIVITY_DEFINITION_ID_NAMESPACE, + raw_id, + ) async def sync(session_factory: async_sessionmaker[AsyncSession]) -> int: @@ -43,11 +58,12 @@ async def sync(session_factory: async_sessionmaker[AsyncSession]) -> int: async with session_factory() as session: async with session.begin(): for d in defs: - file_ids.add(d.id) + definition_id = _definition_uuid(d.id) + file_ids.add(str(definition_id)) stmt = ( pg_insert(ActivityDefinitionRow) .values( - id=uuid.UUID(d.id), + id=definition_id, name=d.name, enabled=d.enabled, trigger_type=d.trigger_config["trigger_type"], @@ -80,14 +96,13 @@ async def sync(session_factory: async_sessionmaker[AsyncSession]) -> int: if file_ids: id_list = [uuid.UUID(i) for i in file_ids] await session.execute( - text( - "UPDATE activity_definitions SET enabled = false" - " WHERE id NOT IN :ids" - ).bindparams(ids=tuple(id_list)) + update(ActivityDefinitionRow) + .where(ActivityDefinitionRow.id.not_in(id_list)) + .values(enabled=False) ) else: await session.execute( - text("UPDATE activity_definitions SET enabled = false") + update(ActivityDefinitionRow).values(enabled=False) ) logger.info("sync_activity_definitions: upserted %d definitions", upserted) diff --git a/src/activity_core/worker.py b/src/activity_core/worker.py index 37976f7..d1cad29 100644 --- a/src/activity_core/worker.py +++ b/src/activity_core/worker.py @@ -34,10 +34,12 @@ from temporalio.worker import Worker from activity_core.activities import ( emit_tasks, + evaluate_instructions, evaluate_rules, init_session_factory, load_activity_definition, log_run, + persist_instruction_reports, persist_task_instance, resolve_context, ) @@ -93,7 +95,15 @@ async def run() -> None: client, task_queue=ORCHESTRATOR_TASK_QUEUE, workflows=[RunActivityWorkflow], - activities=[load_activity_definition, resolve_context, log_run, evaluate_rules, emit_tasks], + activities=[ + load_activity_definition, + resolve_context, + log_run, + evaluate_rules, + evaluate_instructions, + persist_instruction_reports, + emit_tasks, + ], ) task_worker = Worker( diff --git a/tests/test_sync_activity_definitions.py b/tests/test_sync_activity_definitions.py new file mode 100644 index 0000000..e6ee0bd --- /dev/null +++ b/tests/test_sync_activity_definitions.py @@ -0,0 +1,43 @@ +import uuid + +from activity_core.models import ActivityDefinition +from activity_core.sync_activity_definitions import _definition_uuid + + +def test_definition_uuid_preserves_uuid_ids() -> None: + raw_id = "6fca51fa-387a-4fd0-bc4e-d62c29eb859a" + + assert _definition_uuid(raw_id) == uuid.UUID(raw_id) + + +def test_definition_uuid_maps_slug_ids_stably() -> None: + first = _definition_uuid("weekly-sbom-staleness") + second = _definition_uuid("weekly-sbom-staleness") + + assert first == second + assert first.version == 5 + + +def test_activity_definition_accepts_adr_style_context_source_without_name() -> None: + defn = ActivityDefinition.model_validate( + { + "id": "6fca51fa-387a-4fd0-bc4e-d62c29eb859a", + "name": "Daily State Hub WSJF Triage", + "enabled": False, + "trigger_config": { + "trigger_type": "cron", + "cron_expression": "20 7 * * *", + "timezone": "Europe/Berlin", + "misfire_policy": "skip", + }, + "context_sources": [ + { + "type": "state-hub", + "query": "daily_triage_digest", + "bind_to": "context.daily_triage_digest", + } + ], + } + ) + + assert defn.context_sources[0].name == ""