Files
activity-core/tests/test_sync_activity_definitions.py

98 lines
2.8 KiB
Python

import uuid
from activity_core.definition_parser import scan_and_parse
from activity_core.models import ActivityDefinition
from activity_core.sync_activity_definitions import _definition_uuid
def test_definition_uuid_preserves_uuid_ids() -> None:
raw_id = "6fca51fa-387a-4fd0-bc4e-d62c29eb859a"
assert _definition_uuid(raw_id) == uuid.UUID(raw_id)
def test_definition_uuid_maps_slug_ids_stably() -> None:
first = _definition_uuid("weekly-sbom-staleness")
second = _definition_uuid("weekly-sbom-staleness")
assert first == second
assert first.version == 5
def test_activity_definition_accepts_adr_style_context_source_without_name() -> None:
defn = ActivityDefinition.model_validate(
{
"id": "6fca51fa-387a-4fd0-bc4e-d62c29eb859a",
"name": "Daily State Hub WSJF Triage",
"enabled": False,
"trigger_config": {
"trigger_type": "cron",
"cron_expression": "20 7 * * *",
"timezone": "Europe/Berlin",
"misfire_policy": "skip",
},
"context_sources": [
{
"type": "state-hub",
"query": "daily_triage_digest",
"bind_to": "context.daily_triage_digest",
}
],
}
)
assert defn.context_sources[0].name == ""
def test_scan_and_parse_reads_external_activity_definition_dirs(
tmp_path,
monkeypatch,
) -> None:
repo_root = tmp_path / "activity-core"
external_root = tmp_path / "the-custodian"
definitions_dir = external_root / "activity-definitions"
repo_root.mkdir()
definitions_dir.mkdir(parents=True)
(definitions_dir / "ops-service-inventory-probes.md").write_text(
"""---
id: "40d15a87-7ff6-4d8e-992c-37df15f95110"
name: "Ops Service Inventory Probes"
enabled: false
owner: custodian
governance: custodian
status: proposed
trigger:
type: cron
cron_expression: "15 * * * *"
timezone: Europe/Berlin
misfire_policy: skip
context_sources:
- type: ops-inventory
query: probe_services
bind_to: context.ops_probe
params:
inventory_path: /tmp/service-inventory.yml
evidence_sinks:
- type: state-hub-progress
event_type: ops_inventory_probe
---
# Ops Service Inventory Probes
""",
encoding="utf-8",
)
monkeypatch.chdir(repo_root)
monkeypatch.setenv("ACTIVITY_DEFINITION_DIRS", str(external_root))
definitions = scan_and_parse()
assert len(definitions) == 1
definition = definitions[0]
assert definition.name == "Ops Service Inventory Probes"
assert definition.enabled is False
assert definition.context_sources[0]["type"] == "ops-inventory"
assert definition.context_sources[0]["params"]["evidence_sinks"][0]["type"] == (
"state-hub-progress"
)