#!/usr/bin/env python3 """Dry-run report for STATE-WP-0065 P1 spine migration. Prints would-be classification, domain, repo-disposition, and workplan-anchor changes without applying them. Requires a live PostgreSQL connection (same DATABASE_URL as the API). """ from __future__ import annotations import asyncio import sys from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from sqlalchemy import text # noqa: E402 from sqlalchemy.ext.asyncio import AsyncSession # noqa: E402 from api.database import async_session_factory, engine # noqa: E402 from scripts.spine_migration_data import ( # noqa: E402 FALLBACK_REPO_SLUG, MARKET_DOMAINS, OLD_DOMAIN_TO_MARKET, REPO_CLASSIFICATIONS, REPO_DISPOSITIONS, derive_classification, market_domain_uuid, ) def _section(title: str) -> None: print() print("=" * 72) print(title) print("=" * 72) async def _report_domains(session: AsyncSession) -> None: _section("Domain spine replacement") result = await session.execute( text("SELECT slug, name FROM domains ORDER BY slug") ) current = result.fetchall() current_slugs = {row[0] for row in current} print(f"Current domains ({len(current)}):") for slug, name in current: mapped = OLD_DOMAIN_TO_MARKET.get(slug, "(no mapping — would delete)") print(f" {slug:25} → {mapped}") print(f"\nMarket domains to insert ({len(MARKET_DOMAINS)}):") for slug, name in MARKET_DOMAINS: flag = "exists" if slug in current_slugs else "NEW" print(f" [{flag:5}] {slug:20} {name:20} id={market_domain_uuid(slug)}") async def _report_classifications(session: AsyncSession) -> None: _section("Repo classification backfill") rows = await session.execute( text( """ SELECT mr.slug, mr.status, d.slug AS old_domain FROM managed_repos mr JOIN domains d ON d.id = mr.domain_id ORDER BY mr.slug """ ) ) from_file = 0 derived = 0 for repo_slug, status, old_domain in rows: cls = derive_classification(repo_slug, old_domain) source = "fixture" if repo_slug in REPO_CLASSIFICATIONS else "derived" if source == "fixture": from_file += 1 else: derived += 1 print( f" {repo_slug:30} [{status:8}] " f"{old_domain:20} → {cls['category']:12} · {cls['domain']:15} " f"({source}, by={cls.get('classified_by', 'migration')})" ) print(f"\nSummary: {from_file} from REPO_CLASSIFICATIONS, {derived} derived") async def _report_dispositions(session: AsyncSession) -> None: _section("Repo dispositions") if not REPO_DISPOSITIONS: print(" (none)") return for slug, disp in REPO_DISPOSITIONS.items(): repo = await session.execute( text("SELECT 1 FROM managed_repos WHERE slug = :slug"), {"slug": slug}, ) managed = repo.fetchone() state = "found" if managed else "MISSING" print(f" {slug:25} [{state}] action={disp['action']}") if disp.get("target_slug"): print(f" target: {disp['target_slug']}") if disp.get("archive"): print(" would archive phantom/duplicate row") async def _report_workplan_anchors(session: AsyncSession) -> None: _section("Workplan repo_id backfill (would-be)") rows = await session.execute( text( """ SELECT ws.slug, ws.repo_id, t.slug AS topic_slug, d.slug AS domain_slug, mr.slug AS current_repo FROM workstreams ws LEFT JOIN topics t ON t.id = ws.topic_id LEFT JOIN domains d ON d.id = t.domain_id LEFT JOIN managed_repos mr ON mr.id = ws.repo_id ORDER BY ws.slug """ ) ) null_count = 0 for ws_slug, repo_id, topic_slug, domain_slug, current_repo in rows: if repo_id is None: null_count += 1 print( f" NEEDS ANCHOR {ws_slug:40} topic={topic_slug or '-':20} " f"domain={domain_slug or '-'}" ) else: print(f" ok {ws_slug:40} repo={current_repo}") print(f"\nWorkstreams with NULL repo_id: {null_count}") if null_count: print(f"Orphans would fall back to: {FALLBACK_REPO_SLUG}") async def _report_topic_domain_updates(session: AsyncSession) -> None: _section("Topic / domain_goal domain_id remapping") for old_slug, market_slug in OLD_DOMAIN_TO_MARKET.items(): topic_count = await session.execute( text( """ SELECT COUNT(*) FROM topics t JOIN domains d ON d.id = t.domain_id WHERE d.slug = :old_slug """ ), {"old_slug": old_slug}, ) goal_count = await session.execute( text( """ SELECT COUNT(*) FROM domain_goals dg JOIN domains d ON d.id = dg.domain_id WHERE d.slug = :old_slug """ ), {"old_slug": old_slug}, ) tc = topic_count.scalar_one() gc = goal_count.scalar_one() if tc or gc: print(f" {old_slug:22} → {market_slug:15} topics={tc} domain_goals={gc}") async def _report_table_renames(session: AsyncSession) -> None: _section("Schema renames (structural)") fk_tables = [ "tasks.workstream_id", "decisions.workstream_id", "progress_events.workstream_id", "token_events.workstream_id", "contributions.related_workstream_id", "extension_points.workstream_id", "technical_debt.workstream_id", "capability_requests.requesting_workstream_id", "capability_requests.fulfilling_workstream_id", "workplan_launch_requests.workstream_id", ] for item in fk_tables: print(f" {item} → {item.replace('workstream', 'workplan')}") print(" workstreams → workplans") print(" workstream_dependencies → workplan_dependencies") print(" from_workstream_id → from_workplan_id") print(" to_workstream_id → to_workplan_id") async def main() -> None: print("STATE-WP-0065 P1 — Spine migration dry-run report") print("(read-only; no changes applied)") async with async_session_factory() as session: await _report_domains(session) await _report_classifications(session) await _report_dispositions(session) await _report_workplan_anchors(session) await _report_topic_domain_updates(session) await _report_table_renames(session) await engine.dispose() print() print("Dry-run complete. Review the report before running:") print(" alembic upgrade d8e9f0a1b2c3") if __name__ == "__main__": asyncio.run(main())