Implement Stripe publication layer and close WP-0007

This commit is contained in:
codex
2026-07-03 01:08:29 +02:00
parent 124ad48720
commit a76e57ba89
11 changed files with 1508 additions and 9 deletions

View File

@@ -13,6 +13,7 @@ pricing to payment-provider execution.
| [INTENT.md](INTENT.md) | Project purpose, problem space, lifecycle model |
| [docs/ProductRequirementsDocument.md](docs/ProductRequirementsDocument.md) | Generic product requirements |
| [docs/ImplementationRoadmap.md](docs/ImplementationRoadmap.md) | Milestone-based implementation path from observatory MVP to adaptive engine |
| [docs/StripePublication.md](docs/StripePublication.md) | Provider-neutral publication model and Stripe shadow-publication flow |
| [AGENTS.md](AGENTS.md) | Agent instructions, dev workflow, State Hub integration |
| [workplans/](workplans/) | Active workstreams and tasks |
| [projects/coulomb-pricing/](projects/coulomb-pricing/) | Coulomb Social MVP deployment material |

View File

@@ -0,0 +1,696 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Any, Literal
from .pricing_models import PricingModel
PublicationArtifactKind = Literal["product", "meter", "price", "commitment", "configuration"]
ArtifactMappingStatus = Literal["exact", "approximate", "unsupported"]
PublicationOperationKind = Literal["create", "update", "noop", "retire", "rollback"]
DriftSeverity = Literal["info", "warn", "error"]
def _serialize_value(value: Any) -> Any:
if isinstance(value, Decimal):
return str(value)
if hasattr(value, "__dataclass_fields__"):
return {
key: _serialize_value(getattr(value, key))
for key in value.__dataclass_fields__
}
if isinstance(value, tuple):
return [_serialize_value(item) for item in value]
if isinstance(value, list):
return [_serialize_value(item) for item in value]
if isinstance(value, dict):
return {key: _serialize_value(item) for key, item in value.items()}
return value
def _payload_signature(payload: Any) -> str:
return json.dumps(_serialize_value(payload), sort_keys=True)
@dataclass(frozen=True)
class CatalogProduct:
id: str
name: str
description: str
currency: str
lifecycle_phase: str
active_pricing_model_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublishableProduct:
key: str
product_id: str
name: str
description: str
currency: str
lifecycle_phase: str
active: bool
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublishableMeter:
key: str
meter_id: str
name: str
event_name: str
unit: str
aggregation: str = "sum"
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublishablePrice:
key: str
price_id: str
product_key: str
component_id: str
component_kind: str
label: str
currency: str
billing_treatment: str
cadence: str | None = None
amount: Decimal | None = None
unit_price: Decimal | None = None
included_units: Decimal | None = None
meter_key: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublishableCommitment:
key: str
commitment_id: str
kind: str
value: str
unit: str | None = None
description: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublishableConfiguration:
key: str
configuration_id: str
product_key: str
model_id: str
model_name: str
segment: str | None
price_keys: tuple[str, ...]
commitment_keys: tuple[str, ...]
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class PublicationBundle:
bundle_id: str
model_id: str
model_name: str
product: PublishableProduct
meters: tuple[PublishableMeter, ...]
prices: tuple[PublishablePrice, ...]
commitments: tuple[PublishableCommitment, ...]
configurations: tuple[PublishableConfiguration, ...]
provider_hints: dict[str, Any] = field(default_factory=dict)
notes: tuple[str, ...] = ()
@dataclass(frozen=True)
class ProviderMappedArtifact:
provider: str
source_key: str
source_kind: PublicationArtifactKind
provider_id: str
provider_object_type: str
mapping_status: ArtifactMappingStatus
payload: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)
notes: tuple[str, ...] = ()
@dataclass(frozen=True)
class ProviderPublicationPackage:
provider: str
bundle_id: str
model_id: str
model_name: str
artifacts: tuple[ProviderMappedArtifact, ...]
notes: tuple[str, ...] = ()
@dataclass(frozen=True)
class DriftFinding:
provider_id: str
provider_object_type: str
severity: DriftSeverity
summary: str
expected: dict[str, Any] = field(default_factory=dict)
actual: dict[str, Any] = field(default_factory=dict)
suggested_action: str | None = None
@dataclass(frozen=True)
class PublicationOperation:
kind: PublicationOperationKind
provider_id: str
provider_object_type: str
source_key: str | None
source_kind: PublicationArtifactKind | None
mapping_status: ArtifactMappingStatus | None
summary: str
desired_payload: dict[str, Any] = field(default_factory=dict)
current_payload: dict[str, Any] = field(default_factory=dict)
desired_metadata: dict[str, Any] = field(default_factory=dict)
current_metadata: dict[str, Any] = field(default_factory=dict)
desired_notes: tuple[str, ...] = ()
current_notes: tuple[str, ...] = ()
@dataclass(frozen=True)
class PublishedProviderArtifact:
provider: str
source_key: str
source_kind: PublicationArtifactKind
provider_id: str
provider_object_type: str
mapping_status: ArtifactMappingStatus
payload: dict[str, Any]
metadata: dict[str, Any] = field(default_factory=dict)
notes: tuple[str, ...] = ()
@dataclass(frozen=True)
class PublicationRevision:
revision_id: str
model_id: str
model_name: str
summary: str
operations: tuple[PublicationOperation, ...]
snapshot: tuple[PublishedProviderArtifact, ...]
replaced_revision_id: str | None = None
@dataclass(frozen=True)
class ProviderPublicationState:
provider: str
active_revision_id: str | None = None
active_model_id: str | None = None
artifacts: tuple[PublishedProviderArtifact, ...] = ()
revisions: tuple[PublicationRevision, ...] = ()
@dataclass(frozen=True)
class PublicationPlan:
provider: str
bundle_id: str
model_id: str
model_name: str
operations: tuple[PublicationOperation, ...]
drift: tuple[DriftFinding, ...]
unsupported_artifacts: tuple[ProviderMappedArtifact, ...]
summary: str
@dataclass(frozen=True)
class PublicationApplyResult:
plan: PublicationPlan
revision: PublicationRevision
state: ProviderPublicationState
summary: str
def build_publication_bundle(
product: CatalogProduct,
model: PricingModel,
*,
configuration_id: str | None = None,
segment: str | None = None,
) -> PublicationBundle:
product_key = f"product:{product.id}"
publishable_product = PublishableProduct(
key=product_key,
product_id=product.id,
name=product.name,
description=product.description,
currency=product.currency,
lifecycle_phase=product.lifecycle_phase,
active=model.status != "retired",
metadata={
**product.metadata,
"adaptive_pricing_model_id": model.id,
"adaptive_pricing_model_status": model.status,
},
)
meters: list[PublishableMeter] = []
prices: list[PublishablePrice] = []
for component in model.charge_components:
meter_key: str | None = None
if component.kind == "usage" and component.meter:
meter_key = f"meter:{model.id}:{component.id}"
meters.append(
PublishableMeter(
key=meter_key,
meter_id=component.meter,
name=component.label or component.meter,
event_name=component.meter,
unit=component.unit or "usage_unit",
metadata={
"adaptive_pricing_model_id": model.id,
"source_component_id": component.id,
},
)
)
prices.append(
PublishablePrice(
key=f"price:{model.id}:{component.id}",
price_id=f"{model.id}:{component.id}",
product_key=product_key,
component_id=component.id,
component_kind=component.kind,
label=component.label or component.id,
currency=model.currency,
billing_treatment=component.billing_treatment or "recurring",
cadence=component.cadence or (
model.access_fee_cadence if component.kind in {"access", "support", "discount", "risk_adjustment"} else None
),
amount=component.amount,
unit_price=component.unit_price,
included_units=component.included_units,
meter_key=meter_key,
metadata={
**component.metadata,
"adaptive_pricing_model_id": model.id,
"source_component_id": component.id,
"source_component_kind": component.kind,
},
)
)
commitments = tuple(
PublishableCommitment(
key=f"commitment:{model.id}:{commitment.id}",
commitment_id=commitment.id,
kind=commitment.kind,
value=commitment.value,
unit=commitment.unit,
description=commitment.description,
metadata={"adaptive_pricing_model_id": model.id},
)
for commitment in model.commitments
)
configuration = PublishableConfiguration(
key=f"configuration:{configuration_id or model.id}",
configuration_id=configuration_id or model.id,
product_key=product_key,
model_id=model.id,
model_name=model.name,
segment=segment,
price_keys=tuple(price.key for price in prices),
commitment_keys=tuple(commitment.key for commitment in commitments),
metadata={
"adaptive_pricing_model_id": model.id,
"lifecycle_phase": model.lifecycle_phase,
},
)
notes = (
"Publication bundles preserve the internal pricing model as the source of truth.",
"Provider mappings may mark artifacts exact, approximate, or unsupported without mutating the bundle.",
)
return PublicationBundle(
bundle_id=f"bundle:{model.id}",
model_id=model.id,
model_name=model.name,
product=publishable_product,
meters=tuple(meters),
prices=tuple(prices),
commitments=commitments,
configurations=(configuration,),
provider_hints=model.provider_hints,
notes=notes,
)
def _published_artifact(mapped: ProviderMappedArtifact) -> PublishedProviderArtifact:
return PublishedProviderArtifact(
provider=mapped.provider,
source_key=mapped.source_key,
source_kind=mapped.source_kind,
provider_id=mapped.provider_id,
provider_object_type=mapped.provider_object_type,
mapping_status=mapped.mapping_status,
payload=mapped.payload,
metadata=mapped.metadata,
notes=mapped.notes,
)
def _artifact_changed(
desired: ProviderMappedArtifact,
current: PublishedProviderArtifact,
) -> bool:
return any(
[
desired.mapping_status != current.mapping_status,
desired.provider_object_type != current.provider_object_type,
_payload_signature(desired.payload) != _payload_signature(current.payload),
_payload_signature(desired.metadata) != _payload_signature(current.metadata),
desired.notes != current.notes,
]
)
def _diff_summary(
desired: ProviderMappedArtifact,
current: PublishedProviderArtifact,
) -> DriftFinding:
return DriftFinding(
provider_id=desired.provider_id,
provider_object_type=desired.provider_object_type,
severity="warn",
summary="Provider shadow state differs from desired pricing definition.",
expected={
"payload": desired.payload,
"metadata": desired.metadata,
"mapping_status": desired.mapping_status,
"notes": list(desired.notes),
},
actual={
"payload": current.payload,
"metadata": current.metadata,
"mapping_status": current.mapping_status,
"notes": list(current.notes),
},
suggested_action="Publish the desired definition again or reconcile the provider-side drift.",
)
def plan_publication(
package: ProviderPublicationPackage,
current_state: ProviderPublicationState | None = None,
) -> PublicationPlan:
current_state = current_state or ProviderPublicationState(provider=package.provider)
current_index = {artifact.provider_id: artifact for artifact in current_state.artifacts}
desired_publishable = tuple(
artifact for artifact in package.artifacts if artifact.mapping_status != "unsupported"
)
unsupported = tuple(
artifact for artifact in package.artifacts if artifact.mapping_status == "unsupported"
)
operations: list[PublicationOperation] = []
drift: list[DriftFinding] = []
desired_ids = {artifact.provider_id for artifact in desired_publishable}
for artifact in desired_publishable:
current = current_index.get(artifact.provider_id)
if current is None:
operations.append(
PublicationOperation(
kind="create",
provider_id=artifact.provider_id,
provider_object_type=artifact.provider_object_type,
source_key=artifact.source_key,
source_kind=artifact.source_kind,
mapping_status=artifact.mapping_status,
summary="Create provider artifact from canonical pricing definition.",
desired_payload=artifact.payload,
desired_metadata=artifact.metadata,
desired_notes=artifact.notes,
)
)
continue
if _artifact_changed(artifact, current):
operations.append(
PublicationOperation(
kind="update",
provider_id=artifact.provider_id,
provider_object_type=artifact.provider_object_type,
source_key=artifact.source_key,
source_kind=artifact.source_kind,
mapping_status=artifact.mapping_status,
summary="Update provider artifact to match canonical pricing definition.",
desired_payload=artifact.payload,
current_payload=current.payload,
desired_metadata=artifact.metadata,
current_metadata=current.metadata,
desired_notes=artifact.notes,
current_notes=current.notes,
)
)
drift.append(_diff_summary(artifact, current))
continue
operations.append(
PublicationOperation(
kind="noop",
provider_id=artifact.provider_id,
provider_object_type=artifact.provider_object_type,
source_key=artifact.source_key,
source_kind=artifact.source_kind,
mapping_status=artifact.mapping_status,
summary="Provider artifact already matches canonical pricing definition.",
desired_payload=artifact.payload,
current_payload=current.payload,
desired_metadata=artifact.metadata,
current_metadata=current.metadata,
desired_notes=artifact.notes,
current_notes=current.notes,
)
)
for artifact in current_state.artifacts:
if artifact.provider_id in desired_ids:
continue
operations.append(
PublicationOperation(
kind="retire",
provider_id=artifact.provider_id,
provider_object_type=artifact.provider_object_type,
source_key=artifact.source_key,
source_kind=artifact.source_kind,
mapping_status=artifact.mapping_status,
summary="Retire managed provider artifact no longer present in the desired pricing definition.",
current_payload=artifact.payload,
current_metadata=artifact.metadata,
current_notes=artifact.notes,
)
)
drift.append(
DriftFinding(
provider_id=artifact.provider_id,
provider_object_type=artifact.provider_object_type,
severity="warn",
summary="Managed provider artifact exists in shadow state but not in the desired pricing definition.",
actual={
"payload": artifact.payload,
"metadata": artifact.metadata,
"mapping_status": artifact.mapping_status,
"notes": list(artifact.notes),
},
suggested_action="Retire the artifact or republish the desired model.",
)
)
summary = (
f"{package.provider}: {sum(op.kind == 'create' for op in operations)} create, "
f"{sum(op.kind == 'update' for op in operations)} update, "
f"{sum(op.kind == 'retire' for op in operations)} retire, "
f"{sum(op.kind == 'noop' for op in operations)} noop, "
f"{len(unsupported)} unsupported."
)
return PublicationPlan(
provider=package.provider,
bundle_id=package.bundle_id,
model_id=package.model_id,
model_name=package.model_name,
operations=tuple(operations),
drift=tuple(drift),
unsupported_artifacts=unsupported,
summary=summary,
)
def _next_revision_id(state: ProviderPublicationState) -> str:
return f"{state.provider}-rev-{len(state.revisions) + 1:04d}"
def _next_active_model_id(artifacts: list[PublishedProviderArtifact]) -> str | None:
product = next((artifact for artifact in artifacts if artifact.source_kind == "product"), None)
if product is None:
return None
if product.metadata.get("model_id"):
return str(product.metadata["model_id"])
payload_metadata = product.payload.get("metadata", {})
if payload_metadata.get("adaptive_pricing_model_id"):
return str(payload_metadata["adaptive_pricing_model_id"])
return None
def apply_publication(
package: ProviderPublicationPackage,
current_state: ProviderPublicationState | None = None,
) -> PublicationApplyResult:
current_state = current_state or ProviderPublicationState(provider=package.provider)
plan = plan_publication(package, current_state)
desired_index = {
artifact.provider_id: artifact
for artifact in package.artifacts
if artifact.mapping_status != "unsupported"
}
artifact_index = {artifact.provider_id: artifact for artifact in current_state.artifacts}
for operation in plan.operations:
if operation.kind in {"create", "update", "noop"}:
artifact_index[operation.provider_id] = _published_artifact(
desired_index[operation.provider_id]
)
elif operation.kind == "retire":
artifact_index.pop(operation.provider_id, None)
snapshot = tuple(sorted(artifact_index.values(), key=lambda artifact: artifact.provider_id))
revision = PublicationRevision(
revision_id=_next_revision_id(current_state),
model_id=package.model_id,
model_name=package.model_name,
summary=plan.summary,
operations=plan.operations,
snapshot=snapshot,
replaced_revision_id=current_state.active_revision_id,
)
state = ProviderPublicationState(
provider=package.provider,
active_revision_id=revision.revision_id,
active_model_id=_next_active_model_id(list(snapshot)),
artifacts=snapshot,
revisions=current_state.revisions + (revision,),
)
return PublicationApplyResult(
plan=plan,
revision=revision,
state=state,
summary=plan.summary,
)
def rollback_publication(
state: ProviderPublicationState,
revision_id: str,
) -> PublicationApplyResult:
revision = next((item for item in state.revisions if item.revision_id == revision_id), None)
if revision is None:
raise ValueError(f"unknown revision_id '{revision_id}'")
rollback_operation = PublicationOperation(
kind="rollback",
provider_id=revision.revision_id,
provider_object_type="revision",
source_key=None,
source_kind=None,
mapping_status=None,
summary=f"Rollback provider shadow state to {revision.revision_id}.",
)
new_revision = PublicationRevision(
revision_id=_next_revision_id(state),
model_id=revision.model_id,
model_name=revision.model_name,
summary=f"Rolled back provider shadow state to {revision.revision_id}.",
operations=(rollback_operation,),
snapshot=revision.snapshot,
replaced_revision_id=state.active_revision_id,
)
new_state = ProviderPublicationState(
provider=state.provider,
active_revision_id=new_revision.revision_id,
active_model_id=revision.model_id,
artifacts=revision.snapshot,
revisions=state.revisions + (new_revision,),
)
return PublicationApplyResult(
plan=PublicationPlan(
provider=state.provider,
bundle_id=f"rollback:{revision.revision_id}",
model_id=revision.model_id,
model_name=revision.model_name,
operations=(rollback_operation,),
drift=(),
unsupported_artifacts=(),
summary=new_revision.summary,
),
revision=new_revision,
state=new_state,
summary=new_revision.summary,
)
def provider_state_to_dict(state: ProviderPublicationState) -> dict[str, Any]:
return _serialize_value(state)
def _operation_from_dict(raw: dict[str, Any]) -> PublicationOperation:
return PublicationOperation(
kind=raw["kind"],
provider_id=raw["provider_id"],
provider_object_type=raw["provider_object_type"],
source_key=raw.get("source_key"),
source_kind=raw.get("source_kind"),
mapping_status=raw.get("mapping_status"),
summary=raw["summary"],
desired_payload=dict(raw.get("desired_payload", {})),
current_payload=dict(raw.get("current_payload", {})),
desired_metadata=dict(raw.get("desired_metadata", {})),
current_metadata=dict(raw.get("current_metadata", {})),
desired_notes=tuple(raw.get("desired_notes", [])),
current_notes=tuple(raw.get("current_notes", [])),
)
def _artifact_from_dict(raw: dict[str, Any]) -> PublishedProviderArtifact:
return PublishedProviderArtifact(
provider=raw["provider"],
source_key=raw["source_key"],
source_kind=raw["source_kind"],
provider_id=raw["provider_id"],
provider_object_type=raw["provider_object_type"],
mapping_status=raw["mapping_status"],
payload=dict(raw.get("payload", {})),
metadata=dict(raw.get("metadata", {})),
notes=tuple(raw.get("notes", [])),
)
def _revision_from_dict(raw: dict[str, Any]) -> PublicationRevision:
return PublicationRevision(
revision_id=raw["revision_id"],
model_id=raw["model_id"],
model_name=raw["model_name"],
summary=raw["summary"],
operations=tuple(_operation_from_dict(item) for item in raw.get("operations", [])),
snapshot=tuple(_artifact_from_dict(item) for item in raw.get("snapshot", [])),
replaced_revision_id=raw.get("replaced_revision_id"),
)
def provider_state_from_dict(raw: dict[str, Any]) -> ProviderPublicationState:
return ProviderPublicationState(
provider=raw["provider"],
active_revision_id=raw.get("active_revision_id"),
active_model_id=raw.get("active_model_id"),
artifacts=tuple(_artifact_from_dict(item) for item in raw.get("artifacts", [])),
revisions=tuple(_revision_from_dict(item) for item in raw.get("revisions", [])),
)

View File

@@ -0,0 +1,330 @@
from __future__ import annotations
from typing import Any
from .provider_publication import (
ProviderMappedArtifact,
ProviderPublicationPackage,
PublicationBundle,
PublishableCommitment,
PublishableConfiguration,
PublishableMeter,
PublishablePrice,
)
def _lookup_key(*parts: str) -> str:
return "--".join(
part.replace(":", "-").replace("_", "-")
for part in parts
if part
)
def _stripe_interval(value: str | None) -> str | None:
if value is None:
return None
normalized = value.lower()
if normalized in {"monthly", "month"}:
return "month"
if normalized in {"yearly", "annual", "year"}:
return "year"
return normalized
def _stripe_hints(bundle: PublicationBundle) -> dict[str, Any]:
return dict(bundle.provider_hints.get("stripe", {}))
def _product_provider_id(bundle: PublicationBundle) -> str:
hints = _stripe_hints(bundle)
return hints.get("product_lookup_key") or _lookup_key("product", bundle.product.product_id)
def _meter_provider_id(bundle: PublicationBundle, meter: PublishableMeter) -> str:
hints = _stripe_hints(bundle)
if hints.get("meter_name") and len(bundle.meters) == 1:
return str(hints["meter_name"])
return _lookup_key("meter", bundle.model_id, meter.meter_id)
def _product_artifact(bundle: PublicationBundle) -> ProviderMappedArtifact:
hints = _stripe_hints(bundle)
return ProviderMappedArtifact(
provider="stripe",
source_key=bundle.product.key,
source_kind="product",
provider_id=_product_provider_id(bundle),
provider_object_type="product",
mapping_status="exact",
payload={
"lookup_key": _product_provider_id(bundle),
"name": bundle.product.name,
"description": bundle.product.description,
"active": bundle.product.active,
"metadata": {
**bundle.product.metadata,
"collection_method": hints.get("collection_method", "charge_automatically"),
"source_of_truth": "adaptive-pricing",
},
},
metadata={"model_id": bundle.model_id},
notes=("Stripe product mapping is direct for catalog identity and metadata.",),
)
def _meter_artifact(bundle: PublicationBundle, meter: PublishableMeter) -> ProviderMappedArtifact:
provider_id = _meter_provider_id(bundle, meter)
return ProviderMappedArtifact(
provider="stripe",
source_key=meter.key,
source_kind="meter",
provider_id=provider_id,
provider_object_type="billing_meter",
mapping_status="exact",
payload={
"lookup_key": provider_id,
"display_name": meter.name,
"event_name": meter.event_name,
"default_aggregation": {"formula": meter.aggregation},
"unit_label": meter.unit,
"metadata": {
**meter.metadata,
"source_of_truth": "adaptive-pricing",
},
},
metadata={"model_id": bundle.model_id},
notes=("Stripe meter mapping is direct for metered usage identifiers.",),
)
def _fixed_price_payload(bundle: PublicationBundle, price: PublishablePrice) -> dict[str, Any]:
payload = {
"lookup_key": _lookup_key("price", bundle.model_id, price.component_id),
"product": _product_provider_id(bundle),
"currency": price.currency.lower(),
"nickname": price.label,
"unit_amount_decimal": str(price.amount or "0"),
"metadata": {
**price.metadata,
"source_of_truth": "adaptive-pricing",
},
}
if price.billing_treatment != "one_time" and price.cadence:
payload["recurring"] = {"interval": _stripe_interval(price.cadence)}
return payload
def _discount_artifact(bundle: PublicationBundle, price: PublishablePrice) -> ProviderMappedArtifact:
provider_id = _lookup_key("coupon", bundle.model_id, price.component_id)
return ProviderMappedArtifact(
provider="stripe",
source_key=price.key,
source_kind="price",
provider_id=provider_id,
provider_object_type="coupon",
mapping_status="approximate",
payload={
"lookup_key": provider_id,
"amount_off_decimal": str(abs(price.amount or 0)),
"currency": price.currency.lower(),
"name": price.label,
"metadata": {
**price.metadata,
"source_of_truth": "adaptive-pricing",
},
},
metadata={"model_id": bundle.model_id},
notes=(
"Stripe coupons approximate discount components because attachment to subscriptions and eligibility rules lives outside the price object.",
),
)
def _usage_price_artifact(
bundle: PublicationBundle,
price: PublishablePrice,
) -> ProviderMappedArtifact:
provider_id = _lookup_key("price", bundle.model_id, price.component_id)
if price.meter_key is None or price.unit_price is None:
return ProviderMappedArtifact(
provider="stripe",
source_key=price.key,
source_kind="price",
provider_id=provider_id,
provider_object_type="price",
mapping_status="unsupported",
payload={
"lookup_key": provider_id,
"reason": "usage component lacks a billable per-unit price or mapped meter",
},
metadata={"model_id": bundle.model_id},
notes=(
"Stripe publication cannot create a metered price without both a meter and a per-unit charge.",
),
)
meter_provider_id = _lookup_key("meter", bundle.model_id, price.component_id)
if len(bundle.meters) == 1:
meter_provider_id = _meter_provider_id(bundle, bundle.meters[0])
status = "approximate" if price.included_units not in (None, 0) else "exact"
notes = [
"Stripe metered price mapping is direct for per-unit overage billing.",
]
if status == "approximate":
notes.append(
"Included usage allowance requires supplemental credits, invoice adjustments, or custom entitlement logic outside the Stripe price object."
)
return ProviderMappedArtifact(
provider="stripe",
source_key=price.key,
source_kind="price",
provider_id=provider_id,
provider_object_type="price",
mapping_status=status,
payload={
"lookup_key": provider_id,
"product": _product_provider_id(bundle),
"currency": price.currency.lower(),
"nickname": price.label,
"billing_scheme": "per_unit",
"unit_amount_decimal": str(price.unit_price),
"recurring": {
"interval": _stripe_interval(price.cadence) or "month",
"usage_type": "metered",
},
"meter": meter_provider_id,
"metadata": {
**price.metadata,
"included_units": str(price.included_units) if price.included_units is not None else None,
"source_of_truth": "adaptive-pricing",
},
},
metadata={"model_id": bundle.model_id},
notes=tuple(notes),
)
def _price_artifact(bundle: PublicationBundle, price: PublishablePrice) -> ProviderMappedArtifact:
provider_id = _lookup_key("price", bundle.model_id, price.component_id)
if price.component_kind == "usage":
return _usage_price_artifact(bundle, price)
if price.component_kind == "discount":
return _discount_artifact(bundle, price)
return ProviderMappedArtifact(
provider="stripe",
source_key=price.key,
source_kind="price",
provider_id=provider_id,
provider_object_type="price",
mapping_status="exact",
payload=_fixed_price_payload(bundle, price),
metadata={"model_id": bundle.model_id},
notes=("Stripe price mapping is direct for fixed recurring or one-time charges.",),
)
def _commitment_artifact(
bundle: PublicationBundle,
commitment: PublishableCommitment,
) -> ProviderMappedArtifact:
provider_id = _lookup_key("commitment", bundle.model_id, commitment.commitment_id)
if commitment.kind == "contract_duration":
return ProviderMappedArtifact(
provider="stripe",
source_key=commitment.key,
source_kind="commitment",
provider_id=provider_id,
provider_object_type="metadata_binding",
mapping_status="approximate",
payload={
"lookup_key": provider_id,
"metadata": {
**commitment.metadata,
"contract_duration_value": commitment.value,
"contract_duration_unit": commitment.unit,
},
},
metadata={"model_id": bundle.model_id},
notes=(
"Stripe can store contract duration metadata, but enforcement still relies on subscription schedule policy or external contract workflow.",
),
)
return ProviderMappedArtifact(
provider="stripe",
source_key=commitment.key,
source_kind="commitment",
provider_id=provider_id,
provider_object_type="metadata_binding",
mapping_status="unsupported",
payload={
"lookup_key": provider_id,
"kind": commitment.kind,
"value": commitment.value,
"unit": commitment.unit,
},
metadata={"model_id": bundle.model_id},
notes=(
"Stripe metadata alone cannot enforce this commitment semantics; it remains an internal pricing and contract artifact.",
),
)
def _configuration_artifact(
bundle: PublicationBundle,
configuration: PublishableConfiguration,
) -> ProviderMappedArtifact:
provider_id = _lookup_key("configuration", configuration.configuration_id)
return ProviderMappedArtifact(
provider="stripe",
source_key=configuration.key,
source_kind="configuration",
provider_id=provider_id,
provider_object_type="metadata_binding",
mapping_status="approximate",
payload={
"lookup_key": provider_id,
"metadata": {
**configuration.metadata,
"configuration_id": configuration.configuration_id,
"segment": configuration.segment,
"price_keys": list(configuration.price_keys),
"commitment_keys": list(configuration.commitment_keys),
},
},
metadata={"model_id": bundle.model_id},
notes=(
"Customer or default pricing configurations can be recorded in Stripe metadata, but they are not first-class Stripe catalog objects.",
),
)
def map_bundle_to_stripe(bundle: PublicationBundle) -> ProviderPublicationPackage:
artifacts: list[ProviderMappedArtifact] = [_product_artifact(bundle)]
artifacts.extend(_meter_artifact(bundle, meter) for meter in bundle.meters)
artifacts.extend(_price_artifact(bundle, price) for price in bundle.prices)
artifacts.extend(_commitment_artifact(bundle, commitment) for commitment in bundle.commitments)
artifacts.extend(
_configuration_artifact(bundle, configuration)
for configuration in bundle.configurations
)
notes = [
"Stripe remains an execution backend; adaptive-pricing stays the source of truth.",
"Exact mappings create publishable Stripe shadow artifacts; approximate mappings require supplemental operational logic.",
]
if _stripe_hints(bundle).get("metered_usage_strategy") == "future_adapter":
notes.append(
"The pricing model declares a future metered-usage strategy hint, so Stripe publication keeps the usage allowance semantics descriptive rather than executable."
)
return ProviderPublicationPackage(
provider="stripe",
bundle_id=bundle.bundle_id,
model_id=bundle.model_id,
model_name=bundle.model_name,
artifacts=tuple(artifacts),
notes=tuple(notes),
)

114
docs/StripePublication.md Normal file
View File

@@ -0,0 +1,114 @@
# Stripe Publication
Status: MVP for `ADAPTIVE-WP-0007`.
## Purpose
This milestone adds the first outbound execution layer for pricing models.
The implementation keeps `adaptive-pricing` as the source of truth and treats
Stripe as an execution backend. In this repository, publication targets a
file-backed Stripe shadow state rather than the live Stripe API.
## Core Modules
- `adaptive_pricing_core/provider_publication.py`
- `adaptive_pricing_core/stripe_provider.py`
The provider-publication core defines:
- provider-neutral publishable artifacts
- publication plans and operations
- drift findings
- revisioned shadow state
- rollback mechanics
The Stripe mapper translates publishable artifacts into Stripe-oriented objects
and marks each mapping as:
- `exact`
- `approximate`
- `unsupported`
## Publishable Artifact Model
Current provider-neutral artifacts:
- product
- meter
- price
- commitment
- configuration
Current Stripe-oriented object types:
- `product`
- `billing_meter`
- `price`
- `coupon`
- `metadata_binding`
`metadata_binding` is used for execution-adjacent information that Stripe can
store as metadata but does not treat as a first-class pricing object.
## Mapping Semantics
Current exact mappings:
- catalog product identity and metadata
- fixed recurring and one-time prices
- metered usage prices without bundled allowance semantics
- Stripe meter definitions
Current approximate mappings:
- metered prices that also imply included usage
- discount components mapped as coupon-like artifacts
- contract-duration commitments carried as metadata or schedule-adjacent data
- configuration artifacts carried as metadata
Current unsupported mappings:
- included-usage-only components without a billable per-unit overage price
- commitment semantics such as prepayment or minimum turnover when Stripe alone
cannot enforce them
## Coulomb Adapter
Project adapter:
- `projects/coulomb-pricing/observatory/publication.py`
- `projects/coulomb-pricing/observatory/publish.py`
Default local shadow-state path:
- `projects/coulomb-pricing/data/provider_state/stripe-publication.json`
Preview:
```bash
cd projects/coulomb-pricing
python3 -m observatory.publish --model-id flat-899-eur-monthly
```
Apply to the local shadow state:
```bash
cd projects/coulomb-pricing
python3 -m observatory.publish --model-id flat-899-eur-monthly --apply
```
Rollback:
```bash
cd projects/coulomb-pricing
python3 -m observatory.publish --rollback stripe-rev-0001
```
## Current Scope Limit
This milestone does not call the live Stripe API.
It establishes the internal publication model, Stripe object mapping,
idempotent shadow-state synchronization, drift detection, and rollback path so
live API execution can be layered on without making Stripe the source of truth.

View File

@@ -61,3 +61,16 @@ python3 -m observatory.importers.openrouter --input data/imports/openrouter-expo
Sample exports live under `data/imports/`. Live API sync can replace these
file-based importers in a follow-on workplan.
### Stripe Publication (shadow state)
```bash
cd projects/coulomb-pricing
python3 -m observatory.publish --model-id flat-899-eur-monthly
python3 -m observatory.publish --model-id flat-899-eur-monthly --apply
python3 -m observatory.publish --rollback stripe-rev-0001
```
These commands preview, apply, and roll back the local Stripe shadow state used
by the provider-publication MVP. The live Stripe API is still outside this
milestone.

View File

@@ -26,6 +26,7 @@ from .boundary import build_boundary_validation
from .credits import build_credit_summary, load_credit_wallets
from .membership_analytics import build_membership_analytics
from .pricing_context import build_cost_floor, build_market_price_view, build_value_range_view
from .publication import build_stripe_publication_preview
from .recommendations import build_pricing_recommendations
from .simulator import build_pricing_simulations
from .tuning import build_customer_tuning_pilot
@@ -122,6 +123,12 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
recommendations = build_pricing_recommendations(
cost_floor, value_range, market_price, simulations, usage_summary
)
provider_publication = build_stripe_publication_preview(
product,
models,
root,
model_id=product.active_pricing_model_id,
)
return _serialize(
{
@@ -149,6 +156,7 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
"boundary_validation": boundary_validation,
"credit_wallets": credit_summary,
"recommendations": recommendations,
"provider_publication": provider_publication,
"infrastructure": {
"domains": _load_json_catalog(root, "domains.json"),
"virtual_servers": _load_json_catalog(root, "virtual_servers.json"),

View File

@@ -0,0 +1,159 @@
from __future__ import annotations
import json
from decimal import Decimal
from pathlib import Path
from typing import Any
from ._repo_root import ensure_repo_root_on_syspath
from .models import PricingModel, Product
ensure_repo_root_on_syspath()
from adaptive_pricing_core.provider_publication import ( # noqa: E402
CatalogProduct,
ProviderPublicationState,
apply_publication,
build_publication_bundle,
plan_publication,
provider_state_from_dict,
provider_state_to_dict,
rollback_publication,
)
from adaptive_pricing_core.stripe_provider import map_bundle_to_stripe # noqa: E402
def _serialize(value: Any) -> Any:
if isinstance(value, Decimal):
return str(value)
if hasattr(value, "__dataclass_fields__"):
return {key: _serialize(getattr(value, key)) for key in value.__dataclass_fields__}
if isinstance(value, tuple):
return [_serialize(item) for item in value]
if isinstance(value, list):
return [_serialize(item) for item in value]
if isinstance(value, dict):
return {key: _serialize(item) for key, item in value.items()}
return value
def default_stripe_state_path(data_dir: Path) -> Path:
return data_dir / "provider_state" / "stripe-publication.json"
def load_stripe_publication_state(path: Path) -> ProviderPublicationState:
if not path.exists():
return ProviderPublicationState(provider="stripe")
return provider_state_from_dict(json.loads(path.read_text(encoding="utf-8")))
def save_stripe_publication_state(path: Path, state: ProviderPublicationState) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
json.dumps(provider_state_to_dict(state), indent=2),
encoding="utf-8",
)
def _catalog_product(product: Product) -> CatalogProduct:
return CatalogProduct(
id=product.id,
name=product.name,
description=product.description,
currency=product.currency,
lifecycle_phase=product.lifecycle_phase,
active_pricing_model_id=product.active_pricing_model_id,
metadata={"product_channel": "membership"},
)
def _target_model(
models: list[PricingModel],
product: Product,
model_id: str | None = None,
) -> PricingModel:
requested_id = model_id or product.active_pricing_model_id
return next(item for item in models if item.id == requested_id)
def build_stripe_publication_preview(
product: Product,
models: list[PricingModel],
data_dir: Path,
*,
model_id: str | None = None,
state_path: Path | None = None,
) -> dict[str, Any]:
model = _target_model(models, product, model_id)
bundle = build_publication_bundle(_catalog_product(product), model)
package = map_bundle_to_stripe(bundle)
state = load_stripe_publication_state(state_path or default_stripe_state_path(data_dir))
plan = plan_publication(package, state)
return _serialize(
{
"provider": "stripe",
"state_path": str(state_path or default_stripe_state_path(data_dir)),
"model_id": model.id,
"model_name": model.name,
"current_state": {
"active_revision_id": state.active_revision_id,
"active_model_id": state.active_model_id,
"artifact_count": len(state.artifacts),
"revision_count": len(state.revisions),
},
"artifact_counts": {
"exact": sum(item.mapping_status == "exact" for item in package.artifacts),
"approximate": sum(item.mapping_status == "approximate" for item in package.artifacts),
"unsupported": sum(item.mapping_status == "unsupported" for item in package.artifacts),
},
"plan": plan,
"notes": package.notes,
}
)
def publish_to_stripe_shadow(
product: Product,
models: list[PricingModel],
data_dir: Path,
*,
model_id: str | None = None,
state_path: Path | None = None,
) -> dict[str, Any]:
path = state_path or default_stripe_state_path(data_dir)
model = _target_model(models, product, model_id)
bundle = build_publication_bundle(_catalog_product(product), model)
package = map_bundle_to_stripe(bundle)
current_state = load_stripe_publication_state(path)
result = apply_publication(package, current_state)
save_stripe_publication_state(path, result.state)
return _serialize(
{
"provider": "stripe",
"state_path": str(path),
"model_id": model.id,
"model_name": model.name,
"result": result,
}
)
def rollback_stripe_shadow(
data_dir: Path,
revision_id: str,
*,
state_path: Path | None = None,
) -> dict[str, Any]:
path = state_path or default_stripe_state_path(data_dir)
current_state = load_stripe_publication_state(path)
result = rollback_publication(current_state, revision_id)
save_stripe_publication_state(path, result.state)
return _serialize(
{
"provider": "stripe",
"state_path": str(path),
"result": result,
}
)

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from .load import default_data_dir, load_pricing_models, load_product
from .publication import (
build_stripe_publication_preview,
publish_to_stripe_shadow,
rollback_stripe_shadow,
)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Preview or apply Stripe publication for pricing models")
parser.add_argument("--data-dir", type=Path, default=default_data_dir())
parser.add_argument("--model-id", help="Pricing model id to preview or publish")
parser.add_argument("--provider", default="stripe", choices=["stripe"])
parser.add_argument("--state-path", type=Path, help="Override provider shadow-state path")
parser.add_argument("--apply", action="store_true", help="Apply the publication plan to the local Stripe shadow state")
parser.add_argument("--rollback", help="Rollback the local Stripe shadow state to a prior revision id")
args = parser.parse_args(argv)
product = load_product(args.data_dir)
models = load_pricing_models(args.data_dir)
if args.rollback:
payload = rollback_stripe_shadow(args.data_dir, args.rollback, state_path=args.state_path)
elif args.apply:
payload = publish_to_stripe_shadow(
product,
models,
args.data_dir,
model_id=args.model_id,
state_path=args.state_path,
)
else:
payload = build_stripe_publication_preview(
product,
models,
args.data_dir,
model_id=args.model_id,
state_path=args.state_path,
)
print(json.dumps(payload, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -30,6 +30,9 @@ def test_dashboard_payload_contains_live_ledger_totals() -> None:
assert payload["pricing_simulations"]["reference_model_id"] is not None
assert payload["customer_tuning"]["request_count"] == 2
assert payload["customer_tuning"]["accepted_request_ids"] == ["small-team-lower-usage-price"]
assert payload["provider_publication"]["provider"] == "stripe"
assert payload["provider_publication"]["model_id"] == "flat-899-eur-monthly"
assert payload["provider_publication"]["plan"]["summary"].startswith("stripe:")
assert len(payload["boundary_validation"]["model_results"]) == 3
assert payload["boundary_validation"]["policy"]["target_margin_pct"] == "15"
assert any(

View File

@@ -0,0 +1,124 @@
from __future__ import annotations
import json
from pathlib import Path
from observatory.load import load_pricing_models, load_product
from observatory.publication import (
build_stripe_publication_preview,
publish_to_stripe_shadow,
rollback_stripe_shadow,
)
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
def _catalog():
return load_product(DATA_DIR), load_pricing_models(DATA_DIR)
def test_overage_preview_includes_meter_and_approximate_mapping(tmp_path: Path) -> None:
product, models = _catalog()
preview = build_stripe_publication_preview(
product,
models,
DATA_DIR,
model_id="membership-plus-overage",
state_path=tmp_path / "stripe-state.json",
)
assert preview["artifact_counts"]["exact"] >= 3
assert preview["artifact_counts"]["approximate"] >= 1
assert preview["artifact_counts"]["unsupported"] == 0
assert any(
operation["provider_object_type"] == "billing_meter"
for operation in preview["plan"]["operations"]
)
def test_credit_allowance_preview_marks_unsupported_usage_mapping(tmp_path: Path) -> None:
product, models = _catalog()
preview = build_stripe_publication_preview(
product,
models,
DATA_DIR,
model_id="membership-plus-credits",
state_path=tmp_path / "stripe-state.json",
)
unsupported = {
artifact["source_key"]
for artifact in preview["plan"]["unsupported_artifacts"]
}
assert "price:membership-plus-credits:ai-credit-allowance" in unsupported
assert preview["artifact_counts"]["unsupported"] >= 1
def test_publication_is_idempotent_and_detects_drift(tmp_path: Path) -> None:
product, models = _catalog()
state_path = tmp_path / "stripe-state.json"
publish_to_stripe_shadow(
product,
models,
DATA_DIR,
model_id="flat-899-eur-monthly",
state_path=state_path,
)
preview = build_stripe_publication_preview(
product,
models,
DATA_DIR,
model_id="flat-899-eur-monthly",
state_path=state_path,
)
assert {operation["kind"] for operation in preview["plan"]["operations"]} == {"noop"}
raw_state = json.loads(state_path.read_text(encoding="utf-8"))
price_artifact = next(
artifact
for artifact in raw_state["artifacts"]
if artifact["provider_id"] == "price--flat-899-eur-monthly--membership-access"
)
price_artifact["payload"]["unit_amount_decimal"] = "9.49"
state_path.write_text(json.dumps(raw_state, indent=2), encoding="utf-8")
drifted = build_stripe_publication_preview(
product,
models,
DATA_DIR,
model_id="flat-899-eur-monthly",
state_path=state_path,
)
assert drifted["plan"]["drift"]
assert any(operation["kind"] == "update" for operation in drifted["plan"]["operations"])
def test_publication_rollback_restores_prior_revision(tmp_path: Path) -> None:
product, models = _catalog()
state_path = tmp_path / "stripe-state.json"
first = publish_to_stripe_shadow(
product,
models,
DATA_DIR,
model_id="flat-899-eur-monthly",
state_path=state_path,
)
publish_to_stripe_shadow(
product,
models,
DATA_DIR,
model_id="membership-plus-overage",
state_path=state_path,
)
rollback = rollback_stripe_shadow(
DATA_DIR,
first["result"]["revision"]["revision_id"],
state_path=state_path,
)
assert rollback["result"]["state"]["active_model_id"] == "flat-899-eur-monthly"
assert rollback["result"]["revision"]["summary"].startswith("Rolled back")

View File

@@ -4,11 +4,11 @@ type: workplan
title: "Provider abstraction and Stripe publication"
domain: financials
repo: adaptive-pricing
status: backlog
status: finished
owner: codex
topic_slug: helix-forge
created: "2026-07-02"
updated: "2026-07-02"
updated: "2026-07-03"
state_hub_workstream_id: "3e348457-0ef7-4b5c-b4c5-9668e03b63ac"
---
@@ -21,7 +21,7 @@ published to Stripe without making Stripe the source of truth.
```task
id: ADAPTIVE-WP-0007-T01
status: todo
status: done
priority: high
state_hub_task_id: "b4eb157d-54a8-4518-b99c-16355c3e22cf"
```
@@ -34,7 +34,7 @@ configurations.
```task
id: ADAPTIVE-WP-0007-T02
status: todo
status: done
priority: high
state_hub_task_id: "73fd1384-0b9e-48ea-851b-4045928e9c58"
```
@@ -46,7 +46,7 @@ where the mapping is exact, approximate, or unsupported.
```task
id: ADAPTIVE-WP-0007-T03
status: todo
status: done
priority: high
state_hub_task_id: "da137f4b-0b2a-49fb-adf2-7133cb908af0"
```
@@ -58,7 +58,7 @@ provider state remains auditable and synchronized with internal definitions.
```task
id: ADAPTIVE-WP-0007-T04
status: todo
status: done
priority: medium
state_hub_task_id: "3e3e2fc3-f38f-4869-890a-94bffac97439"
```
@@ -70,7 +70,7 @@ state with Stripe artifacts and to unwind failed publish attempts safely.
```task
id: ADAPTIVE-WP-0007-T05
status: todo
status: done
priority: medium
state_hub_task_id: "1dd8f49a-87fb-43ea-9c65-b6aa909e4a56"
```
@@ -79,4 +79,3 @@ Exit when the repo can publish a validated pricing model to Stripe in a
repeatable, explainable, and reversible way.
Dependencies: `ADAPTIVE-WP-0003`, `ADAPTIVE-WP-0004`, `ADAPTIVE-WP-0005`.