Implement governance workflows and close WP-0008

This commit is contained in:
codex
2026-07-03 01:27:37 +02:00
parent a76e57ba89
commit a9a55e19f1
11 changed files with 1177 additions and 74 deletions

View File

@@ -0,0 +1,24 @@
{
"policy_id": "coulomb-governance-v1",
"max_self_serve_discount_pct": "10",
"max_customer_visible_price_increase_pct": "15",
"max_active_experiments": 2,
"max_concurrent_candidate_rollouts": 1,
"require_approval_for_candidate_rollout": true,
"require_approval_for_approximate_provider_mapping": true,
"block_unsupported_provider_artifacts": true,
"drift_blocks_execution": true,
"require_approval_for_price_change": true,
"require_customer_notice_for_price_increase": true,
"customer_notice_days": 30,
"grandfather_existing_customers": true,
"customer_visible_tuning_enabled": false,
"customer_visible_tuning_requires_active_model": true,
"communication_owner_role": "operator",
"default_approver_role": "operator",
"metadata": {
"active_experiment_count": 0,
"candidate_rollout_count": 0,
"policy_scope": "coulomb-social-mvp"
}
}

View File

@@ -9,6 +9,7 @@ from .economics import build_liquidity_summary, build_snapshot
from .load import (
default_data_dir,
latest_period,
load_governance_policy,
load_ltv_scenarios,
load_budget,
load_expense_records,
@@ -24,6 +25,7 @@ from .load import (
from .allocation import build_cost_allocation
from .boundary import build_boundary_validation
from .credits import build_credit_summary, load_credit_wallets
from .governance import build_governance_policy, build_governance_surfaces
from .membership_analytics import build_membership_analytics
from .pricing_context import build_cost_floor, build_market_price_view, build_value_range_view
from .publication import build_stripe_publication_preview
@@ -92,8 +94,10 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
market_raw = load_market_signals(root)
usage_records = load_usage_records(root)
usage_summary = build_usage_summary(usage_records, target_period)
governance_policy_raw = load_governance_policy(root)
ltv_scenarios = load_ltv_scenarios(root)
tuning_requests = load_tuning_requests(root)
governance_policy = build_governance_policy(governance_policy_raw)
cost_floor = build_cost_floor(snapshot, models)
value_range = build_value_range_view(value_range_raw, snapshot, product, models)
market_price = build_market_price_view(market_raw)
@@ -114,6 +118,12 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
tuning_requests,
)
boundary_validation = build_boundary_validation(snapshot, models, usage_records)
provider_publication = build_stripe_publication_preview(
product,
models,
root,
model_id=product.active_pricing_model_id,
)
credit_wallets = load_credit_wallets(root)
credit_summary = build_credit_summary(
credit_wallets,
@@ -121,13 +131,25 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
target_period,
)
recommendations = build_pricing_recommendations(
cost_floor, value_range, market_price, simulations, usage_summary
cost_floor,
value_range,
market_price,
simulations,
usage_summary,
boundary_validation=boundary_validation,
customer_tuning=customer_tuning,
provider_publication=provider_publication,
governance_policy=governance_policy_raw,
product=product,
)
provider_publication = build_stripe_publication_preview(
governance = build_governance_surfaces(
root,
product,
models,
root,
model_id=product.active_pricing_model_id,
cost_floor,
customer_tuning,
provider_publication,
governance_policy,
)
return _serialize(
@@ -156,6 +178,7 @@ def build_dashboard_payload(data_dir: Path | None = None, period: str | None = N
"boundary_validation": boundary_validation,
"credit_wallets": credit_summary,
"recommendations": recommendations,
"governance": governance,
"provider_publication": provider_publication,
"infrastructure": {
"domains": _load_json_catalog(root, "domains.json"),

View File

@@ -0,0 +1,772 @@
from __future__ import annotations
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
from typing import Any
from ._repo_root import ensure_repo_root_on_syspath
from .publication import default_stripe_state_path, load_stripe_publication_state
ensure_repo_root_on_syspath()
from adaptive_pricing_core.governance import ( # noqa: E402
ApprovalRequirement,
GovernanceAssessment,
GovernancePolicy,
GovernanceRisk,
HealthCheck,
SafeTuningContract,
SafeTuningExample,
SafeTuningParameter,
SellerRecommendation,
SupportingObservation,
governance_policy_from_dict,
)
TWOPLACES = Decimal("0.01")
def _serialize(value: Any) -> Any:
if isinstance(value, Decimal):
return str(value)
if hasattr(value, "__dataclass_fields__"):
return {key: _serialize(getattr(value, key)) for key in value.__dataclass_fields__}
if isinstance(value, tuple):
return [_serialize(item) for item in value]
if isinstance(value, list):
return [_serialize(item) for item in value]
if isinstance(value, dict):
return {key: _serialize(item) for key, item in value.items()}
return value
def _money(value: Decimal) -> Decimal:
return value.quantize(TWOPLACES, rounding=ROUND_HALF_UP)
def _decimal(value: Decimal | str | int | float | None) -> Decimal:
if value in (None, ""):
return Decimal("0")
return Decimal(str(value))
def build_governance_policy(raw: dict[str, Any]) -> GovernancePolicy:
return governance_policy_from_dict(raw)
def _publication_assessment(
product: Any,
provider_publication: dict[str, Any],
policy: GovernancePolicy,
) -> GovernanceAssessment:
artifact_counts = provider_publication.get("artifact_counts", {})
approximate = int(artifact_counts.get("approximate", 0))
unsupported = int(artifact_counts.get("unsupported", 0))
drift_count = len(provider_publication.get("plan", {}).get("drift", []))
model_id = provider_publication.get("model_id")
approvals: list[ApprovalRequirement] = []
risks: list[GovernanceRisk] = []
observations = [
SupportingObservation(
id="provider-artifact-counts",
title="Provider mapping counts",
summary=(
f"{artifact_counts.get('exact', 0)} exact, "
f"{approximate} approximate, {unsupported} unsupported artifacts."
),
source_ref="provider_publication.artifact_counts",
),
SupportingObservation(
id="provider-drift-count",
title="Provider drift findings",
summary=f"{drift_count} provider drift findings in the current publication preview.",
source_ref="provider_publication.plan.drift",
),
]
if model_id != getattr(product, "active_pricing_model_id", None):
approvals.append(
ApprovalRequirement(
id="candidate-rollout-approval",
title="Candidate model rollout approval",
approver_role=policy.default_approver_role,
reason="The provider publication target is not the currently active product pricing model.",
)
)
risks.append(
GovernanceRisk(
id="candidate-rollout",
severity="medium",
summary="The publication target is a candidate model rather than the active production model.",
mitigation="Keep the change in shadow state or route through explicit rollout approval.",
)
)
if approximate > 0:
risks.append(
GovernanceRisk(
id="approximate-provider-mapping",
severity="medium",
summary="Some provider mappings are approximate rather than fully executable in Stripe.",
mitigation="Supplement Stripe publication with operational contract logic or human review.",
)
)
if policy.require_approval_for_approximate_provider_mapping:
approvals.append(
ApprovalRequirement(
id="approximate-provider-approval",
title="Approximate provider mapping approval",
approver_role=policy.default_approver_role,
reason="Customer-visible rollout would rely on approximate Stripe mappings.",
)
)
if unsupported > 0:
risks.append(
GovernanceRisk(
id="unsupported-provider-artifacts",
severity="high",
summary="Some pricing artifacts cannot be executed in Stripe by the current publisher.",
mitigation="Block rollout until the unsupported artifacts are removed or implemented.",
)
)
if drift_count > 0:
risks.append(
GovernanceRisk(
id="provider-drift",
severity="high",
summary="The provider shadow state differs from the desired pricing definition.",
mitigation="Reconcile drift before rollout or retire the unmanaged artifact state.",
)
)
if unsupported > 0 and policy.block_unsupported_provider_artifacts:
return GovernanceAssessment(
decision="blocked",
summary="Blocked: unsupported Stripe mappings remain in the provider publication plan.",
approvals=tuple(approvals),
risks=tuple(risks),
supporting_observations=tuple(observations),
notes=(
"Shadow-state publication may still proceed, but customer-visible execution should remain blocked.",
),
)
if drift_count > 0 and policy.drift_blocks_execution:
return GovernanceAssessment(
decision="blocked",
summary="Blocked: provider drift must be reconciled before execution.",
approvals=tuple(approvals),
risks=tuple(risks),
supporting_observations=tuple(observations),
)
if approvals:
return GovernanceAssessment(
decision="approval_required",
summary="Approval required before customer-visible execution.",
approvals=tuple(approvals),
risks=tuple(risks),
supporting_observations=tuple(observations),
)
return GovernanceAssessment(
decision="proceed",
summary="Provider publication is execution-ready under the current governance policy.",
approvals=(),
risks=tuple(risks),
supporting_observations=tuple(observations),
)
def _risk_from_boundary_result(result: dict[str, Any]) -> GovernanceRisk:
severity = "high" if result["status"] == "fail" else "medium"
return GovernanceRisk(
id=result["id"],
severity=severity,
summary=result["summary"],
mitigation=result.get("suggested_action") or result["reason"],
)
def build_pricing_recommendation_workflow(
cost_floor: dict[str, Any],
value_range: dict[str, Any],
market_price: dict[str, Any],
simulations: dict[str, Any],
usage_summary: dict[str, Any],
*,
boundary_validation: dict[str, Any] | None = None,
customer_tuning: dict[str, Any] | None = None,
provider_publication: dict[str, Any] | None = None,
governance_policy: GovernancePolicy | None = None,
product: Any | None = None,
) -> list[dict[str, Any]]:
policy = governance_policy or GovernancePolicy(policy_id="default-governance-policy")
publication_assessment = _publication_assessment(
product,
provider_publication or {},
policy,
) if provider_publication and product is not None else GovernanceAssessment(
decision="proceed",
summary="No provider publication assessment was supplied.",
approvals=(),
risks=(),
supporting_observations=(),
)
recommendations: list[SellerRecommendation] = []
margin_pct = _decimal(cost_floor.get("gross_margin_pct"))
active_price = _decimal(value_range.get("current_price_eur"))
cost_per_member = _decimal(cost_floor.get("cost_per_member"))
ai_spend = _decimal(usage_summary.get("total_ai_spend_eur"))
if margin_pct < Decimal("10"):
approvals = []
risks = [
GovernanceRisk(
id="customer-communication",
severity="medium",
summary="Changing the membership price affects existing customer expectations.",
mitigation=(
f"Route communication through {policy.communication_owner_role} and honor "
f"{policy.customer_notice_days}-day notice if price increases are tested."
),
)
]
if policy.require_approval_for_price_change:
approvals.append(
ApprovalRequirement(
id="price-change-approval",
title="Price change approval",
approver_role=policy.default_approver_role,
reason="The recommendation would change customer-visible pricing.",
)
)
recommendations.append(
SellerRecommendation(
id="margin-pressure",
recommendation_type="model_change",
priority="high",
title="Margin below 10%",
rationale=f"Gross margin is {margin_pct}% at the current price.",
suggested_action="Review infrastructure cost or test a higher access fee within value-range bands.",
confidence=Decimal("0.92"),
governance=GovernanceAssessment(
decision="approval_required" if approvals else "proceed",
summary="Approval required before a customer-visible price change." if approvals else "Margin remediation can proceed to the next workflow stage.",
approvals=tuple(approvals),
risks=tuple(risks),
supporting_observations=(
SupportingObservation(
id="gross-margin",
title="Current gross margin",
summary=f"Observed gross margin is {margin_pct}%.",
source_ref="cost_floor.gross_margin_pct",
value=str(margin_pct),
),
SupportingObservation(
id="cost-per-member",
title="Observed cost per member",
summary=f"Current cost per member is {cost_per_member} EUR.",
source_ref="cost_floor.cost_per_member",
value=str(cost_per_member),
),
),
),
risks=tuple(risks),
supporting_observations=(
SupportingObservation(
id="active-price",
title="Current list price",
summary=f"Current list price is {active_price} EUR.",
source_ref="value_range.current_price_eur",
value=str(active_price),
),
),
related_model_ids=((product.active_pricing_model_id,) if product is not None else ()),
)
)
if ai_spend > Decimal("0") and cost_per_member > Decimal("0"):
ai_ratio = _money((ai_spend / cost_per_member) * Decimal("100"))
if ai_ratio > Decimal("15"):
best = simulations.get("best_ltv_scenario_id") or simulations.get("best_margin_scenario_id")
recommendations.append(
SellerRecommendation(
id="usage-pricing-signal",
recommendation_type="simulation",
priority="medium",
title="AI cost becoming material",
rationale=f"AI spend is {ai_ratio}% of current cost per member.",
suggested_action=f"Evaluate hybrid model '{best}' in the simulator before customer-visible changes.",
confidence=Decimal("0.78"),
governance=GovernanceAssessment(
decision="proceed",
summary="Simulation work can proceed without approval.",
approvals=(),
risks=(
GovernanceRisk(
id="usage-forecast-uncertainty",
severity="medium",
summary="Usage-cost signals come from a small current sample.",
mitigation="Keep the next step at simulation or controlled pilot scope until more usage data is available.",
),
),
supporting_observations=(
SupportingObservation(
id="ai-ratio",
title="AI cost ratio",
summary=f"AI cost represents {ai_ratio}% of current cost per member.",
source_ref="usage.total_ai_spend_eur",
value=str(ai_ratio),
),
),
),
risks=(
GovernanceRisk(
id="pilot-scope",
severity="low",
summary="Hybrid pricing adds operational complexity before rollout automation is mature.",
mitigation="Restrict the recommendation to simulation or small pilot scope.",
),
),
supporting_observations=(
SupportingObservation(
id="best-ltv-scenario",
title="Best LTV scenario",
summary=f"Current simulator best LTV scenario is {best}.",
source_ref="pricing_simulations.best_ltv_scenario_id",
value=str(best),
),
),
related_model_ids=(str(best),) if best else (),
)
)
accepted_tuning_ids = tuple((customer_tuning or {}).get("accepted_request_ids", []))
if accepted_tuning_ids:
request = next(
(
item
for item in (customer_tuning or {}).get("requests", [])
if item.get("id") == accepted_tuning_ids[0]
),
None,
)
outcome = request.get("result", {}) if request else {}
approvals = list(publication_assessment.approvals)
risks = list(publication_assessment.risks)
if not policy.customer_visible_tuning_enabled:
approvals.append(
ApprovalRequirement(
id="customer-visible-tuning-disabled",
title="Customer-visible tuning enablement approval",
approver_role=policy.default_approver_role,
reason="The governance policy still treats customer-visible tuning as pilot-only.",
)
)
recommendations.append(
SellerRecommendation(
id="pilot-tuning-offer",
recommendation_type="model_change",
priority="medium",
title="Pilot a seller-safe tuned hybrid offer",
rationale=(
f"Request '{accepted_tuning_ids[0]}' produced an accepted tuned configuration with "
f"LTV {outcome.get('average_comparable_customer_lifetime_value')} EUR."
),
suggested_action="Use the accepted tuning result as a controlled seller-assisted offer, not a self-serve rollout.",
confidence=Decimal("0.81"),
governance=GovernanceAssessment(
decision="approval_required",
summary="Approval required before exposing tuned pricing to customers.",
approvals=tuple(approvals),
risks=tuple(risks),
supporting_observations=(
SupportingObservation(
id="accepted-tuning-request",
title="Accepted tuning request",
summary=outcome.get("explanation", "A tuned configuration passed the current solver and LTV checks."),
source_ref=f"customer_tuning.requests[{accepted_tuning_ids[0]}]",
),
),
notes=("The current policy exposes tuned pricing only through seller-assisted workflows.",),
),
risks=tuple(risks),
supporting_observations=(
SupportingObservation(
id="tuning-request-id",
title="Accepted tuning request id",
summary=f"Accepted request: {accepted_tuning_ids[0]}.",
source_ref="customer_tuning.accepted_request_ids",
),
),
related_model_ids=(request.get("model_id"),) if request else (),
related_profile_ids=(request.get("profile_id"),) if request else (),
)
)
if market_price.get("market_high_eur") and active_price < _decimal(market_price.get("market_high_eur")):
headroom = _decimal(value_range.get("aggregate_high_eur")) - active_price
if headroom > Decimal("5"):
active_experiment_count = int(policy.metadata.get("active_experiment_count", 0))
experiment_decision = "proceed" if active_experiment_count < policy.max_active_experiments else "blocked"
approvals = ()
risks = ()
if experiment_decision == "blocked":
risks = (
GovernanceRisk(
id="experiment-capacity",
severity="medium",
summary="The configured experiment capacity is exhausted.",
mitigation="Close or review existing experiments before starting another one.",
),
)
recommendations.append(
SellerRecommendation(
id="value-headroom",
recommendation_type="research",
priority="low",
title="Value headroom above list price",
rationale=f"Aggregate value band high is {value_range.get('aggregate_high_eur')} EUR vs {active_price} EUR list.",
suggested_action="Run a staged price experiment within the solo-builder segment band.",
confidence=Decimal("0.63"),
governance=GovernanceAssessment(
decision=experiment_decision,
summary=(
"Experiment capacity is available."
if experiment_decision == "proceed"
else "Blocked until experiment capacity is freed."
),
approvals=approvals,
risks=risks,
supporting_observations=(
SupportingObservation(
id="experiment-capacity",
title="Experiment capacity",
summary=f"{active_experiment_count} active experiments vs cap {policy.max_active_experiments}.",
source_ref="governance_policy.metadata.active_experiment_count",
),
),
),
risks=risks,
supporting_observations=(
SupportingObservation(
id="value-headroom-evidence",
title="Value headroom estimate",
summary=f"Observed value headroom is {headroom} EUR.",
source_ref="value_range.aggregate_high_eur",
value=str(headroom),
),
),
)
)
if provider_publication and product is not None and publication_assessment.decision != "proceed":
recommendations.append(
SellerRecommendation(
id="execution-governance-gate",
recommendation_type="execution",
priority="medium",
title="Keep provider execution behind governance gates",
rationale=publication_assessment.summary,
suggested_action="Resolve publication blockers or route the rollout through explicit approval before customer-visible execution.",
confidence=Decimal("0.88"),
governance=publication_assessment,
risks=publication_assessment.risks,
supporting_observations=publication_assessment.supporting_observations,
related_model_ids=(provider_publication.get("model_id"),),
notes=publication_assessment.notes,
)
)
if not recommendations:
recommendations.append(
SellerRecommendation(
id="hold-course",
recommendation_type="research",
priority="low",
title="Hold current pricing",
rationale="No urgent margin, usage, or competitive signals currently justify a governed change workflow.",
suggested_action="Continue observatory tracking and re-run after the next ledger period.",
confidence=Decimal("0.71"),
governance=GovernanceAssessment(
decision="proceed",
summary="No immediate governed pricing action is required.",
approvals=(),
risks=(),
supporting_observations=(),
),
risks=(),
supporting_observations=(),
)
)
return _serialize(recommendations)
def build_safe_tuning_contracts(
models: list[Any],
customer_tuning: dict[str, Any],
policy: GovernancePolicy,
active_model_id: str,
) -> list[dict[str, Any]]:
tradeoff_lexicon = {
"lower_included_usage": "Lower included usage can unlock lower variable pricing while protecting seller economics.",
"higher_included_usage": "Higher included usage increases predictability but may require stronger commitment or a higher usage price.",
"lower_usage_price": "Lower usage price is only offered when the solver can preserve seller-side LTV.",
"higher_usage_price": "Higher usage price may be the trade-off for keeping monthly access fees and flexibility unchanged.",
"longer_contract_duration": "Longer commitment can support better unit economics.",
"minimum_monthly_turnover": "Minimum turnover protects the seller against under-utilization risk.",
"prepayment": "Prepayment reduces default risk and can support better pricing.",
"guaranteed_platform_fee": "A guaranteed fee protects base platform economics.",
"customer_funded_onboarding": "Customer-funded onboarding offsets seller setup effort.",
"reduced_cancellation_flexibility": "Reduced cancellation flexibility supports stronger lifetime value assumptions.",
}
requests_by_model: dict[str, list[dict[str, Any]]] = {}
for item in customer_tuning.get("requests", []):
if item.get("model_id"):
requests_by_model.setdefault(item["model_id"], []).append(item)
contracts: list[SafeTuningContract] = []
for model in models:
tunables = [parameter for parameter in model.tunable_parameters if parameter.parameter_class == "customer_tunable"]
if not tunables:
continue
examples: list[SafeTuningExample] = []
for item in requests_by_model.get(model.id, []):
outcome = item.get("result", {})
decision = outcome.get("decision", item.get("decision", "rejected"))
customer_visible = (
policy.customer_visible_tuning_enabled
and (not policy.customer_visible_tuning_requires_active_model or model.id == active_model_id)
and decision == "accepted"
)
examples.append(
SafeTuningExample(
id=item["id"],
title=item["name"],
outcome=decision,
summary=outcome.get("explanation", "No tuning explanation available."),
customer_message=(
"This configuration is available through a seller-assisted pilot."
if decision == "accepted"
else "This configuration is outside the current safe self-serve range."
),
visible_to_customer=customer_visible,
tradeoffs=tuple(outcome.get("tradeoffs", [])),
)
)
contracts.append(
SafeTuningContract(
model_id=model.id,
model_name=model.name,
mode=(
"customer_visible"
if policy.customer_visible_tuning_enabled and model.id == active_model_id
else "pilot_only"
),
customer_visible=(
policy.customer_visible_tuning_enabled
and (not policy.customer_visible_tuning_requires_active_model or model.id == active_model_id)
),
tunable_parameters=tuple(
SafeTuningParameter(
key=parameter.key,
label=parameter.key.replace("_", " ").title(),
description=parameter.description,
data_type=parameter.data_type,
default_value=parameter.default_value,
min_value=str(parameter.min_value) if parameter.min_value is not None else None,
max_value=str(parameter.max_value) if parameter.max_value is not None else None,
)
for parameter in tunables
),
tradeoff_lexicon=tradeoff_lexicon,
examples=tuple(examples),
notes=(
"Customer-visible tuning remains policy-governed even when the solver can find a safe configuration.",
"Examples describe current pilot outcomes and should not be treated as automatically executable offers.",
),
)
)
return _serialize(contracts)
def build_governance_health_checks(
cost_floor: dict[str, Any],
customer_tuning: dict[str, Any],
provider_publication: dict[str, Any],
policy: GovernancePolicy,
) -> list[dict[str, Any]]:
checks: list[HealthCheck] = []
margin_pct = _decimal(cost_floor.get("gross_margin_pct"))
if margin_pct < Decimal("0"):
checks.append(
HealthCheck(
id="margin-health",
title="Margin health",
status="fail",
summary="Current gross margin is negative.",
value=str(margin_pct),
threshold="0",
suggested_action="Do not execute customer-visible pricing changes without correcting the current economics gap.",
)
)
elif margin_pct < Decimal("10"):
checks.append(
HealthCheck(
id="margin-health",
title="Margin health",
status="warn",
summary="Current gross margin is below the 10% operating comfort threshold.",
value=str(margin_pct),
threshold="10",
suggested_action="Prioritize simulation and model-change recommendations before expansion.",
)
)
else:
checks.append(
HealthCheck(
id="margin-health",
title="Margin health",
status="pass",
summary="Current gross margin is within the configured comfort threshold.",
value=str(margin_pct),
threshold="10",
)
)
artifact_counts = provider_publication.get("artifact_counts", {})
unsupported = int(artifact_counts.get("unsupported", 0))
drift_count = len(provider_publication.get("plan", {}).get("drift", []))
approximate = int(artifact_counts.get("approximate", 0))
if unsupported > 0 or drift_count > 0:
checks.append(
HealthCheck(
id="provider-execution-health",
title="Provider execution health",
status="fail",
summary="Stripe execution preview is not rollout-ready under the current governance policy.",
value=f"{unsupported} unsupported / {drift_count} drift",
threshold="0 unsupported / 0 drift",
suggested_action="Resolve unsupported mappings or drift before attempting execution.",
)
)
elif approximate > 0:
checks.append(
HealthCheck(
id="provider-execution-health",
title="Provider execution health",
status="warn",
summary="Stripe execution preview depends on approximate mappings.",
value=str(approximate),
threshold="0",
suggested_action="Keep rollout gated behind approval and supplemental operational controls.",
)
)
else:
checks.append(
HealthCheck(
id="provider-execution-health",
title="Provider execution health",
status="pass",
summary="Stripe execution preview is exact and drift-free.",
)
)
accepted_count = len(customer_tuning.get("accepted_request_ids", []))
checks.append(
HealthCheck(
id="tuning-pilot-health",
title="Tuning pilot health",
status="pass" if accepted_count > 0 else "warn",
summary=(
"At least one tuned offer currently passes the solver and governance pilot checks."
if accepted_count > 0
else "No tuned pilot request currently passes the solver."
),
value=str(accepted_count),
threshold=">=1 accepted pilot",
suggested_action=None if accepted_count > 0 else "Revise the pilot requests or relax no-longer-needed constraints after review.",
)
)
active_experiment_count = int(policy.metadata.get("active_experiment_count", 0))
checks.append(
HealthCheck(
id="experiment-capacity",
title="Experiment capacity",
status="pass" if active_experiment_count < policy.max_active_experiments else "warn",
summary=f"{active_experiment_count} active experiments vs cap {policy.max_active_experiments}.",
value=str(active_experiment_count),
threshold=str(policy.max_active_experiments),
suggested_action=(
None
if active_experiment_count < policy.max_active_experiments
else "Close or review an active experiment before starting another governed experiment."
),
)
)
return _serialize(checks)
def build_governance_surfaces(
data_dir: Path,
product: Any,
models: list[Any],
cost_floor: dict[str, Any],
customer_tuning: dict[str, Any],
provider_publication: dict[str, Any],
policy: GovernancePolicy,
) -> dict[str, Any]:
state = load_stripe_publication_state(default_stripe_state_path(data_dir))
publication_assessment = _publication_assessment(product, provider_publication, policy)
audit_surface = {
"provider": "stripe",
"active_revision_id": state.active_revision_id,
"active_model_id": state.active_model_id,
"revision_count": len(state.revisions),
"recent_revisions": [
{
"revision_id": revision.revision_id,
"model_id": revision.model_id,
"summary": revision.summary,
"operation_count": len(revision.operations),
"replaced_revision_id": revision.replaced_revision_id,
}
for revision in state.revisions[-5:]
],
}
return _serialize(
{
"policy": policy,
"publication_assessment": publication_assessment,
"health_checks": build_governance_health_checks(
cost_floor,
customer_tuning,
provider_publication,
policy,
),
"safe_tuning_contracts": build_safe_tuning_contracts(
models,
customer_tuning,
policy,
product.active_pricing_model_id,
),
"audit_surface": audit_surface,
"notes": [
"Governance surfaces are policy-driven and machine-readable so both humans and agents can reason about pricing changes.",
"Execution recommendations remain distinct from shadow-state publication and from customer-visible rollout approval.",
],
}
)

View File

@@ -119,6 +119,13 @@ def load_ltv_scenarios(data_dir: Path | None = None) -> dict:
return _read_json((data_dir or default_data_dir()) / "ltv_scenarios.json")
def load_governance_policy(data_dir: Path | None = None) -> dict:
path = (data_dir or default_data_dir()) / "governance_policy.json"
if not path.exists():
return {}
return _read_json(path)
def load_tuning_requests(data_dir: Path | None = None) -> dict:
path = (data_dir or default_data_dir()) / "tuning_requests.json"
if not path.exists():

View File

@@ -1,68 +1,33 @@
from __future__ import annotations
from decimal import Decimal
from typing import Any
from .governance import build_pricing_recommendation_workflow, build_governance_policy
def build_pricing_recommendations(
cost_floor: dict,
value_range: dict,
market_price: dict,
simulations: dict,
usage_summary: dict,
) -> list[dict]:
recommendations: list[dict] = []
margin_pct = Decimal(str(cost_floor.get("gross_margin_pct", "0")))
ai_spend = Decimal(str(usage_summary.get("total_ai_spend_eur", "0")))
active_price = Decimal(str(value_range.get("current_price_eur", "0")))
cost_per_member = Decimal(str(cost_floor.get("cost_per_member", "0")))
if margin_pct < Decimal("10"):
recommendations.append(
{
"id": "margin-pressure",
"priority": "high",
"title": "Margin below 10%",
"rationale": f"Gross margin is {margin_pct}% at current pricing.",
"suggested_action": "Review infrastructure cost or test a higher access fee within value-range bands.",
}
)
if ai_spend > Decimal("0") and cost_per_member > Decimal("0"):
ai_ratio = (ai_spend / cost_per_member) * Decimal("100")
if ai_ratio > Decimal("15"):
best = simulations.get("best_ltv_scenario_id") or simulations.get("best_margin_scenario_id")
recommendations.append(
{
"id": "usage-pricing-signal",
"priority": "medium",
"title": "AI cost becoming material",
"rationale": f"AI spend is {ai_ratio:.1f}% of cost-per-member this period.",
"suggested_action": f"Evaluate hybrid model '{best}' in the pricing simulator before customer-visible credits.",
}
)
if market_price.get("market_high_eur") and active_price < Decimal(str(market_price["market_high_eur"])):
headroom = Decimal(str(value_range.get("aggregate_high_eur", active_price))) - active_price
if headroom > Decimal("5"):
recommendations.append(
{
"id": "value-headroom",
"priority": "low",
"title": "Value headroom above list price",
"rationale": f"Aggregate value band high is €{value_range['aggregate_high_eur']} vs €{active_price} list.",
"suggested_action": "Run a staged price experiment within the solo-builder segment band.",
}
)
if not recommendations:
recommendations.append(
{
"id": "hold-course",
"priority": "low",
"title": "Hold current pricing",
"rationale": "No urgent margin, usage, or competitive signals detected.",
"suggested_action": "Continue observatory tracking; re-run after next ledger period.",
}
)
return recommendations
cost_floor: dict[str, Any],
value_range: dict[str, Any],
market_price: dict[str, Any],
simulations: dict[str, Any],
usage_summary: dict[str, Any],
*,
boundary_validation: dict[str, Any] | None = None,
customer_tuning: dict[str, Any] | None = None,
provider_publication: dict[str, Any] | None = None,
governance_policy: dict[str, Any] | None = None,
product: Any | None = None,
) -> list[dict[str, Any]]:
policy = build_governance_policy(governance_policy or {})
return build_pricing_recommendation_workflow(
cost_floor,
value_range,
market_price,
simulations,
usage_summary,
boundary_validation=boundary_validation,
customer_tuning=customer_tuning or {},
provider_publication=provider_publication or {},
governance_policy=policy,
product=product,
)

View File

@@ -33,6 +33,9 @@ def test_dashboard_payload_contains_live_ledger_totals() -> None:
assert payload["provider_publication"]["provider"] == "stripe"
assert payload["provider_publication"]["model_id"] == "flat-899-eur-monthly"
assert payload["provider_publication"]["plan"]["summary"].startswith("stripe:")
assert payload["governance"]["policy"]["policy_id"] == "coulomb-governance-v1"
assert payload["governance"]["publication_assessment"]["decision"] == "approval_required"
assert payload["governance"]["safe_tuning_contracts"]
assert len(payload["boundary_validation"]["model_results"]) == 3
assert payload["boundary_validation"]["policy"]["target_margin_pct"] == "15"
assert any(
@@ -40,6 +43,7 @@ def test_dashboard_payload_contains_live_ledger_totals() -> None:
for result in payload["boundary_validation"]["model_results"]
)
assert payload["recommendations"]
assert all("confidence" in item and "governance" in item for item in payload["recommendations"])
def test_payload_json_is_valid() -> None:

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from pathlib import Path
from observatory.api import build_dashboard_payload
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
def test_governance_payload_contains_policy_health_and_audit_surfaces() -> None:
payload = build_dashboard_payload(DATA_DIR, "2026-06")
governance = payload["governance"]
assert governance["policy"]["policy_id"] == "coulomb-governance-v1"
assert governance["publication_assessment"]["decision"] == "approval_required"
assert governance["audit_surface"]["provider"] == "stripe"
assert governance["audit_surface"]["revision_count"] == 0
assert any(
check["id"] == "provider-execution-health" and check["status"] == "warn"
for check in governance["health_checks"]
)
def test_safe_tuning_contract_stays_pilot_only_and_hides_customer_visibility() -> None:
payload = build_dashboard_payload(DATA_DIR, "2026-06")
contract = next(
item
for item in payload["governance"]["safe_tuning_contracts"]
if item["model_id"] == "membership-plus-overage"
)
assert contract["mode"] == "pilot_only"
assert contract["customer_visible"] is False
assert any(example["outcome"] == "accepted" for example in contract["examples"])
assert all(example["visible_to_customer"] is False for example in contract["examples"])
def test_recommendations_include_governed_execution_gate() -> None:
payload = build_dashboard_payload(DATA_DIR, "2026-06")
execution_gate = next(
item for item in payload["recommendations"] if item["id"] == "execution-governance-gate"
)
assert execution_gate["recommendation_type"] == "execution"
assert execution_gate["governance"]["decision"] == "approval_required"
assert execution_gate["confidence"] == "0.88"
assert execution_gate["risks"]
assert execution_gate["supporting_observations"]