from __future__ import annotations import tempfile import unittest from csv import DictReader from pathlib import Path from email_connect.config import AppConfig, MailboxConfig, ReportsConfig, ScanConfig, SourceConfig, StorageConfig from email_connect.scanner import scan_mailbox from email_connect.storage import StateStore FIXTURES = Path(__file__).parent / "fixtures" / "mailbox" RECIPIENT_FIXTURES = Path(__file__).parent / "fixtures" class ScannerTests(unittest.TestCase): def test_scan_fixture_directory_writes_report_and_deduplicates(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) config = AppConfig( mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), scan=ScanConfig(), storage=StorageConfig(path=str(root / "state.sqlite")), reports=ReportsConfig(output_dir=str(root / "reports")), source=SourceConfig(fixture_dir=str(FIXTURES)), ) first = scan_mailbox(config) second = scan_mailbox(config) full = scan_mailbox(config, full_rescan=True, report_only_new=True) self.assertEqual(first.scan.messages_seen, 11) self.assertEqual(first.scan.messages_new, 11) self.assertGreaterEqual(first.scan.evidence_events_created, 11) self.assertEqual(second.scan.messages_seen, 0) self.assertEqual(second.scan.messages_new, 0) self.assertEqual(second.scan.evidence_events_created, 0) self.assertEqual(full.scan.messages_seen, 11) self.assertEqual(full.scan.messages_new, 0) self.assertEqual(full.scan.evidence_events_created, 0) self.assertTrue(first.report_path and first.report_path.exists()) with first.report_path.open(newline="", encoding="utf-8") as fh: first_rows = list(DictReader(fh)) self.assertTrue(first_rows) self.assertTrue(all(row["known_recipient"] == "false" for row in first_rows)) self.assertTrue(full.report_path and full.report_path.exists()) with full.report_path.open(newline="", encoding="utf-8") as fh: self.assertEqual(list(DictReader(fh)), []) def test_scan_updates_endpoint_quality(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) config = AppConfig( mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), scan=ScanConfig(), storage=StorageConfig(path=str(root / "state.sqlite")), reports=ReportsConfig(output_dir=str(root / "reports")), source=SourceConfig(fixture_dir=str(FIXTURES)), ) scan_mailbox(config) store = StateStore(config.storage.path) try: rows = {row["affected_email_address"]: row for row in store.endpoint_quality_rows()} finally: store.close() self.assertEqual(rows["missing@example.com"]["reachability"], "unreachable") self.assertEqual(rows["full@example.com"]["reachability"], "degraded") self.assertEqual(rows["complained@example.com"]["suppression_state"], "suppressed") self.assertEqual(rows["optout@example.com"]["suppression_state"], "opted_out") def test_expected_recipients_sort_first_and_get_no_evidence_rows(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) config = AppConfig( mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), scan=ScanConfig(), storage=StorageConfig(path=str(root / "state.sqlite")), reports=ReportsConfig(output_dir=str(root / "reports")), source=SourceConfig(fixture_dir=str(FIXTURES)), ) result = scan_mailbox( config, full_rescan=True, expected_recipients_path=str(RECIPIENT_FIXTURES / "expected_recipients.txt"), ) self.assertEqual(len(result.warnings), 1) self.assertTrue(result.report_path and result.report_path.exists()) with result.report_path.open(newline="", encoding="utf-8") as fh: rows = list(DictReader(fh)) self.assertGreater(len(rows), 2) known_flags = [row["known_recipient"] for row in rows] self.assertEqual(known_flags, sorted(known_flags, reverse=True)) missing_rows = [row for row in rows if row["affected_email_address"] == "missing@example.com"] self.assertTrue(missing_rows) self.assertTrue(all(row["known_recipient"] == "true" for row in missing_rows)) absent_rows = [row for row in rows if row["affected_email_address"] == "absent@example.com"] self.assertEqual(len(absent_rows), 1) self.assertEqual(absent_rows[0]["normalized_event_type"], "diagnostic.expected_recipient.no_evidence") self.assertEqual(absent_rows[0]["assessment_category"], "undef") self.assertEqual(absent_rows[0]["assessment_subclass"], "undef.no_signal") self.assertEqual(absent_rows[0]["evidence_strength"], "none") store = StateStore(config.storage.path) try: quality_addresses = {row["affected_email_address"] for row in store.endpoint_quality_rows()} finally: store.close() self.assertNotIn("absent@example.com", quality_addresses) def test_csv_expected_recipients_are_supported(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) config = AppConfig( mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), scan=ScanConfig(), storage=StorageConfig(path=str(root / "state.sqlite")), reports=ReportsConfig(output_dir=str(root / "reports")), source=SourceConfig(fixture_dir=str(FIXTURES)), ) result = scan_mailbox( config, full_rescan=True, expected_recipients_path=str(RECIPIENT_FIXTURES / "expected_recipients.csv"), expected_recipient_column="email", ) self.assertEqual(len(result.warnings), 1) self.assertTrue(result.report_path and result.report_path.exists()) with result.report_path.open(newline="", encoding="utf-8") as fh: rows = list(DictReader(fh)) optout_rows = [row for row in rows if row["affected_email_address"] == "optout@example.com"] self.assertTrue(optout_rows) self.assertTrue(all(row["known_recipient"] == "true" for row in optout_rows)) csv_absent = [row for row in rows if row["affected_email_address"] == "csv-absent@example.com"] self.assertEqual(csv_absent[0]["normalized_event_type"], "diagnostic.expected_recipient.no_evidence") def test_datetime_range_excludes_messages_outside_range(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) expected_path = root / "expected.txt" expected_path.write_text("complained@example.com\nmissing@example.com\n", encoding="utf-8") config = AppConfig( mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"), scan=ScanConfig(), storage=StorageConfig(path=str(root / "state.sqlite")), reports=ReportsConfig(output_dir=str(root / "reports")), source=SourceConfig(fixture_dir=str(FIXTURES)), ) result = scan_mailbox( config, full_rescan=True, expected_recipients_path=str(expected_path), range_from="2026-06-02T10:04:00Z", range_to="2026-06-02T10:04:00Z", ) self.assertEqual(result.scan.messages_seen, 11) self.assertEqual(result.scan.messages_parsed, 1) self.assertTrue(result.report_path and result.report_path.exists()) with result.report_path.open(newline="", encoding="utf-8") as fh: rows = list(DictReader(fh)) self.assertEqual({row["affected_email_address"] for row in rows}, {"complained@example.com", "missing@example.com"}) complaint = [row for row in rows if row["affected_email_address"] == "complained@example.com"][0] missing = [row for row in rows if row["affected_email_address"] == "missing@example.com"][0] self.assertEqual(complaint["normalized_event_type"], "notification.channel.complaint_received") self.assertEqual(missing["normalized_event_type"], "diagnostic.expected_recipient.no_evidence") self.assertEqual(result.scan.range_start.isoformat(), "2026-06-02T10:04:00+00:00") self.assertEqual(result.scan.range_end.isoformat(), "2026-06-02T10:04:00+00:00") if __name__ == "__main__": unittest.main()