"""repo-anchored classification spine (STATE-WP-0065 P1) Adds repo classification columns, replaces coordination domains with 14 market domains, backfills classifications, re-anchors workplans on repo_id, and renames workstreams → workplans. Revision ID: d8e9f0a1b2c3 Revises: c7d8e9f0a1b2 Create Date: 2026-06-22 """ from __future__ import annotations import sys from pathlib import Path import sqlalchemy as sa from alembic import op from sqlalchemy.dialects.postgresql import ARRAY # Allow importing migration constants from scripts/. _REPO_ROOT = Path(__file__).resolve().parents[2] if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from scripts.spine_migration_data import ( # noqa: E402 FALLBACK_REPO_SLUG, MARKET_DOMAINS, MARKET_TO_OLD_DOMAIN, OLD_COORDINATION_DOMAINS, OLD_DOMAIN_TO_MARKET, REPO_DISPOSITIONS, derive_classification, market_domain_uuid, migration_provenance, old_domain_uuid, ) revision = "d8e9f0a1b2c3" down_revision = "c7d8e9f0a1b2" branch_labels = None depends_on = None # Tables whose workstream_id column becomes workplan_id. _WORKPLAN_FK_TABLES: list[tuple[str, str, bool]] = [ ("tasks", "workstream_id", True), ("decisions", "workstream_id", False), ("progress_events", "workstream_id", False), ("token_events", "workstream_id", False), ("contributions", "related_workstream_id", False), ("extension_points", "workstream_id", False), ("technical_debt", "workstream_id", False), ("capability_requests", "requesting_workstream_id", False), ("capability_requests", "fulfilling_workstream_id", False), ("workplan_launch_requests", "workstream_id", True), ] def _pg_array_literal(values: list[str]) -> str: if not values: return "ARRAY[]::text[]" escaped = ", ".join("'" + v.replace("'", "''") + "'" for v in values) return f"ARRAY[{escaped}]::text[]" def _insert_market_domains() -> None: conn = op.get_bind() for slug, name in MARKET_DOMAINS: conn.execute( sa.text( """ INSERT INTO domains (id, slug, name, status, created_at, updated_at) VALUES (:id, :slug, :name, 'active', now(), now()) ON CONFLICT (slug) DO NOTHING """ ), {"id": market_domain_uuid(slug), "slug": slug, "name": name}, ) def _backfill_repo_classifications() -> None: conn = op.get_bind() provenance = migration_provenance() rows = conn.execute( sa.text( """ SELECT mr.slug, d.slug AS old_domain_slug FROM managed_repos mr JOIN domains d ON d.id = mr.domain_id """ ) ).fetchall() for repo_slug, old_domain_slug in rows: cls = derive_classification(repo_slug, old_domain_slug) classified_by = cls.get("classified_by", provenance["classified_by"]) market_id = market_domain_uuid(cls["domain"]) conn.execute( sa.text( f""" UPDATE managed_repos SET category = :category, domain_id = :domain_id, secondary_domains = {_pg_array_literal(cls.get('secondary_domains') or [])}, capability_tags = {_pg_array_literal(cls.get('capability_tags') or [])}, business_stake = {_pg_array_literal(cls.get('business_stake') or [])}, business_mechanics = {_pg_array_literal(cls.get('business_mechanics') or [])}, classified_at = :classified_at, classified_by = :classified_by, standard_version = :standard_version WHERE slug = :slug """ ), { "category": cls["category"], "domain_id": market_id, "classified_at": provenance["classified_at"], "classified_by": classified_by, "standard_version": provenance["standard_version"], "slug": repo_slug, }, ) def _market_slug_for_old_domain(old_slug: str) -> str: return OLD_DOMAIN_TO_MARKET.get(old_slug, "infotech") def _update_domain_fks_to_market() -> None: conn = op.get_bind() market_slugs = [s for s, _ in MARKET_DOMAINS] # Map known coordination domains on all domain_id FK holders. old_rows = conn.execute( sa.text("SELECT slug FROM domains WHERE slug NOT IN :slugs").bindparams( sa.bindparam("slugs", expanding=True) ), {"slugs": market_slugs}, ).fetchall() for (old_slug,) in old_rows: market_id = market_domain_uuid(_market_slug_for_old_domain(old_slug)) for table, nullable in ( ("topics", False), ("domain_goals", False), ("capability_catalog", False), ("technical_debt", False), ("extension_points", False), ): conn.execute( sa.text( f""" UPDATE {table} row SET domain_id = :market_id FROM domains old_d WHERE row.domain_id = old_d.id AND old_d.slug = :old_slug """ ), {"market_id": market_id, "old_slug": old_slug}, ) conn.execute( sa.text( """ UPDATE capability_requests cr SET requesting_domain_id = :market_id FROM domains old_d WHERE cr.requesting_domain_id = old_d.id AND old_d.slug = :old_slug """ ), {"market_id": market_id, "old_slug": old_slug}, ) conn.execute( sa.text( """ UPDATE capability_requests cr SET fulfilling_domain_id = :market_id FROM domains old_d WHERE cr.fulfilling_domain_id = old_d.id AND old_d.slug = :old_slug """ ), {"market_id": market_id, "old_slug": old_slug}, ) # Catch-all: anything still on a non-market domain → infotech. infotech_id = market_domain_uuid("infotech") for table in ( "topics", "domain_goals", "capability_catalog", "technical_debt", "extension_points", ): conn.execute( sa.text( f""" UPDATE {table} row SET domain_id = :infotech_id FROM domains d WHERE row.domain_id = d.id AND d.slug NOT IN :market_slugs """ ).bindparams(sa.bindparam("market_slugs", expanding=True)), {"infotech_id": infotech_id, "market_slugs": market_slugs}, ) conn.execute( sa.text( """ UPDATE capability_requests cr SET requesting_domain_id = :infotech_id FROM domains d WHERE cr.requesting_domain_id = d.id AND d.slug NOT IN :market_slugs """ ).bindparams(sa.bindparam("market_slugs", expanding=True)), {"infotech_id": infotech_id, "market_slugs": market_slugs}, ) conn.execute( sa.text( """ UPDATE capability_requests cr SET fulfilling_domain_id = :infotech_id FROM domains d WHERE cr.fulfilling_domain_id = d.id AND d.slug NOT IN :market_slugs """ ).bindparams(sa.bindparam("market_slugs", expanding=True)), {"infotech_id": infotech_id, "market_slugs": market_slugs}, ) def _apply_repo_dispositions() -> None: conn = op.get_bind() for slug, disp in REPO_DISPOSITIONS.items(): action = disp["action"] if action == "relink_to": target = disp["target_slug"] conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = target.id FROM managed_repos phantom, managed_repos target WHERE ws.repo_id = phantom.id AND phantom.slug = :phantom_slug AND target.slug = :target_slug """ ), {"phantom_slug": slug, "target_slug": target}, ) if disp.get("archive"): conn.execute( sa.text( "UPDATE managed_repos SET status = 'archived' WHERE slug = :slug" ), {"slug": slug}, ) elif action == "collapse_into": target = disp["target_slug"] conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = target.id FROM managed_repos dup, managed_repos target WHERE ws.repo_id = dup.id AND dup.slug = :dup_slug AND target.slug = :target_slug """ ), {"dup_slug": slug, "target_slug": target}, ) if disp.get("archive"): conn.execute( sa.text( "UPDATE managed_repos SET status = 'archived' WHERE slug = :slug" ), {"slug": slug}, ) elif action == "archive": conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = fallback.id FROM managed_repos phantom JOIN managed_repos fallback ON fallback.slug = :fallback_slug WHERE ws.repo_id = phantom.id AND phantom.slug = :slug """ ), {"slug": slug, "fallback_slug": FALLBACK_REPO_SLUG}, ) conn.execute( sa.text( "UPDATE managed_repos SET status = 'archived' WHERE slug = :slug" ), {"slug": slug}, ) def _backfill_workstream_repo_ids() -> None: conn = op.get_bind() # topic → domain → first active repo (by created_at) conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = sub.repo_id FROM ( SELECT DISTINCT ON (ws.id) ws.id AS ws_id, mr.id AS repo_id FROM workstreams ws JOIN topics t ON t.id = ws.topic_id JOIN managed_repos mr ON mr.domain_id = t.domain_id WHERE ws.repo_id IS NULL AND mr.status = 'active' ORDER BY ws.id, mr.created_at ) sub WHERE ws.id = sub.ws_id """ ) ) # slug-match heuristics (correlated subquery — LATERAL cannot reference outer ws in WHERE) conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = ( SELECT mr.id FROM managed_repos mr WHERE mr.status = 'active' AND ( LOWER(ws.slug) = LOWER(mr.slug) OR LOWER(REPLACE(ws.slug, '-', '_')) = LOWER(mr.slug) OR LOWER(ws.slug) LIKE '%' || LOWER(mr.slug) || '%' OR LOWER(mr.slug) LIKE '%' || LOWER(REPLACE(ws.slug, '-', '_')) || '%' ) ORDER BY CASE WHEN LOWER(ws.slug) = LOWER(mr.slug) THEN 0 ELSE 1 END, mr.created_at LIMIT 1 ) WHERE ws.repo_id IS NULL AND EXISTS ( SELECT 1 FROM managed_repos mr WHERE mr.status = 'active' AND ( LOWER(ws.slug) = LOWER(mr.slug) OR LOWER(REPLACE(ws.slug, '-', '_')) = LOWER(mr.slug) OR LOWER(ws.slug) LIKE '%' || LOWER(mr.slug) || '%' OR LOWER(mr.slug) LIKE '%' || LOWER(REPLACE(ws.slug, '-', '_')) || '%' ) ) """ ) ) # fallback: state-hub conn.execute( sa.text( """ UPDATE workstreams ws SET repo_id = mr.id FROM managed_repos mr WHERE ws.repo_id IS NULL AND mr.slug = :fallback_slug """ ), {"fallback_slug": FALLBACK_REPO_SLUG}, ) def _delete_old_coordination_domains() -> None: market_slugs = [s for s, _ in MARKET_DOMAINS] conn = op.get_bind() conn.execute( sa.text("DELETE FROM domains WHERE slug NOT IN :slugs").bindparams( sa.bindparam("slugs", expanding=True) ), {"slugs": market_slugs}, ) def _rename_workstream_fk_columns() -> None: for table, col, _required in _WORKPLAN_FK_TABLES: new_col = col.replace("workstream", "workplan") op.alter_column(table, col, new_column_name=new_col) def _rename_workstream_indexes_on_column(table: str, old_col: str) -> None: new_col = old_col.replace("workstream", "workplan") conn = op.get_bind() old_idx = f"ix_{table}_{old_col}" new_idx = f"ix_{table}_{new_col}" exists = conn.execute( sa.text("SELECT 1 FROM pg_indexes WHERE indexname = :name"), {"name": old_idx}, ).fetchone() if exists: op.execute(sa.text(f'ALTER INDEX "{old_idx}" RENAME TO "{new_idx}"')) def _rename_workplan_indexes_back(table: str, workstream_col: str) -> None: workplan_col = workstream_col.replace("workstream", "workplan") conn = op.get_bind() old_idx = f"ix_{table}_{workplan_col}" new_idx = f"ix_{table}_{workstream_col}" exists = conn.execute( sa.text("SELECT 1 FROM pg_indexes WHERE indexname = :name"), {"name": old_idx}, ).fetchone() if exists: op.execute(sa.text(f'ALTER INDEX "{old_idx}" RENAME TO "{new_idx}"')) def upgrade() -> None: # (a) classification columns on managed_repos op.add_column("managed_repos", sa.Column("category", sa.String(length=50), nullable=True)) op.add_column( "managed_repos", sa.Column("secondary_domains", ARRAY(sa.Text()), nullable=True), ) op.add_column( "managed_repos", sa.Column("capability_tags", ARRAY(sa.Text()), nullable=True), ) op.add_column( "managed_repos", sa.Column("business_stake", ARRAY(sa.Text()), nullable=True), ) op.add_column( "managed_repos", sa.Column("business_mechanics", ARRAY(sa.Text()), nullable=True), ) op.add_column("managed_repos", sa.Column("classified_at", sa.Date(), nullable=True)) op.add_column( "managed_repos", sa.Column("classified_by", sa.String(length=50), nullable=True) ) op.add_column( "managed_repos", sa.Column("standard_version", sa.String(length=20), nullable=True), ) # (b) insert 14 market domains (old coordination domains remain for now) _insert_market_domains() # (c) backfill classification _backfill_repo_classifications() # (d)(e)(f) point FKs at market domains _update_domain_fks_to_market() # (g) backfill workstreams.repo_id _backfill_workstream_repo_ids() # (h) discrepancy resolution _apply_repo_dispositions() # (i) topic_id nullable op.alter_column("workstreams", "topic_id", nullable=True) # (j) repo_id NOT NULL (orphans already assigned state-hub) op.alter_column("workstreams", "repo_id", nullable=False) # (k) rename workstreams → workplans op.rename_table("workstreams", "workplans") op.execute('ALTER INDEX IF EXISTS "ix_workstreams_repo_id" RENAME TO "ix_workplans_repo_id"') op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_execution_state" ' 'RENAME TO "ix_workplans_execution_state"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_launch_mode" ' 'RENAME TO "ix_workplans_launch_mode"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_concurrency_mode" ' 'RENAME TO "ix_workplans_concurrency_mode"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_queue_rank" ' 'RENAME TO "ix_workplans_queue_rank"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_execution_group" ' 'RENAME TO "ix_workplans_execution_group"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstreams_scheduled_for" ' 'RENAME TO "ix_workplans_scheduled_for"' ) # (l) workstream_id → workplan_id on dependent tables for table, col, _ in _WORKPLAN_FK_TABLES: _rename_workstream_indexes_on_column(table, col) _rename_workstream_fk_columns() # update decision check constraint name op.drop_constraint("ck_decisions_topic_or_workstream", "decisions", type_="check") op.create_check_constraint( "ck_decisions_topic_or_workplan", "decisions", "topic_id IS NOT NULL OR workplan_id IS NOT NULL", ) # (m) workstream_dependencies → workplan_dependencies op.rename_table("workstream_dependencies", "workplan_dependencies") op.alter_column( "workplan_dependencies", "from_workstream_id", new_column_name="from_workplan_id", ) op.alter_column( "workplan_dependencies", "to_workstream_id", new_column_name="to_workplan_id", ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstream_dependencies_from_workstream_id" ' 'RENAME TO "ix_workplan_dependencies_from_workplan_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstream_dependencies_to_workstream_id" ' 'RENAME TO "ix_workplan_dependencies_to_workplan_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstream_dependencies_to_task_id" ' 'RENAME TO "ix_workplan_dependencies_to_task_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workstream_dependencies_relationship_type" ' 'RENAME TO "ix_workplan_dependencies_relationship_type"' ) op.execute( 'ALTER INDEX IF EXISTS "uq_ws_dep_workstream_target" ' 'RENAME TO "uq_wp_dep_workplan_target"' ) op.execute( 'ALTER INDEX IF EXISTS "uq_ws_dep_task_target" ' 'RENAME TO "uq_wp_dep_task_target"' ) op.drop_constraint("ck_ws_dep_exactly_one_target", "workplan_dependencies", type_="check") op.create_check_constraint( "ck_wp_dep_exactly_one_target", "workplan_dependencies", "(to_workplan_id IS NOT NULL AND to_task_id IS NULL) " "OR (to_workplan_id IS NULL AND to_task_id IS NOT NULL)", ) # (n) remove old coordination domain rows _delete_old_coordination_domains() def _insert_old_coordination_domains() -> None: conn = op.get_bind() for slug, name in OLD_COORDINATION_DOMAINS: conn.execute( sa.text( """ INSERT INTO domains (id, slug, name, status, created_at, updated_at) VALUES (:id, :slug, :name, 'active', now(), now()) ON CONFLICT (slug) DO NOTHING """ ), {"id": old_domain_uuid(slug), "slug": slug, "name": name}, ) def _restore_domain_fks_to_coordination() -> None: conn = op.get_bind() for market_slug, old_slug in MARKET_TO_OLD_DOMAIN.items(): conn.execute( sa.text( """ UPDATE managed_repos SET domain_id = :old_id FROM domains market_d WHERE managed_repos.domain_id = market_d.id AND market_d.slug = :market_slug """ ), { "old_id": old_domain_uuid(old_slug), "market_slug": market_slug, }, ) conn.execute( sa.text( """ UPDATE topics SET domain_id = :old_id FROM domains market_d WHERE topics.domain_id = market_d.id AND market_d.slug = :market_slug """ ), { "old_id": old_domain_uuid(old_slug), "market_slug": market_slug, }, ) conn.execute( sa.text( """ UPDATE domain_goals SET domain_id = :old_id FROM domains market_d WHERE domain_goals.domain_id = market_d.id AND market_d.slug = :market_slug """ ), { "old_id": old_domain_uuid(old_slug), "market_slug": market_slug, }, ) def downgrade() -> None: # Re-insert legacy coordination domains before deleting market rows. _insert_old_coordination_domains() _restore_domain_fks_to_coordination() market_slugs = [s for s, _ in MARKET_DOMAINS] conn = op.get_bind() conn.execute( sa.text("DELETE FROM domains WHERE slug IN :slugs").bindparams( sa.bindparam("slugs", expanding=True) ), {"slugs": market_slugs}, ) # workplan_dependencies → workstream_dependencies op.drop_constraint("ck_wp_dep_exactly_one_target", "workplan_dependencies", type_="check") op.create_check_constraint( "ck_ws_dep_exactly_one_target", "workplan_dependencies", "(to_workplan_id IS NOT NULL AND to_task_id IS NULL) " "OR (to_workplan_id IS NULL AND to_task_id IS NOT NULL)", ) op.alter_column( "workplan_dependencies", "to_workplan_id", new_column_name="to_workstream_id", ) op.alter_column( "workplan_dependencies", "from_workplan_id", new_column_name="from_workstream_id", ) op.rename_table("workplan_dependencies", "workstream_dependencies") op.execute( 'ALTER INDEX IF EXISTS "ix_workplan_dependencies_from_workplan_id" ' 'RENAME TO "ix_workstream_dependencies_from_workstream_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplan_dependencies_to_workplan_id" ' 'RENAME TO "ix_workstream_dependencies_to_workstream_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplan_dependencies_to_task_id" ' 'RENAME TO "ix_workstream_dependencies_to_task_id"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplan_dependencies_relationship_type" ' 'RENAME TO "ix_workstream_dependencies_relationship_type"' ) op.execute( 'ALTER INDEX IF EXISTS "uq_wp_dep_workplan_target" ' 'RENAME TO "uq_ws_dep_workstream_target"' ) op.execute( 'ALTER INDEX IF EXISTS "uq_wp_dep_task_target" ' 'RENAME TO "uq_ws_dep_task_target"' ) op.drop_constraint("ck_decisions_topic_or_workplan", "decisions", type_="check") for table, col, _ in reversed(_WORKPLAN_FK_TABLES): new_col = col.replace("workstream", "workplan") op.alter_column(table, new_col, new_column_name=col) _rename_workplan_indexes_back(table, col) op.rename_table("workplans", "workstreams") op.execute('ALTER INDEX IF EXISTS "ix_workplans_repo_id" RENAME TO "ix_workstreams_repo_id"') op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_execution_state" ' 'RENAME TO "ix_workstreams_execution_state"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_launch_mode" ' 'RENAME TO "ix_workstreams_launch_mode"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_concurrency_mode" ' 'RENAME TO "ix_workstreams_concurrency_mode"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_queue_rank" ' 'RENAME TO "ix_workstreams_queue_rank"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_execution_group" ' 'RENAME TO "ix_workstreams_execution_group"' ) op.execute( 'ALTER INDEX IF EXISTS "ix_workplans_scheduled_for" ' 'RENAME TO "ix_workstreams_scheduled_for"' ) op.alter_column("workstreams", "repo_id", nullable=True) op.alter_column("workstreams", "topic_id", nullable=False) # Un-archive disposition repos (best-effort) for slug in REPO_DISPOSITIONS: conn.execute( sa.text( "UPDATE managed_repos SET status = 'active' WHERE slug = :slug" ), {"slug": slug}, ) op.drop_column("managed_repos", "standard_version") op.drop_column("managed_repos", "classified_by") op.drop_column("managed_repos", "classified_at") op.drop_column("managed_repos", "business_mechanics") op.drop_column("managed_repos", "business_stake") op.drop_column("managed_repos", "capability_tags") op.drop_column("managed_repos", "secondary_domains") op.drop_column("managed_repos", "category") op.create_check_constraint( "ck_decisions_topic_or_workstream", "decisions", "topic_id IS NOT NULL OR workstream_id IS NOT NULL", )