from __future__ import annotations from io import BytesIO from kontextual_engine import InMemoryBlobStorage, LocalBlobStorage, S3BlobStorage, content_digest def test_memory_blob_storage_deduplicates_by_digest() -> None: storage = InMemoryBlobStorage() first = storage.put_bytes(b"same content", media_type="text/plain") second = storage.put_bytes(b"same content", media_type="text/plain") assert first.created is True assert second.created is False assert first.blob.storage_ref == second.blob.storage_ref assert storage.read_bytes(first.blob.storage_ref) == b"same content" assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=4)) == b"same content" assert storage.exists(first.blob.digest) is True assert len(storage.iter_blobs()) == 1 def test_local_blob_storage_stores_one_file_for_duplicate_content(tmp_path) -> None: storage = LocalBlobStorage(tmp_path / "blobs") first = storage.put_bytes(b"local content", media_type="text/plain") second = storage.put_bytes(b"local content", media_type="text/plain") assert first.created is True assert second.created is False assert first.blob.storage_ref == second.blob.storage_ref assert storage.read_bytes(first.blob.storage_ref) == b"local content" assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=5)) == b"local content" assert len(storage.iter_blobs()) == 1 assert len([path for path in (tmp_path / "blobs").rglob("*") if path.is_file()]) == 1 def test_blob_cleanup_dry_run_and_delete(tmp_path) -> None: storage = LocalBlobStorage(tmp_path / "blobs") kept = storage.put_bytes(b"kept").blob orphan = storage.put_bytes(b"orphan").blob dry_run = storage.delete_unreferenced({kept.storage_ref}, dry_run=True) deleted = storage.delete_unreferenced({kept.storage_ref}, dry_run=False) assert dry_run.deleted_count == 1 assert dry_run.reclaimable_bytes == len(b"orphan") assert orphan.storage_ref in dry_run.deleted_storage_refs assert deleted.deleted_storage_refs == (orphan.storage_ref,) assert storage.exists(kept.storage_ref) is True assert storage.exists(orphan.storage_ref) is False def test_s3_blob_storage_uses_content_addressed_keys_with_fake_client() -> None: client = FakeS3Client() storage = S3BlobStorage(bucket="test-bucket", prefix="kontextual", client=client) first = storage.put_bytes(b"s3 content", media_type="text/plain") second = storage.put_bytes(b"s3 content", media_type="text/plain") readback = storage.read_bytes(first.blob.storage_ref) listed = storage.iter_blobs() assert first.created is True assert second.created is False assert first.blob.storage_ref == second.blob.storage_ref assert first.blob.digest == content_digest(b"s3 content") assert first.blob.storage_key.startswith("kontextual/sha256/") assert readback == b"s3 content" assert b"".join(storage.iter_bytes(first.blob.storage_ref, chunk_size=2)) == b"s3 content" assert [item.storage_ref for item in listed] == [first.blob.storage_ref] assert client.put_count == 1 class FakeS3Client: def __init__(self) -> None: self.objects: dict[tuple[str, str], dict] = {} self.put_count = 0 def head_object(self, *, Bucket: str, Key: str) -> dict: try: item = self.objects[(Bucket, Key)] except KeyError as exc: raise FakeS3NotFound() from exc return { "ContentLength": len(item["Body"]), "ContentType": item.get("ContentType"), "Metadata": item.get("Metadata", {}), } def put_object(self, **kwargs) -> None: self.put_count += 1 self.objects[(kwargs["Bucket"], kwargs["Key"])] = kwargs def get_object(self, *, Bucket: str, Key: str) -> dict: try: item = self.objects[(Bucket, Key)] except KeyError as exc: raise FakeS3NotFound() from exc return {"Body": BytesIO(item["Body"])} def list_objects_v2(self, *, Bucket: str, Prefix: str, ContinuationToken: str | None = None) -> dict: contents = [ {"Key": key, "Size": len(item["Body"])} for (bucket, key), item in sorted(self.objects.items()) if bucket == Bucket and key.startswith(Prefix) ] return {"Contents": contents, "IsTruncated": False} def delete_object(self, *, Bucket: str, Key: str) -> None: self.objects.pop((Bucket, Key), None) class FakeS3NotFound(Exception): response = {"Error": {"Code": "NoSuchKey"}, "ResponseMetadata": {"HTTPStatusCode": 404}}