"""Archive integration with artifact-store (IB-WP-0014). The live infospace stays in a local working folder. This module bundles a finalized snapshot of the infospace (or a curated slice) into a content- addressed artifact-store package, finalizes it, and records the returned package id and manifest digest under ``output/archives/index.yaml``. Storage backend choice (local FS in artifact-store v0.1, S3 in WP-0004) is delegated to artifact-store - it is not re-implemented here. """ from __future__ import annotations import asyncio import fnmatch import json import mimetypes import os from collections.abc import AsyncIterator, Iterable from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any from uuid import UUID import yaml from sqlalchemy import insert, select from sqlalchemy.ext.asyncio import create_async_engine from artifactstore.dataplane import InProcessDataPlane from artifactstore.db.schema import metadata as artifactstore_metadata from artifactstore.db.schema import retention_classes as retention_classes_table from artifactstore.db.seed import RETENTION_CLASS_SEEDS from artifactstore.events import RegistryViewWriter from artifactstore.registry import Registry from artifactstore.storage import LocalBackend from .errors import InfospaceError from .lifecycle import load_infospace ARCHIVE_INDEX_PATH = "output/archives/index.yaml" ARCHIVE_STORE_DIR = "output/archives/.store" ARCHIVE_DB_NAME = "registry.sqlite" ARCHIVE_BACKEND_DIR = "storage" DEFAULT_INCLUDE: tuple[str, ...] = ( "infospace.yaml", "artifacts", "workflows", "output", "reports", "exports", ) DEFAULT_RETENTION_CLASS = "release-evidence" PRODUCER = "infospace-bench" DEFAULT_ACTOR = "infospace-bench" @dataclass(frozen=True) class ArchiveRecord: """One row of ``output/archives/index.yaml``.""" package_id: str manifest_digest: str retention_class: str created_at: str included_paths: list[str] file_count: int note: str = "" producer: str = PRODUCER subject: str = "" store_root: str | None = None metadata: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: out: dict[str, Any] = { "package_id": self.package_id, "manifest_digest": self.manifest_digest, "retention_class": self.retention_class, "created_at": self.created_at, "included_paths": list(self.included_paths), "file_count": self.file_count, "note": self.note, "producer": self.producer, "subject": self.subject, } if self.store_root is not None: out["store_root"] = self.store_root if self.metadata: out["metadata"] = dict(self.metadata) return out @classmethod def from_dict(cls, data: dict[str, Any]) -> "ArchiveRecord": return cls( package_id=str(data["package_id"]), manifest_digest=str(data["manifest_digest"]), retention_class=str(data["retention_class"]), created_at=str(data["created_at"]), included_paths=list(data.get("included_paths", [])), file_count=int(data.get("file_count", 0)), note=str(data.get("note", "")), producer=str(data.get("producer", PRODUCER)), subject=str(data.get("subject", "")), store_root=( str(data["store_root"]) if data.get("store_root") is not None else None ), metadata=dict(data.get("metadata", {})), ) def archive_infospace( root: str | Path, *, retention_class: str = DEFAULT_RETENTION_CLASS, include: Iterable[str] | None = None, exclude: Iterable[str] | None = None, note: str = "", registry: Registry | None = None, store_root: str | Path | None = None, actor: str = DEFAULT_ACTOR, ) -> ArchiveRecord: """Bundle the infospace at ``root`` into an artifact-store package. Returns the new :class:`ArchiveRecord` and appends it to the infospace's ``output/archives/index.yaml``. When ``registry`` is None, a self-contained SQLite + local-FS registry is built under ``store_root`` (default: ``/output/archives/.store/``). """ include_tuple = tuple(include) if include else DEFAULT_INCLUDE exclude_tuple = tuple(exclude or ()) return asyncio.run( _archive_infospace_async( Path(root), retention_class=retention_class, include=include_tuple, exclude=exclude_tuple, note=note, registry=registry, store_root=Path(store_root) if store_root else None, actor=actor, ) ) @dataclass(frozen=True) class RestoredArchive: """Result of :func:`restore_archive`.""" package_id: str manifest_digest: str target: str file_count: int restored_paths: list[str] def to_dict(self) -> dict[str, Any]: return { "package_id": self.package_id, "manifest_digest": self.manifest_digest, "target": self.target, "file_count": self.file_count, "restored_paths": list(self.restored_paths), } def restore_archive( package_id: str, *, target: str | Path, store_root: str | Path | None = None, source_infospace: str | Path | None = None, registry: Registry | None = None, force: bool = False, ) -> RestoredArchive: """Re-materialize an archived infospace package into ``target``. Exactly one of ``store_root``, ``source_infospace``, or ``registry`` must locate the artifact-store. ``source_infospace`` is a convenience that resolves to ``/output/archives/.store/``. """ return asyncio.run( _restore_archive_async( package_id=package_id, target=Path(target), store_root=Path(store_root) if store_root else None, source_infospace=Path(source_infospace) if source_infospace else None, registry=registry, force=force, ) ) def annotate_retention( archives: Iterable[ArchiveRecord], *, store_root: str | Path | None = None, source_infospace: str | Path | None = None, registry: Registry | None = None, ) -> list[dict[str, Any]]: """Pair each record with its current retention state if reachable. Returns a list of ``{"archive": ArchiveRecord.to_dict(), "retention": {...}|None}`` entries. Records whose registry cannot be opened get ``retention: None``. """ return asyncio.run( _annotate_retention_async( tuple(archives), store_root=Path(store_root) if store_root else None, source_infospace=Path(source_infospace) if source_infospace else None, registry=registry, ) ) def list_archives(root: str | Path) -> list[ArchiveRecord]: """Return the recorded archive entries for an infospace.""" path = Path(root) / ARCHIVE_INDEX_PATH if not path.exists(): return [] raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {} if not isinstance(raw, dict): raise InfospaceError( "invalid_archive_index", f"Archive index must be a mapping: {path}", {"path": str(path)}, ) items = raw.get("archives", []) if not isinstance(items, list): raise InfospaceError( "invalid_archive_index", f"Archive index 'archives' must be a list: {path}", {"path": str(path)}, ) return [ArchiveRecord.from_dict(item) for item in items] async def _archive_infospace_async( root: Path, *, retention_class: str, include: tuple[str, ...], exclude: tuple[str, ...], note: str, registry: Registry | None, store_root: Path | None, actor: str, ) -> ArchiveRecord: infospace = load_infospace(root) subject = infospace.config.slug auto_exclude = (ARCHIVE_STORE_DIR, ARCHIVE_INDEX_PATH) effective_exclude = exclude + auto_exclude files = _collect_files(root, include=include, exclude=effective_exclude) if not files: raise InfospaceError( "empty_archive", "No files matched the include set for archiving", {"root": str(root), "include": list(include)}, ) owned_registry = registry is None effective_store_root: Path | None = None if owned_registry: effective_store_root = store_root or (root / ARCHIVE_STORE_DIR) registry = await _build_local_registry(effective_store_root) try: assert registry is not None package_id = await registry.create_package( name=f"infospace {subject}", producer=PRODUCER, subject=subject, retention_class=retention_class, actor=actor, metadata={ "infospace_slug": subject, "infospace_name": infospace.config.name, "topic_domain": infospace.config.topic.domain, "included_paths": list(include), "note": note, }, ) for relative_path, abs_path in files: media = ( mimetypes.guess_type(abs_path.name)[0] or "application/octet-stream" ) await registry.ingest_file( package_id, relative_path=relative_path, media_type=media, stream=_file_stream(abs_path), actor=actor, ) manifest_addr = await registry.finalize_package(package_id, actor=actor) pkg = await registry.get_package(package_id) finally: if owned_registry and registry is not None: await registry.dispose() finalized_at = pkg.finalized_at or datetime.now(timezone.utc) record = ArchiveRecord( package_id=str(package_id), manifest_digest=str(manifest_addr), retention_class=retention_class, created_at=finalized_at.isoformat(), included_paths=list(include), file_count=len(files), note=note, producer=PRODUCER, subject=subject, store_root=str(effective_store_root) if effective_store_root else None, ) _append_index(root, record) return record def _collect_files( root: Path, *, include: tuple[str, ...], exclude: tuple[str, ...], ) -> list[tuple[str, Path]]: seen: dict[str, Path] = {} for pattern in include: target = root / pattern if target.is_file(): rel = pattern.replace(os.sep, "/") if not _is_excluded(rel, exclude): seen.setdefault(rel, target) elif target.is_dir(): for path in target.rglob("*"): if not path.is_file(): continue rel = str(path.relative_to(root)).replace(os.sep, "/") if _is_excluded(rel, exclude): continue seen.setdefault(rel, path) return sorted(seen.items()) def _is_excluded(rel_path: str, exclude: tuple[str, ...]) -> bool: for pattern in exclude: cleaned = pattern.rstrip("/") if rel_path == cleaned or rel_path.startswith(cleaned + "/"): return True if fnmatch.fnmatch(rel_path, pattern): return True return False async def _file_stream( path: Path, chunk_size: int = 1024 * 1024, ) -> AsyncIterator[bytes]: with path.open("rb") as fh: while True: chunk = fh.read(chunk_size) if not chunk: break yield chunk def _append_index(root: Path, record: ArchiveRecord) -> None: path = root / ARCHIVE_INDEX_PATH path.parent.mkdir(parents=True, exist_ok=True) existing: list[dict[str, Any]] = [] if path.exists(): raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {} if isinstance(raw, dict): items = raw.get("archives", []) if isinstance(items, list): existing = list(items) existing.append(record.to_dict()) path.write_text( yaml.safe_dump({"archives": existing}, sort_keys=False), encoding="utf-8", ) async def _build_local_registry(store_root: Path) -> Registry: store_root.mkdir(parents=True, exist_ok=True) db_path = store_root / ARCHIVE_DB_NAME backend_root = store_root / ARCHIVE_BACKEND_DIR engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) async with engine.begin() as conn: await conn.run_sync(artifactstore_metadata.create_all) seeded = ( await conn.execute(select(retention_classes_table).limit(1)) ).first() if seeded is None: for seed in RETENTION_CLASS_SEEDS: await conn.execute(insert(retention_classes_table).values(**seed)) backend = LocalBackend(backend_root, backend_id="local") dataplane = InProcessDataPlane(backend) return Registry(engine, dataplane, RegistryViewWriter()) def _resolve_store_root( *, store_root: Path | None, source_infospace: Path | None, ) -> Path | None: if store_root is not None and source_infospace is not None: raise InfospaceError( "ambiguous_archive_store", "Pass at most one of store_root or source_infospace", {}, ) if store_root is not None: return store_root if source_infospace is not None: return source_infospace / ARCHIVE_STORE_DIR return None async def _restore_archive_async( *, package_id: str, target: Path, store_root: Path | None, source_infospace: Path | None, registry: Registry | None, force: bool, ) -> RestoredArchive: owned_registry = registry is None if owned_registry: resolved_store = _resolve_store_root( store_root=store_root, source_infospace=source_infospace, ) if resolved_store is None: raise InfospaceError( "missing_archive_store", "restore_archive needs registry, store_root, or source_infospace", {}, ) if not resolved_store.exists(): raise InfospaceError( "missing_archive_store", f"Archive store does not exist: {resolved_store}", {"store_root": str(resolved_store)}, ) registry = await _build_local_registry(resolved_store) try: assert registry is not None pkg_uuid = UUID(package_id) pkg = await registry.get_package(pkg_uuid) if pkg.manifest_digest_hex is None: raise InfospaceError( "unfinalized_package", f"Package is not finalized: {package_id}", {"package_id": package_id, "status": pkg.status}, ) manifest_bytes = await registry.get_manifest_bytes(pkg_uuid, format="json") manifest = json.loads(manifest_bytes.decode("utf-8")) target.mkdir(parents=True, exist_ok=True) if not force and any(target.iterdir()): raise InfospaceError( "restore_target_not_empty", f"Refusing to restore into non-empty directory: {target}", {"target": str(target)}, ) restored: list[str] = [] for entry in manifest.get("files", []): rel = str(entry["relative_path"]) file_id = UUID(str(entry["id"])) dest = (target / rel).resolve() target_resolved = target.resolve() if target_resolved not in dest.parents and dest != target_resolved: raise InfospaceError( "unsafe_restore_path", f"Manifest path escapes target: {rel}", {"target": str(target), "relative_path": rel}, ) dest.parent.mkdir(parents=True, exist_ok=True) stream = await registry.get_file(file_id) with dest.open("wb") as fh: async for chunk in stream: fh.write(chunk) restored.append(rel) finally: if owned_registry and registry is not None: await registry.dispose() return RestoredArchive( package_id=package_id, manifest_digest=f"blake3:{pkg.manifest_digest_hex}", target=str(target), file_count=len(restored), restored_paths=restored, ) async def _annotate_retention_async( archives: tuple[ArchiveRecord, ...], *, store_root: Path | None, source_infospace: Path | None, registry: Registry | None, ) -> list[dict[str, Any]]: if not archives: return [] owned_registry = registry is None used_store_root: Path | None = None if owned_registry: used_store_root = _resolve_store_root( store_root=store_root, source_infospace=source_infospace, ) if used_store_root is None or not used_store_root.exists(): return [{"archive": rec.to_dict(), "retention": None} for rec in archives] registry = await _build_local_registry(used_store_root) try: assert registry is not None results: list[dict[str, Any]] = [] for rec in archives: retention: dict[str, Any] | None try: state = await registry.get_retention_state(UUID(rec.package_id)) retention = { "current_expires_at": ( state.current_expires_at.isoformat() if state.current_expires_at else None ), "effective_class": state.effective_class, "active_hold_id": ( str(state.active_hold_id) if state.active_hold_id else None ), "eligible_for_deletion": state.eligible_for_deletion, } except Exception as exc: retention = {"error": f"{type(exc).__name__}: {exc}"} results.append({"archive": rec.to_dict(), "retention": retention}) return results finally: if owned_registry and registry is not None: await registry.dispose()