"""CLI command tests (ARTIFACT-STORE-WP-0001-T014).""" from __future__ import annotations import asyncio import json from pathlib import Path from typing import Any import pytest from sqlalchemy import create_engine, insert, inspect, select from typer.testing import CliRunner from artifactstore.cli import app as cli_app from artifactstore.db.schema import metadata, retention_classes, storage_locations from artifactstore.db.seed import RETENTION_CLASS_SEEDS REPO_ROOT = Path(__file__).resolve().parents[2] @pytest.fixture def runner() -> CliRunner: return CliRunner() @pytest.fixture def env_db(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: db_path = tmp_path / "cli-test.db" storage_root = tmp_path / "storage" storage_root.mkdir(parents=True, exist_ok=True) monkeypatch.setenv("ARTIFACTSTORE_DATABASE_URL", f"sqlite+aiosqlite:///{db_path}") monkeypatch.setenv("ARTIFACTSTORE_STORAGE_LOCAL_ROOT", str(storage_root)) return db_path def test_cli_version_prints_version(runner: CliRunner) -> None: result = runner.invoke(cli_app, ["version"]) assert result.exit_code == 0 assert result.output.strip() def test_cli_migrate_creates_schema( runner: CliRunner, env_db: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.chdir(REPO_ROOT) result = runner.invoke(cli_app, ["migrate"]) assert result.exit_code == 0, result.output assert "ok" in result.output sync_engine = create_engine(f"sqlite:///{env_db}", future=True) names = set(inspect(sync_engine).get_table_names()) sync_engine.dispose() assert {"events", "retention_classes", "alembic_version"}.issubset(names) def test_cli_replay_against_empty_log( runner: CliRunner, env_db: Path, ) -> None: # Pre-create schema (without using migrate command to keep the test # focused on the replay surface). sync_engine = create_engine(f"sqlite:///{env_db}", future=True) metadata.create_all(sync_engine) with sync_engine.begin() as conn: conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) sync_engine.dispose() result = runner.invoke(cli_app, ["replay"]) assert result.exit_code == 0, result.output assert "replayed up to sequence 0" in result.output def test_cli_health_reports_ok( runner: CliRunner, env_db: Path, ) -> None: sync_engine = create_engine(f"sqlite:///{env_db}", future=True) metadata.create_all(sync_engine) sync_engine.dispose() result = runner.invoke(cli_app, ["health"]) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["status"] == "ok" assert payload["db"]["healthy"] is True assert payload["backend"]["healthy"] is True def test_cli_push_uses_http_api( runner: CliRunner, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: source = tmp_path / "source" source.mkdir() (source / "a.txt").write_text("alpha", encoding="utf-8") calls: list[tuple[str, str, dict[str, Any]]] = [] multipart_calls: list[tuple[str, dict[str, str], bytes]] = [] def fake_http_json( method: str, base_url: str, path: str, token: str, payload: dict[str, Any], ) -> dict[str, Any]: calls.append((method, path, payload)) assert base_url == "http://api.test" assert token == "secret" if path == "/packages": return {"id": "pkg-1"} if path == "/packages/pkg-1/finalize": return {"manifest_digest": "blake3:abc"} raise AssertionError(f"unexpected JSON request: {method} {path}") def fake_http_multipart( base_url: str, path: str, token: str, *, fields: dict[str, str], file_field: str, file_name: str, file_content_type: str, file_bytes: bytes, ) -> dict[str, Any]: assert base_url == "http://api.test" assert token == "secret" assert path == "/packages/pkg-1/files" assert file_field == "file" assert file_name == "a.txt" assert file_content_type == "text/plain" multipart_calls.append((path, fields, file_bytes)) return {"id": "file-1"} monkeypatch.setattr("artifactstore.cli._http_json", fake_http_json) monkeypatch.setattr("artifactstore.cli._http_multipart", fake_http_multipart) result = runner.invoke( cli_app, [ "push", str(source), "--producer", "prod", "--subject", "sub", "--api-url", "http://api.test", "--token", "secret", ], ) assert result.exit_code == 0, result.output assert json.loads(result.output) == { "package_id": "pkg-1", "manifest_digest": "blake3:abc", "files": 1, } assert calls[0][1] == "/packages" assert calls[1][1] == "/packages/pkg-1/finalize" assert multipart_calls == [ ( "/packages/pkg-1/files", {"relative_path": "a.txt", "media_type": "text/plain"}, b"alpha", ) ] def test_cli_manifest_fetches_json_projection( runner: CliRunner, monkeypatch: pytest.MonkeyPatch, ) -> None: def fake_http_bytes( method: str, base_url: str, path: str, token: str, *, body: bytes | None = None, headers: dict[str, str] | None = None, ) -> bytes: assert method == "GET" assert base_url == "http://api.test" assert path == "/packages/pkg-1/manifest.json" assert token == "secret" assert body is None assert headers == {"Accept": "application/json"} return b'{"manifest_version":1}' monkeypatch.setattr("artifactstore.cli._http_bytes", fake_http_bytes) result = runner.invoke( cli_app, [ "manifest", "pkg-1", "--api-url", "http://api.test", "--token", "secret", ], ) assert result.exit_code == 0, result.output assert json.loads(result.output) == {"manifest_version": 1} def test_cli_retention_sweep_marks_expired_package( runner: CliRunner, env_db: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: sync_engine = create_engine(f"sqlite:///{env_db}", future=True) metadata.create_all(sync_engine) with sync_engine.begin() as conn: conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) sync_engine.dispose() retention_config = tmp_path / "retention.toml" retention_config.write_text( '[retention_classes.transient]\ndefault_duration_seconds = 0\n', encoding="utf-8", ) monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config)) async def create_expired_package() -> str: from artifactstore.app import build_registry from artifactstore.config import get_settings registry = build_registry(get_settings()) try: package_id = await registry.create_package( name="expired", producer="tests", subject="cli-sweep", retention_class="transient", actor="ops", ) finally: await registry.dispose() return str(package_id) package_id = asyncio.run(create_expired_package()) result = runner.invoke(cli_app, ["retention", "sweep"]) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload == {"marked_package_ids": [package_id], "marked_count": 1} def test_cli_retention_gc_collects_eligible_package( runner: CliRunner, env_db: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: sync_engine = create_engine(f"sqlite:///{env_db}", future=True) metadata.create_all(sync_engine) with sync_engine.begin() as conn: conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) sync_engine.dispose() retention_config = tmp_path / "retention.toml" retention_config.write_text( '[retention_classes.transient]\ndefault_duration_seconds = 0\n', encoding="utf-8", ) monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config)) async def create_expired_file() -> None: from collections.abc import AsyncIterator from datetime import UTC, datetime, timedelta from artifactstore.app import build_registry from artifactstore.config import get_settings async def stream() -> AsyncIterator[bytes]: yield b"collect-me" registry = build_registry(get_settings()) try: package_id = await registry.create_package( name="collect", producer="tests", subject="cli-gc", retention_class="transient", actor="ops", ) await registry.ingest_file( package_id, relative_path="collect.bin", media_type="application/octet-stream", stream=stream(), actor="ops", ) await registry.finalize_package(package_id, actor="ops") await registry.sweep_deletion_eligibility( now=datetime.now(UTC) + timedelta(seconds=1) ) finally: await registry.dispose() asyncio.run(create_expired_file()) result = runner.invoke(cli_app, ["retention", "gc"]) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["released_location_count"] == 1 assert payload["delete_attempted_object_count"] == 1 assert payload["deleted_object_count"] == 1 assert payload["results"][0]["object_deleted"] is True sync_engine = create_engine(f"sqlite:///{env_db}", future=True) with sync_engine.connect() as conn: status = conn.execute(select(storage_locations.c.status)).scalar_one() sync_engine.dispose() assert status == "deleted" def test_cli_storage_verify_marks_local_location_verified( runner: CliRunner, env_db: Path, ) -> None: sync_engine = create_engine(f"sqlite:///{env_db}", future=True) metadata.create_all(sync_engine) with sync_engine.begin() as conn: conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS]) sync_engine.dispose() async def create_stored_file() -> None: from collections.abc import AsyncIterator from artifactstore.app import build_registry from artifactstore.config import get_settings async def stream() -> AsyncIterator[bytes]: yield b"verify-me" registry = build_registry(get_settings()) try: package_id = await registry.create_package( name="verify", producer="tests", subject="storage", retention_class="raw-evidence", actor="ops", ) await registry.ingest_file( package_id, relative_path="verify.txt", media_type="text/plain", stream=stream(), actor="ops", ) finally: await registry.dispose() asyncio.run(create_stored_file()) result = runner.invoke(cli_app, ["storage", "verify", "--backend", "local"]) assert result.exit_code == 0, result.output payload = json.loads(result.output) assert payload["verified_count"] == 1 assert payload["failed_count"] == 0 assert payload["results"][0]["verified"] is True sync_engine = create_engine(f"sqlite:///{env_db}", future=True) with sync_engine.connect() as conn: status = conn.execute(select(storage_locations.c.status)).scalar_one() sync_engine.dispose() assert status == "verified"