"""Archive integration with artifact-store (IB-WP-0014). The live infospace stays in a local working folder. This module bundles a finalized snapshot of the infospace (or a curated slice) into a content- addressed artifact-store package, finalizes it, and records the returned package id and manifest digest under ``output/archives/index.yaml``. Storage backend choice (local FS in artifact-store v0.1, S3 in WP-0004) is delegated to artifact-store - it is not re-implemented here. """ from __future__ import annotations import asyncio import fnmatch import mimetypes import os from collections.abc import AsyncIterator, Iterable from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml from sqlalchemy import insert, select from sqlalchemy.ext.asyncio import create_async_engine from artifactstore.dataplane import InProcessDataPlane from artifactstore.db.schema import metadata as artifactstore_metadata from artifactstore.db.schema import retention_classes as retention_classes_table from artifactstore.db.seed import RETENTION_CLASS_SEEDS from artifactstore.events import RegistryViewWriter from artifactstore.registry import Registry from artifactstore.storage import LocalBackend from .errors import InfospaceError from .lifecycle import load_infospace ARCHIVE_INDEX_PATH = "output/archives/index.yaml" ARCHIVE_STORE_DIR = "output/archives/.store" ARCHIVE_DB_NAME = "registry.sqlite" ARCHIVE_BACKEND_DIR = "storage" DEFAULT_INCLUDE: tuple[str, ...] = ( "infospace.yaml", "artifacts", "workflows", "output", "reports", "exports", ) DEFAULT_RETENTION_CLASS = "release-evidence" PRODUCER = "infospace-bench" DEFAULT_ACTOR = "infospace-bench" @dataclass(frozen=True) class ArchiveRecord: """One row of ``output/archives/index.yaml``.""" package_id: str manifest_digest: str retention_class: str created_at: str included_paths: list[str] file_count: int note: str = "" producer: str = PRODUCER subject: str = "" store_root: str | None = None metadata: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: out: dict[str, Any] = { "package_id": self.package_id, "manifest_digest": self.manifest_digest, "retention_class": self.retention_class, "created_at": self.created_at, "included_paths": list(self.included_paths), "file_count": self.file_count, "note": self.note, "producer": self.producer, "subject": self.subject, } if self.store_root is not None: out["store_root"] = self.store_root if self.metadata: out["metadata"] = dict(self.metadata) return out @classmethod def from_dict(cls, data: dict[str, Any]) -> "ArchiveRecord": return cls( package_id=str(data["package_id"]), manifest_digest=str(data["manifest_digest"]), retention_class=str(data["retention_class"]), created_at=str(data["created_at"]), included_paths=list(data.get("included_paths", [])), file_count=int(data.get("file_count", 0)), note=str(data.get("note", "")), producer=str(data.get("producer", PRODUCER)), subject=str(data.get("subject", "")), store_root=( str(data["store_root"]) if data.get("store_root") is not None else None ), metadata=dict(data.get("metadata", {})), ) def archive_infospace( root: str | Path, *, retention_class: str = DEFAULT_RETENTION_CLASS, include: Iterable[str] | None = None, exclude: Iterable[str] | None = None, note: str = "", registry: Registry | None = None, store_root: str | Path | None = None, actor: str = DEFAULT_ACTOR, ) -> ArchiveRecord: """Bundle the infospace at ``root`` into an artifact-store package. Returns the new :class:`ArchiveRecord` and appends it to the infospace's ``output/archives/index.yaml``. When ``registry`` is None, a self-contained SQLite + local-FS registry is built under ``store_root`` (default: ``/output/archives/.store/``). """ include_tuple = tuple(include) if include else DEFAULT_INCLUDE exclude_tuple = tuple(exclude or ()) return asyncio.run( _archive_infospace_async( Path(root), retention_class=retention_class, include=include_tuple, exclude=exclude_tuple, note=note, registry=registry, store_root=Path(store_root) if store_root else None, actor=actor, ) ) def list_archives(root: str | Path) -> list[ArchiveRecord]: """Return the recorded archive entries for an infospace.""" path = Path(root) / ARCHIVE_INDEX_PATH if not path.exists(): return [] raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {} if not isinstance(raw, dict): raise InfospaceError( "invalid_archive_index", f"Archive index must be a mapping: {path}", {"path": str(path)}, ) items = raw.get("archives", []) if not isinstance(items, list): raise InfospaceError( "invalid_archive_index", f"Archive index 'archives' must be a list: {path}", {"path": str(path)}, ) return [ArchiveRecord.from_dict(item) for item in items] async def _archive_infospace_async( root: Path, *, retention_class: str, include: tuple[str, ...], exclude: tuple[str, ...], note: str, registry: Registry | None, store_root: Path | None, actor: str, ) -> ArchiveRecord: infospace = load_infospace(root) subject = infospace.config.slug auto_exclude = (ARCHIVE_STORE_DIR, ARCHIVE_INDEX_PATH) effective_exclude = exclude + auto_exclude files = _collect_files(root, include=include, exclude=effective_exclude) if not files: raise InfospaceError( "empty_archive", "No files matched the include set for archiving", {"root": str(root), "include": list(include)}, ) owned_registry = registry is None effective_store_root: Path | None = None if owned_registry: effective_store_root = store_root or (root / ARCHIVE_STORE_DIR) registry = await _build_local_registry(effective_store_root) try: assert registry is not None package_id = await registry.create_package( name=f"infospace {subject}", producer=PRODUCER, subject=subject, retention_class=retention_class, actor=actor, metadata={ "infospace_slug": subject, "infospace_name": infospace.config.name, "topic_domain": infospace.config.topic.domain, "included_paths": list(include), "note": note, }, ) for relative_path, abs_path in files: media = ( mimetypes.guess_type(abs_path.name)[0] or "application/octet-stream" ) await registry.ingest_file( package_id, relative_path=relative_path, media_type=media, stream=_file_stream(abs_path), actor=actor, ) manifest_addr = await registry.finalize_package(package_id, actor=actor) pkg = await registry.get_package(package_id) finally: if owned_registry and registry is not None: await registry.dispose() finalized_at = pkg.finalized_at or datetime.now(timezone.utc) record = ArchiveRecord( package_id=str(package_id), manifest_digest=str(manifest_addr), retention_class=retention_class, created_at=finalized_at.isoformat(), included_paths=list(include), file_count=len(files), note=note, producer=PRODUCER, subject=subject, store_root=str(effective_store_root) if effective_store_root else None, ) _append_index(root, record) return record def _collect_files( root: Path, *, include: tuple[str, ...], exclude: tuple[str, ...], ) -> list[tuple[str, Path]]: seen: dict[str, Path] = {} for pattern in include: target = root / pattern if target.is_file(): rel = pattern.replace(os.sep, "/") if not _is_excluded(rel, exclude): seen.setdefault(rel, target) elif target.is_dir(): for path in target.rglob("*"): if not path.is_file(): continue rel = str(path.relative_to(root)).replace(os.sep, "/") if _is_excluded(rel, exclude): continue seen.setdefault(rel, path) return sorted(seen.items()) def _is_excluded(rel_path: str, exclude: tuple[str, ...]) -> bool: for pattern in exclude: cleaned = pattern.rstrip("/") if rel_path == cleaned or rel_path.startswith(cleaned + "/"): return True if fnmatch.fnmatch(rel_path, pattern): return True return False async def _file_stream( path: Path, chunk_size: int = 1024 * 1024, ) -> AsyncIterator[bytes]: with path.open("rb") as fh: while True: chunk = fh.read(chunk_size) if not chunk: break yield chunk def _append_index(root: Path, record: ArchiveRecord) -> None: path = root / ARCHIVE_INDEX_PATH path.parent.mkdir(parents=True, exist_ok=True) existing: list[dict[str, Any]] = [] if path.exists(): raw = yaml.safe_load(path.read_text(encoding="utf-8")) or {} if isinstance(raw, dict): items = raw.get("archives", []) if isinstance(items, list): existing = list(items) existing.append(record.to_dict()) path.write_text( yaml.safe_dump({"archives": existing}, sort_keys=False), encoding="utf-8", ) async def _build_local_registry(store_root: Path) -> Registry: store_root.mkdir(parents=True, exist_ok=True) db_path = store_root / ARCHIVE_DB_NAME backend_root = store_root / ARCHIVE_BACKEND_DIR engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) async with engine.begin() as conn: await conn.run_sync(artifactstore_metadata.create_all) seeded = ( await conn.execute(select(retention_classes_table).limit(1)) ).first() if seeded is None: for seed in RETENTION_CLASS_SEEDS: await conn.execute(insert(retention_classes_table).values(**seed)) backend = LocalBackend(backend_root, backend_id="local") dataplane = InProcessDataPlane(backend) return Registry(engine, dataplane, RegistryViewWriter())