feat: expand mailbox evidence scanner

This commit is contained in:
2026-06-02 02:07:50 +02:00
parent 8532583182
commit 226c045397
16 changed files with 670 additions and 33 deletions

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
import imaplib
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, Protocol
from .config import AppConfig
@dataclass(frozen=True)
class MailboxSourceMessage:
source_uid: str
raw_bytes: bytes
raw_message_ref: str
imap_uid: str | None = None
class MailboxMessageSource(Protocol):
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
...
class FixtureMailboxSource:
def __init__(self, fixture_dir: str | Path) -> None:
self.fixture_dir = Path(fixture_dir)
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
del include_seen, since
paths = sorted(self.fixture_dir.glob("*.eml"))
emitted = 0
for path in paths:
source_uid = path.name
if not full_rescan and since_uid and source_uid <= since_uid:
continue
yield MailboxSourceMessage(
source_uid=source_uid,
raw_bytes=path.read_bytes(),
raw_message_ref=str(path),
imap_uid=None,
)
emitted += 1
if max_messages and emitted >= max_messages:
break
class ImapMailboxSource:
def __init__(self, config: AppConfig) -> None:
self.config = config
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
mailbox = self.config.mailbox
if self.config.scan.mark_seen:
raise ValueError("IMAP mark_seen is intentionally unsupported; scans are read-only.")
if not mailbox.host:
raise ValueError("mailbox.host is required for IMAP scans.")
if not mailbox.username_env or not mailbox.password_env:
raise ValueError("mailbox.username_env and mailbox.password_env are required for IMAP scans.")
username = os.environ.get(mailbox.username_env)
password = os.environ.get(mailbox.password_env)
if not username or not password:
raise ValueError(f"IMAP credentials not found in {mailbox.username_env}/{mailbox.password_env}.")
conn: imaplib.IMAP4
if mailbox.tls:
conn = imaplib.IMAP4_SSL(mailbox.host, mailbox.port)
else:
conn = imaplib.IMAP4(mailbox.host, mailbox.port)
selected = False
try:
_expect_ok(conn.login(username, password), "login")
_expect_ok(conn.select(mailbox.folder, readonly=True), f"select {mailbox.folder}")
selected = True
criteria = _search_criteria(include_seen=include_seen, since=since)
_status, search_data = _expect_ok(conn.uid("search", None, *criteria), "uid search")
uids = _decode_uids(search_data)
if not full_rescan and since_uid:
uids = [uid for uid in uids if _uid_after(uid, since_uid)]
uids = sorted(uids, key=_uid_sort_key)
if max_messages:
uids = uids[:max_messages]
for uid in uids:
_fetch_status, fetch_data = _expect_ok(conn.uid("fetch", uid, "(BODY.PEEK[])"), f"uid fetch {uid}")
raw_bytes = _raw_message_from_fetch(fetch_data)
if raw_bytes is None:
continue
yield MailboxSourceMessage(
source_uid=uid,
raw_bytes=raw_bytes,
raw_message_ref=f"imap://{mailbox.host}/{mailbox.folder};UID={uid}",
imap_uid=uid,
)
finally:
if selected:
try:
conn.close()
except imaplib.IMAP4.error:
pass
try:
conn.logout()
except imaplib.IMAP4.error:
pass
def source_for_config(config: AppConfig, *, fixture_dir_override: str | None = None) -> MailboxMessageSource:
if fixture_dir_override:
return FixtureMailboxSource(fixture_dir_override)
if config.mailbox.protocol == "fixture":
fixture_dir = config.source.fixture_dir
if not fixture_dir:
raise ValueError("source.fixture_dir is required for fixture scans.")
return FixtureMailboxSource(fixture_dir)
if config.mailbox.protocol == "imap":
return ImapMailboxSource(config)
raise ValueError(f"Unsupported mailbox protocol: {config.mailbox.protocol}")
def _search_criteria(*, include_seen: bool, since: datetime | None) -> list[str]:
criteria = ["ALL" if include_seen else "UNSEEN"]
if since is not None:
criteria.extend(["SINCE", _imap_date(since)])
return criteria
def _imap_date(value: datetime) -> str:
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
return f"{value.day:02d}-{months[value.month - 1]}-{value.year:04d}"
def _expect_ok(result: tuple[str, list], operation: str) -> tuple[str, list]:
status, data = result
if status != "OK":
raise RuntimeError(f"IMAP {operation} failed with status {status}: {data!r}")
return status, data
def _decode_uids(search_data: list) -> list[str]:
if not search_data:
return []
raw = search_data[0] or b""
if isinstance(raw, str):
raw_text = raw
else:
raw_text = raw.decode("ascii", errors="ignore")
return [part for part in raw_text.split() if part]
def _uid_after(uid: str, since_uid: str) -> bool:
try:
return int(uid) > int(since_uid)
except ValueError:
return uid > since_uid
def _uid_sort_key(uid: str) -> tuple[int, int | str]:
try:
return (0, int(uid))
except ValueError:
return (1, uid)
def _raw_message_from_fetch(fetch_data: list) -> bytes | None:
for item in fetch_data:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], bytes):
return item[1]
return None