Files
adaptive-pricing/projects/coulomb-pricing/observatory/load.py
tegwick ea2c2c6403 Fix cumulative platform cost Stripe double-counting
Split infrastructure vs payment-processing costs. Liquidity burn now
uses infrastructure cash out only (€1,155.20 cumulative) because Stripe
fees are already deducted from net member payments. Total platform cost
(€1,158.24) remains visible for gross-margin economics.
2026-06-22 01:51:53 +02:00

147 lines
4.6 KiB
Python

from __future__ import annotations
import json
from decimal import Decimal
from pathlib import Path
_ZERO = Decimal("0")
from .models import (
Budget,
CostEntry,
MembershipRecord,
MonthlyPlatformCost,
PricingModel,
Product,
RevenueEntry,
)
def _money(value: str | int | float) -> Decimal:
return Decimal(str(value))
def _read_json(path: Path) -> dict:
return json.loads(path.read_text(encoding="utf-8"))
def default_data_dir() -> Path:
return Path(__file__).resolve().parent.parent / "data"
def load_product(data_dir: Path | None = None) -> Product:
raw = _read_json((data_dir or default_data_dir()) / "product.json")
return Product(
id=raw["id"],
name=raw["name"],
lifecycle_phase=raw["lifecycle_phase"],
currency=raw["currency"],
description=raw["description"],
active_pricing_model_id=raw["active_pricing_model_id"],
)
def load_budget(data_dir: Path | None = None) -> Budget:
raw = _read_json((data_dir or default_data_dir()) / "budget.json")
return Budget(
currency=raw["currency"],
initial_budget=_money(raw["initial_budget"]),
started=raw["started"],
)
def load_pricing_models(data_dir: Path | None = None) -> list[PricingModel]:
raw = _read_json((data_dir or default_data_dir()) / "pricing-models.json")
return [
PricingModel(
id=item["id"],
name=item["name"],
model_type=item["model_type"],
lifecycle_phase=item["lifecycle_phase"],
currency=item["currency"],
access_fee_amount=_money(item["access_fee_amount"]),
access_fee_cadence=item["access_fee_cadence"],
status=item["status"],
)
for item in raw["models"]
]
def _parse_cost_entries(items: list[dict]) -> list[CostEntry]:
return [
CostEntry(
id=item["id"],
name=item["name"],
category=item["category"],
amount=_money(item["amount"]),
currency=item["currency"],
cadence=item["cadence"],
allocation=item["allocation"],
)
for item in items
]
def load_cost_rate_card(data_dir: Path | None = None) -> tuple[list[CostEntry], dict[str, Decimal]]:
raw = _read_json((data_dir or default_data_dir()) / "costs.json")
items = raw.get("rate_card", raw.get("entries", []))
fx = {pair: _money(rate) for pair, rate in raw.get("fx_rates", {}).items()}
return _parse_cost_entries(items), fx
def load_monthly_platform_costs(data_dir: Path | None = None) -> list[MonthlyPlatformCost]:
raw = _read_json((data_dir or default_data_dir()) / "costs.json")
rows: list[MonthlyPlatformCost] = []
for item in raw.get("monthly_history", []):
if "infrastructure_cost" in item:
infrastructure = _money(item["infrastructure_cost"])
processing = _money(item.get("payment_processing_cost", "0"))
else:
# Legacy single platform_cost field treated as infrastructure only.
infrastructure = _money(item["platform_cost"])
processing = _ZERO
rows.append(
MonthlyPlatformCost(
period=item["period"],
infrastructure_cost=infrastructure,
payment_processing_cost=processing,
active_members=item["active_members"],
gross_revenue=_money(item["gross_revenue"]),
)
)
return rows
def load_revenue(data_dir: Path | None = None) -> list[RevenueEntry]:
raw = _read_json((data_dir or default_data_dir()) / "revenue.json")
return [
RevenueEntry(
id=item["id"],
period=item["period"],
gross_amount=_money(item["gross_amount"]),
fees_amount=_money(item["fees_amount"]),
refunds_amount=_money(item.get("refunds_amount", "0")),
net_amount=_money(item["net_amount"]),
currency=item["currency"],
source=item["source"],
)
for item in raw["entries"]
]
def load_membership(data_dir: Path | None = None) -> list[MembershipRecord]:
raw = _read_json((data_dir or default_data_dir()) / "membership.json")
return [
MembershipRecord(
id=item["id"],
status=item["status"],
joined_at=item["joined_at"],
plan_id=item["plan_id"],
churned_at=item.get("churned_at"),
)
for item in raw["members"]
]
def latest_period(monthly_costs: list[MonthlyPlatformCost]) -> str:
return max(item.period for item in monthly_costs)