Files
artifact-store/tests/integration/test_http_api.py

241 lines
9.3 KiB
Python

"""HTTP API integration tests for ARTIFACT-STORE-WP-0002."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
import cbor2
from fastapi.testclient import TestClient
from hypothesis import HealthCheck, given
from hypothesis import settings as hypothesis_settings
from hypothesis import strategies as st
from sqlalchemy import create_engine, insert
from artifactstore.api.http import create_app
from artifactstore.config import Settings
from artifactstore.db.schema import metadata, retention_classes
from artifactstore.db.seed import RETENTION_CLASS_SEEDS
from artifactstore.identity import digest_bytes
AUTH = {"Authorization": "Bearer test-token"}
def _settings(root: Path) -> Settings:
db_path = root / "http-api.db"
storage_root = root / "storage"
storage_root.mkdir(parents=True, exist_ok=True)
sync_engine = create_engine(f"sqlite:///{db_path}", future=True)
metadata.create_all(sync_engine)
with sync_engine.begin() as conn:
conn.execute(insert(retention_classes), [dict(s) for s in RETENTION_CLASS_SEEDS])
sync_engine.dispose()
return Settings(
database_url=f"sqlite+aiosqlite:///{db_path}",
storage_local_root=str(storage_root),
log_level="INFO",
auth_tokens="test-token",
)
def _create_package(client: TestClient, *, name: str = "pkg") -> str:
resp = client.post(
"/packages",
headers=AUTH,
json={
"name": name,
"producer": "guide-board",
"subject": "run-42",
"retention_class": "raw-evidence",
"metadata": {"run_id": "r-42", "kind": "integration"},
},
)
assert resp.status_code == 201, resp.text
return str(resp.json()["id"])
def _upload_file(client: TestClient, package_id: str, rel_path: str, data: bytes) -> dict[str, Any]:
resp = client.post(
f"/packages/{package_id}/files",
headers=AUTH,
data={"relative_path": rel_path, "media_type": "application/octet-stream"},
files={"file": (Path(rel_path).name, data, "application/octet-stream")},
)
assert resp.status_code == 201, resp.text
return dict(resp.json())
def test_http_surface_ingest_finalize_download_and_events(tmp_path: Path) -> None:
app = create_app(_settings(tmp_path))
with TestClient(app) as client:
unauth = client.get("/packages")
assert unauth.status_code == 401
assert unauth.headers["content-type"].startswith("application/problem+json")
assert client.get("/openapi.json").status_code == 200
assert client.get("/docs").status_code == 200
assert client.get("/backends", headers=AUTH).json()["backends"][0]["backend_id"] == "local"
assert client.get("/retention-classes", headers=AUTH).json()["retention_classes"]
package_id = _create_package(client)
listing = client.get(
"/packages",
headers=AUTH,
params={
"producer": "guide-board",
"subject": "run-42",
"retention_class": "raw-evidence",
"metadata_key": "run_id",
"metadata_value": "r-42",
},
)
assert listing.status_code == 200
assert [p["id"] for p in listing.json()["packages"]] == [package_id]
data = b"hello artifact-store http api" * 64
file_record = _upload_file(client, package_id, "reports/hello.bin", data)
assert file_record["size_bytes"] == len(data)
assert file_record["digest_primary_hex"] == digest_bytes(data).primary.hex
finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH)
assert finalized.status_code == 200, finalized.text
assert finalized.json()["status"] == "finalized"
assert finalized.json()["manifest_digest"].startswith("blake3:")
manifest_cbor = client.get(
f"/packages/{package_id}/manifest",
headers={**AUTH, "Accept": "application/cbor"},
)
assert manifest_cbor.status_code == 200
manifest_payload = cbor2.loads(manifest_cbor.content)
assert manifest_payload["manifest_version"] == 1
assert manifest_payload["package"]["id"] == package_id
manifest_json = client.get(f"/packages/{package_id}/manifest.json", headers=AUTH)
assert manifest_json.status_code == 200
assert manifest_json.json()["files"][0]["relative_path"] == "reports/hello.bin"
file_id = file_record["id"]
metadata_resp = client.get(f"/files/{file_id}", headers=AUTH)
assert metadata_resp.status_code == 200
content_address = metadata_resp.json()["content_address"]
download = client.get(f"/files/{file_id}/download", headers=AUTH)
assert download.status_code == 200
assert download.content == data
assert download.headers["etag"] == f'"{content_address}"'
partial = client.get(
f"/files/{file_id}/download",
headers={**AUTH, "Range": "bytes=6-17"},
)
assert partial.status_code == 206
assert partial.headers["content-range"] == f"bytes 6-17/{len(data)}"
assert partial.content == data[6:18]
not_modified = client.get(
f"/files/{file_id}/download",
headers={**AUTH, "If-None-Match": f'"{content_address}"'},
)
assert not_modified.status_code == 304
events_json = client.get(
"/events",
headers={**AUTH, "Accept": "application/json"},
params={"since": 0, "limit": 10, "wait_seconds": 0},
)
assert events_json.status_code == 200
assert [e["event_type"] for e in events_json.json()["events"]] == [
"v1.package.created",
"v1.retention.default_applied",
"v1.file.ingested",
"v1.package.finalized",
]
events_cbor = client.get(
"/events",
headers={**AUTH, "Accept": "application/cbor"},
params={"since": 0, "limit": 10, "wait_seconds": 0},
)
assert events_cbor.status_code == 200
assert cbor2.loads(events_cbor.content)["events"][0]["sequence"] == 1
def test_http_scripted_50_file_package_flow(tmp_path: Path) -> None:
app = create_app(_settings(tmp_path))
with TestClient(app) as client:
package_id = _create_package(client, name="fifty")
uploaded: list[tuple[str, bytes, dict[str, Any]]] = []
for idx in range(50):
rel_path = f"bundle/file-{idx:02d}.bin"
payload = f"payload {idx:02d}:".encode() + bytes([idx]) * (idx + 1)
record = _upload_file(client, package_id, rel_path, payload)
uploaded.append((rel_path, payload, record))
finalized = client.post(f"/packages/{package_id}/finalize", headers=AUTH)
assert finalized.status_code == 200, finalized.text
for rel_path, payload, record in uploaded:
assert record["relative_path"] == rel_path
assert record["digest_primary_hex"] == digest_bytes(payload).primary.hex
downloaded = client.get(f"/files/{record['id']}/download", headers=AUTH)
assert downloaded.status_code == 200
assert downloaded.content == payload
events = client.get(
"/events",
headers={**AUTH, "Accept": "application/json"},
params={"since": 0, "limit": 100, "wait_seconds": 0},
)
assert events.status_code == 200
assert len(events.json()["events"]) == 53
assert events.json()["events"][-1]["event_type"] == "v1.package.finalized"
@given(
data=st.binary(min_size=1, max_size=512),
stem=st.text(alphabet=list("abcdefghijklmnopqrstuvwxyz0123456789_-"), min_size=1, max_size=24),
)
@hypothesis_settings(
max_examples=12,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture],
)
def test_upload_session_lifecycle_property(data: bytes, stem: str) -> None:
with tempfile.TemporaryDirectory() as tmp:
app = create_app(_settings(Path(tmp)))
with TestClient(app) as client:
package_id = _create_package(client, name="upload-session")
opened = client.post(
"/uploads",
headers=AUTH,
json={"expected_size_bytes": len(data), "media_type": "application/octet-stream"},
)
assert opened.status_code == 201, opened.text
upload_url = opened.json()["content_upload_url"]
patched = client.patch(
upload_url,
headers={**AUTH, "Content-Range": f"bytes 0-{len(data) - 1}/{len(data)}"},
content=data,
)
assert patched.status_code == 200, patched.text
assert patched.json()["received_bytes"] == len(data)
completed = client.post(
f"{upload_url}/complete",
headers=AUTH,
json={
"package_id": package_id,
"relative_path": f"uploads/{stem}.bin",
"media_type": "application/octet-stream",
},
)
assert completed.status_code == 201, completed.text
file_id = completed.json()["id"]
downloaded = client.get(f"/files/{file_id}/download", headers=AUTH)
assert downloaded.status_code == 200
assert downloaded.content == data