from __future__ import annotations import imaplib import os from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Iterable, Protocol from .config import AppConfig @dataclass(frozen=True) class MailboxSourceMessage: source_uid: str raw_bytes: bytes raw_message_ref: str imap_uid: str | None = None class MailboxMessageSource(Protocol): def iter_messages( self, *, max_messages: int, since_uid: str | None, full_rescan: bool, include_seen: bool, since: datetime | None, ) -> Iterable[MailboxSourceMessage]: ... class FixtureMailboxSource: def __init__(self, fixture_dir: str | Path) -> None: self.fixture_dir = Path(fixture_dir) def iter_messages( self, *, max_messages: int, since_uid: str | None, full_rescan: bool, include_seen: bool, since: datetime | None, ) -> Iterable[MailboxSourceMessage]: del include_seen, since paths = sorted(self.fixture_dir.glob("*.eml")) emitted = 0 for path in paths: source_uid = path.name if not full_rescan and since_uid and source_uid <= since_uid: continue yield MailboxSourceMessage( source_uid=source_uid, raw_bytes=path.read_bytes(), raw_message_ref=str(path), imap_uid=None, ) emitted += 1 if max_messages and emitted >= max_messages: break class ImapMailboxSource: def __init__(self, config: AppConfig) -> None: self.config = config def iter_messages( self, *, max_messages: int, since_uid: str | None, full_rescan: bool, include_seen: bool, since: datetime | None, ) -> Iterable[MailboxSourceMessage]: mailbox = self.config.mailbox if self.config.scan.mark_seen: raise ValueError("IMAP mark_seen is intentionally unsupported; scans are read-only.") if not mailbox.host: raise ValueError("mailbox.host is required for IMAP scans.") if not mailbox.username_env or not mailbox.password_env: raise ValueError("mailbox.username_env and mailbox.password_env are required for IMAP scans.") username = os.environ.get(mailbox.username_env) password = os.environ.get(mailbox.password_env) if not username or not password: raise ValueError(f"IMAP credentials not found in {mailbox.username_env}/{mailbox.password_env}.") conn: imaplib.IMAP4 if mailbox.tls: conn = imaplib.IMAP4_SSL(mailbox.host, mailbox.port) else: conn = imaplib.IMAP4(mailbox.host, mailbox.port) selected = False try: _expect_ok(conn.login(username, password), "login") _expect_ok(conn.select(mailbox.folder, readonly=True), f"select {mailbox.folder}") selected = True criteria = _search_criteria(include_seen=include_seen, since=since) _status, search_data = _expect_ok(conn.uid("search", None, *criteria), "uid search") uids = _decode_uids(search_data) if not full_rescan and since_uid: uids = [uid for uid in uids if _uid_after(uid, since_uid)] uids = sorted(uids, key=_uid_sort_key) if max_messages: uids = uids[:max_messages] for uid in uids: _fetch_status, fetch_data = _expect_ok(conn.uid("fetch", uid, "(BODY.PEEK[])"), f"uid fetch {uid}") raw_bytes = _raw_message_from_fetch(fetch_data) if raw_bytes is None: continue yield MailboxSourceMessage( source_uid=uid, raw_bytes=raw_bytes, raw_message_ref=f"imap://{mailbox.host}/{mailbox.folder};UID={uid}", imap_uid=uid, ) finally: if selected: try: conn.close() except imaplib.IMAP4.error: pass try: conn.logout() except imaplib.IMAP4.error: pass def source_for_config(config: AppConfig, *, fixture_dir_override: str | None = None) -> MailboxMessageSource: if fixture_dir_override: return FixtureMailboxSource(fixture_dir_override) if config.mailbox.protocol == "fixture": fixture_dir = config.source.fixture_dir if not fixture_dir: raise ValueError("source.fixture_dir is required for fixture scans.") return FixtureMailboxSource(fixture_dir) if config.mailbox.protocol == "imap": return ImapMailboxSource(config) raise ValueError(f"Unsupported mailbox protocol: {config.mailbox.protocol}") def _search_criteria(*, include_seen: bool, since: datetime | None) -> list[str]: criteria = ["ALL" if include_seen else "UNSEEN"] if since is not None: criteria.extend(["SINCE", _imap_date(since)]) return criteria def _imap_date(value: datetime) -> str: months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] return f"{value.day:02d}-{months[value.month - 1]}-{value.year:04d}" def _expect_ok(result: tuple[str, list], operation: str) -> tuple[str, list]: status, data = result if status != "OK": raise RuntimeError(f"IMAP {operation} failed with status {status}: {data!r}") return status, data def _decode_uids(search_data: list) -> list[str]: if not search_data: return [] raw = search_data[0] or b"" if isinstance(raw, str): raw_text = raw else: raw_text = raw.decode("ascii", errors="ignore") return [part for part in raw_text.split() if part] def _uid_after(uid: str, since_uid: str) -> bool: try: return int(uid) > int(since_uid) except ValueError: return uid > since_uid def _uid_sort_key(uid: str) -> tuple[int, int | str]: try: return (0, int(uid)) except ValueError: return (1, uid) def _raw_message_from_fetch(fetch_data: list) -> bytes | None: for item in fetch_data: if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], bytes): return item[1] return None