generated from coulomb/repo-seed
98 lines
2.8 KiB
Python
98 lines
2.8 KiB
Python
import uuid
|
|
|
|
from activity_core.definition_parser import scan_and_parse
|
|
from activity_core.models import ActivityDefinition
|
|
from activity_core.sync_activity_definitions import _definition_uuid
|
|
|
|
|
|
def test_definition_uuid_preserves_uuid_ids() -> None:
|
|
raw_id = "6fca51fa-387a-4fd0-bc4e-d62c29eb859a"
|
|
|
|
assert _definition_uuid(raw_id) == uuid.UUID(raw_id)
|
|
|
|
|
|
def test_definition_uuid_maps_slug_ids_stably() -> None:
|
|
first = _definition_uuid("weekly-sbom-staleness")
|
|
second = _definition_uuid("weekly-sbom-staleness")
|
|
|
|
assert first == second
|
|
assert first.version == 5
|
|
|
|
|
|
def test_activity_definition_accepts_adr_style_context_source_without_name() -> None:
|
|
defn = ActivityDefinition.model_validate(
|
|
{
|
|
"id": "6fca51fa-387a-4fd0-bc4e-d62c29eb859a",
|
|
"name": "Daily State Hub WSJF Triage",
|
|
"enabled": False,
|
|
"trigger_config": {
|
|
"trigger_type": "cron",
|
|
"cron_expression": "20 7 * * *",
|
|
"timezone": "Europe/Berlin",
|
|
"misfire_policy": "skip",
|
|
},
|
|
"context_sources": [
|
|
{
|
|
"type": "state-hub",
|
|
"query": "daily_triage_digest",
|
|
"bind_to": "context.daily_triage_digest",
|
|
}
|
|
],
|
|
}
|
|
)
|
|
|
|
assert defn.context_sources[0].name == ""
|
|
|
|
|
|
def test_scan_and_parse_reads_external_activity_definition_dirs(
|
|
tmp_path,
|
|
monkeypatch,
|
|
) -> None:
|
|
repo_root = tmp_path / "activity-core"
|
|
external_root = tmp_path / "the-custodian"
|
|
definitions_dir = external_root / "activity-definitions"
|
|
repo_root.mkdir()
|
|
definitions_dir.mkdir(parents=True)
|
|
(definitions_dir / "ops-service-inventory-probes.md").write_text(
|
|
"""---
|
|
id: "40d15a87-7ff6-4d8e-992c-37df15f95110"
|
|
name: "Ops Service Inventory Probes"
|
|
enabled: false
|
|
owner: custodian
|
|
governance: custodian
|
|
status: proposed
|
|
trigger:
|
|
type: cron
|
|
cron_expression: "15 * * * *"
|
|
timezone: Europe/Berlin
|
|
misfire_policy: skip
|
|
context_sources:
|
|
- type: ops-inventory
|
|
query: probe_services
|
|
bind_to: context.ops_probe
|
|
params:
|
|
inventory_path: /tmp/service-inventory.yml
|
|
evidence_sinks:
|
|
- type: state-hub-progress
|
|
event_type: ops_inventory_probe
|
|
---
|
|
|
|
# Ops Service Inventory Probes
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
monkeypatch.chdir(repo_root)
|
|
monkeypatch.setenv("ACTIVITY_DEFINITION_DIRS", str(external_root))
|
|
|
|
definitions = scan_and_parse()
|
|
|
|
assert len(definitions) == 1
|
|
definition = definitions[0]
|
|
assert definition.name == "Ops Service Inventory Probes"
|
|
assert definition.enabled is False
|
|
assert definition.context_sources[0]["type"] == "ops-inventory"
|
|
assert definition.context_sources[0]["params"]["evidence_sinks"][0]["type"] == (
|
|
"state-hub-progress"
|
|
)
|