import uuid from activity_core.definition_parser import scan_and_parse from activity_core.models import ActivityDefinition from activity_core.sync_activity_definitions import _definition_uuid def test_definition_uuid_preserves_uuid_ids() -> None: raw_id = "6fca51fa-387a-4fd0-bc4e-d62c29eb859a" assert _definition_uuid(raw_id) == uuid.UUID(raw_id) def test_definition_uuid_maps_slug_ids_stably() -> None: first = _definition_uuid("weekly-sbom-staleness") second = _definition_uuid("weekly-sbom-staleness") assert first == second assert first.version == 5 def test_activity_definition_accepts_adr_style_context_source_without_name() -> None: defn = ActivityDefinition.model_validate( { "id": "6fca51fa-387a-4fd0-bc4e-d62c29eb859a", "name": "Daily State Hub WSJF Triage", "enabled": False, "trigger_config": { "trigger_type": "cron", "cron_expression": "20 7 * * *", "timezone": "Europe/Berlin", "misfire_policy": "skip", }, "context_sources": [ { "type": "state-hub", "query": "daily_triage_digest", "bind_to": "context.daily_triage_digest", } ], } ) assert defn.context_sources[0].name == "" def test_scan_and_parse_reads_external_activity_definition_dirs( tmp_path, monkeypatch, ) -> None: repo_root = tmp_path / "activity-core" external_root = tmp_path / "the-custodian" definitions_dir = external_root / "activity-definitions" repo_root.mkdir() definitions_dir.mkdir(parents=True) (definitions_dir / "ops-service-inventory-probes.md").write_text( """--- id: "40d15a87-7ff6-4d8e-992c-37df15f95110" name: "Ops Service Inventory Probes" enabled: false owner: custodian governance: custodian status: proposed trigger: type: cron cron_expression: "15 * * * *" timezone: Europe/Berlin misfire_policy: skip context_sources: - type: ops-inventory query: probe_services bind_to: context.ops_probe params: inventory_path: /tmp/service-inventory.yml evidence_sinks: - type: state-hub-progress event_type: ops_inventory_probe --- # Ops Service Inventory Probes """, encoding="utf-8", ) monkeypatch.chdir(repo_root) monkeypatch.setenv("ACTIVITY_DEFINITION_DIRS", str(external_root)) definitions = scan_and_parse() assert len(definitions) == 1 definition = definitions[0] assert definition.name == "Ops Service Inventory Probes" assert definition.enabled is False assert definition.context_sources[0]["type"] == "ops-inventory" assert definition.context_sources[0]["params"]["evidence_sinks"][0]["type"] == ( "state-hub-progress" )