Files
state-hub/tests/test_reconciliation.py

126 lines
3.5 KiB
Python

from __future__ import annotations
from api.services.reconciliation import (
ReconciliationClass,
classify_task_status_change,
classify_workstream_status_change,
)
def test_workstream_open_transition_writes_through():
result = classify_workstream_status_change(
current_status="active",
target_status="backlog",
file_backed=True,
)
assert result.reconciliation_class == ReconciliationClass.WRITE_THROUGH
assert "frontmatter" in result.reason
def test_workstream_unfiled_transition_is_deferred():
result = classify_workstream_status_change(
current_status="active",
target_status="backlog",
file_backed=False,
)
assert result.reconciliation_class == ReconciliationClass.DEFERRED
assert "not backed" in result.reason
def test_workstream_finish_requires_terminal_tasks():
result = classify_workstream_status_change(
current_status="active",
target_status="finished",
file_backed=True,
tasks_terminal=False,
)
assert result.reconciliation_class == ReconciliationClass.HUMAN_CONFIRMATION
assert "open work" in result.reason
def test_workstream_finish_with_terminal_tasks_writes_through():
result = classify_workstream_status_change(
current_status="active",
target_status="finished",
file_backed=True,
tasks_terminal=True,
)
assert result.reconciliation_class == ReconciliationClass.WRITE_THROUGH
assert "finished" in result.follow_up
def test_archived_workstream_file_requires_confirmation():
result = classify_workstream_status_change(
current_status="finished",
target_status="active",
file_backed=True,
archived_file=True,
)
assert result.reconciliation_class == ReconciliationClass.HUMAN_CONFIRMATION
assert "archived" in result.reason
def test_task_status_update_writes_through_to_task_block():
result = classify_task_status_change(
current_status="todo",
target_status="progress",
file_backed=True,
task_linked=True,
)
assert result.reconciliation_class == ReconciliationClass.WRITE_THROUGH
assert "task block" in result.reason
def test_task_without_file_is_deferred():
result = classify_task_status_change(
current_status="todo",
target_status="progress",
file_backed=False,
task_linked=True,
)
assert result.reconciliation_class == ReconciliationClass.DEFERRED
assert "without a local workplan file" in result.reason
def test_unlinked_task_is_deferred_until_link_repaired():
result = classify_task_status_change(
current_status="todo",
target_status="progress",
file_backed=True,
task_linked=False,
)
assert result.reconciliation_class == ReconciliationClass.DEFERRED
assert "not linked" in result.reason
def test_wait_task_requires_blocking_reason():
result = classify_task_status_change(
current_status="todo",
target_status="wait",
file_backed=True,
task_linked=True,
)
assert result.reconciliation_class == ReconciliationClass.HUMAN_CONFIRMATION
assert "wait condition" in result.reason
def test_wait_task_with_reason_writes_through():
result = classify_task_status_change(
current_status="todo",
target_status="wait",
file_backed=True,
task_linked=True,
blocking_reason="Waiting on dependency",
)
assert result.reconciliation_class == ReconciliationClass.WRITE_THROUGH