"""Scoping, proposed-not-applied output, and the active-pattern registry (PRD §6.4 FR-X2/FR-X3/FR-X4; T04). * **Scope (FR-X2):** a pattern lands in a target environment only if the target's repo/domain/flavor are within the pattern's :class:`Scope` (an empty scope list means "unrestricted on that axis"). * **Proposed, not applied (FR-X3):** rendered artifacts are written under a ``proposals/`` tree mirroring the target path — a reviewable diff a human applies, never auto-written into the live file. Re-running upserts each pattern's block in place (idempotent), so proposals don't accumulate duplicates. * **Active-pattern registry (FR-X4):** a JSON record of which pattern (and version) is proposed/active in which (repo, flavor) environment. """ from __future__ import annotations import json import os from dataclasses import dataclass from datetime import datetime, timezone from ..curate.schema import SolutionPattern from .base import upsert_block from .registry import get_distributor def _now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") @dataclass(frozen=True) class Target: """An environment a pattern could be distributed to.""" repo: str domain: str = "" flavor: str = "claude" def applies(pattern: SolutionPattern, target: Target) -> bool: """True if ``target`` is within the pattern's scope (empty axis == any).""" sc = pattern.scope if sc.repos and target.repo not in sc.repos: return False if sc.domains and target.domain and target.domain not in sc.domains: return False if sc.flavors and target.flavor not in sc.flavors: return False return True def is_distributable(pattern: SolutionPattern) -> bool: return pattern.status == "approved" and pattern.distribution_ready class ActiveRegistry: """JSON record of patterns proposed/active per (repo, flavor) — FR-X4.""" def __init__(self, path: str) -> None: self.path = path self._entries: dict[str, dict] = {} if os.path.exists(path): with open(path, encoding="utf-8") as fh: for e in json.load(fh): self._entries[self._key(e["pattern_id"], e["repo"], e["flavor"])] = e @staticmethod def _key(pid: str, repo: str, flavor: str) -> str: return f"{pid}|{repo}|{flavor}" def record(self, pid: str, repo: str, flavor: str, version: str, status: str = "proposed") -> None: self._entries[self._key(pid, repo, flavor)] = { "pattern_id": pid, "repo": repo, "flavor": flavor, "version": version, "status": status, "updated_at": _now(), } def entries(self) -> list[dict]: return [self._entries[k] for k in sorted(self._entries)] def save(self) -> None: os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) with open(self.path, "w", encoding="utf-8") as fh: json.dump(self.entries(), fh, indent=2, sort_keys=True) fh.write("\n") @dataclass class ProposalResult: proposals: list = None # (repo, flavor, pattern_id, proposal_path) files_written: list = None # absolute proposal paths skipped_not_distributable: list = None # pattern ids def __post_init__(self): self.proposals = self.proposals or [] self.files_written = self.files_written or [] self.skipped_not_distributable = self.skipped_not_distributable or [] def propose(patterns: list[SolutionPattern], targets: list[Target], out_dir: str, registry: ActiveRegistry) -> ProposalResult: """Render in-scope, distributable patterns into per-target proposal files.""" result = ProposalResult() pending: dict[str, str] = {} # proposal path -> accumulated content for p in patterns: if not is_distributable(p): result.skipped_not_distributable.append(p.id) continue for t in targets: dist = get_distributor(t.flavor) if dist is None or not applies(p, t): continue art = dist.render(p) path = os.path.join(out_dir, t.repo, art.target_path) if path not in pending: pending[path] = _read(path) pending[path] = upsert_block(pending[path], p.id, art.content) registry.record(p.id, t.repo, t.flavor, p.version) result.proposals.append((t.repo, t.flavor, p.id, path)) for path, content in pending.items(): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8") as fh: fh.write(content if content.endswith("\n") else content + "\n") result.files_written.append(path) registry.save() return result def _read(path: str) -> str: if os.path.exists(path): with open(path, encoding="utf-8") as fh: return fh.read() return ""