Implement comparable LTV engine and close WP-0005

This commit is contained in:
codex
2026-07-02 22:50:16 +02:00
parent 656bbb81a5
commit 386c8a46fe
13 changed files with 1060 additions and 68 deletions

View File

@@ -9,6 +9,7 @@ from .economics import build_liquidity_summary, build_snapshot
from .load import (
default_data_dir,
latest_period,
load_ltv_scenarios,
load_budget,
load_expense_records,
load_market_signals,
@@ -88,12 +89,19 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
market_raw = load_market_signals(root)
usage_records = load_usage_records(root)
usage_summary = build_usage_summary(usage_records, target_period)
ltv_scenarios = load_ltv_scenarios(root)
cost_floor = build_cost_floor(snapshot, models)
value_range = build_value_range_view(value_range_raw, snapshot, product, models)
market_price = build_market_price_view(market_raw)
cost_allocation = build_cost_allocation(snapshot, usage_records)
ai_cost_per_member = usage_summary["cost_per_active_user_eur"]
simulations = build_pricing_simulations(snapshot, models, ai_cost_per_member)
simulations = build_pricing_simulations(
snapshot,
models,
ai_cost_per_member,
usage_records=usage_records,
scenario_catalog=ltv_scenarios,
)
boundary_validation = build_boundary_validation(snapshot, models, usage_records)
credit_wallets = load_credit_wallets(root)
credit_summary = build_credit_summary(

View File

@@ -115,6 +115,10 @@ def load_market_signals(data_dir: Path | None = None) -> dict:
return _read_json((data_dir or default_data_dir()) / "market_signals.json")
def load_ltv_scenarios(data_dir: Path | None = None) -> dict:
return _read_json((data_dir or default_data_dir()) / "ltv_scenarios.json")
def load_membership(data_dir: Path | None = None) -> list[MembershipRecord]:
raw = _read_json((data_dir or default_data_dir()) / "membership.json")
return [

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from decimal import Decimal
from typing import Any
from ._repo_root import ensure_repo_root_on_syspath
from .boundary import build_boundary_policy
from .models import EconomicsSnapshot, PricingModel
ensure_repo_root_on_syspath()
from adaptive_pricing_core.boundary_engine import PricingConfiguration # noqa: E402
from adaptive_pricing_core.comparable_ltv import ( # noqa: E402
ComparableCustomerProfile,
LTVPolicy,
SensitivityCase,
compare_pricing_configurations,
)
def _serialize(value: Any) -> Any:
if isinstance(value, Decimal):
return str(value)
if hasattr(value, "__dataclass_fields__"):
return {key: _serialize(getattr(value, key)) for key in value.__dataclass_fields__}
if isinstance(value, tuple):
return [_serialize(item) for item in value]
if isinstance(value, list):
return [_serialize(item) for item in value]
if isinstance(value, dict):
return {key: _serialize(item) for key, item in value.items()}
return value
def _decimal(value: Decimal | str | int | float | None) -> Decimal:
if value in (None, ""):
return Decimal("0")
return Decimal(str(value))
def _usage_component(model: PricingModel):
return next((component for component in model.charge_components if component.kind == "usage"), None)
def _included_units(model: PricingModel, members_per_customer: int) -> Decimal | None:
usage = _usage_component(model)
if not usage or usage.included_units is None:
return None
return usage.included_units * Decimal(members_per_customer)
def _usage_unit_price(model: PricingModel) -> Decimal | None:
usage = _usage_component(model)
if not usage or usage.unit_price is None:
return None
return usage.unit_price
def _usage_unit_cost(records: list[dict[str, Any]], period: str) -> Decimal:
period_rows = [row for row in records if row.get("period") == period]
total_units = sum(_decimal(row.get("tokens")) for row in period_rows)
total_cost = sum(_decimal(row.get("cost_eur")) for row in period_rows)
if total_units <= Decimal("0"):
return Decimal("0")
return total_cost / total_units
def _payment_fee_rate(snapshot: EconomicsSnapshot) -> Decimal:
if snapshot.monthly_revenue <= Decimal("0"):
return Decimal("0")
return (snapshot.monthly_payment_processing_cost / snapshot.monthly_revenue) * Decimal("100")
def _profile(raw: dict[str, Any]) -> ComparableCustomerProfile:
return ComparableCustomerProfile(
id=raw["id"],
name=raw["name"],
segment=raw["segment"],
eligible_model_ids=tuple(raw.get("eligible_model_ids", [])),
members_per_customer=int(raw.get("members_per_customer", 1)),
expected_monthly_usage_units=_decimal(raw.get("expected_monthly_usage_units")),
usage_variance_pct=_decimal(raw.get("usage_variance_pct")),
monthly_churn_pct=_decimal(raw.get("monthly_churn_pct")),
monthly_default_pct=_decimal(raw.get("monthly_default_pct")),
monthly_support_cost=_decimal(raw.get("monthly_support_cost")),
monthly_risk_cost=_decimal(raw.get("monthly_risk_cost")),
acquisition_cost=_decimal(raw.get("acquisition_cost")),
upfront_investment_cost=_decimal(raw.get("upfront_investment_cost")),
allocated_fixed_cost=_decimal(raw["allocated_fixed_cost"]) if raw.get("allocated_fixed_cost") else None,
notes=raw.get("notes", ""),
)
def _sensitivity_case(raw: dict[str, Any]) -> SensitivityCase:
return SensitivityCase(
id=raw["id"],
name=raw["name"],
usage_multiplier=_decimal(raw.get("usage_multiplier", "1")),
usage_variance_delta_pct=_decimal(raw.get("usage_variance_delta_pct")),
monthly_churn_delta_pct=_decimal(raw.get("monthly_churn_delta_pct")),
monthly_default_delta_pct=_decimal(raw.get("monthly_default_delta_pct")),
monthly_support_cost_delta=_decimal(raw.get("monthly_support_cost_delta")),
monthly_risk_cost_delta=_decimal(raw.get("monthly_risk_cost_delta")),
)
def _ltv_policy(raw: dict[str, Any]) -> LTVPolicy:
return LTVPolicy(
horizon_months=int(raw.get("horizon_months", 24)),
monthly_discount_rate_pct=_decimal(raw.get("monthly_discount_rate_pct", "1.0")),
required_improvement_factor=_decimal(raw.get("required_improvement_factor", "1.05")),
)
def _configuration(
model: PricingModel,
profile: ComparableCustomerProfile,
snapshot: EconomicsSnapshot,
usage_unit_cost: Decimal,
) -> PricingConfiguration:
members_per_customer = max(profile.members_per_customer, 1)
per_member_fixed_cost = (
snapshot.monthly_infrastructure_cost / snapshot.active_members
if snapshot.active_members
else snapshot.monthly_infrastructure_cost
)
allocated_fixed_cost = (
profile.allocated_fixed_cost
if profile.allocated_fixed_cost is not None
else per_member_fixed_cost * Decimal(members_per_customer)
)
return PricingConfiguration(
model=model,
segment=profile.segment,
expected_usage_units=profile.expected_monthly_usage_units,
expected_usage_variance_pct=profile.usage_variance_pct,
allocated_fixed_cost=allocated_fixed_cost,
unit_cost=usage_unit_cost,
support_cost=profile.monthly_support_cost,
risk_cost=profile.monthly_risk_cost,
payment_fee_rate_pct=_payment_fee_rate(snapshot),
access_fee_amount=model.access_fee_amount * Decimal(members_per_customer),
included_units=_included_units(model, members_per_customer),
usage_unit_price=_usage_unit_price(model),
)
def build_ltv_simulations(
snapshot: EconomicsSnapshot,
models: list[PricingModel],
usage_records: list[dict[str, Any]],
scenario_catalog: dict[str, Any],
) -> dict[str, Any]:
policy = _ltv_policy(scenario_catalog)
boundary_policy = build_boundary_policy(snapshot)
sensitivity_cases = tuple(_sensitivity_case(item) for item in scenario_catalog.get("sensitivity_cases", []))
observed_usage_unit_cost = _usage_unit_cost(usage_records, snapshot.period)
profile_results = []
for raw_profile in scenario_catalog.get("profiles", []):
profile = _profile(raw_profile)
configurations = [
_configuration(model, profile, snapshot, observed_usage_unit_cost)
for model in models
if model.status in ("active", "candidate")
]
profile_results.append(
compare_pricing_configurations(
configurations,
profile,
boundary_policy,
policy,
sensitivity_cases=sensitivity_cases,
)
)
primary = profile_results[0] if profile_results else None
primary_scenarios = list(primary.comparisons) if primary else []
active_model = next((model for model in models if model.status == "active"), None)
best_margin = max(primary_scenarios, key=lambda item: item.base_monthly_margin, default=None)
best_ltv = max(
primary_scenarios,
key=lambda item: item.average_comparable_customer_lifetime_value,
default=None,
)
return _serialize({
"period": snapshot.period,
"currency": snapshot.currency,
"required_improvement_factor": policy.required_improvement_factor,
"horizon_months": policy.horizon_months,
"monthly_discount_rate_pct": policy.monthly_discount_rate_pct,
"active_scenario_id": active_model.id if active_model else None,
"best_margin_scenario_id": best_margin.model_id if best_margin else None,
"best_ltv_scenario_id": best_ltv.model_id if best_ltv else None,
"reference_model_id": primary.reference_model_id if primary else None,
"primary_profile_id": primary.profile.id if primary else None,
"scenarios": primary_scenarios,
"profile_comparisons": profile_results,
"calibration": {
"observed_usage_unit_cost": observed_usage_unit_cost,
"observed_payment_fee_rate_pct": _payment_fee_rate(snapshot),
"profile_count": len(profile_results),
},
"notes": [
scenario_catalog.get("notes", ""),
"Primary scenarios expose the first configured comparable-customer profile for backward-compatible UI consumers.",
"Profile comparisons compare candidate models using discounted seller LTV rather than only current-period gross margin.",
],
})

View File

@@ -30,7 +30,7 @@ def build_pricing_recommendations(
if ai_spend > Decimal("0") and cost_per_member > Decimal("0"):
ai_ratio = (ai_spend / cost_per_member) * Decimal("100")
if ai_ratio > Decimal("15"):
best = simulations.get("best_margin_scenario_id")
best = simulations.get("best_ltv_scenario_id") or simulations.get("best_margin_scenario_id")
recommendations.append(
{
"id": "usage-pricing-signal",
@@ -65,4 +65,4 @@ def build_pricing_recommendations(
}
)
return recommendations
return recommendations

View File

@@ -1,70 +1,64 @@
from __future__ import annotations
from decimal import Decimal, ROUND_HALF_UP
from decimal import Decimal
from typing import Any
from .ltv import build_ltv_simulations
from .models import EconomicsSnapshot, PricingModel
TWOPLACES = Decimal("0.01")
OVERAGE_RATE = Decimal("0.002") # EUR per token above allowance (observatory estimate)
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def _simulate_model(
model: PricingModel,
snapshot: EconomicsSnapshot,
ai_cost_per_member: Decimal,
included_tokens: int = 100_000,
actual_tokens: int = 120_000,
) -> dict:
members = snapshot.active_members or 1
subscription_revenue = model.access_fee_amount * members
overage_revenue = Decimal("0")
if model.model_type == "hybrid_subscription_usage" and actual_tokens > included_tokens:
overage_tokens = actual_tokens - included_tokens
overage_revenue = OVERAGE_RATE * overage_tokens * members
gross_revenue = subscription_revenue + overage_revenue
platform_cost = snapshot.monthly_total_platform_cost + (ai_cost_per_member * members)
margin = gross_revenue - platform_cost
margin_pct = (margin / gross_revenue * Decimal("100")) if gross_revenue else Decimal("0")
def _fallback_catalog(models: list[PricingModel]) -> dict[str, Any]:
return {
"model_id": model.id,
"model_name": model.name,
"model_type": model.model_type,
"status": model.status,
"access_fee_eur": model.access_fee_amount,
"projected_revenue_eur": _money(gross_revenue),
"projected_overage_eur": _money(overage_revenue),
"projected_platform_cost_eur": _money(platform_cost),
"projected_margin_eur": _money(margin),
"projected_margin_pct": _money(margin_pct),
"assumed_tokens_per_member": actual_tokens,
"included_tokens": included_tokens if model.model_type != "flat_subscription" else None,
"version": 1,
"currency": "EUR",
"horizon_months": 24,
"monthly_discount_rate_pct": "1.0",
"required_improvement_factor": "1.05",
"profiles": [
{
"id": "observatory-default",
"name": "Observatory default",
"segment": "coulomb-social-members",
"eligible_model_ids": [model.id for model in models if model.status in ("active", "candidate")],
"members_per_customer": 1,
"expected_monthly_usage_units": "120000",
"usage_variance_pct": "25",
"monthly_churn_pct": "5.0",
"monthly_default_pct": "1.0",
"monthly_support_cost": "0.00",
"monthly_risk_cost": "0.00",
"acquisition_cost": "0.00",
"upfront_investment_cost": "0.00",
"notes": "Fallback scenario when no explicit LTV scenario catalog is provided."
}
],
"notes": "Fallback scenario catalog generated inside observatory.simulator.",
}
def _fallback_usage_records(snapshot: EconomicsSnapshot, ai_cost_per_member: Decimal) -> list[dict[str, Any]]:
return [
{
"id": "fallback-usage",
"period": snapshot.period,
"member_id": "fallback",
"tokens": 120000,
"cost_eur": ai_cost_per_member,
"source": "fallback",
}
]
def build_pricing_simulations(
snapshot: EconomicsSnapshot,
models: list[PricingModel],
ai_cost_per_member: Decimal,
) -> dict:
scenarios = [
_simulate_model(model, snapshot, ai_cost_per_member)
for model in models
if model.status in ("active", "candidate")
]
active = next((item for item in scenarios if item["status"] == "active"), scenarios[0])
best_margin = max(scenarios, key=lambda item: item["projected_margin_eur"])
return {
"period": snapshot.period,
"currency": snapshot.currency,
"active_scenario_id": active["model_id"],
"best_margin_scenario_id": best_margin["model_id"],
"scenarios": scenarios,
"notes": "Projections hold member count and infrastructure cost constant; overage uses observatory token estimate.",
}
usage_records: list[dict[str, Any]] | None = None,
scenario_catalog: dict[str, Any] | None = None,
) -> dict[str, Any]:
return build_ltv_simulations(
snapshot,
models,
usage_records or _fallback_usage_records(snapshot, ai_cost_per_member),
scenario_catalog or _fallback_catalog(models),
)