diff --git a/README.md b/README.md index 73c050a..3a732b0 100644 --- a/README.md +++ b/README.md @@ -19,16 +19,25 @@ The example config uses `tests/fixtures/mailbox` as a mailbox source. Runtime state is written to `.email-connect/state.sqlite`; generated CSV reports are written to `reports/`. +For a live mailbox, set `mailbox.protocol: imap`, configure host, port, folder, +and credential environment variable names, then export the credentials before +running `scan-mailbox`. IMAP scans select the configured folder read-only and +fetch message bodies with `BODY.PEEK[]`; mailbox write-back actions such as +marking messages seen are intentionally unsupported in this MVP. + ## Current Scope - Coordination-engine spec review and references. - Initial adapter descriptor, capability profile, evidence ceiling, and limitations. +- Fixture and read-only IMAP mailbox sources. - Conservative mailbox message parser and evidence mapper. -- SQLite state store with message and evidence deduplication. -- CSV report generation. -- Golden fixture tests for hard bounce, soft bounce, out-of-office, and human +- SQLite state store with scan cursor, message/evidence deduplication, and + endpoint quality hints. +- CSV report generation, including `--report-only-new`. +- Golden fixture tests for hard bounce, soft bounce, delayed delivery, final + failure, complaint, unsubscribe, unknown return, out-of-office, and human reply signals. -IMAP access, provider webhooks, outbound sending, suppression workflows, and a -UI are planned work, not complete in this first slice. +Provider webhooks, outbound sending, suppression workflows, OAuth mailbox login, +and a UI remain outside this first mailbox-scanner slice. diff --git a/docs/email-evidence-canon.md b/docs/email-evidence-canon.md index fda8b98..bd784fc 100644 --- a/docs/email-evidence-canon.md +++ b/docs/email-evidence-canon.md @@ -46,3 +46,19 @@ coordination runtime decides whether those facts satisfy a coordination case. - Human reply does not prove legal acceptance. - Unknown return messages remain visible. - Scanner and proxy interactions must stay below identity-bound interaction. + +## Endpoint Quality Hints + +Endpoint quality rows are diagnostic state, not verdicts: + +| Evidence | Quality hint | +| --- | --- | +| Hard bounce or final failure | `reachability = unreachable`, `last_failure_at` | +| Soft bounce or delayed delivery | `reachability = degraded`, `last_failure_at` | +| Complaint | `suppression_state = suppressed` | +| Unsubscribe | `suppression_state = opted_out` | +| Human reply | `last_success_at` | +| Out-of-office | `reachability = uncertain`, `last_auto_reply_at` | + +These hints can guide future suppression and review workflows, but they do not +prove human awareness, authority, payload access, or coordination success. diff --git a/docs/initial-runtime-architecture.md b/docs/initial-runtime-architecture.md index a7fccb9..b42d765 100644 --- a/docs/initial-runtime-architecture.md +++ b/docs/initial-runtime-architecture.md @@ -15,11 +15,13 @@ email-connect scan-mailbox --config config/mailbox.example.yml --out reports/ ``` It scans an inbound return mailbox source, classifies messages, stores scan -state in SQLite, and writes timestamped CSV evidence reports. +state in SQLite, updates endpoint-quality hints, and writes timestamped CSV +evidence reports. -The initial source implementation supports fixture directories. The IMAP -connector remains the next mailbox boundary to complete under -`EMAIL-WP-0002-T02`. +The source layer supports deterministic fixture directories and a read-only IMAP +connector. IMAP scans select the configured folder with `readonly=True`, fetch +messages using `BODY.PEEK[]`, and reject `mark_seen` because mailbox write-back +actions are out of scope for this MVP. ## Package Layout @@ -29,6 +31,7 @@ src/email_connect/ cli.py # command line entry points config.py # config loading evidence.py # native class to normalized evidence mapping + mailbox.py # fixture and IMAP mailbox sources models.py # mailbox, parse, evidence, endpoint quality dataclasses parser.py # MIME/header parsing and conservative classification reporting.py # CSV report generation @@ -44,12 +47,19 @@ SQLite is the MVP store. The initial schema includes: - `mailbox_messages` - `parsed_messages` - `evidence_candidates` +- `scan_cursors` +- `endpoint_quality` Message deduplication is keyed by mailbox ID, IMAP UID when present, message ID, received timestamp, sender, subject hash, and body hash. Evidence deduplication follows the workplan fields: message, parser version, normalized event, affected recipient, original message, SMTP/enhanced status, and reason. +Incremental scans use `scan_cursors` by mailbox and folder. Full rescans ignore +the cursor while preserving message and evidence deduplication. Endpoint-quality +rows are diagnostic hints derived from explicit evidence events; they are not +coordination outcomes. + ## Evidence Mapping Parser output is represented as `ParsedMailboxMessage`. The mapper converts it @@ -60,6 +70,9 @@ Examples: - `hard_bounce` -> `notification.endpoint.rejected_permanent` - `soft_bounce` -> `notification.endpoint.rejected_temporary` +- `delayed_delivery_notice` -> `notification.endpoint.deferred` +- `complaint_or_abuse` -> `notification.channel.complaint_received` +- `unsubscribe_or_opt_out` -> `notification.channel.unsubscribe_received` - `out_of_office` -> `interaction.out_of_office_received` - `human_reply` -> `interaction.reply_received` @@ -95,4 +108,5 @@ normalization layer. PYTHONPATH=src python3 -m unittest discover -s tests PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --out reports/ +PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --report-only-new --out reports/ ``` diff --git a/src/email_connect/cli.py b/src/email_connect/cli.py index bc8c9ac..42b92be 100644 --- a/src/email_connect/cli.py +++ b/src/email_connect/cli.py @@ -38,6 +38,7 @@ def main(argv: list[str] | None = None) -> int: report_only_new=args.report_only_new, dry_run=args.dry_run, fixture_dir=args.fixture_dir, + since=args.since, ) print(f"scan_id={result.scan.scan_id}") print(f"messages_seen={result.scan.messages_seen}") diff --git a/src/email_connect/evidence.py b/src/email_connect/evidence.py index 3dbf2bd..fa17ee7 100644 --- a/src/email_connect/evidence.py +++ b/src/email_connect/evidence.py @@ -8,6 +8,7 @@ from .models import ( AssessmentCategory, Confidence, EmailEvidenceCandidate, + EndpointQualityUpdate, EvidenceStrength, MessageClass, ParsedMailboxMessage, @@ -149,3 +150,68 @@ def candidate_from_parsed( "reason_code": parsed.reason_code, }, ) + + +def endpoint_quality_from_candidate(candidate: EmailEvidenceCandidate) -> EndpointQualityUpdate | None: + address = candidate.affected_email_address + if not address: + return None + + message_class = candidate.metadata.get("message_class") + signal = str(message_class or candidate.event_type) + reason_code = candidate.metadata.get("reason_code") + observed_at = candidate.observed_at + + if message_class in {MessageClass.HARD_BOUNCE.value, MessageClass.FINAL_DELIVERY_FAILURE.value}: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + reachability="unreachable", + last_failure_at=observed_at, + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + if message_class in {MessageClass.SOFT_BOUNCE.value, MessageClass.DELAYED_DELIVERY_NOTICE.value}: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + reachability="degraded", + last_failure_at=observed_at, + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + if message_class == MessageClass.COMPLAINT_OR_ABUSE.value: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + suppression_state="suppressed", + last_failure_at=observed_at, + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + if message_class == MessageClass.UNSUBSCRIBE_OR_OPT_OUT.value: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + suppression_state="opted_out", + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + if message_class == MessageClass.HUMAN_REPLY.value: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + last_success_at=observed_at, + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + if message_class == MessageClass.OUT_OF_OFFICE.value: + return EndpointQualityUpdate( + affected_email_address=address.lower(), + reachability="uncertain", + last_auto_reply_at=observed_at, + reason_code=reason_code, + confidence=candidate.confidence, + quality_signals=[signal, candidate.event_type], + ) + return None diff --git a/src/email_connect/mailbox.py b/src/email_connect/mailbox.py new file mode 100644 index 0000000..47f4bb1 --- /dev/null +++ b/src/email_connect/mailbox.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +import imaplib +import os +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Iterable, Protocol + +from .config import AppConfig + + +@dataclass(frozen=True) +class MailboxSourceMessage: + source_uid: str + raw_bytes: bytes + raw_message_ref: str + imap_uid: str | None = None + + +class MailboxMessageSource(Protocol): + def iter_messages( + self, + *, + max_messages: int, + since_uid: str | None, + full_rescan: bool, + include_seen: bool, + since: datetime | None, + ) -> Iterable[MailboxSourceMessage]: + ... + + +class FixtureMailboxSource: + def __init__(self, fixture_dir: str | Path) -> None: + self.fixture_dir = Path(fixture_dir) + + def iter_messages( + self, + *, + max_messages: int, + since_uid: str | None, + full_rescan: bool, + include_seen: bool, + since: datetime | None, + ) -> Iterable[MailboxSourceMessage]: + del include_seen, since + paths = sorted(self.fixture_dir.glob("*.eml")) + emitted = 0 + for path in paths: + source_uid = path.name + if not full_rescan and since_uid and source_uid <= since_uid: + continue + yield MailboxSourceMessage( + source_uid=source_uid, + raw_bytes=path.read_bytes(), + raw_message_ref=str(path), + imap_uid=None, + ) + emitted += 1 + if max_messages and emitted >= max_messages: + break + + +class ImapMailboxSource: + def __init__(self, config: AppConfig) -> None: + self.config = config + + def iter_messages( + self, + *, + max_messages: int, + since_uid: str | None, + full_rescan: bool, + include_seen: bool, + since: datetime | None, + ) -> Iterable[MailboxSourceMessage]: + mailbox = self.config.mailbox + if self.config.scan.mark_seen: + raise ValueError("IMAP mark_seen is intentionally unsupported; scans are read-only.") + if not mailbox.host: + raise ValueError("mailbox.host is required for IMAP scans.") + if not mailbox.username_env or not mailbox.password_env: + raise ValueError("mailbox.username_env and mailbox.password_env are required for IMAP scans.") + + username = os.environ.get(mailbox.username_env) + password = os.environ.get(mailbox.password_env) + if not username or not password: + raise ValueError(f"IMAP credentials not found in {mailbox.username_env}/{mailbox.password_env}.") + + conn: imaplib.IMAP4 + if mailbox.tls: + conn = imaplib.IMAP4_SSL(mailbox.host, mailbox.port) + else: + conn = imaplib.IMAP4(mailbox.host, mailbox.port) + + selected = False + try: + _expect_ok(conn.login(username, password), "login") + _expect_ok(conn.select(mailbox.folder, readonly=True), f"select {mailbox.folder}") + selected = True + + criteria = _search_criteria(include_seen=include_seen, since=since) + _status, search_data = _expect_ok(conn.uid("search", None, *criteria), "uid search") + uids = _decode_uids(search_data) + if not full_rescan and since_uid: + uids = [uid for uid in uids if _uid_after(uid, since_uid)] + uids = sorted(uids, key=_uid_sort_key) + if max_messages: + uids = uids[:max_messages] + + for uid in uids: + _fetch_status, fetch_data = _expect_ok(conn.uid("fetch", uid, "(BODY.PEEK[])"), f"uid fetch {uid}") + raw_bytes = _raw_message_from_fetch(fetch_data) + if raw_bytes is None: + continue + yield MailboxSourceMessage( + source_uid=uid, + raw_bytes=raw_bytes, + raw_message_ref=f"imap://{mailbox.host}/{mailbox.folder};UID={uid}", + imap_uid=uid, + ) + finally: + if selected: + try: + conn.close() + except imaplib.IMAP4.error: + pass + try: + conn.logout() + except imaplib.IMAP4.error: + pass + + +def source_for_config(config: AppConfig, *, fixture_dir_override: str | None = None) -> MailboxMessageSource: + if fixture_dir_override: + return FixtureMailboxSource(fixture_dir_override) + if config.mailbox.protocol == "fixture": + fixture_dir = config.source.fixture_dir + if not fixture_dir: + raise ValueError("source.fixture_dir is required for fixture scans.") + return FixtureMailboxSource(fixture_dir) + if config.mailbox.protocol == "imap": + return ImapMailboxSource(config) + raise ValueError(f"Unsupported mailbox protocol: {config.mailbox.protocol}") + + +def _search_criteria(*, include_seen: bool, since: datetime | None) -> list[str]: + criteria = ["ALL" if include_seen else "UNSEEN"] + if since is not None: + criteria.extend(["SINCE", _imap_date(since)]) + return criteria + + +def _imap_date(value: datetime) -> str: + months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] + return f"{value.day:02d}-{months[value.month - 1]}-{value.year:04d}" + + +def _expect_ok(result: tuple[str, list], operation: str) -> tuple[str, list]: + status, data = result + if status != "OK": + raise RuntimeError(f"IMAP {operation} failed with status {status}: {data!r}") + return status, data + + +def _decode_uids(search_data: list) -> list[str]: + if not search_data: + return [] + raw = search_data[0] or b"" + if isinstance(raw, str): + raw_text = raw + else: + raw_text = raw.decode("ascii", errors="ignore") + return [part for part in raw_text.split() if part] + + +def _uid_after(uid: str, since_uid: str) -> bool: + try: + return int(uid) > int(since_uid) + except ValueError: + return uid > since_uid + + +def _uid_sort_key(uid: str) -> tuple[int, int | str]: + try: + return (0, int(uid)) + except ValueError: + return (1, uid) + + +def _raw_message_from_fetch(fetch_data: list) -> bytes | None: + for item in fetch_data: + if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], bytes): + return item[1] + return None diff --git a/src/email_connect/scanner.py b/src/email_connect/scanner.py index 1fde93a..9f697fd 100644 --- a/src/email_connect/scanner.py +++ b/src/email_connect/scanner.py @@ -2,12 +2,13 @@ from __future__ import annotations from dataclasses import dataclass from datetime import UTC, datetime -from pathlib import Path from uuid import uuid4 from .config import AppConfig +from .evidence import endpoint_quality_from_candidate +from .mailbox import source_for_config from .models import MailboxScan -from .parser import parse_message_file +from .parser import parse_message_bytes from .reporting import write_evidence_report from .storage import StateStore @@ -26,26 +27,39 @@ def scan_mailbox( report_only_new: bool = False, dry_run: bool = False, fixture_dir: str | None = None, + since: str | None = None, ) -> ScanResult: started_at = datetime.now(UTC) scan_id = str(uuid4()) - source_dir = fixture_dir or config.source.fixture_dir - if not source_dir: - raise ValueError("No source.fixture_dir configured yet. IMAP connector is planned in EMAIL-WP-0002-T02.") - - files = sorted(Path(source_dir).glob("*.eml")) - if config.scan.max_messages_per_run: - files = files[: config.scan.max_messages_per_run] + since_at = _parse_since(since or config.scan.since) + source = source_for_config(config, fixture_dir_override=fixture_dir) store = StateStore(config.storage.path) messages_seen = 0 messages_new = 0 messages_parsed = 0 evidence_created = 0 + new_evidence_keys: list[str] = [] + last_source_uid: str | None = None try: - for path in files: + cursor = None if full_rescan else store.get_scan_cursor(config.mailbox.id, config.mailbox.folder) + for message in source.iter_messages( + max_messages=config.scan.max_messages_per_run, + since_uid=cursor, + full_rescan=full_rescan, + include_seen=config.scan.include_seen, + since=since_at, + ): messages_seen += 1 - inbound, parsed, candidate = parse_message_file(path, mailbox_id=config.mailbox.id) + last_source_uid = message.source_uid + inbound, parsed, candidate = parse_message_bytes( + message.raw_bytes, + mailbox_id=config.mailbox.id, + raw_message_ref=message.raw_message_ref, + imap_uid=message.imap_uid, + ) + if since_at and inbound.received_at and inbound.received_at < since_at: + continue if dry_run: messages_parsed += 1 continue @@ -55,11 +69,31 @@ def scan_mailbox( messages_parsed += 1 if candidate is not None: candidate = _enrich_candidate(candidate, inbound, parsed) - evidence_created += int(store.insert_evidence(candidate)) + inserted = store.insert_evidence(candidate) + evidence_created += int(inserted) + if inserted: + new_evidence_keys.append(candidate.deduplication_key) + quality_update = endpoint_quality_from_candidate(candidate) + if quality_update is not None: + store.apply_endpoint_quality_update( + config.mailbox.id, + quality_update, + observed_at=candidate.observed_at, + ) + + if not dry_run and last_source_uid: + store.set_scan_cursor( + config.mailbox.id, + config.mailbox.folder, + last_source_uid, + updated_at=datetime.now(UTC), + ) report_path = None if not dry_run: - report_rows = store.evidence_rows() + report_rows = store.evidence_rows( + deduplication_keys=new_evidence_keys if report_only_new else None, + ) report_path = write_evidence_report( report_rows, output_dir=output_dir or config.reports.output_dir, @@ -80,6 +114,7 @@ def scan_mailbox( messages_parsed=messages_parsed, evidence_events_created=evidence_created, report_path=str(report_path) if report_path else None, + since=since_at, ) if not dry_run: store.insert_scan(scan) @@ -105,3 +140,17 @@ def _enrich_candidate(candidate, inbound, parsed): "metadata": metadata, } ) + + +def _parse_since(value: str | None) -> datetime | None: + if not value: + return None + normalized = value.strip() + if not normalized: + return None + if len(normalized) == 10: + normalized = normalized + "T00:00:00+00:00" + parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00")) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) diff --git a/src/email_connect/storage.py b/src/email_connect/storage.py index 7709c1f..b2577e1 100644 --- a/src/email_connect/storage.py +++ b/src/email_connect/storage.py @@ -5,7 +5,14 @@ import sqlite3 from datetime import UTC, datetime from pathlib import Path -from .models import EmailEvidenceCandidate, InboundMailboxMessage, MailboxScan, ParsedMailboxMessage +from .models import ( + Confidence, + EmailEvidenceCandidate, + EndpointQualityUpdate, + InboundMailboxMessage, + MailboxScan, + ParsedMailboxMessage, +) class StateStore: @@ -88,6 +95,30 @@ class StateStore: notes_json text not null, metadata_json text not null ); + + create table if not exists scan_cursors ( + mailbox_id text not null, + folder text not null, + last_uid text not null, + updated_at text not null, + primary key (mailbox_id, folder) + ); + + create table if not exists endpoint_quality ( + endpoint_key text primary key, + mailbox_id text not null, + affected_email_address text not null, + reachability text not null, + suppression_state text not null, + last_success_at text, + last_failure_at text, + last_auto_reply_at text, + reason_code text, + confidence text not null, + quality_signals_json text not null, + updated_at text not null, + unique (mailbox_id, affected_email_address) + ); """ ) self.conn.commit() @@ -203,10 +234,148 @@ class StateStore: ) self.conn.commit() - def evidence_rows(self) -> list[dict]: + def get_scan_cursor(self, mailbox_id: str, folder: str) -> str | None: + row = self.conn.execute( + "select last_uid from scan_cursors where mailbox_id = ? and folder = ?", + (mailbox_id, folder), + ).fetchone() + return str(row["last_uid"]) if row else None + + def set_scan_cursor(self, mailbox_id: str, folder: str, last_uid: str, *, updated_at: datetime) -> None: + self.conn.execute( + """ + insert into scan_cursors values (?, ?, ?, ?) + on conflict(mailbox_id, folder) do update set + last_uid = excluded.last_uid, + updated_at = excluded.updated_at + """, + (mailbox_id, folder, last_uid, _dt(updated_at)), + ) + self.conn.commit() + + def apply_endpoint_quality_update( + self, + mailbox_id: str, + update: EndpointQualityUpdate, + *, + observed_at: datetime, + ) -> None: + address = update.affected_email_address.lower() + endpoint_key = f"{mailbox_id}:{address}" + existing = self.conn.execute( + "select * from endpoint_quality where endpoint_key = ?", + (endpoint_key,), + ).fetchone() + merged = _merge_endpoint_quality(existing, update, observed_at=observed_at) + self.conn.execute( + """ + insert into endpoint_quality values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + on conflict(endpoint_key) do update set + reachability = excluded.reachability, + suppression_state = excluded.suppression_state, + last_success_at = excluded.last_success_at, + last_failure_at = excluded.last_failure_at, + last_auto_reply_at = excluded.last_auto_reply_at, + reason_code = excluded.reason_code, + confidence = excluded.confidence, + quality_signals_json = excluded.quality_signals_json, + updated_at = excluded.updated_at + """, + ( + endpoint_key, + mailbox_id, + address, + merged["reachability"], + merged["suppression_state"], + _dt(merged["last_success_at"]), + _dt(merged["last_failure_at"]), + _dt(merged["last_auto_reply_at"]), + merged["reason_code"], + merged["confidence"], + json.dumps(merged["quality_signals"]), + _dt(observed_at), + ), + ) + self.conn.commit() + + def endpoint_quality_rows(self) -> list[dict]: + rows = self.conn.execute( + "select * from endpoint_quality order by affected_email_address" + ).fetchall() + return [dict(row) for row in rows] + + def evidence_rows(self, *, deduplication_keys: list[str] | None = None) -> list[dict]: + if deduplication_keys is not None: + if not deduplication_keys: + return [] + placeholders = ", ".join("?" for _ in deduplication_keys) + rows = self.conn.execute( + f""" + select * from evidence_candidates + where deduplication_key in ({placeholders}) + order by observed_at, event_type + """, + deduplication_keys, + ).fetchall() + return [dict(row) for row in rows] rows = self.conn.execute("select * from evidence_candidates order by observed_at, event_type").fetchall() return [dict(row) for row in rows] def _dt(value: datetime | None) -> str | None: return value.isoformat() if value else None + + +def _parse_dt(value: str | None) -> datetime | None: + return datetime.fromisoformat(value) if value else None + + +def _merge_endpoint_quality( + existing, + update: EndpointQualityUpdate, + *, + observed_at: datetime, +) -> dict: + existing_signals = json.loads(existing["quality_signals_json"]) if existing else [] + update_signals = [signal for signal in update.quality_signals if signal] + signals = list(dict.fromkeys([*existing_signals, *update_signals])) + return { + "reachability": _merge_state(existing, "reachability", update.reachability, default="unknown"), + "suppression_state": _merge_state( + existing, + "suppression_state", + update.suppression_state, + default="unknown", + ), + "last_success_at": _latest_dt(_parse_dt(existing["last_success_at"]) if existing else None, update.last_success_at), + "last_failure_at": _latest_dt(_parse_dt(existing["last_failure_at"]) if existing else None, update.last_failure_at), + "last_auto_reply_at": _latest_dt( + _parse_dt(existing["last_auto_reply_at"]) if existing else None, + update.last_auto_reply_at, + ), + "reason_code": update.reason_code or (existing["reason_code"] if existing else None), + "confidence": _max_confidence(existing["confidence"] if existing else Confidence.LOW.value, update.confidence.value), + "quality_signals": signals, + "updated_at": observed_at, + } + + +def _merge_state(existing, field: str, new_value: str, *, default: str) -> str: + if new_value != "unknown": + return new_value + if existing: + return str(existing[field]) + return default + + +def _latest_dt(left: datetime | None, right: datetime | None) -> datetime | None: + if left is None: + return right + if right is None: + return left + return max(left, right) + + +def _max_confidence(left: str, right: str) -> str: + order = {Confidence.LOW.value: 0, Confidence.MEDIUM.value: 1, Confidence.HIGH.value: 2} + return left if order.get(left, 0) >= order.get(right, 0) else right diff --git a/tests/fixtures/mailbox/complaint.eml b/tests/fixtures/mailbox/complaint.eml new file mode 100644 index 0000000..bad11a9 --- /dev/null +++ b/tests/fixtures/mailbox/complaint.eml @@ -0,0 +1,10 @@ +From: Feedback Loop +To: abuse@example.com +Subject: Spam complaint notification +Date: Tue, 02 Jun 2026 10:04:00 +0000 +Message-ID: +Content-Type: text/plain; charset=utf-8 + +Feedback loop abuse report. +Final-Recipient: rfc822; complained@example.com +This is a spam complaint notification for the original message. diff --git a/tests/fixtures/mailbox/delayed_delivery.eml b/tests/fixtures/mailbox/delayed_delivery.eml new file mode 100644 index 0000000..38b8c57 --- /dev/null +++ b/tests/fixtures/mailbox/delayed_delivery.eml @@ -0,0 +1,10 @@ +From: Mail Delivery Subsystem +To: sender@example.com +Subject: Delivery delayed +Date: Tue, 02 Jun 2026 10:05:00 +0000 +Message-ID: +Content-Type: text/plain; charset=utf-8 + +Delivery delayed. We will keep trying to deliver your message. +Final-Recipient: rfc822; waiting@example.com +Status: 4.4.1 diff --git a/tests/fixtures/mailbox/final_failure.eml b/tests/fixtures/mailbox/final_failure.eml new file mode 100644 index 0000000..06ca146 --- /dev/null +++ b/tests/fixtures/mailbox/final_failure.eml @@ -0,0 +1,12 @@ +From: Mail Delivery Subsystem +To: sender@example.com +Subject: Final failure +Date: Tue, 02 Jun 2026 10:06:00 +0000 +Message-ID: +Content-Type: text/plain; charset=utf-8 + +Delivery Status Notification. +Final-Recipient: rfc822; expired@example.com +Action: failed +Status: 5.4.7 +Diagnostic-Code: smtp; Could not deliver after retry period. Final failure, giving up. diff --git a/tests/fixtures/mailbox/unknown_return.eml b/tests/fixtures/mailbox/unknown_return.eml new file mode 100644 index 0000000..980e85c --- /dev/null +++ b/tests/fixtures/mailbox/unknown_return.eml @@ -0,0 +1,10 @@ +From: Mail System +To: sender@example.com +Subject: Return mailbox notice +Date: Tue, 02 Jun 2026 10:07:00 +0000 +Message-ID: +Content-Type: text/plain; charset=utf-8 + +This message references delivery and recipient handling, but it does not include +a reliable SMTP status or delivery-status notification. +Recipient reference: mystery@example.com diff --git a/tests/fixtures/mailbox/unsubscribe.eml b/tests/fixtures/mailbox/unsubscribe.eml new file mode 100644 index 0000000..e022f1c --- /dev/null +++ b/tests/fixtures/mailbox/unsubscribe.eml @@ -0,0 +1,8 @@ +From: Recipient +To: sender@example.com +Subject: Please unsubscribe me +Date: Tue, 02 Jun 2026 10:08:00 +0000 +Message-ID: +Content-Type: text/plain; charset=utf-8 + +Please unsubscribe optout@example.com from future messages. Remove me from this list. diff --git a/tests/test_parser.py b/tests/test_parser.py index fb11232..a4ecb26 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -38,6 +38,40 @@ class ParserTests(unittest.TestCase): self.assertEqual(candidate.event_type, "interaction.reply_received") self.assertEqual(candidate.assessment_subclass, "success.reply_received") + def test_delayed_delivery_notice_stays_deferred(self) -> None: + _inbound, parsed, candidate = parse_message_file(FIXTURES / "delayed_delivery.eml", mailbox_id="test") + self.assertEqual(parsed.message_class, MessageClass.DELAYED_DELIVERY_NOTICE) + self.assertIsNotNone(candidate) + self.assertEqual(candidate.event_type, "notification.endpoint.deferred") + self.assertEqual(candidate.assessment_subclass, "undef.deferred") + + def test_final_failure_maps_to_expired_without_delivery(self) -> None: + _inbound, parsed, candidate = parse_message_file(FIXTURES / "final_failure.eml", mailbox_id="test") + self.assertEqual(parsed.message_class, MessageClass.FINAL_DELIVERY_FAILURE) + self.assertIsNotNone(candidate) + self.assertEqual(candidate.event_type, "notification.endpoint.rejected_permanent") + self.assertEqual(candidate.assessment_subclass, "fail.expired_without_delivery") + + def test_complaint_maps_to_channel_failure(self) -> None: + _inbound, parsed, candidate = parse_message_file(FIXTURES / "complaint.eml", mailbox_id="test") + self.assertEqual(parsed.message_class, MessageClass.COMPLAINT_OR_ABUSE) + self.assertIsNotNone(candidate) + self.assertEqual(candidate.event_type, "notification.channel.complaint_received") + self.assertEqual(candidate.assessment_subclass, "fail.complaint_received") + + def test_unsubscribe_maps_to_opt_out(self) -> None: + _inbound, parsed, candidate = parse_message_file(FIXTURES / "unsubscribe.eml", mailbox_id="test") + self.assertEqual(parsed.message_class, MessageClass.UNSUBSCRIBE_OR_OPT_OUT) + self.assertIsNotNone(candidate) + self.assertEqual(candidate.event_type, "notification.channel.unsubscribe_received") + self.assertEqual(candidate.assessment_subclass, "fail.unsubscribed") + + def test_unknown_return_message_is_preserved(self) -> None: + _inbound, parsed, candidate = parse_message_file(FIXTURES / "unknown_return.eml", mailbox_id="test") + self.assertEqual(parsed.message_class, MessageClass.UNKNOWN_RETURN_MESSAGE) + self.assertIsNotNone(candidate) + self.assertEqual(candidate.event_type, "notification.endpoint.unknown") + if __name__ == "__main__": unittest.main() diff --git a/tests/test_scanner.py b/tests/test_scanner.py index 3b2894b..3753bd9 100644 --- a/tests/test_scanner.py +++ b/tests/test_scanner.py @@ -2,10 +2,12 @@ from __future__ import annotations import tempfile import unittest +from csv import DictReader from pathlib import Path from email_connect.config import AppConfig, MailboxConfig, ReportsConfig, ScanConfig, SourceConfig, StorageConfig from email_connect.scanner import scan_mailbox +from email_connect.storage import StateStore FIXTURES = Path(__file__).parent / "fixtures" / "mailbox" @@ -24,13 +26,44 @@ class ScannerTests(unittest.TestCase): ) first = scan_mailbox(config) second = scan_mailbox(config) + full = scan_mailbox(config, full_rescan=True, report_only_new=True) - self.assertEqual(first.scan.messages_seen, 4) - self.assertEqual(first.scan.messages_new, 4) - self.assertGreaterEqual(first.scan.evidence_events_created, 4) + self.assertEqual(first.scan.messages_seen, 9) + self.assertEqual(first.scan.messages_new, 9) + self.assertGreaterEqual(first.scan.evidence_events_created, 9) + self.assertEqual(second.scan.messages_seen, 0) self.assertEqual(second.scan.messages_new, 0) self.assertEqual(second.scan.evidence_events_created, 0) + self.assertEqual(full.scan.messages_seen, 9) + self.assertEqual(full.scan.messages_new, 0) + self.assertEqual(full.scan.evidence_events_created, 0) self.assertTrue(first.report_path and first.report_path.exists()) + self.assertTrue(full.report_path and full.report_path.exists()) + with full.report_path.open(newline="", encoding="utf-8") as fh: + self.assertEqual(list(DictReader(fh)), []) + + def test_scan_updates_endpoint_quality(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + config = AppConfig( + mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), + scan=ScanConfig(), + storage=StorageConfig(path=str(root / "state.sqlite")), + reports=ReportsConfig(output_dir=str(root / "reports")), + source=SourceConfig(fixture_dir=str(FIXTURES)), + ) + scan_mailbox(config) + + store = StateStore(config.storage.path) + try: + rows = {row["affected_email_address"]: row for row in store.endpoint_quality_rows()} + finally: + store.close() + + self.assertEqual(rows["missing@example.com"]["reachability"], "unreachable") + self.assertEqual(rows["full@example.com"]["reachability"], "degraded") + self.assertEqual(rows["complained@example.com"]["suppression_state"], "suppressed") + self.assertEqual(rows["optout@example.com"]["suppression_state"], "opted_out") if __name__ == "__main__": diff --git a/workplans/EMAIL-WP-0002-mvp-mailbox-evidence-scanner.md b/workplans/EMAIL-WP-0002-mvp-mailbox-evidence-scanner.md index a9c8fd4..246473f 100644 --- a/workplans/EMAIL-WP-0002-mvp-mailbox-evidence-scanner.md +++ b/workplans/EMAIL-WP-0002-mvp-mailbox-evidence-scanner.md @@ -716,7 +716,7 @@ Config file is loaded and validated. ```task id: EMAIL-WP-0002-T02 -status: progress +status: done priority: high state_hub_task_id: "25a4da12-1bcd-4c6d-a0eb-a2f525b9c4b9" ``` @@ -744,7 +744,7 @@ CLI can connect to mailbox and list/fetch messages without modifying mailbox. ```task id: EMAIL-WP-0002-T03 -status: progress +status: done priority: high state_hub_task_id: "16b95a6b-1375-4c91-8b78-0b75d51e0aeb" ``` @@ -856,7 +856,7 @@ Representative OOO and human reply samples are classified with confidence. ```task id: EMAIL-WP-0002-T07 -status: todo +status: done priority: high state_hub_task_id: "8637d383-25f7-45b5-9680-427ed2ca87bf" ``` @@ -880,7 +880,7 @@ Representative complaint and unsubscribe examples are classified. ```task id: EMAIL-WP-0002-T08 -status: progress +status: done priority: high state_hub_task_id: "6d62dea0-f416-4c0b-80a0-7c16422b8e5f" ``` @@ -906,7 +906,7 @@ Parsed messages produce evidence candidates according to the mapping table. ```task id: EMAIL-WP-0002-T09 -status: todo +status: done priority: medium state_hub_task_id: "0d110877-953f-4aa2-961b-eec81e0159d4" ``` @@ -989,7 +989,7 @@ Automated tests verify expected classification and normalized event output. ```task id: EMAIL-WP-0002-T12 -status: progress +status: done priority: medium state_hub_task_id: "a5f7067e-87be-4438-ba35-b12d06a8181e" ```