IB-WP-0014: archive-list, restore, retention annotation, docs (T03-T05)

Round out IB-WP-0014 with the remaining archive operations and docs.

- restore_archive() and `infospace-bench restore <pkg> --target <dir>` round-trip
  a finalized package's bytes back to disk. Refuses to overwrite a non-empty
  target unless --force. --from <infospace-root> resolves the store location.
- archive-list CLI with --with-retention flag; annotate_retention() opens the
  per-infospace registry and joins each record with its current retention
  state (effective class, expires, holds, eligibility).
- docs/archive-integration.md covers when to archive, the include set,
  retention classes, storage layout, credentials policy, and the explicit
  non-goal that S3/git backends live in artifact-store.
- SCOPE.md cross-links the new doc.
- Workplan flipped to status: done. Full pytest suite: 72 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-17 11:46:23 +02:00
parent e343443d77
commit ddefd69f71
8 changed files with 636 additions and 2 deletions

View File

@@ -1,4 +1,11 @@
from .archive import ArchiveRecord, archive_infospace, list_archives
from .archive import (
ArchiveRecord,
RestoredArchive,
annotate_retention,
archive_infospace,
list_archives,
restore_archive,
)
from .errors import InfospaceError
from .evaluation import EntityEvaluation, EvaluationSnapshot, MetricValue, ScoreEntry
from .engine import (
@@ -43,6 +50,7 @@ from .workflow import load_workflows, plan_workflow, run_workflow
__all__ = [
"ArchiveRecord",
"DisciplineBinding",
"RestoredArchive",
"EntityEvaluation",
"EvaluationSnapshot",
"Infospace",
@@ -61,6 +69,7 @@ __all__ = [
"TopicConfig",
"ViabilityThreshold",
"add_artifact",
"annotate_retention",
"append_to_history",
"archive_infospace",
"create_infospace",
@@ -79,6 +88,7 @@ __all__ = [
"read_snapshot",
"record_check_results",
"register_artifact",
"restore_archive",
"load_workflows",
"plan_workflow",
"run_workflow",

View File

@@ -13,6 +13,7 @@ from __future__ import annotations
import asyncio
import fnmatch
import json
import mimetypes
import os
from collections.abc import AsyncIterator, Iterable
@@ -20,6 +21,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import UUID
import yaml
from sqlalchemy import insert, select
@@ -142,6 +144,77 @@ def archive_infospace(
)
@dataclass(frozen=True)
class RestoredArchive:
"""Result of :func:`restore_archive`."""
package_id: str
manifest_digest: str
target: str
file_count: int
restored_paths: list[str]
def to_dict(self) -> dict[str, Any]:
return {
"package_id": self.package_id,
"manifest_digest": self.manifest_digest,
"target": self.target,
"file_count": self.file_count,
"restored_paths": list(self.restored_paths),
}
def restore_archive(
package_id: str,
*,
target: str | Path,
store_root: str | Path | None = None,
source_infospace: str | Path | None = None,
registry: Registry | None = None,
force: bool = False,
) -> RestoredArchive:
"""Re-materialize an archived infospace package into ``target``.
Exactly one of ``store_root``, ``source_infospace``, or ``registry`` must
locate the artifact-store. ``source_infospace`` is a convenience that
resolves to ``<source_infospace>/output/archives/.store/``.
"""
return asyncio.run(
_restore_archive_async(
package_id=package_id,
target=Path(target),
store_root=Path(store_root) if store_root else None,
source_infospace=Path(source_infospace) if source_infospace else None,
registry=registry,
force=force,
)
)
def annotate_retention(
archives: Iterable[ArchiveRecord],
*,
store_root: str | Path | None = None,
source_infospace: str | Path | None = None,
registry: Registry | None = None,
) -> list[dict[str, Any]]:
"""Pair each record with its current retention state if reachable.
Returns a list of ``{"archive": ArchiveRecord.to_dict(), "retention": {...}|None}``
entries. Records whose registry cannot be opened get ``retention: None``.
"""
return asyncio.run(
_annotate_retention_async(
tuple(archives),
store_root=Path(store_root) if store_root else None,
source_infospace=Path(source_infospace) if source_infospace else None,
registry=registry,
)
)
def list_archives(root: str | Path) -> list[ArchiveRecord]:
"""Return the recorded archive entries for an infospace."""
path = Path(root) / ARCHIVE_INDEX_PATH
@@ -323,3 +396,151 @@ async def _build_local_registry(store_root: Path) -> Registry:
backend = LocalBackend(backend_root, backend_id="local")
dataplane = InProcessDataPlane(backend)
return Registry(engine, dataplane, RegistryViewWriter())
def _resolve_store_root(
*,
store_root: Path | None,
source_infospace: Path | None,
) -> Path | None:
if store_root is not None and source_infospace is not None:
raise InfospaceError(
"ambiguous_archive_store",
"Pass at most one of store_root or source_infospace",
{},
)
if store_root is not None:
return store_root
if source_infospace is not None:
return source_infospace / ARCHIVE_STORE_DIR
return None
async def _restore_archive_async(
*,
package_id: str,
target: Path,
store_root: Path | None,
source_infospace: Path | None,
registry: Registry | None,
force: bool,
) -> RestoredArchive:
owned_registry = registry is None
if owned_registry:
resolved_store = _resolve_store_root(
store_root=store_root,
source_infospace=source_infospace,
)
if resolved_store is None:
raise InfospaceError(
"missing_archive_store",
"restore_archive needs registry, store_root, or source_infospace",
{},
)
if not resolved_store.exists():
raise InfospaceError(
"missing_archive_store",
f"Archive store does not exist: {resolved_store}",
{"store_root": str(resolved_store)},
)
registry = await _build_local_registry(resolved_store)
try:
assert registry is not None
pkg_uuid = UUID(package_id)
pkg = await registry.get_package(pkg_uuid)
if pkg.manifest_digest_hex is None:
raise InfospaceError(
"unfinalized_package",
f"Package is not finalized: {package_id}",
{"package_id": package_id, "status": pkg.status},
)
manifest_bytes = await registry.get_manifest_bytes(pkg_uuid, format="json")
manifest = json.loads(manifest_bytes.decode("utf-8"))
target.mkdir(parents=True, exist_ok=True)
if not force and any(target.iterdir()):
raise InfospaceError(
"restore_target_not_empty",
f"Refusing to restore into non-empty directory: {target}",
{"target": str(target)},
)
restored: list[str] = []
for entry in manifest.get("files", []):
rel = str(entry["relative_path"])
file_id = UUID(str(entry["id"]))
dest = (target / rel).resolve()
target_resolved = target.resolve()
if target_resolved not in dest.parents and dest != target_resolved:
raise InfospaceError(
"unsafe_restore_path",
f"Manifest path escapes target: {rel}",
{"target": str(target), "relative_path": rel},
)
dest.parent.mkdir(parents=True, exist_ok=True)
stream = await registry.get_file(file_id)
with dest.open("wb") as fh:
async for chunk in stream:
fh.write(chunk)
restored.append(rel)
finally:
if owned_registry and registry is not None:
await registry.dispose()
return RestoredArchive(
package_id=package_id,
manifest_digest=f"blake3:{pkg.manifest_digest_hex}",
target=str(target),
file_count=len(restored),
restored_paths=restored,
)
async def _annotate_retention_async(
archives: tuple[ArchiveRecord, ...],
*,
store_root: Path | None,
source_infospace: Path | None,
registry: Registry | None,
) -> list[dict[str, Any]]:
if not archives:
return []
owned_registry = registry is None
used_store_root: Path | None = None
if owned_registry:
used_store_root = _resolve_store_root(
store_root=store_root,
source_infospace=source_infospace,
)
if used_store_root is None or not used_store_root.exists():
return [{"archive": rec.to_dict(), "retention": None} for rec in archives]
registry = await _build_local_registry(used_store_root)
try:
assert registry is not None
results: list[dict[str, Any]] = []
for rec in archives:
retention: dict[str, Any] | None
try:
state = await registry.get_retention_state(UUID(rec.package_id))
retention = {
"current_expires_at": (
state.current_expires_at.isoformat()
if state.current_expires_at
else None
),
"effective_class": state.effective_class,
"active_hold_id": (
str(state.active_hold_id) if state.active_hold_id else None
),
"eligible_for_deletion": state.eligible_for_deletion,
}
except Exception as exc:
retention = {"error": f"{type(exc).__name__}: {exc}"}
results.append({"archive": rec.to_dict(), "retention": retention})
return results
finally:
if owned_registry and registry is not None:
await registry.dispose()

View File

@@ -6,6 +6,12 @@ import json
import sys
from pathlib import Path
from .archive import (
annotate_retention,
archive_infospace,
list_archives,
restore_archive,
)
from .checks import run_collection_checks
from .engine import engine_capability_contract, plan_asset_sync, sync_assets
from .errors import InfospaceError
@@ -195,6 +201,74 @@ def build_parser() -> argparse.ArgumentParser:
generate_from_source.add_argument("--max-chunks", type=int, default=0)
generate_from_source.add_argument("--apply", action="store_true")
archive = sub.add_parser(
"archive",
help="Archive an infospace into artifact-store (durable, content-addressed)",
)
archive.add_argument("root")
archive.add_argument(
"--retention-class",
default="release-evidence",
help="artifact-store retention class id (default: release-evidence)",
)
archive.add_argument(
"--include",
action="append",
default=[],
help="Relative path to include (repeatable). Default: infospace.yaml, artifacts/, workflows/, output/, reports/, exports/",
)
archive.add_argument(
"--exclude",
action="append",
default=[],
help="Relative path or glob to exclude (repeatable)",
)
archive.add_argument("--note", default="", help="Free-text note for the archive record")
archive.add_argument(
"--store-root",
default="",
help="Override the artifact-store location (default: <root>/output/archives/.store)",
)
archive_list = sub.add_parser(
"archive-list",
help="List recorded archives for an infospace",
)
archive_list.add_argument("root")
archive_list.add_argument(
"--with-retention",
action="store_true",
help="Annotate each archive with its current retention state",
)
archive_list.add_argument(
"--store-root",
default="",
help="Override the artifact-store location for retention lookups",
)
restore = sub.add_parser(
"restore",
help="Restore an archived infospace package into a target directory",
)
restore.add_argument("package_id")
restore.add_argument("--target", required=True, help="Directory to restore into")
restore.add_argument(
"--from",
dest="from_root",
default="",
help="Source infospace whose archive store holds the package",
)
restore.add_argument(
"--store-root",
default="",
help="Direct path to the artifact-store location",
)
restore.add_argument(
"--force",
action="store_true",
help="Overwrite into a non-empty target directory",
)
engine = sub.add_parser("engine", help="Inspect and sync engine boundary state")
engine_sub = engine.add_subparsers(dest="engine_command", required=True)
@@ -423,6 +497,36 @@ def main(argv: list[str] | None = None) -> int:
_write_json(plan_generation(infospace.root, stage=args.stage))
else:
parser.error(f"Unhandled generate command: {args.generate_command}")
elif args.command == "archive":
record = archive_infospace(
Path(args.root),
retention_class=args.retention_class,
include=args.include or None,
exclude=args.exclude or None,
note=args.note,
store_root=args.store_root or None,
)
_write_json(record.to_dict())
elif args.command == "archive-list":
archives = list_archives(Path(args.root))
if args.with_retention:
payload = annotate_retention(
archives,
store_root=args.store_root or None,
source_infospace=Path(args.root) if not args.store_root else None,
)
_write_json({"archives": payload})
else:
_write_json({"archives": [rec.to_dict() for rec in archives]})
elif args.command == "restore":
result = restore_archive(
args.package_id,
target=Path(args.target),
store_root=args.store_root or None,
source_infospace=Path(args.from_root) if args.from_root else None,
force=args.force,
)
_write_json(result.to_dict())
elif args.command == "engine":
if args.engine_command == "inspect":
_write_json(