Files
artifact-store/tests/integration/test_cli_commands.py

378 lines
12 KiB
Python

"""CLI command tests (ARTIFACT-STORE-WP-0001-T014)."""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any
import pytest
from sqlalchemy import create_engine, insert, inspect, select
from typer.testing import CliRunner
from artifactstore.cli import app as cli_app
from artifactstore.db.schema import metadata, retention_classes, storage_locations
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
REPO_ROOT = Path(__file__).resolve().parents[2]
@pytest.fixture
def runner() -> CliRunner:
return CliRunner()
@pytest.fixture
def env_db(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
db_path = tmp_path / "cli-test.db"
storage_root = tmp_path / "storage"
storage_root.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("ARTIFACTSTORE_DATABASE_URL", f"sqlite+aiosqlite:///{db_path}")
monkeypatch.setenv("ARTIFACTSTORE_STORAGE_LOCAL_ROOT", str(storage_root))
return db_path
def test_cli_version_prints_version(runner: CliRunner) -> None:
result = runner.invoke(cli_app, ["version"])
assert result.exit_code == 0
assert result.output.strip()
def test_cli_migrate_creates_schema(
runner: CliRunner,
env_db: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.chdir(REPO_ROOT)
result = runner.invoke(cli_app, ["migrate"])
assert result.exit_code == 0, result.output
assert "ok" in result.output
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
names = set(inspect(sync_engine).get_table_names())
sync_engine.dispose()
assert {"events", "retention_classes", "alembic_version"}.issubset(names)
def test_cli_replay_against_empty_log(
runner: CliRunner,
env_db: Path,
) -> None:
# Pre-create schema (without using migrate command to keep the test
# focused on the replay surface).
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
result = runner.invoke(cli_app, ["replay"])
assert result.exit_code == 0, result.output
assert "replayed up to sequence 0" in result.output
def test_cli_health_reports_ok(
runner: CliRunner,
env_db: Path,
) -> None:
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
sync_engine.dispose()
result = runner.invoke(cli_app, ["health"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["status"] == "ok"
assert payload["db"]["healthy"] is True
assert payload["backend"]["healthy"] is True
def test_cli_push_uses_http_api(
runner: CliRunner,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
source = tmp_path / "source"
source.mkdir()
(source / "a.txt").write_text("alpha", encoding="utf-8")
calls: list[tuple[str, str, dict[str, Any]]] = []
multipart_calls: list[tuple[str, dict[str, str], bytes]] = []
def fake_http_json(
method: str,
base_url: str,
path: str,
token: str,
payload: dict[str, Any],
) -> dict[str, Any]:
calls.append((method, path, payload))
assert base_url == "http://api.test"
assert token == "secret"
if path == "/packages":
return {"id": "pkg-1"}
if path == "/packages/pkg-1/finalize":
return {"manifest_digest": "blake3:abc"}
raise AssertionError(f"unexpected JSON request: {method} {path}")
def fake_http_multipart(
base_url: str,
path: str,
token: str,
*,
fields: dict[str, str],
file_field: str,
file_name: str,
file_content_type: str,
file_bytes: bytes,
) -> dict[str, Any]:
assert base_url == "http://api.test"
assert token == "secret"
assert path == "/packages/pkg-1/files"
assert file_field == "file"
assert file_name == "a.txt"
assert file_content_type == "text/plain"
multipart_calls.append((path, fields, file_bytes))
return {"id": "file-1"}
monkeypatch.setattr("artifactstore.cli._http_json", fake_http_json)
monkeypatch.setattr("artifactstore.cli._http_multipart", fake_http_multipart)
result = runner.invoke(
cli_app,
[
"push",
str(source),
"--producer",
"prod",
"--subject",
"sub",
"--api-url",
"http://api.test",
"--token",
"secret",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {
"package_id": "pkg-1",
"manifest_digest": "blake3:abc",
"files": 1,
}
assert calls[0][1] == "/packages"
assert calls[1][1] == "/packages/pkg-1/finalize"
assert multipart_calls == [
(
"/packages/pkg-1/files",
{"relative_path": "a.txt", "media_type": "text/plain"},
b"alpha",
)
]
def test_cli_manifest_fetches_json_projection(
runner: CliRunner,
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_http_bytes(
method: str,
base_url: str,
path: str,
token: str,
*,
body: bytes | None = None,
headers: dict[str, str] | None = None,
) -> bytes:
assert method == "GET"
assert base_url == "http://api.test"
assert path == "/packages/pkg-1/manifest.json"
assert token == "secret"
assert body is None
assert headers == {"Accept": "application/json"}
return b'{"manifest_version":1}'
monkeypatch.setattr("artifactstore.cli._http_bytes", fake_http_bytes)
result = runner.invoke(
cli_app,
[
"manifest",
"pkg-1",
"--api-url",
"http://api.test",
"--token",
"secret",
],
)
assert result.exit_code == 0, result.output
assert json.loads(result.output) == {"manifest_version": 1}
def test_cli_retention_sweep_marks_expired_package(
runner: CliRunner,
env_db: Path,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
retention_config = tmp_path / "retention.toml"
retention_config.write_text(
'[retention_classes.transient]\ndefault_duration_seconds = 0\n',
encoding="utf-8",
)
monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config))
async def create_expired_package() -> str:
from artifactstore.app import build_registry
from artifactstore.config import get_settings
registry = build_registry(get_settings())
try:
package_id = await registry.create_package(
name="expired",
producer="tests",
subject="cli-sweep",
retention_class="transient",
actor="ops",
)
finally:
await registry.dispose()
return str(package_id)
package_id = asyncio.run(create_expired_package())
result = runner.invoke(cli_app, ["retention", "sweep"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload == {"marked_package_ids": [package_id], "marked_count": 1}
def test_cli_retention_gc_collects_eligible_package(
runner: CliRunner,
env_db: Path,
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
retention_config = tmp_path / "retention.toml"
retention_config.write_text(
'[retention_classes.transient]\ndefault_duration_seconds = 0\n',
encoding="utf-8",
)
monkeypatch.setenv("ARTIFACTSTORE_RETENTION_CONFIG_PATH", str(retention_config))
async def create_expired_file() -> None:
from collections.abc import AsyncIterator
from datetime import UTC, datetime, timedelta
from artifactstore.app import build_registry
from artifactstore.config import get_settings
async def stream() -> AsyncIterator[bytes]:
yield b"collect-me"
registry = build_registry(get_settings())
try:
package_id = await registry.create_package(
name="collect",
producer="tests",
subject="cli-gc",
retention_class="transient",
actor="ops",
)
await registry.ingest_file(
package_id,
relative_path="collect.bin",
media_type="application/octet-stream",
stream=stream(),
actor="ops",
)
await registry.finalize_package(package_id, actor="ops")
await registry.sweep_deletion_eligibility(
now=datetime.now(UTC) + timedelta(seconds=1)
)
finally:
await registry.dispose()
asyncio.run(create_expired_file())
result = runner.invoke(cli_app, ["retention", "gc"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["released_location_count"] == 1
assert payload["delete_attempted_object_count"] == 1
assert payload["deleted_object_count"] == 1
assert payload["results"][0]["object_deleted"] is True
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
with sync_engine.connect() as conn:
status = conn.execute(select(storage_locations.c.status)).scalar_one()
sync_engine.dispose()
assert status == "deleted"
def test_cli_storage_verify_marks_local_location_verified(
runner: CliRunner,
env_db: Path,
) -> None:
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
async def create_stored_file() -> None:
from collections.abc import AsyncIterator
from artifactstore.app import build_registry
from artifactstore.config import get_settings
async def stream() -> AsyncIterator[bytes]:
yield b"verify-me"
registry = build_registry(get_settings())
try:
package_id = await registry.create_package(
name="verify",
producer="tests",
subject="storage",
retention_class="raw-evidence",
actor="ops",
)
await registry.ingest_file(
package_id,
relative_path="verify.txt",
media_type="text/plain",
stream=stream(),
actor="ops",
)
finally:
await registry.dispose()
asyncio.run(create_stored_file())
result = runner.invoke(cli_app, ["storage", "verify", "--backend", "local"])
assert result.exit_code == 0, result.output
payload = json.loads(result.output)
assert payload["verified_count"] == 1
assert payload["failed_count"] == 0
assert payload["results"][0]["verified"] is True
sync_engine = create_engine(f"sqlite:///{env_db}", future=True)
with sync_engine.connect() as conn:
status = conn.execute(select(storage_locations.c.status)).scalar_one()
sync_engine.dispose()
assert status == "verified"