Harden coding retro resolver selection

This commit is contained in:
2026-06-18 15:13:08 +02:00
parent 206bb336d2
commit 23e2316dff
3 changed files with 148 additions and 4 deletions

View File

@@ -219,11 +219,13 @@ def _coding_retro(params: dict[str, Any]) -> dict[str, Any]:
"""
event_type = str(params.get("event_type") or "coding_retro")
limit = _bounded_int(params.get("limit", 100), default=100, minimum=1, maximum=500)
items = _fetch_json("/progress/", {"limit": limit})
query_params = {"event_type": event_type, "limit": limit}
items = _fetch_json("/progress/", query_params)
if not isinstance(items, list):
return _empty_coding_retro(event_type)
item = _latest_progress_item(items, event_type)
window_days = _optional_int(params.get("window_days"))
item = _latest_progress_item(items, event_type, window_days)
if item is None:
return _empty_coding_retro(event_type)
@@ -256,12 +258,18 @@ def _empty_coding_retro(event_type: str) -> dict[str, Any]:
def _latest_progress_item(
items: list[Any],
event_type: str,
window_days: int | None = None,
) -> dict[str, Any] | None:
newest: dict[str, Any] | None = None
newest_key: tuple[datetime, int] | None = None
for index, item in enumerate(items):
if not isinstance(item, dict) or item.get("event_type") != event_type:
continue
if window_days is not None and not _progress_matches_window_days(
item,
window_days,
):
continue
key = (_parse_progress_timestamp(item.get("created_at")), index)
if newest_key is None or key > newest_key:
newest = item
@@ -295,6 +303,56 @@ def _progress_detail(item: dict[str, Any]) -> dict[str, Any]:
return {}
def _progress_matches_window_days(item: dict[str, Any], window_days: int) -> bool:
detail = _progress_detail(item)
return _progress_window_days(detail) == window_days
def _progress_window_days(detail: dict[str, Any]) -> int | None:
window = detail.get("window")
if isinstance(window, dict):
direct = _optional_int(window.get("days") or window.get("window_days"))
if direct is not None:
return direct
ranged = _window_days_from_range(
window.get("since") or window.get("window_start"),
window.get("until") or window.get("window_end"),
)
if ranged is not None:
return ranged
direct = _optional_int(detail.get("days") or detail.get("window_days"))
if direct is not None:
return direct
return _window_days_from_range(
detail.get("since") or detail.get("window_start"),
detail.get("until") or detail.get("window_end"),
)
def _window_days_from_range(start: Any, end: Any) -> int | None:
start_ts = _parse_optional_timestamp(start)
end_ts = _parse_optional_timestamp(end)
if start_ts is None or end_ts is None or end_ts < start_ts:
return None
seconds = (end_ts - start_ts).total_seconds()
if seconds <= 0:
return None
return max(1, round(seconds / 86400))
def _parse_optional_timestamp(value: Any) -> datetime | None:
if not isinstance(value, str) or not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _normalise_coding_retro_suggestions(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
@@ -374,6 +432,13 @@ def _bounded_int(value: Any, *, default: int, minimum: int, maximum: int) -> int
return max(minimum, min(maximum, number))
def _optional_int(value: Any) -> int | None:
try:
return int(value)
except (TypeError, ValueError):
return None
def _clean_scalar(value: Any) -> str:
return " ".join(str(value or "").split())