Files
state-hub/api/services/recently_on_scope.py

508 lines
17 KiB
Python

from __future__ import annotations
import os
import re
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
import yaml
from fastapi import HTTPException
from sqlalchemy import false, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from api.config import settings
from api.models.decision import Decision
from api.models.domain import Domain
from api.models.managed_repo import ManagedRepo
from api.models.progress_event import ProgressEvent
from api.models.task import Task, TaskStatus
from api.models.topic import Topic
from api.models.workstream import Workstream
from api.schemas.recently_on_scope import (
RecentlyOnScopeReportMetadata,
RecentlyOnScopeSourceCounts,
)
from api.services.markitect_templates import inspect_markdown_template, render_markdown_template
_DURATION_RE = re.compile(r"^(?P<count>[1-9][0-9]*)(?P<unit>[mhd])$")
_REPORT_ID_RE = re.compile(r"^[0-9]{8}T[0-9]{6}Z--(?:[0-9]+[mhd]|[0-9]{8}T[0-9]{6}Z)$")
_TEMPLATE_PATH = Path("templates/recently-on-scope/domain-digest.md")
_TEMPLATE_VERSION = "1"
@dataclass(frozen=True)
class DigestWindow:
range: str
since: datetime
until: datetime
exact: bool = False
def parse_duration(value: str) -> timedelta:
match = _DURATION_RE.fullmatch(value.strip())
if match is None:
raise ValueError("range must use a duration such as 15m, 1h, 6h, or 1d")
count = int(match.group("count"))
unit = match.group("unit")
if unit == "m":
return timedelta(minutes=count)
if unit == "h":
return timedelta(hours=count)
return timedelta(days=count)
def resolve_window(
range_value: str = "1h",
since: datetime | None = None,
until: datetime | None = None,
*,
now: datetime | None = None,
) -> DigestWindow:
duration = parse_duration(range_value)
end = _as_utc(until or now or datetime.now(tz=UTC))
start = _as_utc(since) if since is not None else end - duration
if start >= end:
raise ValueError("since must be earlier than until")
return DigestWindow(range=range_value, since=start, until=end, exact=since is not None or until is not None)
def report_root() -> Path:
root = Path(settings.state_hub_report_dir).expanduser()
if not root.is_absolute():
root = Path.cwd() / root
return root
def template_path() -> Path:
path = _TEMPLATE_PATH
if not path.is_absolute():
path = Path.cwd() / path
return path
def report_id_for(window: DigestWindow) -> str:
if window.exact:
return f"{_stamp(window.since)}--{_stamp(window.until)}"
return f"{_stamp(window.until)}--{window.range}"
def report_path_for(domain_slug: str, window: DigestWindow) -> Path:
return report_root() / domain_slug / f"{report_id_for(window)}.md"
async def collect_domain_activity(
session: AsyncSession,
domain_slug: str,
window: DigestWindow,
) -> dict[str, Any]:
domain = await _get_domain(domain_slug, session)
topics = await _list_topics(domain.id, session)
topic_ids = [topic.id for topic in topics]
workstreams = await _list_workstreams(topic_ids, session)
workstream_ids = [workstream.id for workstream in workstreams]
tasks = await _list_tasks(workstream_ids, session)
task_ids = [task.id for task in tasks]
decisions = await _list_recent_decisions(topic_ids, workstream_ids, window, session)
decision_ids = [decision.id for decision in decisions]
progress_events = await _list_recent_progress(topic_ids, workstream_ids, task_ids, decision_ids, window, session)
repos = await _list_repos(domain.id, session)
recent_workstreams = [
workstream for workstream in workstreams
if _in_window(workstream.created_at, window) or _in_window(workstream.updated_at, window)
]
recent_tasks = [
task for task in tasks
if _in_window(task.created_at, window) or _in_window(task.updated_at, window)
]
attention_tasks = [
task for task in tasks
if task.needs_human and _enum_value(task.status) not in {TaskStatus.done.value, TaskStatus.cancelled.value}
]
data = {
"domain": _domain_data(domain),
"window": {
"range": window.range,
"since": _iso(window.since),
"until": _iso(window.until),
},
"generated_at": _iso(datetime.now(tz=UTC)),
"template_version": _TEMPLATE_VERSION,
"source_counts": {
"progress_events": len(progress_events),
"decisions": len(decisions),
"workstreams": len(recent_workstreams),
"tasks": len(recent_tasks),
"repos": len(repos),
"attention_items": len(attention_tasks),
},
"progress_events": [_progress_data(event) for event in progress_events],
"decisions": [_decision_data(decision) for decision in decisions],
"workstreams": [_workstream_data(workstream) for workstream in recent_workstreams],
"tasks": [_task_data(task) for task in recent_tasks],
"repos": [_repo_data(repo) for repo in repos],
"attention_items": [_attention_task_data(task) for task in attention_tasks],
}
data |= _section_text(data)
return data
async def generate_report(
session: AsyncSession,
domain_slug: str,
window: DigestWindow,
) -> tuple[RecentlyOnScopeReportMetadata, str]:
data = await collect_domain_activity(session, domain_slug, window)
tmpl = template_path()
inspect_markdown_template(tmpl)
markdown = render_markdown_template(tmpl, data)
path = report_path_for(data["domain"]["slug"], window)
_write_report(path, markdown)
return _metadata_from_data(report_id_for(window), path, data, window), markdown
def list_reports(domain_slug: str) -> list[RecentlyOnScopeReportMetadata]:
directory = report_root() / domain_slug
if not directory.is_dir():
return []
reports = []
for path in sorted(directory.glob("*.md"), reverse=True):
metadata = metadata_from_report(path)
if metadata is not None:
reports.append(metadata)
return reports
def read_report(domain_slug: str, report_id: str) -> str:
if _REPORT_ID_RE.fullmatch(report_id) is None:
raise FileNotFoundError(report_id)
path = report_root() / domain_slug / f"{report_id}.md"
if not path.is_file():
raise FileNotFoundError(report_id)
return path.read_text(encoding="utf-8")
def metadata_from_report(path: Path) -> RecentlyOnScopeReportMetadata | None:
text = path.read_text(encoding="utf-8")
frontmatter = _frontmatter(text)
if not frontmatter:
return None
source_counts = frontmatter.get("source_counts") or {}
try:
return RecentlyOnScopeReportMetadata(
id=path.stem,
domain_slug=str(frontmatter["domain_slug"]),
range=str(frontmatter["range"]),
since=_as_utc(frontmatter["since"]),
until=_as_utc(frontmatter["until"]),
generated_at=_as_utc(frontmatter["generated_at"]),
path=str(path),
source_counts=RecentlyOnScopeSourceCounts(**source_counts),
)
except (KeyError, TypeError, ValueError):
return None
async def _get_domain(domain_slug: str, session: AsyncSession) -> Domain:
result = await session.execute(select(Domain).where(Domain.slug == domain_slug))
domain = result.scalar_one_or_none()
if domain is None:
raise HTTPException(status_code=404, detail=f"Domain '{domain_slug}' not found")
return domain
async def _list_topics(domain_id: uuid.UUID, session: AsyncSession) -> list[Topic]:
result = await session.execute(select(Topic).where(Topic.domain_id == domain_id).order_by(Topic.slug))
return list(result.scalars().all())
async def _list_workstreams(topic_ids: list[uuid.UUID], session: AsyncSession) -> list[Workstream]:
result = await session.execute(
select(Workstream)
.where(_in(Workstream.topic_id, topic_ids))
.order_by(Workstream.updated_at.desc(), Workstream.created_at.desc())
)
return list(result.scalars().all())
async def _list_tasks(workstream_ids: list[uuid.UUID], session: AsyncSession) -> list[Task]:
result = await session.execute(
select(Task)
.where(_in(Task.workstream_id, workstream_ids))
.order_by(Task.updated_at.desc(), Task.created_at.desc())
)
return list(result.scalars().all())
async def _list_recent_decisions(
topic_ids: list[uuid.UUID],
workstream_ids: list[uuid.UUID],
window: DigestWindow,
session: AsyncSession,
) -> list[Decision]:
result = await session.execute(
select(Decision)
.where(or_(_in(Decision.topic_id, topic_ids), _in(Decision.workstream_id, workstream_ids)))
.where(
or_(
_between(Decision.created_at, window),
_between(Decision.updated_at, window),
_between(Decision.decided_at, window),
)
)
.order_by(Decision.updated_at.desc(), Decision.created_at.desc())
)
return list(result.scalars().all())
async def _list_recent_progress(
topic_ids: list[uuid.UUID],
workstream_ids: list[uuid.UUID],
task_ids: list[uuid.UUID],
decision_ids: list[uuid.UUID],
window: DigestWindow,
session: AsyncSession,
) -> list[ProgressEvent]:
result = await session.execute(
select(ProgressEvent)
.where(_between(ProgressEvent.created_at, window))
.where(
or_(
_in(ProgressEvent.topic_id, topic_ids),
_in(ProgressEvent.workstream_id, workstream_ids),
_in(ProgressEvent.task_id, task_ids),
_in(ProgressEvent.decision_id, decision_ids),
)
)
.order_by(ProgressEvent.created_at.desc())
)
return list(result.scalars().all())
async def _list_repos(domain_id: uuid.UUID, session: AsyncSession) -> list[ManagedRepo]:
result = await session.execute(
select(ManagedRepo)
.where(ManagedRepo.domain_id == domain_id)
.where(ManagedRepo.status == "active")
.order_by(ManagedRepo.slug)
)
return list(result.scalars().all())
def _in(column, values: list[uuid.UUID]):
return column.in_(values) if values else false()
def _between(column, window: DigestWindow):
return column.is_not(None) & (column >= window.since) & (column <= window.until)
def _in_window(value: datetime | None, window: DigestWindow) -> bool:
if value is None:
return False
value = _as_utc(value)
return window.since <= value <= window.until
def _metadata_from_data(
report_id: str,
path: Path,
data: dict[str, Any],
window: DigestWindow,
) -> RecentlyOnScopeReportMetadata:
return RecentlyOnScopeReportMetadata(
id=report_id,
domain_slug=data["domain"]["slug"],
range=window.range,
since=window.since,
until=window.until,
generated_at=_as_utc(data["generated_at"]),
path=str(path),
source_counts=RecentlyOnScopeSourceCounts(**data["source_counts"]),
)
def _write_report(path: Path, markdown: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_name(f".{path.name}.tmp")
tmp.write_text(markdown, encoding="utf-8")
os.replace(tmp, path)
def _frontmatter(text: str) -> dict[str, Any] | None:
if not text.startswith("---\n"):
return None
end = text.find("\n---", 4)
if end == -1:
return None
data = yaml.safe_load(text[4:end].strip()) or {}
return data if isinstance(data, dict) else None
def _section_text(data: dict[str, Any]) -> dict[str, str]:
return {
"progress_section": _progress_section(data["progress_events"]),
"decisions_section": _decisions_section(data["decisions"]),
"workstreams_section": _workstreams_section(data["workstreams"]),
"tasks_section": _tasks_section(data["tasks"]),
"attention_section": _attention_section(data["attention_items"]),
"repos_section": _repos_section(data["repos"]),
}
def _progress_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No progress events in this window._"
return "\n".join(
f"- `{item['created_at']}` `{item['event_type']}` {item['summary']}"
+ (f" - {item['author']}" if item.get("author") else "")
for item in items
)
def _decisions_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No decisions changed in this window._"
return "\n".join(
f"- `{item['updated_at']}` **{item['title']}** - {item['status']}"
+ (f"; decided by {item['decided_by']}" if item.get("decided_by") else "")
for item in items
)
def _workstreams_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No workstreams changed in this window._"
return "\n".join(
f"- `{item['updated_at']}` **{item['title']}** (`{item['slug']}`) - {item['status']}"
for item in items
)
def _tasks_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No tasks changed in this window._"
return "\n".join(
f"- `{item['updated_at']}` **{item['title']}** - {item['status']} / {item['priority']}"
for item in items
)
def _attention_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No open human-intervention items for this domain._"
return "\n".join(
f"- **{item['title']}** - {item.get('intervention_note') or item.get('blocking_reason') or 'needs human attention'}"
for item in items
)
def _repos_section(items: list[dict[str, Any]]) -> str:
if not items:
return "_No active repositories are registered for this domain._"
return "\n".join(
f"- `{item['slug']}` - {item['name']}" + (f" ({item['remote_url']})" if item.get("remote_url") else "")
for item in items
)
def _domain_data(domain: Domain) -> dict[str, Any]:
return {
"id": str(domain.id),
"slug": domain.slug,
"name": domain.name,
"description": domain.description or "",
}
def _progress_data(event: ProgressEvent) -> dict[str, Any]:
return {
"id": str(event.id),
"created_at": _iso(event.created_at),
"event_type": event.event_type,
"summary": event.summary,
"author": event.author,
"workstream_id": str(event.workstream_id) if event.workstream_id else None,
"task_id": str(event.task_id) if event.task_id else None,
"decision_id": str(event.decision_id) if event.decision_id else None,
}
def _decision_data(decision: Decision) -> dict[str, Any]:
return {
"id": str(decision.id),
"title": decision.title,
"status": _enum_value(decision.status),
"decision_type": _enum_value(decision.decision_type),
"decided_by": decision.decided_by,
"decided_at": _iso(decision.decided_at) if decision.decided_at else "",
"created_at": _iso(decision.created_at),
"updated_at": _iso(decision.updated_at),
}
def _workstream_data(workstream: Workstream) -> dict[str, Any]:
return {
"id": str(workstream.id),
"slug": workstream.slug,
"title": workstream.title,
"status": workstream.status,
"owner": workstream.owner or "",
"created_at": _iso(workstream.created_at),
"updated_at": _iso(workstream.updated_at),
}
def _task_data(task: Task) -> dict[str, Any]:
return {
"id": str(task.id),
"workstream_id": str(task.workstream_id),
"title": task.title,
"status": _enum_value(task.status),
"priority": _enum_value(task.priority),
"assignee": task.assignee or "",
"blocking_reason": task.blocking_reason or "",
"created_at": _iso(task.created_at),
"updated_at": _iso(task.updated_at),
}
def _attention_task_data(task: Task) -> dict[str, Any]:
data = _task_data(task)
data["intervention_note"] = task.intervention_note or ""
return data
def _repo_data(repo: ManagedRepo) -> dict[str, Any]:
return {
"id": str(repo.id),
"slug": repo.slug,
"name": repo.name,
"remote_url": repo.remote_url or "",
"local_path": repo.local_path or "",
}
def _enum_value(value: Any) -> Any:
return getattr(value, "value", value)
def _as_utc(value: datetime | str) -> datetime:
if isinstance(value, str):
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
def _iso(value: datetime) -> str:
return _as_utc(value).isoformat().replace("+00:00", "Z")
def _stamp(value: datetime) -> str:
return _as_utc(value).strftime("%Y%m%dT%H%M%SZ")