Files
adaptive-pricing/adaptive_pricing_core/customer_tuning.py

512 lines
19 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, replace
from decimal import Decimal, ROUND_HALF_UP
from typing import Literal
from .boundary_engine import (
BoundaryPolicy,
CommitmentTerms,
ConstraintResult,
PricingConfiguration,
ValidationResult,
)
from .comparable_ltv import (
ComparableCustomerProfile,
ComparableLTVEstimate,
LTVPolicy,
estimate_comparable_customer_ltv,
required_improvement_threshold,
select_reference_estimate,
)
SolverPreference = Literal["lower_usage_price", "seller_ltv"]
ApprovalMode = Literal["self_serve_only", "allow_approval"]
TuningDecision = Literal["accepted", "requires_approval", "rejected"]
TWOPLACES = Decimal("0.01")
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def _usage_component(configuration: PricingConfiguration):
return next(
(component for component in configuration.model.charge_components if component.kind == "usage"),
None,
)
def _default_usage_unit_price(configuration: PricingConfiguration) -> Decimal:
usage_component = _usage_component(configuration)
if configuration.usage_unit_price is not None:
return configuration.usage_unit_price
if usage_component and usage_component.unit_price is not None:
return usage_component.unit_price
for parameter in configuration.model.tunable_parameters:
if parameter.key == "overage_unit_price" and parameter.default_value not in (None, ""):
return Decimal(str(parameter.default_value))
return Decimal("0")
def _percent_delta(candidate: Decimal, reference: Decimal) -> Decimal | None:
if reference == Decimal("0"):
return None
return _money(((candidate - reference) / abs(reference)) * Decimal("100"))
@dataclass(frozen=True)
class UsagePriceSearchPolicy:
min_usage_unit_price: Decimal | None = None
max_usage_unit_price: Decimal | None = None
usage_unit_price_step: Decimal = Decimal("0.0001")
max_usage_price_multiplier: Decimal = Decimal("4")
@dataclass(frozen=True)
class CustomerTuningRequest:
included_units: Decimal | None = None
contract_duration_months: int | None = None
minimum_monthly_turnover: Decimal = Decimal("0")
prepaid_amount: Decimal = Decimal("0")
guaranteed_platform_fee: Decimal = Decimal("0")
customer_funded_onboarding: Decimal = Decimal("0")
reduced_cancellation_flexibility: bool | None = None
preference: SolverPreference = "lower_usage_price"
approval_mode: ApprovalMode = "self_serve_only"
@dataclass(frozen=True)
class CustomerTuningOutcome:
model_id: str
model_name: str
decision: TuningDecision
valid: bool
requires_approval: bool
preference: SolverPreference
approval_mode: ApprovalMode
request: CustomerTuningRequest
solved_configuration: dict[str, object]
solved_usage_unit_price: Decimal
reference_model_id: str | None
reference_model_name: str | None
reference_ltv: Decimal | None
required_improvement_threshold: Decimal | None
average_comparable_customer_lifetime_value: Decimal
improvement_vs_reference_pct: Decimal | None
passes_required_improvement: bool
evaluated_candidates: int
tradeoffs: tuple[str, ...]
binding_constraints: tuple[ConstraintResult, ...]
validation: ValidationResult
explanation: str
@dataclass(frozen=True)
class _CandidateAssessment:
configuration: PricingConfiguration
estimate: ComparableLTVEstimate
decision: TuningDecision
passes_required_improvement: bool
improvement_vs_reference_pct: Decimal | None
def _price_range(
configuration: PricingConfiguration,
search_policy: UsagePriceSearchPolicy,
) -> tuple[Decimal, ...]:
step = search_policy.usage_unit_price_step
if step <= Decimal("0"):
raise ValueError("usage_unit_price_step must be positive")
default_usage_price = _default_usage_unit_price(configuration)
min_usage_price = search_policy.min_usage_unit_price
if min_usage_price is None:
min_usage_price = max(configuration.unit_cost, default_usage_price / Decimal("10"), step)
max_usage_price = search_policy.max_usage_unit_price
if max_usage_price is None:
base = default_usage_price if default_usage_price > Decimal("0") else step
max_usage_price = max(min_usage_price, base * search_policy.max_usage_price_multiplier)
if max_usage_price < min_usage_price:
max_usage_price = min_usage_price
values: list[Decimal] = []
current = min_usage_price
while current <= max_usage_price:
values.append(current)
current += step
if not values or values[-1] != max_usage_price:
values.append(max_usage_price)
return tuple(dict.fromkeys(values))
def _resolved_search_policy(
configuration: PricingConfiguration,
request: CustomerTuningRequest,
search_policy: UsagePriceSearchPolicy | None,
) -> UsagePriceSearchPolicy:
policy = search_policy or UsagePriceSearchPolicy()
if request.preference != "lower_usage_price" or policy.max_usage_unit_price is not None:
return policy
return replace(
policy,
max_usage_unit_price=_default_usage_unit_price(configuration),
)
def _commitment_terms(
base_terms: CommitmentTerms,
request: CustomerTuningRequest,
) -> CommitmentTerms:
return replace(
base_terms,
contract_duration_months=(
request.contract_duration_months
if request.contract_duration_months is not None
else base_terms.contract_duration_months
),
minimum_monthly_turnover=request.minimum_monthly_turnover,
prepaid_amount=request.prepaid_amount,
guaranteed_platform_fee=request.guaranteed_platform_fee,
customer_funded_onboarding=request.customer_funded_onboarding,
reduced_cancellation_flexibility=(
request.reduced_cancellation_flexibility
if request.reduced_cancellation_flexibility is not None
else base_terms.reduced_cancellation_flexibility
),
)
def _candidate_configuration(
base_configuration: PricingConfiguration,
request: CustomerTuningRequest,
usage_unit_price: Decimal,
) -> PricingConfiguration:
return replace(
base_configuration,
included_units=(
request.included_units
if request.included_units is not None
else base_configuration.included_units
),
usage_unit_price=usage_unit_price,
commitment_terms=_commitment_terms(base_configuration.commitment_terms, request),
)
def _candidate_decision(
validation: ValidationResult,
passes_required_improvement: bool,
approval_mode: ApprovalMode,
) -> TuningDecision:
if not validation.valid or not passes_required_improvement:
return "rejected"
if validation.requires_approval:
return "requires_approval" if approval_mode == "allow_approval" else "rejected"
return "accepted"
def _headroom_by_constraint(
configuration: PricingConfiguration,
validation: ValidationResult,
) -> dict[str, Decimal]:
metrics = validation.metrics
policy = validation.policy
return {
"usage-variance-limit": policy.max_expected_usage_variance_pct - configuration.expected_usage_variance_pct,
"payment-fee-limit": policy.max_payment_fee_pct - metrics.payment_fee_pct,
"cost-floor-coverage": metrics.monthly_margin,
"minimum-margin": metrics.margin_pct - policy.minimum_margin_pct,
"target-margin-approval": metrics.margin_pct - policy.target_margin_pct,
"discount-exposure-limit": policy.max_discount_pct - metrics.concession_pct,
"discount-approval-threshold": policy.approval_discount_pct - metrics.concession_pct,
}
def _binding_constraints(
configuration: PricingConfiguration,
validation: ValidationResult,
) -> tuple[ConstraintResult, ...]:
flagged = tuple(result for result in validation.constraints if result.status != "pass")
if flagged:
return flagged
headroom = _headroom_by_constraint(configuration, validation)
ordered_ids = [
constraint_id
for constraint_id, _headroom in sorted(headroom.items(), key=lambda item: item[1])
if constraint_id in {result.id for result in validation.constraints}
]
selected_ids = ordered_ids[:2]
if not selected_ids:
return ()
return tuple(
result for result in validation.constraints if result.id in selected_ids
)
def _tradeoffs(
base_configuration: PricingConfiguration,
candidate_configuration: PricingConfiguration,
validation: ValidationResult,
) -> tuple[str, ...]:
tradeoffs: list[str] = []
if (
base_configuration.included_units is not None
and candidate_configuration.included_units is not None
and candidate_configuration.included_units < base_configuration.included_units
):
tradeoffs.append("lower_included_usage")
if (
base_configuration.included_units is not None
and candidate_configuration.included_units is not None
and candidate_configuration.included_units > base_configuration.included_units
):
tradeoffs.append("higher_included_usage")
if (
base_configuration.usage_unit_price is not None
and candidate_configuration.usage_unit_price is not None
and candidate_configuration.usage_unit_price < base_configuration.usage_unit_price
):
tradeoffs.append("lower_usage_price")
if (
base_configuration.usage_unit_price is not None
and candidate_configuration.usage_unit_price is not None
and candidate_configuration.usage_unit_price > base_configuration.usage_unit_price
):
tradeoffs.append("higher_usage_price")
baseline_duration = base_configuration.commitment_terms.contract_duration_months or 0
candidate_duration = validation.metrics.contract_duration_months
if candidate_duration > baseline_duration:
tradeoffs.append("longer_contract_duration")
if validation.metrics.minimum_monthly_turnover > Decimal("0"):
tradeoffs.append("minimum_monthly_turnover")
if validation.metrics.prepaid_amount > Decimal("0"):
tradeoffs.append("prepayment")
if validation.metrics.guaranteed_platform_fee > Decimal("0"):
tradeoffs.append("guaranteed_platform_fee")
if validation.metrics.customer_funded_onboarding > Decimal("0"):
tradeoffs.append("customer_funded_onboarding")
if validation.metrics.reduced_cancellation_flexibility:
tradeoffs.append("reduced_cancellation_flexibility")
for signal in validation.metrics.meaningful_commitment_signals:
if signal not in tradeoffs:
tradeoffs.append(signal)
return tuple(tradeoffs)
def _explanation(
assessment: _CandidateAssessment,
request: CustomerTuningRequest,
reference_estimate: ComparableLTVEstimate | None,
threshold: Decimal | None,
tradeoffs: tuple[str, ...],
binding_constraints: tuple[ConstraintResult, ...],
) -> str:
validation = assessment.estimate.validation
metrics = validation.metrics
if assessment.decision in {"accepted", "requires_approval"}:
outcome = (
"Accepted self-serve tuning"
if assessment.decision == "accepted"
else "Requires seller approval"
)
parts = [
f"{outcome} at {metrics.usage_unit_price} {metrics.currency} usage price.",
(
f"Comparable-customer LTV {assessment.estimate.average_comparable_customer_lifetime_value} "
f"{metrics.currency}"
),
]
if reference_estimate is not None and threshold is not None:
parts.append(
f"clears threshold {threshold} {metrics.currency} versus {reference_estimate.model_name}."
)
if tradeoffs:
parts.append("Trade-offs: " + ", ".join(tradeoffs) + ".")
return " ".join(parts)
failed_constraints = [result.title for result in binding_constraints if result.status == "fail"]
review_constraints = [result.title for result in binding_constraints if result.status == "review"]
parts = ["Rejected tuning request."]
if not assessment.passes_required_improvement and reference_estimate is not None and threshold is not None:
parts.append(
(
f"LTV {assessment.estimate.average_comparable_customer_lifetime_value} {metrics.currency} "
f"misses threshold {threshold} {metrics.currency} versus {reference_estimate.model_name}."
)
)
if failed_constraints:
parts.append("Hard blockers: " + ", ".join(failed_constraints) + ".")
if review_constraints and request.approval_mode == "self_serve_only":
parts.append("Self-serve blockers: " + ", ".join(review_constraints) + ".")
if tradeoffs:
parts.append("Attempted trade-offs: " + ", ".join(tradeoffs) + ".")
return " ".join(parts)
def _acceptable_candidates(
candidates: tuple[_CandidateAssessment, ...],
) -> tuple[_CandidateAssessment, ...]:
return tuple(candidate for candidate in candidates if candidate.decision in {"accepted", "requires_approval"})
def _candidate_sort_key(
candidate: _CandidateAssessment,
preference: SolverPreference,
) -> tuple[Decimal, Decimal]:
usage_price = candidate.estimate.validation.metrics.usage_unit_price
ltv = candidate.estimate.average_comparable_customer_lifetime_value
if preference == "lower_usage_price":
return (usage_price, -ltv)
return (-ltv, usage_price)
def _fallback_sort_key(
candidate: _CandidateAssessment,
preference: SolverPreference,
) -> tuple[int, int, int, Decimal, Decimal]:
usage_price = candidate.estimate.validation.metrics.usage_unit_price
ltv = candidate.estimate.average_comparable_customer_lifetime_value
return (
0 if candidate.passes_required_improvement else 1,
0 if candidate.estimate.validation.valid else 1,
0 if not candidate.estimate.validation.requires_approval else 1,
usage_price if preference == "lower_usage_price" else -ltv,
-ltv if preference == "lower_usage_price" else usage_price,
)
def _select_candidate(
candidates: tuple[_CandidateAssessment, ...],
preference: SolverPreference,
) -> _CandidateAssessment:
acceptable = _acceptable_candidates(candidates)
if acceptable:
return min(acceptable, key=lambda candidate: _candidate_sort_key(candidate, preference))
return min(candidates, key=lambda candidate: _fallback_sort_key(candidate, preference))
def solve_customer_tuning(
base_configuration: PricingConfiguration,
reference_configurations: list[PricingConfiguration],
profile: ComparableCustomerProfile,
boundary_policy: BoundaryPolicy,
ltv_policy: LTVPolicy,
request: CustomerTuningRequest,
search_policy: UsagePriceSearchPolicy | None = None,
) -> CustomerTuningOutcome:
if _usage_component(base_configuration) is None:
raise ValueError("customer tuning prototype currently requires a usage-priced model")
reference_estimates = [
estimate_comparable_customer_ltv(configuration, profile, boundary_policy, ltv_policy)
for configuration in reference_configurations
]
reference_estimate = select_reference_estimate(reference_estimates, profile.eligible_model_ids)
threshold = (
required_improvement_threshold(
reference_estimate.average_comparable_customer_lifetime_value,
ltv_policy.required_improvement_factor,
)
if reference_estimate is not None
else None
)
candidates: list[_CandidateAssessment] = []
for usage_unit_price in _price_range(
base_configuration,
_resolved_search_policy(base_configuration, request, search_policy),
):
configuration = _candidate_configuration(base_configuration, request, usage_unit_price)
estimate = estimate_comparable_customer_ltv(
configuration,
profile,
boundary_policy,
ltv_policy,
)
passes_required_improvement = (
True
if threshold is None
else estimate.average_comparable_customer_lifetime_value >= threshold
)
decision = _candidate_decision(
estimate.validation,
passes_required_improvement,
request.approval_mode,
)
candidates.append(
_CandidateAssessment(
configuration=configuration,
estimate=estimate,
decision=decision,
passes_required_improvement=passes_required_improvement,
improvement_vs_reference_pct=(
_percent_delta(
estimate.average_comparable_customer_lifetime_value,
reference_estimate.average_comparable_customer_lifetime_value,
)
if reference_estimate is not None
else None
),
)
)
if not candidates:
raise ValueError("customer tuning search produced no candidates")
selected = _select_candidate(tuple(candidates), request.preference)
binding_constraints = _binding_constraints(selected.configuration, selected.estimate.validation)
tradeoffs = _tradeoffs(
base_configuration,
selected.configuration,
selected.estimate.validation,
)
explanation = _explanation(
selected,
request,
reference_estimate,
threshold,
tradeoffs,
binding_constraints,
)
return CustomerTuningOutcome(
model_id=base_configuration.model.id,
model_name=base_configuration.model.name,
decision=selected.decision,
valid=selected.estimate.validation.valid,
requires_approval=selected.estimate.validation.requires_approval,
preference=request.preference,
approval_mode=request.approval_mode,
request=request,
solved_configuration=selected.estimate.validation.configuration,
solved_usage_unit_price=selected.estimate.validation.metrics.usage_unit_price,
reference_model_id=reference_estimate.model_id if reference_estimate else None,
reference_model_name=reference_estimate.model_name if reference_estimate else None,
reference_ltv=(
reference_estimate.average_comparable_customer_lifetime_value
if reference_estimate is not None
else None
),
required_improvement_threshold=threshold,
average_comparable_customer_lifetime_value=(
selected.estimate.average_comparable_customer_lifetime_value
),
improvement_vs_reference_pct=selected.improvement_vs_reference_pct,
passes_required_improvement=selected.passes_required_improvement,
evaluated_candidates=len(candidates),
tradeoffs=tradeoffs,
binding_constraints=binding_constraints,
validation=selected.estimate.validation,
explanation=explanation,
)