From cea59d7f13024d0015d86e32c1bcbc7bcb4be2fc Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 22 May 2026 16:14:08 +0200 Subject: [PATCH] Add hourly RecentlyOnScope batch endpoint --- api/main.py | 1 + api/routers/recently_on_scope.py | 21 +++++ api/schemas/recently_on_scope.py | 30 ++++++++ api/services/recently_on_scope.py | 124 ++++++++++++++++++++++++++++++ tests/test_recently_on_scope.py | 121 +++++++++++++++++++++++++++++ 5 files changed, 297 insertions(+) diff --git a/api/main.py b/api/main.py index 96d9274..baf698e 100644 --- a/api/main.py +++ b/api/main.py @@ -79,6 +79,7 @@ app.add_middleware( ) app.include_router(domains.router) +app.include_router(recently_on_scope.hourly_router) app.include_router(recently_on_scope.router) app.include_router(repos.router) app.include_router(topics.router) diff --git a/api/routers/recently_on_scope.py b/api/routers/recently_on_scope.py index 5607710..c62e9c6 100644 --- a/api/routers/recently_on_scope.py +++ b/api/routers/recently_on_scope.py @@ -8,17 +8,38 @@ from api.database import get_session from api.schemas.recently_on_scope import ( RecentlyOnScopeGenerate, RecentlyOnScopeGeneratedReport, + RecentlyOnScopeHourlyGenerate, + RecentlyOnScopeHourlyRun, RecentlyOnScopeReportMetadata, ) from api.services.markitect_templates import MarkitectRenderError, MarkitectUnavailable from api.services.recently_on_scope import ( generate_report, + generate_hourly_reports, list_reports, read_report, resolve_window, ) router = APIRouter(prefix="/domains/{slug}/recently-on-scope", tags=["recently-on-scope"]) +hourly_router = APIRouter(prefix="/recently-on-scope", tags=["recently-on-scope"]) + + +@hourly_router.post("/hourly", response_model=RecentlyOnScopeHourlyRun, status_code=status.HTTP_201_CREATED) +async def generate_hourly_recently_on_scope( + body: RecentlyOnScopeHourlyGenerate, + session: AsyncSession = Depends(get_session), +) -> RecentlyOnScopeHourlyRun: + try: + window = resolve_window(body.range, body.since, body.until) + return await generate_hourly_reports( + session, + window, + active_only=body.active_only, + include_attention=body.include_attention, + ) + except ValueError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc @router.post("/", response_model=RecentlyOnScopeGeneratedReport, status_code=status.HTTP_201_CREATED) diff --git a/api/schemas/recently_on_scope.py b/api/schemas/recently_on_scope.py index e030ba0..fb007a5 100644 --- a/api/schemas/recently_on_scope.py +++ b/api/schemas/recently_on_scope.py @@ -1,5 +1,6 @@ from __future__ import annotations +import uuid from datetime import datetime from pydantic import BaseModel, Field @@ -33,3 +34,32 @@ class RecentlyOnScopeReportMetadata(BaseModel): class RecentlyOnScopeGeneratedReport(RecentlyOnScopeReportMetadata): markdown: str + + +class RecentlyOnScopeHourlyGenerate(RecentlyOnScopeGenerate): + active_only: bool = True + include_attention: bool = False + + +class RecentlyOnScopeSkippedDomain(BaseModel): + domain_slug: str + reason: str + source_counts: RecentlyOnScopeSourceCounts + + +class RecentlyOnScopeFailedDomain(BaseModel): + domain_slug: str + error: str + + +class RecentlyOnScopeHourlyRun(BaseModel): + range: str + since: datetime + until: datetime + generated_at: datetime + active_only: bool + include_attention: bool + generated: list[RecentlyOnScopeReportMetadata] = Field(default_factory=list) + skipped: list[RecentlyOnScopeSkippedDomain] = Field(default_factory=list) + failed: list[RecentlyOnScopeFailedDomain] = Field(default_factory=list) + progress_event_id: uuid.UUID | None = None diff --git a/api/services/recently_on_scope.py b/api/services/recently_on_scope.py index 242490a..a86fb60 100644 --- a/api/services/recently_on_scope.py +++ b/api/services/recently_on_scope.py @@ -22,8 +22,11 @@ from api.models.task import Task, TaskStatus from api.models.topic import Topic from api.models.workstream import Workstream from api.schemas.recently_on_scope import ( + RecentlyOnScopeFailedDomain, + RecentlyOnScopeHourlyRun, RecentlyOnScopeReportMetadata, RecentlyOnScopeSourceCounts, + RecentlyOnScopeSkippedDomain, ) from api.services.markitect_templates import inspect_markdown_template, render_markdown_template @@ -157,6 +160,68 @@ async def generate_report( window: DigestWindow, ) -> tuple[RecentlyOnScopeReportMetadata, str]: data = await collect_domain_activity(session, domain_slug, window) + return _render_report_from_data(data, window) + + +async def generate_hourly_reports( + session: AsyncSession, + window: DigestWindow, + *, + active_only: bool = True, + include_attention: bool = False, +) -> RecentlyOnScopeHourlyRun: + generated: list[RecentlyOnScopeReportMetadata] = [] + skipped: list[RecentlyOnScopeSkippedDomain] = [] + failed: list[RecentlyOnScopeFailedDomain] = [] + + domains = await _list_domains(session, active_only=active_only) + for domain in domains: + try: + data = await collect_domain_activity(session, domain.slug, window) + counts = RecentlyOnScopeSourceCounts(**data["source_counts"]) + if not _has_qualifying_activity(counts, include_attention=include_attention): + skipped.append( + RecentlyOnScopeSkippedDomain( + domain_slug=domain.slug, + reason="no qualifying activity in window", + source_counts=counts, + ) + ) + continue + metadata, _markdown = _render_report_from_data(data, window) + generated.append(metadata) + except Exception as exc: # pragma: no cover - exercised via router tests + failed.append(RecentlyOnScopeFailedDomain(domain_slug=domain.slug, error=str(exc))) + + generated_at = datetime.now(tz=UTC) + progress_event_id = await _log_hourly_progress( + session, + window, + generated_at=generated_at, + active_only=active_only, + include_attention=include_attention, + generated=generated, + skipped=skipped, + failed=failed, + ) + return RecentlyOnScopeHourlyRun( + range=window.range, + since=window.since, + until=window.until, + generated_at=generated_at, + active_only=active_only, + include_attention=include_attention, + generated=generated, + skipped=skipped, + failed=failed, + progress_event_id=progress_event_id, + ) + + +def _render_report_from_data( + data: dict[str, Any], + window: DigestWindow, +) -> tuple[RecentlyOnScopeReportMetadata, str]: tmpl = template_path() inspect_markdown_template(tmpl) markdown = render_markdown_template(tmpl, data) @@ -207,6 +272,65 @@ def metadata_from_report(path: Path) -> RecentlyOnScopeReportMetadata | None: return None +async def _list_domains(session: AsyncSession, *, active_only: bool) -> list[Domain]: + stmt = select(Domain).order_by(Domain.slug) + if active_only: + stmt = stmt.where(Domain.status == "active") + result = await session.execute(stmt) + return list(result.scalars().all()) + + +def _has_qualifying_activity( + counts: RecentlyOnScopeSourceCounts, + *, + include_attention: bool, +) -> bool: + if ( + counts.progress_events + or counts.decisions + or counts.workstreams + or counts.tasks + ): + return True + return include_attention and counts.attention_items > 0 + + +async def _log_hourly_progress( + session: AsyncSession, + window: DigestWindow, + *, + generated_at: datetime, + active_only: bool, + include_attention: bool, + generated: list[RecentlyOnScopeReportMetadata], + skipped: list[RecentlyOnScopeSkippedDomain], + failed: list[RecentlyOnScopeFailedDomain], +) -> uuid.UUID: + event = ProgressEvent( + event_type="recently_on_scope_hourly", + summary=( + "RecentlyOnScope hourly batch completed: " + f"{len(generated)} generated, {len(skipped)} skipped, {len(failed)} failed" + ), + detail={ + "range": window.range, + "since": _iso(window.since), + "until": _iso(window.until), + "generated_at": _iso(generated_at), + "active_only": active_only, + "include_attention": include_attention, + "generated": [item.model_dump(mode="json") for item in generated], + "skipped": [item.model_dump(mode="json") for item in skipped], + "failed": [item.model_dump(mode="json") for item in failed], + }, + author="state-hub", + ) + session.add(event) + await session.commit() + await session.refresh(event) + return event.id + + async def _get_domain(domain_slug: str, session: AsyncSession) -> Domain: result = await session.execute(select(Domain).where(Domain.slug == domain_slug)) domain = result.scalar_one_or_none() diff --git a/tests/test_recently_on_scope.py b/tests/test_recently_on_scope.py index fbe609e..ba43e96 100644 --- a/tests/test_recently_on_scope.py +++ b/tests/test_recently_on_scope.py @@ -171,3 +171,124 @@ class TestRecentlyOnScopeRoutes: response = await client.get("/domains/digest/recently-on-scope/20260522T120000Z--1h") assert response.status_code == 404 + + async def test_hourly_batch_generates_only_domains_with_activity( + self, + client, + tmp_path, + monkeypatch, + fake_markitect, + ): + monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path)) + + await _create_domain(client) + topic = await _create_topic(client) + workstream = await _create_workstream(client, topic["id"]) + task = await _create_task(client, workstream["id"], title="Batch source") + progress = await client.post( + "/progress/", + json={ + "topic_id": topic["id"], + "workstream_id": workstream["id"], + "task_id": task["id"], + "event_type": "note", + "summary": "Batch source changed", + }, + ) + assert progress.status_code == 201, progress.text + await _create_domain(client, slug="quiet", name="Quiet Domain") + + response = await client.post("/recently-on-scope/hourly", json={"range": "1d"}) + + assert response.status_code == 201, response.text + body = response.json() + assert [item["domain_slug"] for item in body["generated"]] == ["digest"] + assert [item["domain_slug"] for item in body["skipped"]] == ["quiet"] + assert body["failed"] == [] + assert body["generated"][0]["source_counts"]["progress_events"] == 1 + assert body["progress_event_id"] is not None + + events = await client.get("/progress/", params={"event_type": "recently_on_scope_hourly"}) + assert events.status_code == 200, events.text + assert events.json()[0]["detail"]["generated"][0]["domain_slug"] == "digest" + + async def test_hourly_batch_reports_no_active_domains( + self, + client, + tmp_path, + monkeypatch, + fake_markitect, + ): + monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path)) + await _create_domain(client, slug="quiet", name="Quiet Domain") + + response = await client.post("/recently-on-scope/hourly", json={"range": "1d"}) + + assert response.status_code == 201, response.text + body = response.json() + assert body["generated"] == [] + assert [item["domain_slug"] for item in body["skipped"]] == ["quiet"] + assert body["failed"] == [] + + async def test_hourly_batch_is_idempotent_for_exact_window( + self, + client, + tmp_path, + monkeypatch, + fake_markitect, + ): + monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path)) + await _create_domain(client) + topic = await _create_topic(client) + workstream = await _create_workstream(client, topic["id"]) + await _create_task(client, workstream["id"], title="Idempotent source") + payload = { + "range": "1h", + "since": "2000-01-01T00:00:00Z", + "until": "2100-01-01T00:00:00Z", + } + + first = await client.post("/recently-on-scope/hourly", json=payload) + second = await client.post("/recently-on-scope/hourly", json=payload) + + assert first.status_code == 201, first.text + assert second.status_code == 201, second.text + assert first.json()["generated"][0]["id"] == "20000101T000000Z--21000101T000000Z" + listed = await client.get("/domains/digest/recently-on-scope/") + assert listed.status_code == 200 + assert [item["id"] for item in listed.json()] == ["20000101T000000Z--21000101T000000Z"] + + async def test_hourly_batch_continues_after_domain_failure( + self, + client, + tmp_path, + monkeypatch, + fake_markitect, + ): + monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path)) + original_render = ros._render_report_from_data + + def flaky_render(data, window): + if data["domain"]["slug"] == "broken": + raise RuntimeError("render failed") + return original_render(data, window) + + monkeypatch.setattr(ros, "_render_report_from_data", flaky_render) + + await _create_domain(client, slug="broken", name="Broken Domain") + broken_topic = await _create_topic(client, domain_slug="broken", slug="broken-topic") + broken_workstream = await _create_workstream(client, broken_topic["id"], slug="broken-ws") + await _create_task(client, broken_workstream["id"], title="Broken source") + + await _create_domain(client, slug="good", name="Good Domain") + good_topic = await _create_topic(client, domain_slug="good", slug="good-topic") + good_workstream = await _create_workstream(client, good_topic["id"], slug="good-ws") + await _create_task(client, good_workstream["id"], title="Good source") + + response = await client.post("/recently-on-scope/hourly", json={"range": "1d"}) + + assert response.status_code == 201, response.text + body = response.json() + assert [item["domain_slug"] for item in body["generated"]] == ["good"] + assert [item["domain_slug"] for item in body["failed"]] == ["broken"] + assert body["failed"][0]["error"] == "render failed"