Complete Economic Observatory MVP (ADAPTIVE-WP-0002)

Add file-based Bubble, Stripe, and OpenRouter importers; usage attribution,
cost allocation, pricing simulator, credit wallets, and recommendations in the
dashboard API. Document whynot-design UI workflow and archive the finished
workplan with all ten tasks marked done.
This commit is contained in:
2026-06-22 23:23:31 +02:00
parent 04ee6d2421
commit 0a38def5a5
26 changed files with 871 additions and 111 deletions

View File

@@ -1,3 +1,3 @@
"""Coulomb Social Economic Observatory — Sprint 1 foundations."""
"""Coulomb Social Economic Observatory — MVP (ledger, API, importers, simulator)."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from decimal import Decimal
from .models import EconomicsSnapshot
from .usage import build_usage_summary
def build_cost_allocation(
snapshot: EconomicsSnapshot,
usage_records: list[dict],
) -> dict:
usage = build_usage_summary(usage_records, snapshot.period)
variable_ai = usage["total_ai_spend_eur"]
fixed = snapshot.monthly_infrastructure_cost
variable_processing = snapshot.monthly_payment_processing_cost
total = fixed + variable_processing + variable_ai
contribution = snapshot.monthly_revenue - total
return {
"period": snapshot.period,
"currency": snapshot.currency,
"fixed_costs_eur": fixed,
"variable_processing_eur": variable_processing,
"variable_ai_eur": variable_ai,
"total_platform_cost_eur": total,
"cost_floor_eur": snapshot.cost_per_member,
"contribution_margin_eur": contribution,
"contribution_margin_pct": snapshot.gross_margin_pct,
"cost_per_member_eur": snapshot.cost_per_member,
"active_members": snapshot.active_members,
}

View File

@@ -19,7 +19,13 @@ from .load import (
load_product,
load_value_range,
)
from .allocation import build_cost_allocation
from .credits import build_credit_summary, load_credit_wallets
from .membership_analytics import build_membership_analytics
from .pricing_context import build_cost_floor, build_market_price_view, build_value_range_view
from .recommendations import build_pricing_recommendations
from .simulator import build_pricing_simulations
from .usage import build_usage_summary, load_usage_records
def _serialize(value: Any) -> Any:
@@ -77,6 +83,23 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
value_range_raw = load_value_range(root)
market_raw = load_market_signals(root)
usage_records = load_usage_records(root)
usage_summary = build_usage_summary(usage_records, target_period)
cost_floor = build_cost_floor(snapshot, models)
value_range = build_value_range_view(value_range_raw, snapshot, product, models)
market_price = build_market_price_view(market_raw)
cost_allocation = build_cost_allocation(snapshot, usage_records)
ai_cost_per_member = usage_summary["cost_per_active_user_eur"]
simulations = build_pricing_simulations(snapshot, models, ai_cost_per_member)
credit_wallets = load_credit_wallets(root)
credit_summary = build_credit_summary(
credit_wallets,
{key: value for key, value in usage_summary["by_member"].items()},
target_period,
)
recommendations = build_pricing_recommendations(
cost_floor, value_range, market_price, simulations, usage_summary
)
return _serialize(
{
@@ -91,9 +114,17 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
"members": members,
"payments": payments,
"expense_record_count": len(expenses),
"cost_floor": build_cost_floor(snapshot, models),
"value_range": build_value_range_view(value_range_raw, snapshot, product, models),
"market_price": build_market_price_view(market_raw),
"membership_analytics": build_membership_analytics(
members, target_period, [row["period"] for row in history]
),
"cost_floor": cost_floor,
"value_range": value_range,
"market_price": market_price,
"usage": usage_summary,
"cost_allocation": cost_allocation,
"pricing_simulations": simulations,
"credit_wallets": credit_summary,
"recommendations": recommendations,
"infrastructure": {
"domains": _load_json_catalog(root, "domains.json"),
"virtual_servers": _load_json_catalog(root, "virtual_servers.json"),

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
from typing import Any
from .load import _read_json, default_data_dir
TWOPLACES = Decimal("0.01")
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def load_credit_wallets(data_dir=None) -> dict[str, Any]:
root = data_dir or default_data_dir()
path = Path(root) / "credit_wallets.json"
if not path.exists():
return {"version": 1, "currency": "EUR", "wallets": []}
return _read_json(path)
def build_credit_summary(raw: dict, usage_by_member: dict[str, Decimal], period: str) -> dict:
wallets = []
for item in raw.get("wallets", []):
member_id = item["member_id"]
allowance = Decimal(str(item.get("monthly_allowance_eur", "0")))
used = usage_by_member.get(member_id, Decimal(str(item.get("used_eur", "0"))))
remaining = max(Decimal("0"), allowance - used)
wallets.append(
{
"member_id": member_id,
"period": period,
"monthly_allowance_eur": _money(allowance),
"used_eur": _money(used),
"remaining_eur": _money(remaining),
"overage_eur": _money(max(Decimal("0"), used - allowance)),
}
)
return {
"period": period,
"currency": raw.get("currency", "EUR"),
"wallet_count": len(wallets),
"wallets": wallets,
"notes": "Observatory-only credit accounting; no customer billing in MVP.",
}

View File

@@ -0,0 +1 @@
"""File-based importers for Bubble, Stripe, and OpenRouter exports."""

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
import json
from pathlib import Path
def read_export(path: Path) -> dict:
return json.loads(path.read_text(encoding="utf-8"))
def write_registry(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
import argparse
from pathlib import Path
from . import _io
BUBBLE_STATUS = {"Active": "active", "Cancelled": "churned", "Paused": "paused"}
def import_membership(export: dict, plan_id: str = "flat-899-eur-monthly") -> dict:
members = []
for index, user in enumerate(export.get("users", []), start=1):
bubble_id = user.get("bubble_id") or user.get("_id") or f"bubble-{index}"
username = user.get("username") or user.get("email") or bubble_id
status = BUBBLE_STATUS.get(user.get("status", "Active"), "active")
members.append(
{
"id": f"member-{username}",
"username": username,
"external_id": bubble_id,
"status": status,
"joined_at": user.get("created") or user.get("joined_at"),
"plan_id": user.get("plan") or plan_id,
"source": "bubble",
"churned_at": user.get("cancelled_at") if status == "churned" else None,
}
)
return {
"version": 1,
"snapshot_date": export.get("exported_at", export.get("snapshot_date")),
"members": members,
"note": "Imported from Bubble export",
}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Import Bubble membership export")
parser.add_argument("--input", type=Path, required=True, help="Bubble JSON export")
parser.add_argument(
"--output",
type=Path,
default=Path(__file__).resolve().parent.parent.parent / "data" / "membership.json",
)
parser.add_argument("--plan-id", default="flat-899-eur-monthly")
args = parser.parse_args(argv)
payload = import_membership(_io.read_export(args.input), args.plan_id)
_io.write_registry(args.output, payload)
print(f"Wrote {len(payload['members'])} members → {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,57 @@
from __future__ import annotations
import argparse
from decimal import Decimal
from pathlib import Path
from . import _io
def _money(value: str | int | float) -> str:
return f"{Decimal(str(value)):.2f}"
def import_usage(export: dict, fx_usd_eur: str = "0.92") -> dict:
rate = Decimal(str(fx_usd_eur))
records = []
for index, row in enumerate(export.get("usage", export.get("records", [])), start=1):
cost_usd = Decimal(str(row.get("cost_usd") or row.get("cost", "0")))
cost_eur = (cost_usd * rate).quantize(Decimal("0.01"))
records.append(
{
"id": row.get("id") or f"usage-{row['period']}-{index}",
"period": row["period"],
"member_id": row.get("member_id") or row.get("user_id"),
"model": row["model"],
"tokens": row.get("tokens", 0),
"cost_usd": _money(cost_usd),
"cost_eur": _money(cost_eur),
"source": "openrouter",
}
)
return {
"version": 1,
"fx_usd_eur": str(rate),
"records": sorted(records, key=lambda item: (item["period"], item["id"])),
}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Import OpenRouter usage export")
parser.add_argument("--input", type=Path, required=True, help="OpenRouter JSON export")
parser.add_argument(
"--output",
type=Path,
default=Path(__file__).resolve().parent.parent.parent / "data" / "usage_records.json",
)
parser.add_argument("--fx-usd-eur", default="0.92")
args = parser.parse_args(argv)
payload = import_usage(_io.read_export(args.input), args.fx_usd_eur)
_io.write_registry(args.output, payload)
print(f"Wrote {len(payload['records'])} usage records → {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import argparse
from decimal import Decimal
from pathlib import Path
from . import _io
def _money(value: str | int | float) -> str:
return f"{Decimal(str(value)):.2f}"
def import_payments(export: dict) -> dict:
records = []
for index, charge in enumerate(export.get("charges", export.get("records", [])), start=1):
period = charge["period"]
records.append(
{
"id": charge.get("id") or f"pay-{period}-{index}",
"period": period,
"gross_amount": _money(charge.get("gross") or charge["gross_amount"]),
"fees_amount": _money(charge.get("fee") or charge.get("fees_amount", "0")),
"refunds_amount": _money(charge.get("refunds_amount", "0")),
"net_amount": _money(charge.get("net") or charge["net_amount"]),
"currency": charge.get("currency", "EUR"),
"source": "stripe",
"member_count": charge.get("member_count", 1),
"member_username": charge.get("customer") or charge.get("member_username"),
"product": charge.get("product"),
"payout_account": charge.get("payout_account"),
}
)
return {"version": 2, "records": sorted(records, key=lambda item: item["period"])}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Import Stripe charge export")
parser.add_argument("--input", type=Path, required=True, help="Stripe JSON export")
parser.add_argument(
"--output",
type=Path,
default=Path(__file__).resolve().parent.parent.parent / "data" / "payment_records.json",
)
args = parser.parse_args(argv)
payload = import_payments(_io.read_export(args.input))
_io.write_registry(args.output, payload)
print(f"Wrote {len(payload['records'])} payment records → {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from .models import MembershipRecord
def _period_prefix(date: str | None) -> str | None:
if not date or len(date) < 7:
return None
return date[:7]
def build_membership_analytics(
members: list[MembershipRecord],
period: str,
history: list[str],
) -> dict:
active = sum(1 for member in members if member.status == "active")
new_members = sum(1 for member in members if _period_prefix(member.joined_at) == period)
churned = sum(1 for member in members if _period_prefix(member.churned_at) == period)
prior_period = None
sorted_periods = sorted(history)
if period in sorted_periods:
index = sorted_periods.index(period)
if index > 0:
prior_period = sorted_periods[index - 1]
prior_active = active
if prior_period:
prior_active = sum(
1
for member in members
if member.status == "active" and _period_prefix(member.joined_at) <= prior_period
)
growth_rate = None
if prior_active:
growth_rate = round(((active - prior_active) / prior_active) * 100, 1)
return {
"period": period,
"total_members": len(members),
"active_members": active,
"new_members": new_members,
"churned_members": churned,
"growth_rate_pct": growth_rate,
"snapshot_source": "membership.json",
}

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
from decimal import Decimal
def build_pricing_recommendations(
cost_floor: dict,
value_range: dict,
market_price: dict,
simulations: dict,
usage_summary: dict,
) -> list[dict]:
recommendations: list[dict] = []
margin_pct = Decimal(str(cost_floor.get("gross_margin_pct", "0")))
ai_spend = Decimal(str(usage_summary.get("total_ai_spend_eur", "0")))
active_price = Decimal(str(value_range.get("current_price_eur", "0")))
cost_per_member = Decimal(str(cost_floor.get("cost_per_member", "0")))
if margin_pct < Decimal("10"):
recommendations.append(
{
"id": "margin-pressure",
"priority": "high",
"title": "Margin below 10%",
"rationale": f"Gross margin is {margin_pct}% at current pricing.",
"suggested_action": "Review infrastructure cost or test a higher access fee within value-range bands.",
}
)
if ai_spend > Decimal("0") and cost_per_member > Decimal("0"):
ai_ratio = (ai_spend / cost_per_member) * Decimal("100")
if ai_ratio > Decimal("15"):
best = simulations.get("best_margin_scenario_id")
recommendations.append(
{
"id": "usage-pricing-signal",
"priority": "medium",
"title": "AI cost becoming material",
"rationale": f"AI spend is {ai_ratio:.1f}% of cost-per-member this period.",
"suggested_action": f"Evaluate hybrid model '{best}' in the pricing simulator before customer-visible credits.",
}
)
if market_price.get("market_high_eur") and active_price < Decimal(str(market_price["market_high_eur"])):
headroom = Decimal(str(value_range.get("aggregate_high_eur", active_price))) - active_price
if headroom > Decimal("5"):
recommendations.append(
{
"id": "value-headroom",
"priority": "low",
"title": "Value headroom above list price",
"rationale": f"Aggregate value band high is €{value_range['aggregate_high_eur']} vs €{active_price} list.",
"suggested_action": "Run a staged price experiment within the solo-builder segment band.",
}
)
if not recommendations:
recommendations.append(
{
"id": "hold-course",
"priority": "low",
"title": "Hold current pricing",
"rationale": "No urgent margin, usage, or competitive signals detected.",
"suggested_action": "Continue observatory tracking; re-run after next ledger period.",
}
)
return recommendations

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from decimal import Decimal, ROUND_HALF_UP
from .models import EconomicsSnapshot, PricingModel
TWOPLACES = Decimal("0.01")
OVERAGE_RATE = Decimal("0.002") # EUR per token above allowance (observatory estimate)
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def _simulate_model(
model: PricingModel,
snapshot: EconomicsSnapshot,
ai_cost_per_member: Decimal,
included_tokens: int = 100_000,
actual_tokens: int = 120_000,
) -> dict:
members = snapshot.active_members or 1
subscription_revenue = model.access_fee_amount * members
overage_revenue = Decimal("0")
if model.model_type == "hybrid_subscription_usage" and actual_tokens > included_tokens:
overage_tokens = actual_tokens - included_tokens
overage_revenue = OVERAGE_RATE * overage_tokens * members
gross_revenue = subscription_revenue + overage_revenue
platform_cost = snapshot.monthly_total_platform_cost + (ai_cost_per_member * members)
margin = gross_revenue - platform_cost
margin_pct = (margin / gross_revenue * Decimal("100")) if gross_revenue else Decimal("0")
return {
"model_id": model.id,
"model_name": model.name,
"model_type": model.model_type,
"status": model.status,
"access_fee_eur": model.access_fee_amount,
"projected_revenue_eur": _money(gross_revenue),
"projected_overage_eur": _money(overage_revenue),
"projected_platform_cost_eur": _money(platform_cost),
"projected_margin_eur": _money(margin),
"projected_margin_pct": _money(margin_pct),
"assumed_tokens_per_member": actual_tokens,
"included_tokens": included_tokens if model.model_type != "flat_subscription" else None,
}
def build_pricing_simulations(
snapshot: EconomicsSnapshot,
models: list[PricingModel],
ai_cost_per_member: Decimal,
) -> dict:
scenarios = [
_simulate_model(model, snapshot, ai_cost_per_member)
for model in models
if model.status in ("active", "candidate")
]
active = next((item for item in scenarios if item["status"] == "active"), scenarios[0])
best_margin = max(scenarios, key=lambda item: item["projected_margin_eur"])
return {
"period": snapshot.period,
"currency": snapshot.currency,
"active_scenario_id": active["model_id"],
"best_margin_scenario_id": best_margin["model_id"],
"scenarios": scenarios,
"notes": "Projections hold member count and infrastructure cost constant; overage uses observatory token estimate.",
}

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from decimal import Decimal, ROUND_HALF_UP
from typing import Any
TWOPLACES = Decimal("0.01")
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def load_usage_records(data_dir) -> list[dict[str, Any]]:
from pathlib import Path
from .load import _read_json, default_data_dir
root = data_dir or default_data_dir()
path = Path(root) / "usage_records.json"
if not path.exists():
return []
raw = _read_json(path)
return list(raw.get("records", []))
def build_usage_summary(records: list[dict[str, Any]], period: str) -> dict:
period_rows = [row for row in records if row.get("period") == period]
by_member: dict[str, Decimal] = {}
by_model: dict[str, Decimal] = {}
total = Decimal("0")
for row in period_rows:
cost = Decimal(str(row.get("cost_eur", "0")))
total += cost
member_id = row.get("member_id") or "unknown"
model = row.get("model") or "unknown"
by_member[member_id] = by_member.get(member_id, Decimal("0")) + cost
by_model[model] = by_model.get(model, Decimal("0")) + cost
active_members = len(by_member) or 1
return {
"period": period,
"total_ai_spend_eur": _money(total),
"cost_per_active_user_eur": _money(total / active_members),
"by_member": {key: _money(value) for key, value in sorted(by_member.items())},
"by_model": {key: _money(value) for key, value in sorted(by_model.items())},
"record_count": len(period_rows),
}