Files
kaizen-agentic/tests/test_metrics_publish.py
tegwick b48a2102d7 WP-0004: ecosystem integration complete
Add Helix Forge correlation (HELIX_SESSION_UID env, metrics correlate),
artifact-store publish (metrics publish), activity-core ActivityDefinition
references, integration patterns docs, and canon/knowledge design artifacts.
2026-06-16 01:53:01 +02:00

143 lines
4.4 KiB
Python

"""Tests for artifact-store publish integration (WP-0004 Part 3)."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from click.testing import CliRunner
from kaizen_agentic.cli import cli
from kaizen_agentic.integrations.artifact_store import (
PublishResult,
build_optimizer_manifest,
publish_optimizer_evidence,
)
from kaizen_agentic.metrics import OptimizerStore
@pytest.fixture
def project_with_optimizer(tmp_path: Path) -> Path:
store = OptimizerStore(tmp_path)
store.write_analysis(
{
"project": "demo",
"optimized_at": "2026-06-18",
"agents": [{"agent": "tdd-workflow"}],
}
)
store.append_recommendations(
"tdd-workflow",
[{"type": "reliability", "message": "Improve test stability"}],
metrics_count=10,
)
return tmp_path
def test_build_optimizer_manifest(project_with_optimizer: Path):
manifest = build_optimizer_manifest(project_with_optimizer)
assert manifest["schema"] == "kaizen-agentic/optimizer-evidence/v1"
assert manifest["retention_class"] == "raw-evidence"
assert manifest["retention_days"] == 180
assert "tdd-workflow" in manifest["agents"]
def test_publish_optimizer_evidence_calls_api(project_with_optimizer: Path):
calls: list[tuple[str, str]] = []
def fake_json(method, base_url, path, token, payload):
calls.append((method, path))
if path == "/packages":
return {"id": "pkg-123"}
if path.endswith("/finalize"):
return {"id": "pkg-123", "manifest_digest": "blake3:deadbeef"}
raise AssertionError(path)
def fake_multipart(base_url, path, token, **kwargs):
calls.append(("POST", path))
return {"id": "file-1"}
with patch(
"kaizen_agentic.integrations.artifact_store._http_json",
side_effect=fake_json,
), patch(
"kaizen_agentic.integrations.artifact_store._http_multipart",
side_effect=fake_multipart,
):
result = publish_optimizer_evidence(
project_with_optimizer,
api_url="http://api.test",
token="secret",
)
assert result.package_id == "pkg-123"
assert result.files_uploaded == 2
assert result.retention_class == "raw-evidence"
assert calls[0] == ("POST", "/packages")
assert any("/files" in path for _, path in calls)
assert calls[-1] == ("POST", "/packages/pkg-123/finalize")
class TestMetricsPublishCli:
def test_publish_requires_token(self, project_with_optimizer: Path):
runner = CliRunner()
result = runner.invoke(
cli,
["metrics", "publish", "--target", str(project_with_optimizer)],
)
assert result.exit_code != 0
assert "token" in result.output.lower()
def test_publish_success(self, project_with_optimizer: Path):
runner = CliRunner()
with patch(
"kaizen_agentic.cli.publish_optimizer_evidence",
return_value=PublishResult(
package_id="pkg-99",
manifest_digest="blake3:abc",
files_uploaded=2,
retention_class="raw-evidence",
),
):
result = runner.invoke(
cli,
[
"metrics",
"publish",
"--target",
str(project_with_optimizer),
"--token",
"test-token",
"--api-url",
"http://127.0.0.1:8000",
],
)
assert result.exit_code == 0
assert "pkg-99" in result.output
@pytest.mark.integration
def test_publish_against_live_artifact_store(project_with_optimizer: Path):
"""Optional live test — skipped when artifact-store is unreachable."""
import urllib.error
import urllib.request
api_url = "http://127.0.0.1:8000"
try:
urllib.request.urlopen(f"{api_url}/health", timeout=2)
except (urllib.error.URLError, TimeoutError):
pytest.skip("artifact-store not reachable")
token = __import__("os").environ.get("ARTIFACTSTORE_API_TOKEN")
if not token:
pytest.skip("ARTIFACTSTORE_API_TOKEN not set")
result = publish_optimizer_evidence(
project_with_optimizer,
api_url=api_url,
token=token,
)
assert result.package_id
assert result.files_uploaded >= 1