Files
adaptive-pricing/projects/coulomb-pricing/observatory/importers/stripe.py
tegwick 0a38def5a5 Complete Economic Observatory MVP (ADAPTIVE-WP-0002)
Add file-based Bubble, Stripe, and OpenRouter importers; usage attribution,
cost allocation, pricing simulator, credit wallets, and recommendations in the
dashboard API. Document whynot-design UI workflow and archive the finished
workplan with all ten tasks marked done.
2026-06-22 23:23:31 +02:00

54 lines
2.0 KiB
Python

from __future__ import annotations
import argparse
from decimal import Decimal
from pathlib import Path
from . import _io
def _money(value: str | int | float) -> str:
return f"{Decimal(str(value)):.2f}"
def import_payments(export: dict) -> dict:
records = []
for index, charge in enumerate(export.get("charges", export.get("records", [])), start=1):
period = charge["period"]
records.append(
{
"id": charge.get("id") or f"pay-{period}-{index}",
"period": period,
"gross_amount": _money(charge.get("gross") or charge["gross_amount"]),
"fees_amount": _money(charge.get("fee") or charge.get("fees_amount", "0")),
"refunds_amount": _money(charge.get("refunds_amount", "0")),
"net_amount": _money(charge.get("net") or charge["net_amount"]),
"currency": charge.get("currency", "EUR"),
"source": "stripe",
"member_count": charge.get("member_count", 1),
"member_username": charge.get("customer") or charge.get("member_username"),
"product": charge.get("product"),
"payout_account": charge.get("payout_account"),
}
)
return {"version": 2, "records": sorted(records, key=lambda item: item["period"])}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Import Stripe charge export")
parser.add_argument("--input", type=Path, required=True, help="Stripe JSON export")
parser.add_argument(
"--output",
type=Path,
default=Path(__file__).resolve().parent.parent.parent / "data" / "payment_records.json",
)
args = parser.parse_args(argv)
payload = import_payments(_io.read_export(args.input))
_io.write_registry(args.output, payload)
print(f"Wrote {len(payload['records'])} payment records → {args.output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())