Files
state-hub/scripts/pull_image.py
tegwick 0ea2788943 Add state-hub v0.1 — local-first state service for the Custodian
Implements the first live layer of the Custodian cognitive infrastructure:
PostgreSQL schema, FastAPI REST API, FastMCP stdio server, and Observable
Framework telemetry dashboard.

- state-hub/: full stack (docker-compose, FastAPI, Alembic, MCP server, dashboard)
- 5 DB tables: topics, workstreams, tasks, decisions, progress_events
- 11 MCP tools + 5 resources registered in .mcp.json
- Observable dashboard: Overview, Workstreams, Decisions, Progress pages
- CLAUDE.md: session protocol (get_state_summary / add_progress_event ritual)
- ~/.claude/CLAUDE.md: global cross-project reference to the hub
- scripts/pull_image.py: WSL2 TLS-resilient Docker image downloader

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-24 17:47:49 +01:00

186 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
Pull a Docker Hub image via the registry v2 API using Python's SSL (OpenSSL),
then import it via `docker load`. Bypasses Docker's Go TLS client entirely.
Usage: python pull_image.py <image:tag> [output.tar]
e.g: python pull_image.py postgres:16-alpine postgres.tar
"""
import json
import os
import sys
import tarfile
import tempfile
import urllib.request
import urllib.error
import hashlib
def get_token(repo: str) -> str:
url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repo}:pull"
with urllib.request.urlopen(url, timeout=30) as r:
return json.loads(r.read())["token"]
class _StripAuthOnRedirect(urllib.request.HTTPRedirectHandler):
"""Follow redirects but strip Authorization; keep Range and other headers."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
new_req = urllib.request.Request(newurl)
# Forward Range header (needed for chunked downloads) but NOT Authorization
for hdr in ("Range",):
val = req.get_header(hdr.capitalize())
if val:
new_req.add_header(hdr, val)
return new_req
def _opener():
return urllib.request.build_opener(_StripAuthOnRedirect())
def registry_get(url: str, token: str, headers: dict | None = None) -> bytes:
"""GET with Bearer auth; follows redirects WITHOUT auth (for S3/CDN blobs)."""
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}", **(headers or {})})
with _opener().open(req, timeout=60) as r:
return r.read()
def pull_image(image: str, output_tar: str) -> None:
if ":" in image:
repo_name, tag = image.rsplit(":", 1)
else:
repo_name, tag = image, "latest"
if "/" not in repo_name:
repo_name = f"library/{repo_name}"
print(f"Authenticating for {repo_name}:{tag} ...")
token = get_token(repo_name)
# Fetch manifest (prefer OCI, fall back to v2 schema2)
print("Fetching manifest ...")
manifest_url = f"https://registry-1.docker.io/v2/{repo_name}/manifests/{tag}"
manifest_bytes = registry_get(
manifest_url,
token,
headers={"Accept": "application/vnd.docker.distribution.manifest.v2+json,"
"application/vnd.oci.image.manifest.v1+json"},
)
manifest = json.loads(manifest_bytes)
# Handle manifest list (multi-arch) — pick linux/amd64
media_type = manifest.get("mediaType", "") or manifest.get("schemaVersion", "")
if "list" in str(media_type) or manifest.get("manifests"):
print("Manifest list detected — selecting linux/amd64 ...")
for m in manifest["manifests"]:
plat = m.get("platform", {})
if plat.get("os") == "linux" and plat.get("architecture") == "amd64":
digest = m["digest"]
manifest_bytes = registry_get(
f"https://registry-1.docker.io/v2/{repo_name}/manifests/{digest}",
token,
headers={"Accept": "application/vnd.docker.distribution.manifest.v2+json"},
)
manifest = json.loads(manifest_bytes)
break
config_digest = manifest["config"]["digest"]
layers = manifest["layers"]
with tempfile.TemporaryDirectory() as tmpdir:
# Download config blob
print("Downloading config ...")
config_data = registry_get(
f"https://registry-1.docker.io/v2/{repo_name}/blobs/{config_digest}",
token,
)
config_filename = config_digest.replace("sha256:", "") + ".json"
config_path = os.path.join(tmpdir, config_filename)
with open(config_path, "wb") as f:
f.write(config_data)
# Download each layer
layer_dirs = []
for i, layer in enumerate(layers):
digest = layer["digest"]
size = layer["size"]
short = digest[7:19]
print(f"Downloading layer {i+1}/{len(layers)} ({short}..., {size//1024//1024}MB) ...")
blob_url = f"https://registry-1.docker.io/v2/{repo_name}/blobs/{digest}"
req = urllib.request.Request(blob_url, headers={"Authorization": f"Bearer {token}"})
layer_dir = os.path.join(tmpdir, f"layer_{i}")
os.makedirs(layer_dir)
layer_tar = os.path.join(layer_dir, "layer.tar")
version_file = os.path.join(layer_dir, "VERSION")
json_file = os.path.join(layer_dir, "json")
# Stream download with Range-request chunking so a TCP corruption
# only loses one 2MB chunk, not the whole download.
CHUNK_SIZE = 2 * 1024 * 1024 # 2MB per Range request
downloaded = 0
with open(layer_tar, "wb") as f:
while downloaded < size:
end = min(downloaded + CHUNK_SIZE - 1, size - 1)
while True:
try:
range_req = urllib.request.Request(
blob_url,
headers={
"Authorization": f"Bearer {token}",
"Range": f"bytes={downloaded}-{end}",
},
)
with _opener().open(range_req, timeout=60) as resp:
data = resp.read()
break
except Exception as exc:
print(f"\r retry at {downloaded//1024//1024}MB ({exc})...", end="", flush=True)
import time; time.sleep(1)
f.write(data)
downloaded += len(data)
pct = downloaded * 100 // size if size else 0
print(f"\r {downloaded//1024//1024}MB / {size//1024//1024}MB ({pct}%)", end="", flush=True)
print()
with open(version_file, "w") as f:
f.write("1.0")
with open(json_file, "w") as f:
json.dump({"id": digest.replace("sha256:", "")}, f)
layer_dirs.append(f"layer_{i}/layer.tar")
# Write manifest.json
manifest_json = [
{
"Config": config_filename,
"RepoTags": [f"{repo_name.replace('library/', '')}:{tag}"],
"Layers": layer_dirs,
}
]
manifest_path = os.path.join(tmpdir, "manifest.json")
with open(manifest_path, "w") as f:
json.dump(manifest_json, f)
# Bundle into tar
print(f"Building {output_tar} ...")
with tarfile.open(output_tar, "w") as tar:
for name in [config_filename, "manifest.json"]:
tar.add(os.path.join(tmpdir, name), arcname=name)
for i in range(len(layers)):
for fname in ["layer.tar", "VERSION", "json"]:
path = os.path.join(tmpdir, f"layer_{i}", fname)
tar.add(path, arcname=f"layer_{i}/{fname}")
print(f"Done. Load with: docker load -i {output_tar}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: pull_image.py <image:tag> [output.tar]")
sys.exit(1)
image = sys.argv[1]
output = sys.argv[2] if len(sys.argv) > 2 else image.replace(":", "_").replace("/", "_") + ".tar"
pull_image(image, output)