Add hourly RecentlyOnScope batch endpoint

This commit is contained in:
2026-05-22 16:14:08 +02:00
parent c95d29023d
commit cea59d7f13
5 changed files with 297 additions and 0 deletions

View File

@@ -79,6 +79,7 @@ app.add_middleware(
)
app.include_router(domains.router)
app.include_router(recently_on_scope.hourly_router)
app.include_router(recently_on_scope.router)
app.include_router(repos.router)
app.include_router(topics.router)

View File

@@ -8,17 +8,38 @@ from api.database import get_session
from api.schemas.recently_on_scope import (
RecentlyOnScopeGenerate,
RecentlyOnScopeGeneratedReport,
RecentlyOnScopeHourlyGenerate,
RecentlyOnScopeHourlyRun,
RecentlyOnScopeReportMetadata,
)
from api.services.markitect_templates import MarkitectRenderError, MarkitectUnavailable
from api.services.recently_on_scope import (
generate_report,
generate_hourly_reports,
list_reports,
read_report,
resolve_window,
)
router = APIRouter(prefix="/domains/{slug}/recently-on-scope", tags=["recently-on-scope"])
hourly_router = APIRouter(prefix="/recently-on-scope", tags=["recently-on-scope"])
@hourly_router.post("/hourly", response_model=RecentlyOnScopeHourlyRun, status_code=status.HTTP_201_CREATED)
async def generate_hourly_recently_on_scope(
body: RecentlyOnScopeHourlyGenerate,
session: AsyncSession = Depends(get_session),
) -> RecentlyOnScopeHourlyRun:
try:
window = resolve_window(body.range, body.since, body.until)
return await generate_hourly_reports(
session,
window,
active_only=body.active_only,
include_attention=body.include_attention,
)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
@router.post("/", response_model=RecentlyOnScopeGeneratedReport, status_code=status.HTTP_201_CREATED)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import uuid
from datetime import datetime
from pydantic import BaseModel, Field
@@ -33,3 +34,32 @@ class RecentlyOnScopeReportMetadata(BaseModel):
class RecentlyOnScopeGeneratedReport(RecentlyOnScopeReportMetadata):
markdown: str
class RecentlyOnScopeHourlyGenerate(RecentlyOnScopeGenerate):
active_only: bool = True
include_attention: bool = False
class RecentlyOnScopeSkippedDomain(BaseModel):
domain_slug: str
reason: str
source_counts: RecentlyOnScopeSourceCounts
class RecentlyOnScopeFailedDomain(BaseModel):
domain_slug: str
error: str
class RecentlyOnScopeHourlyRun(BaseModel):
range: str
since: datetime
until: datetime
generated_at: datetime
active_only: bool
include_attention: bool
generated: list[RecentlyOnScopeReportMetadata] = Field(default_factory=list)
skipped: list[RecentlyOnScopeSkippedDomain] = Field(default_factory=list)
failed: list[RecentlyOnScopeFailedDomain] = Field(default_factory=list)
progress_event_id: uuid.UUID | None = None

View File

@@ -22,8 +22,11 @@ from api.models.task import Task, TaskStatus
from api.models.topic import Topic
from api.models.workstream import Workstream
from api.schemas.recently_on_scope import (
RecentlyOnScopeFailedDomain,
RecentlyOnScopeHourlyRun,
RecentlyOnScopeReportMetadata,
RecentlyOnScopeSourceCounts,
RecentlyOnScopeSkippedDomain,
)
from api.services.markitect_templates import inspect_markdown_template, render_markdown_template
@@ -157,6 +160,68 @@ async def generate_report(
window: DigestWindow,
) -> tuple[RecentlyOnScopeReportMetadata, str]:
data = await collect_domain_activity(session, domain_slug, window)
return _render_report_from_data(data, window)
async def generate_hourly_reports(
session: AsyncSession,
window: DigestWindow,
*,
active_only: bool = True,
include_attention: bool = False,
) -> RecentlyOnScopeHourlyRun:
generated: list[RecentlyOnScopeReportMetadata] = []
skipped: list[RecentlyOnScopeSkippedDomain] = []
failed: list[RecentlyOnScopeFailedDomain] = []
domains = await _list_domains(session, active_only=active_only)
for domain in domains:
try:
data = await collect_domain_activity(session, domain.slug, window)
counts = RecentlyOnScopeSourceCounts(**data["source_counts"])
if not _has_qualifying_activity(counts, include_attention=include_attention):
skipped.append(
RecentlyOnScopeSkippedDomain(
domain_slug=domain.slug,
reason="no qualifying activity in window",
source_counts=counts,
)
)
continue
metadata, _markdown = _render_report_from_data(data, window)
generated.append(metadata)
except Exception as exc: # pragma: no cover - exercised via router tests
failed.append(RecentlyOnScopeFailedDomain(domain_slug=domain.slug, error=str(exc)))
generated_at = datetime.now(tz=UTC)
progress_event_id = await _log_hourly_progress(
session,
window,
generated_at=generated_at,
active_only=active_only,
include_attention=include_attention,
generated=generated,
skipped=skipped,
failed=failed,
)
return RecentlyOnScopeHourlyRun(
range=window.range,
since=window.since,
until=window.until,
generated_at=generated_at,
active_only=active_only,
include_attention=include_attention,
generated=generated,
skipped=skipped,
failed=failed,
progress_event_id=progress_event_id,
)
def _render_report_from_data(
data: dict[str, Any],
window: DigestWindow,
) -> tuple[RecentlyOnScopeReportMetadata, str]:
tmpl = template_path()
inspect_markdown_template(tmpl)
markdown = render_markdown_template(tmpl, data)
@@ -207,6 +272,65 @@ def metadata_from_report(path: Path) -> RecentlyOnScopeReportMetadata | None:
return None
async def _list_domains(session: AsyncSession, *, active_only: bool) -> list[Domain]:
stmt = select(Domain).order_by(Domain.slug)
if active_only:
stmt = stmt.where(Domain.status == "active")
result = await session.execute(stmt)
return list(result.scalars().all())
def _has_qualifying_activity(
counts: RecentlyOnScopeSourceCounts,
*,
include_attention: bool,
) -> bool:
if (
counts.progress_events
or counts.decisions
or counts.workstreams
or counts.tasks
):
return True
return include_attention and counts.attention_items > 0
async def _log_hourly_progress(
session: AsyncSession,
window: DigestWindow,
*,
generated_at: datetime,
active_only: bool,
include_attention: bool,
generated: list[RecentlyOnScopeReportMetadata],
skipped: list[RecentlyOnScopeSkippedDomain],
failed: list[RecentlyOnScopeFailedDomain],
) -> uuid.UUID:
event = ProgressEvent(
event_type="recently_on_scope_hourly",
summary=(
"RecentlyOnScope hourly batch completed: "
f"{len(generated)} generated, {len(skipped)} skipped, {len(failed)} failed"
),
detail={
"range": window.range,
"since": _iso(window.since),
"until": _iso(window.until),
"generated_at": _iso(generated_at),
"active_only": active_only,
"include_attention": include_attention,
"generated": [item.model_dump(mode="json") for item in generated],
"skipped": [item.model_dump(mode="json") for item in skipped],
"failed": [item.model_dump(mode="json") for item in failed],
},
author="state-hub",
)
session.add(event)
await session.commit()
await session.refresh(event)
return event.id
async def _get_domain(domain_slug: str, session: AsyncSession) -> Domain:
result = await session.execute(select(Domain).where(Domain.slug == domain_slug))
domain = result.scalar_one_or_none()

View File

@@ -171,3 +171,124 @@ class TestRecentlyOnScopeRoutes:
response = await client.get("/domains/digest/recently-on-scope/20260522T120000Z--1h")
assert response.status_code == 404
async def test_hourly_batch_generates_only_domains_with_activity(
self,
client,
tmp_path,
monkeypatch,
fake_markitect,
):
monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path))
await _create_domain(client)
topic = await _create_topic(client)
workstream = await _create_workstream(client, topic["id"])
task = await _create_task(client, workstream["id"], title="Batch source")
progress = await client.post(
"/progress/",
json={
"topic_id": topic["id"],
"workstream_id": workstream["id"],
"task_id": task["id"],
"event_type": "note",
"summary": "Batch source changed",
},
)
assert progress.status_code == 201, progress.text
await _create_domain(client, slug="quiet", name="Quiet Domain")
response = await client.post("/recently-on-scope/hourly", json={"range": "1d"})
assert response.status_code == 201, response.text
body = response.json()
assert [item["domain_slug"] for item in body["generated"]] == ["digest"]
assert [item["domain_slug"] for item in body["skipped"]] == ["quiet"]
assert body["failed"] == []
assert body["generated"][0]["source_counts"]["progress_events"] == 1
assert body["progress_event_id"] is not None
events = await client.get("/progress/", params={"event_type": "recently_on_scope_hourly"})
assert events.status_code == 200, events.text
assert events.json()[0]["detail"]["generated"][0]["domain_slug"] == "digest"
async def test_hourly_batch_reports_no_active_domains(
self,
client,
tmp_path,
monkeypatch,
fake_markitect,
):
monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path))
await _create_domain(client, slug="quiet", name="Quiet Domain")
response = await client.post("/recently-on-scope/hourly", json={"range": "1d"})
assert response.status_code == 201, response.text
body = response.json()
assert body["generated"] == []
assert [item["domain_slug"] for item in body["skipped"]] == ["quiet"]
assert body["failed"] == []
async def test_hourly_batch_is_idempotent_for_exact_window(
self,
client,
tmp_path,
monkeypatch,
fake_markitect,
):
monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path))
await _create_domain(client)
topic = await _create_topic(client)
workstream = await _create_workstream(client, topic["id"])
await _create_task(client, workstream["id"], title="Idempotent source")
payload = {
"range": "1h",
"since": "2000-01-01T00:00:00Z",
"until": "2100-01-01T00:00:00Z",
}
first = await client.post("/recently-on-scope/hourly", json=payload)
second = await client.post("/recently-on-scope/hourly", json=payload)
assert first.status_code == 201, first.text
assert second.status_code == 201, second.text
assert first.json()["generated"][0]["id"] == "20000101T000000Z--21000101T000000Z"
listed = await client.get("/domains/digest/recently-on-scope/")
assert listed.status_code == 200
assert [item["id"] for item in listed.json()] == ["20000101T000000Z--21000101T000000Z"]
async def test_hourly_batch_continues_after_domain_failure(
self,
client,
tmp_path,
monkeypatch,
fake_markitect,
):
monkeypatch.setattr(ros.settings, "state_hub_report_dir", str(tmp_path))
original_render = ros._render_report_from_data
def flaky_render(data, window):
if data["domain"]["slug"] == "broken":
raise RuntimeError("render failed")
return original_render(data, window)
monkeypatch.setattr(ros, "_render_report_from_data", flaky_render)
await _create_domain(client, slug="broken", name="Broken Domain")
broken_topic = await _create_topic(client, domain_slug="broken", slug="broken-topic")
broken_workstream = await _create_workstream(client, broken_topic["id"], slug="broken-ws")
await _create_task(client, broken_workstream["id"], title="Broken source")
await _create_domain(client, slug="good", name="Good Domain")
good_topic = await _create_topic(client, domain_slug="good", slug="good-topic")
good_workstream = await _create_workstream(client, good_topic["id"], slug="good-ws")
await _create_task(client, good_workstream["id"], title="Good source")
response = await client.post("/recently-on-scope/hourly", json={"range": "1d"})
assert response.status_code == 201, response.text
body = response.json()
assert [item["domain_slug"] for item in body["generated"]] == ["good"]
assert [item["domain_slug"] for item in body["failed"]] == ["broken"]
assert body["failed"][0]["error"] == "render failed"