generated from coulomb/repo-seed
574 lines
21 KiB
Python
574 lines
21 KiB
Python
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from kontextual_engine import (
|
|
Actor,
|
|
ActorType,
|
|
AssetRegistryService,
|
|
AssetRepresentation,
|
|
Classification,
|
|
InMemoryAssetRegistryRepository,
|
|
OperationContext,
|
|
RepresentationKind,
|
|
Sensitivity,
|
|
SQLiteAssetRegistryRepository,
|
|
TransformationRunStatus,
|
|
TransformationService,
|
|
ValidationError,
|
|
WorkflowExceptionKind,
|
|
WorkflowExceptionStatus,
|
|
WorkflowInputDefinition,
|
|
WorkflowInputKind,
|
|
WorkflowInvocation,
|
|
WorkflowReviewDecisionType,
|
|
WorkflowReviewStatus,
|
|
WorkflowRunStatus,
|
|
WorkflowService,
|
|
WorkflowStepDefinition,
|
|
WorkflowStepRunStatus,
|
|
WorkflowTemplate,
|
|
)
|
|
|
|
|
|
def test_workflow_template_registration_persists_input_kinds_and_rejects_bad_dependencies() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
service = WorkflowService(repo)
|
|
context = operation_context()
|
|
|
|
template = service.register_template(rich_input_template(), context)
|
|
|
|
assert template.created_by == "user-test"
|
|
assert [item.kind for item in repo.get_workflow_template("workflow-rich-inputs").inputs] == [
|
|
WorkflowInputKind.ASSET,
|
|
WorkflowInputKind.COLLECTION,
|
|
WorkflowInputKind.QUERY,
|
|
WorkflowInputKind.SOURCE_EVENT,
|
|
WorkflowInputKind.PAYLOAD,
|
|
]
|
|
assert repo.list_audit_events(target="workflow_template:workflow-rich-inputs")[0].operation == (
|
|
"workflow.template.register"
|
|
)
|
|
|
|
with pytest.raises(ValidationError) as missing_dependency:
|
|
service.register_template(
|
|
WorkflowTemplate(
|
|
template_id="workflow-bad",
|
|
name="Bad Workflow",
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="late",
|
|
operation_id="structured_view",
|
|
depends_on=("missing",),
|
|
),
|
|
),
|
|
),
|
|
context,
|
|
)
|
|
assert missing_dependency.value.details["diagnostics"][0]["code"] == "workflow.dependency_missing"
|
|
|
|
with pytest.raises(ValidationError) as cycle:
|
|
service.register_template(
|
|
WorkflowTemplate(
|
|
template_id="workflow-cycle",
|
|
name="Cycle",
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="a",
|
|
operation_id="structured_view",
|
|
depends_on=("b",),
|
|
),
|
|
WorkflowStepDefinition(
|
|
step_id="b",
|
|
operation_id="structured_view",
|
|
depends_on=("a",),
|
|
),
|
|
),
|
|
),
|
|
context,
|
|
)
|
|
assert cycle.value.details["diagnostics"][0]["code"] == "workflow.dependency_cycle"
|
|
|
|
|
|
def test_workflow_invocation_executes_dependent_transformations_in_order() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(dependent_transformation_template(), context)
|
|
|
|
result = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-dependent",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.run.status == WorkflowRunStatus.COMPLETED
|
|
step_runs = {step.step_id: step for step in result.run.step_runs}
|
|
assert step_runs["view"].status == WorkflowStepRunStatus.COMPLETED
|
|
assert step_runs["view_again"].status == WorkflowStepRunStatus.COMPLETED
|
|
assert result.run.output_asset_ids == ("asset-view-1", "asset-view-2")
|
|
first_transformation = repo.get_transformation_run(step_runs["view"].transformation_run_id)
|
|
second_transformation = repo.get_transformation_run(step_runs["view_again"].transformation_run_id)
|
|
assert first_transformation.source_asset_ids == ("asset-source",)
|
|
assert second_transformation.source_asset_ids == ("asset-view-1",)
|
|
assert second_transformation.status == TransformationRunStatus.COMPLETED
|
|
assert [
|
|
event.operation
|
|
for event in repo.list_audit_events(target=f"workflow_run:{result.run.run_id}")
|
|
] == [
|
|
"workflow.run.queued",
|
|
"workflow.run.started",
|
|
"workflow.run.completed",
|
|
]
|
|
|
|
|
|
def test_workflow_queue_cancel_resume_and_retry_do_not_require_storage_edits() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(single_step_template(), context)
|
|
|
|
queued = workflow.queue_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-single",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
canceled = workflow.cancel_run(queued.run.run_id, context, reason="operator pause")
|
|
resumed = workflow.resume_run(canceled.run_id, context)
|
|
completed = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-single",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
retry = workflow.retry_run(completed.run.run_id, context)
|
|
|
|
assert queued.run.status == WorkflowRunStatus.QUEUED
|
|
assert canceled.status == WorkflowRunStatus.CANCELED
|
|
assert {step.status for step in canceled.step_runs} == {WorkflowStepRunStatus.CANCELED}
|
|
assert resumed.success is False
|
|
assert resumed.diagnostics[0].code == "workflow.run_not_resumable"
|
|
assert repo.get_workflow_run(completed.run.run_id).status == WorkflowRunStatus.RETRIED
|
|
assert retry.run.retry_of_run_id == completed.run.run_id
|
|
assert retry.run.attempt == 2
|
|
assert retry.run.status == WorkflowRunStatus.COMPLETED
|
|
|
|
|
|
def test_workflow_continue_failure_finishes_partially_completed() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(partial_template(), context)
|
|
|
|
result = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-partial",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
step_runs = {step.step_id: step for step in result.run.step_runs}
|
|
assert result.success is False
|
|
assert result.run.status == WorkflowRunStatus.PARTIALLY_COMPLETED
|
|
assert result.run.output_asset_ids == ("asset-view-success",)
|
|
assert step_runs["markdown"].status == WorkflowStepRunStatus.SKIPPED
|
|
assert step_runs["view"].status == WorkflowStepRunStatus.COMPLETED
|
|
assert result.diagnostics[0].code == "transformation.operation_not_executable"
|
|
assert repo.list_assets(asset_type="derived_artifact")[0].id == "asset-view-success"
|
|
|
|
|
|
def test_sqlite_workflow_templates_and_runs_survive_reinstantiation(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(single_step_template(), context)
|
|
|
|
result = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-single",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
reloaded = SQLiteAssetRegistryRepository(db_path)
|
|
run = reloaded.get_workflow_run(result.run.run_id)
|
|
|
|
assert reloaded.get_workflow_template("workflow-single").template_id == "workflow-single"
|
|
assert run.status == WorkflowRunStatus.COMPLETED
|
|
assert run.step_runs[0].status == WorkflowStepRunStatus.COMPLETED
|
|
assert reloaded.list_workflow_runs(template_id="workflow-single")[0].run_id == result.run.run_id
|
|
assert reloaded.list_representations(asset_id="asset-single-output")[0].kind == RepresentationKind.DERIVED
|
|
|
|
|
|
def test_review_gate_pauses_output_then_continue_completes_and_reconstructs_lineage() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(review_gate_template(), context)
|
|
|
|
waiting = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-review",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
assert waiting.success is False
|
|
assert waiting.run.status == WorkflowRunStatus.WAITING
|
|
assert waiting.run.step_runs[0].status == WorkflowStepRunStatus.WAITING
|
|
assert waiting.run.review_tasks[0].status == WorkflowReviewStatus.OPEN
|
|
assert waiting.run.exceptions[0].kind == WorkflowExceptionKind.REVIEW_REQUIRED
|
|
assert waiting.run.exceptions[0].status == WorkflowExceptionStatus.OPEN
|
|
assert workflow.list_review_tasks(workflow_run_id=waiting.run.run_id) == waiting.run.review_tasks
|
|
assert workflow.list_exception_queue(kind=WorkflowExceptionKind.REVIEW_REQUIRED) == waiting.run.exceptions
|
|
|
|
continued = workflow.record_review_decision(
|
|
waiting.run.run_id,
|
|
waiting.run.review_tasks[0].review_id,
|
|
WorkflowReviewDecisionType.CONTINUE,
|
|
context,
|
|
note="approved for release",
|
|
)
|
|
reconstruction = workflow.reconstruct_run(continued.run.run_id)
|
|
|
|
assert continued.success is True
|
|
assert continued.run.status == WorkflowRunStatus.COMPLETED
|
|
assert continued.run.review_tasks[0].status == WorkflowReviewStatus.CONTINUED
|
|
assert continued.run.exceptions[0].status == WorkflowExceptionStatus.RESOLVED
|
|
assert continued.run.output_asset_ids == ("asset-reviewed-output",)
|
|
assert len(reconstruction.transformation_runs) == 1
|
|
assert reconstruction.derived_lineage[0].output_asset_id == "asset-reviewed-output"
|
|
assert {
|
|
event.operation
|
|
for event in reconstruction.audit_events
|
|
} >= {
|
|
"workflow.review.requested",
|
|
"workflow.review.continue",
|
|
"derived_artifact.lineage.linked",
|
|
}
|
|
|
|
|
|
def test_review_decisions_can_reject_correct_retry_and_escalate_runs() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(review_gate_template(), context)
|
|
|
|
rejected = _invoke_review_workflow(workflow, context)
|
|
rejected_result = workflow.record_review_decision(
|
|
rejected.run.run_id,
|
|
rejected.run.review_tasks[0].review_id,
|
|
WorkflowReviewDecisionType.REJECT,
|
|
context,
|
|
note="incorrect output",
|
|
)
|
|
assert rejected_result.run.status == WorkflowRunStatus.FAILED
|
|
assert rejected_result.run.review_tasks[0].status == WorkflowReviewStatus.REJECTED
|
|
assert rejected_result.run.exceptions[0].status == WorkflowExceptionStatus.RESOLVED
|
|
|
|
corrected = _invoke_review_workflow(workflow, context)
|
|
corrected_result = workflow.record_review_decision(
|
|
corrected.run.run_id,
|
|
corrected.run.review_tasks[0].review_id,
|
|
WorkflowReviewDecisionType.CORRECT,
|
|
context,
|
|
note="minor metadata correction",
|
|
correction={"label": "approved-after-edit"},
|
|
)
|
|
assert corrected_result.run.status == WorkflowRunStatus.COMPLETED
|
|
assert corrected_result.run.review_tasks[0].status == WorkflowReviewStatus.CORRECTED
|
|
assert corrected_result.run.review_tasks[0].correction == {"label": "approved-after-edit"}
|
|
|
|
retry_requested = _invoke_review_workflow(workflow, context)
|
|
retry_result = workflow.record_review_decision(
|
|
retry_requested.run.run_id,
|
|
retry_requested.run.review_tasks[0].review_id,
|
|
WorkflowReviewDecisionType.RETRY,
|
|
context,
|
|
note="rerun with same inputs",
|
|
)
|
|
assert repo.get_workflow_run(retry_requested.run.run_id).status == WorkflowRunStatus.RETRIED
|
|
assert retry_result.run.retry_of_run_id == retry_requested.run.run_id
|
|
assert retry_result.run.status == WorkflowRunStatus.WAITING
|
|
assert retry_result.run.review_tasks[0].status == WorkflowReviewStatus.OPEN
|
|
|
|
escalated = _invoke_review_workflow(workflow, context)
|
|
escalated_result = workflow.record_review_decision(
|
|
escalated.run.run_id,
|
|
escalated.run.review_tasks[0].review_id,
|
|
WorkflowReviewDecisionType.ESCALATE,
|
|
context,
|
|
note="needs owner decision",
|
|
)
|
|
assert escalated_result.run.status == WorkflowRunStatus.WAITING
|
|
assert escalated_result.run.review_tasks[0].status == WorkflowReviewStatus.ESCALATED
|
|
assert escalated_result.run.exceptions[0].status == WorkflowExceptionStatus.ESCALATED
|
|
assert workflow.list_exception_queue(status=WorkflowExceptionStatus.ESCALATED)[0].exception_id == (
|
|
escalated_result.run.exceptions[0].exception_id
|
|
)
|
|
|
|
|
|
def test_failed_workflow_step_creates_exception_queue_item() -> None:
|
|
repo = InMemoryAssetRegistryRepository()
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(markdown_failure_template(), context)
|
|
|
|
result = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-failure",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
exceptions = workflow.list_exception_queue(kind=WorkflowExceptionKind.FAILED)
|
|
|
|
assert result.run.status == WorkflowRunStatus.FAILED
|
|
assert exceptions[0].workflow_run_id == result.run.run_id
|
|
assert exceptions[0].step_id == "markdown"
|
|
assert exceptions[0].diagnostics[0]["code"] == "transformation.operation_not_executable"
|
|
|
|
|
|
def test_sqlite_review_state_survives_reinstantiation(tmp_path: Path) -> None:
|
|
db_path = tmp_path / "registry.sqlite"
|
|
repo = SQLiteAssetRegistryRepository(db_path)
|
|
registry = AssetRegistryService(repo)
|
|
context = operation_context()
|
|
create_source_asset(registry, context, asset_id="asset-source")
|
|
workflow = WorkflowService(
|
|
repo,
|
|
transformation_service=TransformationService(repo, asset_service=registry),
|
|
)
|
|
workflow.register_template(review_gate_template(), context)
|
|
|
|
waiting = workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-review",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
reloaded = WorkflowService(SQLiteAssetRegistryRepository(db_path))
|
|
run = reloaded.reconstruct_run(waiting.run.run_id).run
|
|
|
|
assert run.status == WorkflowRunStatus.WAITING
|
|
assert run.review_tasks[0].status == WorkflowReviewStatus.OPEN
|
|
assert run.exceptions[0].kind == WorkflowExceptionKind.REVIEW_REQUIRED
|
|
|
|
|
|
def rich_input_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-rich-inputs",
|
|
name="Rich Inputs",
|
|
inputs=(
|
|
WorkflowInputDefinition("source", WorkflowInputKind.ASSET),
|
|
WorkflowInputDefinition("bundle", WorkflowInputKind.COLLECTION, required=False),
|
|
WorkflowInputDefinition("query", WorkflowInputKind.QUERY, required=False),
|
|
WorkflowInputDefinition("event", WorkflowInputKind.SOURCE_EVENT, required=False),
|
|
WorkflowInputDefinition("payload", WorkflowInputKind.PAYLOAD, required=False),
|
|
),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="view",
|
|
operation_id="structured_view",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-rich-view"},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def dependent_transformation_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-dependent",
|
|
name="Dependent Transformations",
|
|
inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="view",
|
|
operation_id="structured_view",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-view-1", "title": "First View"},
|
|
),
|
|
WorkflowStepDefinition(
|
|
step_id="view_again",
|
|
operation_id="structured_view",
|
|
depends_on=("view",),
|
|
inputs={"source_asset_ids": "$steps.view.output_asset_ids"},
|
|
outputs={"asset_id": "asset-view-2", "title": "Second View"},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def single_step_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-single",
|
|
name="Single Step",
|
|
inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="view",
|
|
operation_id="structured_view",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-single-output", "title": "Single Output"},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def partial_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-partial",
|
|
name="Partial Workflow",
|
|
inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="markdown",
|
|
operation_id="markdown_transform",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-markdown-output"},
|
|
failure_behavior="continue",
|
|
),
|
|
WorkflowStepDefinition(
|
|
step_id="view",
|
|
operation_id="structured_view",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-view-success"},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def review_gate_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-review",
|
|
name="Review Workflow",
|
|
inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="view",
|
|
operation_id="structured_view",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-reviewed-output", "title": "Reviewed Output"},
|
|
review_gate={
|
|
"required": True,
|
|
"reason": "sensitive output requires approval",
|
|
"queue": "content-review",
|
|
},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def markdown_failure_template() -> WorkflowTemplate:
|
|
return WorkflowTemplate(
|
|
template_id="workflow-failure",
|
|
name="Failure Workflow",
|
|
inputs=(WorkflowInputDefinition("source", WorkflowInputKind.ASSET),),
|
|
steps=(
|
|
WorkflowStepDefinition(
|
|
step_id="markdown",
|
|
operation_id="markdown_transform",
|
|
inputs={"source_asset_ids": "$inputs.source"},
|
|
outputs={"asset_id": "asset-markdown-failure"},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
def _invoke_review_workflow(
|
|
workflow: WorkflowService,
|
|
context: OperationContext,
|
|
):
|
|
return workflow.invoke_template(
|
|
WorkflowInvocation(
|
|
template_id="workflow-review",
|
|
inputs={"source": {"kind": "asset", "asset_id": "asset-source"}},
|
|
),
|
|
context,
|
|
)
|
|
|
|
|
|
def create_source_asset(
|
|
registry: AssetRegistryService,
|
|
context: OperationContext,
|
|
*,
|
|
asset_id: str,
|
|
) -> None:
|
|
registry.create_asset(
|
|
"Source",
|
|
Classification(
|
|
asset_type="document",
|
|
sensitivity=Sensitivity.INTERNAL,
|
|
owner="Platform Knowledge",
|
|
),
|
|
context,
|
|
asset_id=asset_id,
|
|
representations=[
|
|
AssetRepresentation.from_content(
|
|
asset_id,
|
|
RepresentationKind.SOURCE,
|
|
"text/markdown",
|
|
"# Source\n",
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
def operation_context() -> OperationContext:
|
|
actor = Actor.create(
|
|
ActorType.HUMAN,
|
|
actor_id="user-test",
|
|
display_name="Test User",
|
|
groups=["engineering"],
|
|
)
|
|
return OperationContext.create(actor, correlation_id="corr-test")
|