from __future__ import annotations from dataclasses import dataclass, replace from decimal import Decimal, ROUND_HALF_UP from typing import Literal from .boundary_engine import ( BoundaryPolicy, CommitmentTerms, ConstraintResult, PricingConfiguration, ValidationResult, ) from .comparable_ltv import ( ComparableCustomerProfile, ComparableLTVEstimate, LTVPolicy, estimate_comparable_customer_ltv, required_improvement_threshold, select_reference_estimate, ) SolverPreference = Literal["lower_usage_price", "seller_ltv"] ApprovalMode = Literal["self_serve_only", "allow_approval"] TuningDecision = Literal["accepted", "requires_approval", "rejected"] TWOPLACES = Decimal("0.01") def _money(value: Decimal) -> Decimal: return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP) def _usage_component(configuration: PricingConfiguration): return next( (component for component in configuration.model.charge_components if component.kind == "usage"), None, ) def _default_usage_unit_price(configuration: PricingConfiguration) -> Decimal: usage_component = _usage_component(configuration) if configuration.usage_unit_price is not None: return configuration.usage_unit_price if usage_component and usage_component.unit_price is not None: return usage_component.unit_price for parameter in configuration.model.tunable_parameters: if parameter.key == "overage_unit_price" and parameter.default_value not in (None, ""): return Decimal(str(parameter.default_value)) return Decimal("0") def _percent_delta(candidate: Decimal, reference: Decimal) -> Decimal | None: if reference == Decimal("0"): return None return _money(((candidate - reference) / abs(reference)) * Decimal("100")) @dataclass(frozen=True) class UsagePriceSearchPolicy: min_usage_unit_price: Decimal | None = None max_usage_unit_price: Decimal | None = None usage_unit_price_step: Decimal = Decimal("0.0001") max_usage_price_multiplier: Decimal = Decimal("4") @dataclass(frozen=True) class CustomerTuningRequest: included_units: Decimal | None = None contract_duration_months: int | None = None minimum_monthly_turnover: Decimal = Decimal("0") prepaid_amount: Decimal = Decimal("0") guaranteed_platform_fee: Decimal = Decimal("0") customer_funded_onboarding: Decimal = Decimal("0") reduced_cancellation_flexibility: bool | None = None preference: SolverPreference = "lower_usage_price" approval_mode: ApprovalMode = "self_serve_only" @dataclass(frozen=True) class CustomerTuningOutcome: model_id: str model_name: str decision: TuningDecision valid: bool requires_approval: bool preference: SolverPreference approval_mode: ApprovalMode request: CustomerTuningRequest solved_configuration: dict[str, object] solved_usage_unit_price: Decimal reference_model_id: str | None reference_model_name: str | None reference_ltv: Decimal | None required_improvement_threshold: Decimal | None average_comparable_customer_lifetime_value: Decimal improvement_vs_reference_pct: Decimal | None passes_required_improvement: bool evaluated_candidates: int tradeoffs: tuple[str, ...] binding_constraints: tuple[ConstraintResult, ...] validation: ValidationResult explanation: str @dataclass(frozen=True) class _CandidateAssessment: configuration: PricingConfiguration estimate: ComparableLTVEstimate decision: TuningDecision passes_required_improvement: bool improvement_vs_reference_pct: Decimal | None def _price_range( configuration: PricingConfiguration, search_policy: UsagePriceSearchPolicy, ) -> tuple[Decimal, ...]: step = search_policy.usage_unit_price_step if step <= Decimal("0"): raise ValueError("usage_unit_price_step must be positive") default_usage_price = _default_usage_unit_price(configuration) min_usage_price = search_policy.min_usage_unit_price if min_usage_price is None: min_usage_price = max(configuration.unit_cost, default_usage_price / Decimal("10"), step) max_usage_price = search_policy.max_usage_unit_price if max_usage_price is None: base = default_usage_price if default_usage_price > Decimal("0") else step max_usage_price = max(min_usage_price, base * search_policy.max_usage_price_multiplier) if max_usage_price < min_usage_price: max_usage_price = min_usage_price values: list[Decimal] = [] current = min_usage_price while current <= max_usage_price: values.append(current) current += step if not values or values[-1] != max_usage_price: values.append(max_usage_price) return tuple(dict.fromkeys(values)) def _resolved_search_policy( configuration: PricingConfiguration, request: CustomerTuningRequest, search_policy: UsagePriceSearchPolicy | None, ) -> UsagePriceSearchPolicy: policy = search_policy or UsagePriceSearchPolicy() if request.preference != "lower_usage_price" or policy.max_usage_unit_price is not None: return policy return replace( policy, max_usage_unit_price=_default_usage_unit_price(configuration), ) def _commitment_terms( base_terms: CommitmentTerms, request: CustomerTuningRequest, ) -> CommitmentTerms: return replace( base_terms, contract_duration_months=( request.contract_duration_months if request.contract_duration_months is not None else base_terms.contract_duration_months ), minimum_monthly_turnover=request.minimum_monthly_turnover, prepaid_amount=request.prepaid_amount, guaranteed_platform_fee=request.guaranteed_platform_fee, customer_funded_onboarding=request.customer_funded_onboarding, reduced_cancellation_flexibility=( request.reduced_cancellation_flexibility if request.reduced_cancellation_flexibility is not None else base_terms.reduced_cancellation_flexibility ), ) def _candidate_configuration( base_configuration: PricingConfiguration, request: CustomerTuningRequest, usage_unit_price: Decimal, ) -> PricingConfiguration: return replace( base_configuration, included_units=( request.included_units if request.included_units is not None else base_configuration.included_units ), usage_unit_price=usage_unit_price, commitment_terms=_commitment_terms(base_configuration.commitment_terms, request), ) def _candidate_decision( validation: ValidationResult, passes_required_improvement: bool, approval_mode: ApprovalMode, ) -> TuningDecision: if not validation.valid or not passes_required_improvement: return "rejected" if validation.requires_approval: return "requires_approval" if approval_mode == "allow_approval" else "rejected" return "accepted" def _headroom_by_constraint( configuration: PricingConfiguration, validation: ValidationResult, ) -> dict[str, Decimal]: metrics = validation.metrics policy = validation.policy return { "usage-variance-limit": policy.max_expected_usage_variance_pct - configuration.expected_usage_variance_pct, "payment-fee-limit": policy.max_payment_fee_pct - metrics.payment_fee_pct, "cost-floor-coverage": metrics.monthly_margin, "minimum-margin": metrics.margin_pct - policy.minimum_margin_pct, "target-margin-approval": metrics.margin_pct - policy.target_margin_pct, "discount-exposure-limit": policy.max_discount_pct - metrics.concession_pct, "discount-approval-threshold": policy.approval_discount_pct - metrics.concession_pct, } def _binding_constraints( configuration: PricingConfiguration, validation: ValidationResult, ) -> tuple[ConstraintResult, ...]: flagged = tuple(result for result in validation.constraints if result.status != "pass") if flagged: return flagged headroom = _headroom_by_constraint(configuration, validation) ordered_ids = [ constraint_id for constraint_id, _headroom in sorted(headroom.items(), key=lambda item: item[1]) if constraint_id in {result.id for result in validation.constraints} ] selected_ids = ordered_ids[:2] if not selected_ids: return () return tuple( result for result in validation.constraints if result.id in selected_ids ) def _tradeoffs( base_configuration: PricingConfiguration, candidate_configuration: PricingConfiguration, validation: ValidationResult, ) -> tuple[str, ...]: tradeoffs: list[str] = [] if ( base_configuration.included_units is not None and candidate_configuration.included_units is not None and candidate_configuration.included_units < base_configuration.included_units ): tradeoffs.append("lower_included_usage") if ( base_configuration.included_units is not None and candidate_configuration.included_units is not None and candidate_configuration.included_units > base_configuration.included_units ): tradeoffs.append("higher_included_usage") if ( base_configuration.usage_unit_price is not None and candidate_configuration.usage_unit_price is not None and candidate_configuration.usage_unit_price < base_configuration.usage_unit_price ): tradeoffs.append("lower_usage_price") if ( base_configuration.usage_unit_price is not None and candidate_configuration.usage_unit_price is not None and candidate_configuration.usage_unit_price > base_configuration.usage_unit_price ): tradeoffs.append("higher_usage_price") baseline_duration = base_configuration.commitment_terms.contract_duration_months or 0 candidate_duration = validation.metrics.contract_duration_months if candidate_duration > baseline_duration: tradeoffs.append("longer_contract_duration") if validation.metrics.minimum_monthly_turnover > Decimal("0"): tradeoffs.append("minimum_monthly_turnover") if validation.metrics.prepaid_amount > Decimal("0"): tradeoffs.append("prepayment") if validation.metrics.guaranteed_platform_fee > Decimal("0"): tradeoffs.append("guaranteed_platform_fee") if validation.metrics.customer_funded_onboarding > Decimal("0"): tradeoffs.append("customer_funded_onboarding") if validation.metrics.reduced_cancellation_flexibility: tradeoffs.append("reduced_cancellation_flexibility") for signal in validation.metrics.meaningful_commitment_signals: if signal not in tradeoffs: tradeoffs.append(signal) return tuple(tradeoffs) def _explanation( assessment: _CandidateAssessment, request: CustomerTuningRequest, reference_estimate: ComparableLTVEstimate | None, threshold: Decimal | None, tradeoffs: tuple[str, ...], binding_constraints: tuple[ConstraintResult, ...], ) -> str: validation = assessment.estimate.validation metrics = validation.metrics if assessment.decision in {"accepted", "requires_approval"}: outcome = ( "Accepted self-serve tuning" if assessment.decision == "accepted" else "Requires seller approval" ) parts = [ f"{outcome} at {metrics.usage_unit_price} {metrics.currency} usage price.", ( f"Comparable-customer LTV {assessment.estimate.average_comparable_customer_lifetime_value} " f"{metrics.currency}" ), ] if reference_estimate is not None and threshold is not None: parts.append( f"clears threshold {threshold} {metrics.currency} versus {reference_estimate.model_name}." ) if tradeoffs: parts.append("Trade-offs: " + ", ".join(tradeoffs) + ".") return " ".join(parts) failed_constraints = [result.title for result in binding_constraints if result.status == "fail"] review_constraints = [result.title for result in binding_constraints if result.status == "review"] parts = ["Rejected tuning request."] if not assessment.passes_required_improvement and reference_estimate is not None and threshold is not None: parts.append( ( f"LTV {assessment.estimate.average_comparable_customer_lifetime_value} {metrics.currency} " f"misses threshold {threshold} {metrics.currency} versus {reference_estimate.model_name}." ) ) if failed_constraints: parts.append("Hard blockers: " + ", ".join(failed_constraints) + ".") if review_constraints and request.approval_mode == "self_serve_only": parts.append("Self-serve blockers: " + ", ".join(review_constraints) + ".") if tradeoffs: parts.append("Attempted trade-offs: " + ", ".join(tradeoffs) + ".") return " ".join(parts) def _acceptable_candidates( candidates: tuple[_CandidateAssessment, ...], ) -> tuple[_CandidateAssessment, ...]: return tuple(candidate for candidate in candidates if candidate.decision in {"accepted", "requires_approval"}) def _candidate_sort_key( candidate: _CandidateAssessment, preference: SolverPreference, ) -> tuple[Decimal, Decimal]: usage_price = candidate.estimate.validation.metrics.usage_unit_price ltv = candidate.estimate.average_comparable_customer_lifetime_value if preference == "lower_usage_price": return (usage_price, -ltv) return (-ltv, usage_price) def _fallback_sort_key( candidate: _CandidateAssessment, preference: SolverPreference, ) -> tuple[int, int, int, Decimal, Decimal]: usage_price = candidate.estimate.validation.metrics.usage_unit_price ltv = candidate.estimate.average_comparable_customer_lifetime_value return ( 0 if candidate.passes_required_improvement else 1, 0 if candidate.estimate.validation.valid else 1, 0 if not candidate.estimate.validation.requires_approval else 1, usage_price if preference == "lower_usage_price" else -ltv, -ltv if preference == "lower_usage_price" else usage_price, ) def _select_candidate( candidates: tuple[_CandidateAssessment, ...], preference: SolverPreference, ) -> _CandidateAssessment: acceptable = _acceptable_candidates(candidates) if acceptable: return min(acceptable, key=lambda candidate: _candidate_sort_key(candidate, preference)) return min(candidates, key=lambda candidate: _fallback_sort_key(candidate, preference)) def solve_customer_tuning( base_configuration: PricingConfiguration, reference_configurations: list[PricingConfiguration], profile: ComparableCustomerProfile, boundary_policy: BoundaryPolicy, ltv_policy: LTVPolicy, request: CustomerTuningRequest, search_policy: UsagePriceSearchPolicy | None = None, ) -> CustomerTuningOutcome: if _usage_component(base_configuration) is None: raise ValueError("customer tuning prototype currently requires a usage-priced model") reference_estimates = [ estimate_comparable_customer_ltv(configuration, profile, boundary_policy, ltv_policy) for configuration in reference_configurations ] reference_estimate = select_reference_estimate(reference_estimates, profile.eligible_model_ids) threshold = ( required_improvement_threshold( reference_estimate.average_comparable_customer_lifetime_value, ltv_policy.required_improvement_factor, ) if reference_estimate is not None else None ) candidates: list[_CandidateAssessment] = [] for usage_unit_price in _price_range( base_configuration, _resolved_search_policy(base_configuration, request, search_policy), ): configuration = _candidate_configuration(base_configuration, request, usage_unit_price) estimate = estimate_comparable_customer_ltv( configuration, profile, boundary_policy, ltv_policy, ) passes_required_improvement = ( True if threshold is None else estimate.average_comparable_customer_lifetime_value >= threshold ) decision = _candidate_decision( estimate.validation, passes_required_improvement, request.approval_mode, ) candidates.append( _CandidateAssessment( configuration=configuration, estimate=estimate, decision=decision, passes_required_improvement=passes_required_improvement, improvement_vs_reference_pct=( _percent_delta( estimate.average_comparable_customer_lifetime_value, reference_estimate.average_comparable_customer_lifetime_value, ) if reference_estimate is not None else None ), ) ) if not candidates: raise ValueError("customer tuning search produced no candidates") selected = _select_candidate(tuple(candidates), request.preference) binding_constraints = _binding_constraints(selected.configuration, selected.estimate.validation) tradeoffs = _tradeoffs( base_configuration, selected.configuration, selected.estimate.validation, ) explanation = _explanation( selected, request, reference_estimate, threshold, tradeoffs, binding_constraints, ) return CustomerTuningOutcome( model_id=base_configuration.model.id, model_name=base_configuration.model.name, decision=selected.decision, valid=selected.estimate.validation.valid, requires_approval=selected.estimate.validation.requires_approval, preference=request.preference, approval_mode=request.approval_mode, request=request, solved_configuration=selected.estimate.validation.configuration, solved_usage_unit_price=selected.estimate.validation.metrics.usage_unit_price, reference_model_id=reference_estimate.model_id if reference_estimate else None, reference_model_name=reference_estimate.model_name if reference_estimate else None, reference_ltv=( reference_estimate.average_comparable_customer_lifetime_value if reference_estimate is not None else None ), required_improvement_threshold=threshold, average_comparable_customer_lifetime_value=( selected.estimate.average_comparable_customer_lifetime_value ), improvement_vs_reference_pct=selected.improvement_vs_reference_pct, passes_required_improvement=selected.passes_required_improvement, evaluated_candidates=len(candidates), tradeoffs=tradeoffs, binding_constraints=binding_constraints, validation=selected.estimate.validation, explanation=explanation, )