Compare commits

...

3 Commits

Author SHA1 Message Date
7ab1f9deb9 feat: finish mailbox evidence scanner mvp 2026-06-02 02:24:39 +02:00
226c045397 feat: expand mailbox evidence scanner 2026-06-02 02:07:50 +02:00
8532583182 feat: start mailbox evidence scanner 2026-06-02 01:19:09 +02:00
34 changed files with 2546 additions and 20 deletions

5
.gitignore vendored
View File

@@ -171,6 +171,9 @@ cython_debug/
# Ruff stuff:
.ruff_cache/
# email-connect local runtime artifacts
.email-connect/
reports/*.csv
# PyPI configuration file
.pypirc

View File

@@ -1,3 +1,43 @@
# repo-seed
# email-connect
A git repository template to bootstrap coulomb projects from.
Headless, provider-neutral email communication and evidence service.
The first implementation slice is the Mailbox Evidence Scanner MVP: scan a
return mailbox or fixture directory, classify inbound email-channel evidence,
store scan state locally, and generate timestamped CSV reports without
overclaiming delivery, awareness, or coordination success.
## Quickstart
```bash
PYTHONPATH=src python3 -m unittest discover -s tests
PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --out reports/
```
The example config uses `tests/fixtures/mailbox` as a mailbox source. Runtime
state is written to `.email-connect/state.sqlite`; generated CSV reports are
written to `reports/`.
For a live mailbox, set `mailbox.protocol: imap`, configure host, port, folder,
and credential environment variable names, then export the credentials before
running `scan-mailbox`. IMAP scans select the configured folder read-only and
fetch message bodies with `BODY.PEEK[]`; mailbox write-back actions such as
marking messages seen are intentionally unsupported in this MVP.
## Current Scope
- Coordination-engine spec review and references.
- Initial adapter descriptor, capability profile, evidence ceiling, and
limitations.
- Fixture and read-only IMAP mailbox sources.
- Conservative mailbox message parser and evidence mapper.
- SQLite state store with scan cursor, message/evidence deduplication, and
endpoint quality hints.
- CSV report generation, including `--report-only-new`.
- Golden fixture tests for hard bounce, soft bounce, delayed delivery, final
failure, complaint, unsubscribe, challenge-response, unknown return,
parse-failure, out-of-office, and human reply signals.
Provider webhooks, outbound sending, suppression workflows, OAuth mailbox login,
and a UI remain outside this first mailbox-scanner slice.

View File

@@ -0,0 +1,31 @@
mailbox:
id: return-mailbox-default
protocol: fixture
host: null
port: 993
tls: true
username_env: EMAIL_CONNECT_IMAP_USER
password_env: EMAIL_CONNECT_IMAP_PASSWORD
folder: INBOX
source:
fixture_dir: tests/fixtures/mailbox
scan:
mode: incremental
max_messages_per_run: 5000
since: null
include_seen: true
mark_seen: false
store_raw_headers: true
store_raw_body: false
store_raw_message_ref: true
storage:
path: .email-connect/state.sqlite
reports:
output_dir: reports
include_all_evidence: true
include_unknown_messages: true
timestamp_timezone: UTC

View File

@@ -0,0 +1,46 @@
# coordination-engine Email Spec Review
## Source Specs
Reviewed local coordination-engine specifications:
- `/home/worsch/coordination-engine/spec/EmailAdapterSpecification.md`
- `/home/worsch/coordination-engine/spec/AdapterInterfaceSpecification.md`
- `/home/worsch/coordination-engine/spec/RuntimeArchitectureAndAdapterSubsystem.md`
- `/home/worsch/coordination-engine/spec/ProductRequirementsDocument.md`
These are referenced, not copied, so `email-connect` stays aligned with the
authoritative coordination-engine checkout.
## Contract Points For email-connect
- `email-connect` is a `notification`, `communication`, and `interaction`
adapter.
- It reports email-channel evidence and advisory assessment. It does not own
coordination case result evaluation.
- Ambiguous provider or mailbox signals must map to the weakest safe normalized
event.
- Provider and mailbox events must preserve raw references and native status
mappings.
- The adapter must expose or enable production of normalized `EvidenceEvent`
records.
- The adapter should expose endpoint quality diagnostics, but endpoint quality
is not coordination success.
- Email's positive evidence ceiling is limited: email tracking cannot prove
human awareness, identity, authority, payload access, or non-repudiation.
- Golden tests must include overclaim prevention, especially "provider
delivered" not becoming awareness or payload delivery.
## First Implementation Implications
- The mailbox scanner emits `EmailEvidenceCandidate` rows that are shaped to
become coordination-engine `EvidenceEvent` records later.
- Classifiers preserve `unknown_return_message` and `parse_failed` instead of
silently discarding uncertainty.
- Out-of-office replies update diagnostics only; they do not prove awareness or
reachability.
- Human replies are email-channel success signals, but not legal acceptance or
coordination result satisfaction.
- CSV reports include event type, assessment category/subclass, confidence,
evidence strength, observed time, occurred time, raw reference, and
deduplication key for auditability.

View File

@@ -0,0 +1,66 @@
# Email Evidence Canon
## Rule
Email events are evidence, not result satisfaction.
The scanner reports email-channel facts and uncertainty. A downstream
coordination runtime decides whether those facts satisfy a coordination case.
## Initial Message Classes
- `hard_bounce`
- `soft_bounce`
- `delayed_delivery_notice`
- `final_delivery_failure`
- `out_of_office`
- `human_reply`
- `complaint_or_abuse`
- `unsubscribe_or_opt_out`
- `challenge_response`
- `unknown_return_message`
- `unrelated_message`
- `parse_failed`
## Initial Normalized Events
| Message class | Normalized event | Assessment |
| --- | --- | --- |
| `hard_bounce` | `notification.endpoint.rejected_permanent` | `fail.hard_bounce` |
| `soft_bounce` | `notification.endpoint.rejected_temporary` | `undef.deferred` |
| `delayed_delivery_notice` | `notification.endpoint.deferred` | `undef.deferred` |
| `final_delivery_failure` | `notification.endpoint.rejected_permanent` | `fail.expired_without_delivery` |
| `out_of_office` | `interaction.out_of_office_received` | `undef.out_of_office` |
| `human_reply` | `interaction.reply_received` | `success.reply_received` |
| `complaint_or_abuse` | `notification.channel.complaint_received` | `fail.complaint_received` |
| `unsubscribe_or_opt_out` | `notification.channel.unsubscribe_received` | `fail.unsubscribed` |
| `unknown_return_message` | `notification.endpoint.unknown` | `undef.conflicting_evidence` |
| `challenge_response` | `interaction.unverified_actor_interaction` | `undef.identity_uncertain` |
| `parse_failed` | `diagnostic.message.parse_failed` | `undef.parse_failed` |
## Overclaim Prevention
- No bounce found does not mean delivery success.
- Provider acceptance does not mean endpoint acceptance.
- Endpoint acceptance does not mean inbox placement.
- Out-of-office does not prove recipient awareness or action.
- Human reply does not prove legal acceptance.
- Unknown return messages remain visible.
- Parse failures are diagnostic rows, not delivery or interaction outcomes.
- Scanner and proxy interactions must stay below identity-bound interaction.
## Endpoint Quality Hints
Endpoint quality rows are diagnostic state, not verdicts:
| Evidence | Quality hint |
| --- | --- |
| Hard bounce or final failure | `reachability = unreachable`, `last_failure_at` |
| Soft bounce or delayed delivery | `reachability = degraded`, `last_failure_at` |
| Complaint | `suppression_state = suppressed` |
| Unsubscribe | `suppression_state = opted_out` |
| Human reply | `last_success_at` |
| Out-of-office | `reachability = uncertain`, `last_auto_reply_at` |
These hints can guide future suppression and review workflows, but they do not
prove human awareness, authority, payload access, or coordination success.

View File

@@ -0,0 +1,116 @@
# Initial Runtime Architecture
## Status
This is the first implementation architecture for the mailbox evidence scanner
slice. It is intentionally small and stdlib-only so the repo can run before a
larger service stack is chosen.
## Service Boundary
The first slice is a CLI scanner:
```text
email-connect scan-mailbox --config config/mailbox.example.yml --out reports/
```
It scans an inbound return mailbox source, classifies messages, stores scan
state in SQLite, updates endpoint-quality hints, and writes timestamped CSV
evidence reports.
The source layer supports deterministic fixture directories and a read-only IMAP
connector. IMAP scans select the configured folder with `readonly=True`, fetch
messages using `BODY.PEEK[]`, and reject `mark_seen` because mailbox write-back
actions are out of scope for this MVP.
## Package Layout
```text
src/email_connect/
adapter_contract.py # coordination-engine descriptor and evidence ceiling
cli.py # command line entry points
config.py # config loading
evidence.py # native class to normalized evidence mapping
mailbox.py # fixture and IMAP mailbox sources
models.py # mailbox, parse, evidence, endpoint quality dataclasses
parser.py # MIME/header parsing and conservative classification
reporting.py # CSV report generation
scanner.py # scan orchestration
storage.py # SQLite state store
```
## Persistence
SQLite is the MVP store. The initial schema includes:
- `mailbox_scans`
- `mailbox_messages`
- `parsed_messages`
- `evidence_candidates`
- `scan_cursors`
- `endpoint_quality`
Message deduplication is keyed by mailbox ID, IMAP UID when present, message ID,
received timestamp, sender, subject hash, and body hash. Evidence
deduplication follows the workplan fields: message, parser version, normalized
event, affected recipient, original message, SMTP/enhanced status, and reason.
Incremental scans use `scan_cursors` by mailbox and folder. Full rescans ignore
the cursor while preserving message and evidence deduplication. Endpoint-quality
rows are diagnostic hints derived from explicit evidence events; they are not
coordination outcomes.
## Evidence Mapping
Parser output is represented as `ParsedMailboxMessage`. The mapper converts it
to `EmailEvidenceCandidate` using coordination-engine event names and advisory
assessment classes.
Examples:
- `hard_bounce` -> `notification.endpoint.rejected_permanent`
- `soft_bounce` -> `notification.endpoint.rejected_temporary`
- `delayed_delivery_notice` -> `notification.endpoint.deferred`
- `complaint_or_abuse` -> `notification.channel.complaint_received`
- `unsubscribe_or_opt_out` -> `notification.channel.unsubscribe_received`
- `out_of_office` -> `interaction.out_of_office_received`
- `challenge_response` -> `interaction.unverified_actor_interaction`
- `human_reply` -> `interaction.reply_received`
- `parse_failed` -> `diagnostic.message.parse_failed`
The mapper does not emit evidence for unrelated messages. Unknown return
messages stay visible as `notification.endpoint.unknown`. Parse failures are
visible as diagnostics without claiming delivery, interaction, identity, or
endpoint quality.
## coordination-engine Alignment
The implementation keeps these coordination-engine concepts explicit:
- adapter descriptor
- adapter capability profile
- evidence ceiling
- advisory assessment
- endpoint quality update shape
- event observation and raw reference preservation
- golden tests for overclaim prevention
Email evidence remains below the coordination result layer. The scanner does
not infer inbox placement, human awareness, legal acceptance, payload access, or
case success.
## Provider Boundary
Provider webhook ingestion and outbound send APIs are deliberately outside this
slice. The mailbox scanner uses the same evidence model so future provider
events can enter through a parallel ingestion path and converge at the same
normalization layer.
## Development Commands
```bash
PYTHONPATH=src python3 -m unittest discover -s tests
PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --out reports/
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --report-only-new --out reports/
```

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "email-connect"
version = "0.1.0"
description = "Provider-neutral email communication and evidence service."
readme = "README.md"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [{ name = "Coulomb" }]
dependencies = []
[project.scripts]
email-connect = "email_connect.cli:main"
[tool.setuptools.packages.find]
where = ["src"]

1
reports/.gitkeep Normal file
View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,3 @@
"""email-connect package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
ADAPTER_ID = "email-connect"
ADAPTER_CONTRACT_VERSION = "1.1"
EMITTED_EVENT_TYPES = [
"notification.endpoint.rejected_permanent",
"notification.endpoint.rejected_temporary",
"notification.endpoint.deferred",
"notification.channel.complaint_received",
"notification.channel.unsubscribe_received",
"interaction.reply_received",
"interaction.out_of_office_received",
"notification.endpoint.unknown",
"diagnostic.message.parse_failed",
]
def adapter_descriptor() -> dict:
"""Return the initial coordination-engine adapter descriptor."""
return {
"adapter_id": ADAPTER_ID,
"adapter_name": "email-connect",
"adapter_version": "0.1.0",
"adapter_contract_version": ADAPTER_CONTRACT_VERSION,
"adapter_types": ["notification", "communication", "interaction"],
"deployment_mode": "external",
"capability_profile": {
"can_notify": True,
"can_publish": False,
"can_deliver_payload": False,
"can_collect_payload": False,
"can_grant_access": False,
"can_revoke_access": False,
"can_observe_interaction": True,
"can_observe_identity": False,
"can_observe_authority": False,
"can_emit_delivery_receipts": False,
"can_emit_display_or_read_receipts": False,
"can_emit_return_or_failure_events": True,
"can_emit_late_events": True,
"can_cancel_after_dispatch": False,
"supports_webhooks": False,
"supports_polling": True,
"supports_idempotency": "adapter_managed",
"supports_endpoint_quality": True,
"supports_native_status_mapping": True,
"supports_golden_tests": True,
"limitations": [
"Mailbox evidence cannot prove inbox placement.",
"Reply, open, and click signals do not prove coordination result satisfaction.",
],
},
"supported_actions": [],
"emitted_event_types": EMITTED_EVENT_TYPES,
"supported_channels": ["email"],
"supported_endpoint_types": ["email_address"],
"evidence_profile": {
"native_status_mapping_ref": "src/email_connect/evidence.py",
"golden_tests_ref": "tests/fixtures",
},
"evidence_ceiling": {
"max_positive_event": "interaction.reply_received",
"max_positive_strength": "medium",
"can_prove_human_awareness": False,
"can_prove_payload_access": False,
"can_prove_payload_delivery": False,
"can_prove_identity": False,
"can_prove_authority": False,
"can_prove_non_repudiation": False,
"limitations": [
"Email cannot reliably prove intended-recipient awareness.",
"Mailbox scanning observes return-channel evidence only.",
],
},
"assurance_capability": {
"awareness_assurance": "weak",
"delivery_assurance": "none",
"identity_assurance": "none",
"authority_assurance": "none",
"non_repudiation_assurance": "none",
},
"identity_profile": {
"can_identify_actor": False,
"identity_limitations": ["Inbound mailbox evidence is not authenticated actor evidence."],
},
"late_event_policy": {
"accepts_late_events": True,
"late_event_notes": ["Bounces and replies may arrive after earlier acceptance evidence."],
},
"limitations": [
"No outbound sending in the first mailbox-scanner MVP.",
"No provider webhook integration in the first mailbox-scanner MVP.",
"No legal delivery assessment.",
],
}

56
src/email_connect/cli.py Normal file
View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from .adapter_contract import adapter_descriptor
from .config import load_config
from .scanner import scan_mailbox
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="email-connect")
sub = parser.add_subparsers(dest="command", required=True)
scan = sub.add_parser("scan-mailbox", help="Scan a return mailbox or fixture directory.")
scan.add_argument("--config", required=True)
scan.add_argument("--out", default=None)
scan.add_argument("--full-rescan", action="store_true")
scan.add_argument("--since", default=None)
scan.add_argument("--report-only-new", action="store_true")
scan.add_argument("--dry-run", action="store_true")
scan.add_argument("--fixture-dir", default=None)
sub.add_parser("adapter-descriptor", help="Print the initial coordination-engine adapter descriptor.")
args = parser.parse_args(argv)
if args.command == "adapter-descriptor":
print(json.dumps(adapter_descriptor(), indent=2, sort_keys=True))
return 0
if args.command == "scan-mailbox":
config = load_config(args.config)
result = scan_mailbox(
config,
output_dir=args.out,
full_rescan=args.full_rescan,
report_only_new=args.report_only_new,
dry_run=args.dry_run,
fixture_dir=args.fixture_dir,
since=args.since,
)
print(f"scan_id={result.scan.scan_id}")
print(f"messages_seen={result.scan.messages_seen}")
print(f"messages_new={result.scan.messages_new}")
print(f"messages_parsed={result.scan.messages_parsed}")
print(f"evidence_events_created={result.scan.evidence_events_created}")
if result.report_path:
print(f"report_path={Path(result.report_path)}")
return 0
return 2
if __name__ == "__main__":
raise SystemExit(main())

140
src/email_connect/config.py Normal file
View File

@@ -0,0 +1,140 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class MailboxConfig:
id: str
protocol: str = "imap"
host: str | None = None
port: int = 993
tls: bool = True
username_env: str | None = None
password_env: str | None = None
folder: str = "INBOX"
@dataclass(frozen=True)
class ScanConfig:
mode: str = "incremental"
max_messages_per_run: int = 5000
since: str | None = None
include_seen: bool = True
mark_seen: bool = False
store_raw_headers: bool = True
store_raw_body: bool = False
store_raw_message_ref: bool = True
@dataclass(frozen=True)
class StorageConfig:
path: str = ".email-connect/state.sqlite"
@dataclass(frozen=True)
class ReportsConfig:
output_dir: str = "reports"
include_all_evidence: bool = True
include_unknown_messages: bool = True
timestamp_timezone: str = "UTC"
@dataclass(frozen=True)
class SourceConfig:
fixture_dir: str | None = None
@dataclass(frozen=True)
class AppConfig:
mailbox: MailboxConfig
scan: ScanConfig
storage: StorageConfig
reports: ReportsConfig
source: SourceConfig = SourceConfig()
def load_config(path: str | Path) -> AppConfig:
data = _load_mapping(Path(path))
mailbox = data.get("mailbox", {})
scan = data.get("scan", {})
storage = data.get("storage", {})
reports = data.get("reports", {})
source = data.get("source", {})
return AppConfig(
mailbox=MailboxConfig(
id=str(mailbox.get("id", "return-mailbox-default")),
protocol=str(mailbox.get("protocol", "imap")),
host=mailbox.get("host"),
port=int(mailbox.get("port", 993)),
tls=bool(mailbox.get("tls", True)),
username_env=mailbox.get("username_env"),
password_env=mailbox.get("password_env"),
folder=str(mailbox.get("folder", "INBOX")),
),
scan=ScanConfig(
mode=str(scan.get("mode", "incremental")),
max_messages_per_run=int(scan.get("max_messages_per_run", 5000)),
since=scan.get("since"),
include_seen=bool(scan.get("include_seen", True)),
mark_seen=bool(scan.get("mark_seen", False)),
store_raw_headers=bool(scan.get("store_raw_headers", True)),
store_raw_body=bool(scan.get("store_raw_body", False)),
store_raw_message_ref=bool(scan.get("store_raw_message_ref", True)),
),
storage=StorageConfig(path=str(storage.get("path", ".email-connect/state.sqlite"))),
reports=ReportsConfig(
output_dir=str(reports.get("output_dir", "reports")),
include_all_evidence=bool(reports.get("include_all_evidence", True)),
include_unknown_messages=bool(reports.get("include_unknown_messages", True)),
timestamp_timezone=str(reports.get("timestamp_timezone", "UTC")),
),
source=SourceConfig(fixture_dir=source.get("fixture_dir")),
)
def _load_mapping(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
try:
import yaml # type: ignore
loaded = yaml.safe_load(text)
return loaded or {}
except ModuleNotFoundError:
return _parse_simple_yaml(text)
def _parse_simple_yaml(text: str) -> dict[str, Any]:
"""Parse the small YAML subset used by config/mailbox.example.yml."""
result: dict[str, Any] = {}
current: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" ") and line.endswith(":"):
key = line[:-1].strip()
current = {}
result[key] = current
continue
if current is None or ":" not in line:
raise ValueError(f"Unsupported YAML line: {raw_line}")
key, value = line.strip().split(":", 1)
current[key.strip()] = _parse_scalar(value.strip())
return result
def _parse_scalar(value: str) -> Any:
if value == "" or value == "null":
return None
if value in {"true", "false"}:
return value == "true"
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
try:
return int(value)
except ValueError:
return value

View File

@@ -0,0 +1,223 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid5, NAMESPACE_URL
from .models import (
AssessmentCategory,
Confidence,
EmailEvidenceCandidate,
EndpointQualityUpdate,
EvidenceStrength,
MessageClass,
ParsedMailboxMessage,
)
@dataclass(frozen=True)
class EvidenceMapping:
event_type: str
assessment_category: AssessmentCategory
assessment_subclass: str
evidence_strength: EvidenceStrength
normalized_meaning: str
EVIDENCE_MAPPINGS: dict[MessageClass, EvidenceMapping | None] = {
MessageClass.HARD_BOUNCE: EvidenceMapping(
"notification.endpoint.rejected_permanent",
AssessmentCategory.FAIL,
"fail.hard_bounce",
EvidenceStrength.NEGATIVE,
"Strong evidence that the recipient endpoint rejected the message permanently.",
),
MessageClass.SOFT_BOUNCE: EvidenceMapping(
"notification.endpoint.rejected_temporary",
AssessmentCategory.UNDEF,
"undef.deferred",
EvidenceStrength.AMBIGUOUS,
"Temporary endpoint failure; retry or later evidence may change interpretation.",
),
MessageClass.DELAYED_DELIVERY_NOTICE: EvidenceMapping(
"notification.endpoint.deferred",
AssessmentCategory.UNDEF,
"undef.deferred",
EvidenceStrength.AMBIGUOUS,
"Delivery is delayed and still uncertain.",
),
MessageClass.FINAL_DELIVERY_FAILURE: EvidenceMapping(
"notification.endpoint.rejected_permanent",
AssessmentCategory.FAIL,
"fail.expired_without_delivery",
EvidenceStrength.NEGATIVE,
"Provider or endpoint gave up after retrying.",
),
MessageClass.OUT_OF_OFFICE: EvidenceMapping(
"interaction.out_of_office_received",
AssessmentCategory.UNDEF,
"undef.out_of_office",
EvidenceStrength.WEAK,
"Mailbox auto-reply was observed; awareness and action remain uncertain.",
),
MessageClass.HUMAN_REPLY: EvidenceMapping(
"interaction.reply_received",
AssessmentCategory.SUCCESS,
"success.reply_received",
EvidenceStrength.MEDIUM,
"A reply-like message was observed on the email channel.",
),
MessageClass.COMPLAINT_OR_ABUSE: EvidenceMapping(
"notification.channel.complaint_received",
AssessmentCategory.FAIL,
"fail.complaint_received",
EvidenceStrength.NEGATIVE,
"Complaint or abuse feedback was observed.",
),
MessageClass.UNSUBSCRIBE_OR_OPT_OUT: EvidenceMapping(
"notification.channel.unsubscribe_received",
AssessmentCategory.FAIL,
"fail.unsubscribed",
EvidenceStrength.NEGATIVE,
"Opt-out or unsubscribe signal was observed.",
),
MessageClass.UNKNOWN_RETURN_MESSAGE: EvidenceMapping(
"notification.endpoint.unknown",
AssessmentCategory.UNDEF,
"undef.conflicting_evidence",
EvidenceStrength.AMBIGUOUS,
"Return-channel message could not be classified reliably.",
),
MessageClass.CHALLENGE_RESPONSE: EvidenceMapping(
"interaction.unverified_actor_interaction",
AssessmentCategory.UNDEF,
"undef.identity_uncertain",
EvidenceStrength.AMBIGUOUS,
"Challenge-response or automated interaction was observed.",
),
MessageClass.UNRELATED_MESSAGE: None,
MessageClass.PARSE_FAILED: EvidenceMapping(
"diagnostic.message.parse_failed",
AssessmentCategory.UNDEF,
"undef.parse_failed",
EvidenceStrength.NONE,
"Message source could not be parsed reliably; no delivery or interaction claim is made.",
),
}
def evidence_deduplication_key(parsed: ParsedMailboxMessage, mapping: EvidenceMapping) -> str:
parts = [
parsed.mailbox_message_id,
parsed.parser_version,
mapping.event_type,
parsed.affected_email_address or "",
parsed.original_message_id or "",
parsed.smtp_status_code or "",
parsed.enhanced_status_code or "",
parsed.reason_code or "",
]
return "|".join(parts)
def candidate_from_parsed(
parsed: ParsedMailboxMessage,
*,
raw_message_ref: str | None,
observed_at: datetime,
occurred_at: datetime | None,
) -> EmailEvidenceCandidate | None:
mapping = EVIDENCE_MAPPINGS.get(parsed.message_class)
if mapping is None:
return None
dedup_key = evidence_deduplication_key(parsed, mapping)
candidate_id = str(uuid5(NAMESPACE_URL, "email-connect:evidence:" + dedup_key))
return EmailEvidenceCandidate(
evidence_candidate_id=candidate_id,
mailbox_message_id=parsed.mailbox_message_id,
parsed_message_id=parsed.parsed_message_id,
event_type=mapping.event_type,
assessment_category=mapping.assessment_category,
assessment_subclass=mapping.assessment_subclass,
affected_email_address=parsed.affected_email_address,
original_message_id=parsed.original_message_id,
confidence=parsed.confidence,
evidence_strength=mapping.evidence_strength,
occurred_at=occurred_at,
observed_at=observed_at,
deduplication_key=dedup_key,
raw_message_ref=raw_message_ref,
notes=[mapping.normalized_meaning, *parsed.notes],
metadata={
"message_class": parsed.message_class.value,
"smtp_status_code": parsed.smtp_status_code,
"enhanced_status_code": parsed.enhanced_status_code,
"reason_code": parsed.reason_code,
},
)
def endpoint_quality_from_candidate(candidate: EmailEvidenceCandidate) -> EndpointQualityUpdate | None:
address = candidate.affected_email_address
if not address:
return None
message_class = candidate.metadata.get("message_class")
signal = str(message_class or candidate.event_type)
reason_code = candidate.metadata.get("reason_code")
observed_at = candidate.observed_at
if message_class in {MessageClass.HARD_BOUNCE.value, MessageClass.FINAL_DELIVERY_FAILURE.value}:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="unreachable",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class in {MessageClass.SOFT_BOUNCE.value, MessageClass.DELAYED_DELIVERY_NOTICE.value}:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="degraded",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.COMPLAINT_OR_ABUSE.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
suppression_state="suppressed",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.UNSUBSCRIBE_OR_OPT_OUT.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
suppression_state="opted_out",
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.HUMAN_REPLY.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
last_success_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.OUT_OF_OFFICE.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="uncertain",
last_auto_reply_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
return None

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
import imaplib
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, Protocol
from .config import AppConfig
@dataclass(frozen=True)
class MailboxSourceMessage:
source_uid: str
raw_bytes: bytes
raw_message_ref: str
imap_uid: str | None = None
class MailboxMessageSource(Protocol):
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
...
class FixtureMailboxSource:
def __init__(self, fixture_dir: str | Path) -> None:
self.fixture_dir = Path(fixture_dir)
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
del include_seen, since
paths = sorted(self.fixture_dir.glob("*.eml"))
emitted = 0
for path in paths:
source_uid = path.name
if not full_rescan and since_uid and source_uid <= since_uid:
continue
yield MailboxSourceMessage(
source_uid=source_uid,
raw_bytes=path.read_bytes(),
raw_message_ref=str(path),
imap_uid=None,
)
emitted += 1
if max_messages and emitted >= max_messages:
break
class ImapMailboxSource:
def __init__(self, config: AppConfig) -> None:
self.config = config
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
mailbox = self.config.mailbox
if self.config.scan.mark_seen:
raise ValueError("IMAP mark_seen is intentionally unsupported; scans are read-only.")
if not mailbox.host:
raise ValueError("mailbox.host is required for IMAP scans.")
if not mailbox.username_env or not mailbox.password_env:
raise ValueError("mailbox.username_env and mailbox.password_env are required for IMAP scans.")
username = os.environ.get(mailbox.username_env)
password = os.environ.get(mailbox.password_env)
if not username or not password:
raise ValueError(f"IMAP credentials not found in {mailbox.username_env}/{mailbox.password_env}.")
conn: imaplib.IMAP4
if mailbox.tls:
conn = imaplib.IMAP4_SSL(mailbox.host, mailbox.port)
else:
conn = imaplib.IMAP4(mailbox.host, mailbox.port)
selected = False
try:
_expect_ok(conn.login(username, password), "login")
_expect_ok(conn.select(mailbox.folder, readonly=True), f"select {mailbox.folder}")
selected = True
criteria = _search_criteria(include_seen=include_seen, since=since)
_status, search_data = _expect_ok(conn.uid("search", None, *criteria), "uid search")
uids = _decode_uids(search_data)
if not full_rescan and since_uid:
uids = [uid for uid in uids if _uid_after(uid, since_uid)]
uids = sorted(uids, key=_uid_sort_key)
if max_messages:
uids = uids[:max_messages]
for uid in uids:
_fetch_status, fetch_data = _expect_ok(conn.uid("fetch", uid, "(BODY.PEEK[])"), f"uid fetch {uid}")
raw_bytes = _raw_message_from_fetch(fetch_data)
if raw_bytes is None:
continue
yield MailboxSourceMessage(
source_uid=uid,
raw_bytes=raw_bytes,
raw_message_ref=f"imap://{mailbox.host}/{mailbox.folder};UID={uid}",
imap_uid=uid,
)
finally:
if selected:
try:
conn.close()
except imaplib.IMAP4.error:
pass
try:
conn.logout()
except imaplib.IMAP4.error:
pass
def source_for_config(config: AppConfig, *, fixture_dir_override: str | None = None) -> MailboxMessageSource:
if fixture_dir_override:
return FixtureMailboxSource(fixture_dir_override)
if config.mailbox.protocol == "fixture":
fixture_dir = config.source.fixture_dir
if not fixture_dir:
raise ValueError("source.fixture_dir is required for fixture scans.")
return FixtureMailboxSource(fixture_dir)
if config.mailbox.protocol == "imap":
return ImapMailboxSource(config)
raise ValueError(f"Unsupported mailbox protocol: {config.mailbox.protocol}")
def _search_criteria(*, include_seen: bool, since: datetime | None) -> list[str]:
criteria = ["ALL" if include_seen else "UNSEEN"]
if since is not None:
criteria.extend(["SINCE", _imap_date(since)])
return criteria
def _imap_date(value: datetime) -> str:
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
return f"{value.day:02d}-{months[value.month - 1]}-{value.year:04d}"
def _expect_ok(result: tuple[str, list], operation: str) -> tuple[str, list]:
status, data = result
if status != "OK":
raise RuntimeError(f"IMAP {operation} failed with status {status}: {data!r}")
return status, data
def _decode_uids(search_data: list) -> list[str]:
if not search_data:
return []
raw = search_data[0] or b""
if isinstance(raw, str):
raw_text = raw
else:
raw_text = raw.decode("ascii", errors="ignore")
return [part for part in raw_text.split() if part]
def _uid_after(uid: str, since_uid: str) -> bool:
try:
return int(uid) > int(since_uid)
except ValueError:
return uid > since_uid
def _uid_sort_key(uid: str) -> tuple[int, int | str]:
try:
return (0, int(uid))
except ValueError:
return (1, uid)
def _raw_message_from_fetch(fetch_data: list) -> bytes | None:
for item in fetch_data:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], bytes):
return item[1]
return None

126
src/email_connect/models.py Normal file
View File

@@ -0,0 +1,126 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import StrEnum
from typing import Any
class MessageClass(StrEnum):
HARD_BOUNCE = "hard_bounce"
SOFT_BOUNCE = "soft_bounce"
DELAYED_DELIVERY_NOTICE = "delayed_delivery_notice"
FINAL_DELIVERY_FAILURE = "final_delivery_failure"
OUT_OF_OFFICE = "out_of_office"
HUMAN_REPLY = "human_reply"
COMPLAINT_OR_ABUSE = "complaint_or_abuse"
UNSUBSCRIBE_OR_OPT_OUT = "unsubscribe_or_opt_out"
CHALLENGE_RESPONSE = "challenge_response"
UNKNOWN_RETURN_MESSAGE = "unknown_return_message"
UNRELATED_MESSAGE = "unrelated_message"
PARSE_FAILED = "parse_failed"
class AssessmentCategory(StrEnum):
SUCCESS = "success"
FAIL = "fail"
UNDEF = "undef"
class Confidence(StrEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class EvidenceStrength(StrEnum):
NONE = "none"
WEAK = "weak"
MEDIUM = "medium"
STRONG = "strong"
NEGATIVE = "negative"
AMBIGUOUS = "ambiguous"
@dataclass(frozen=True)
class MailboxScan:
scan_id: str
mailbox_id: str
started_at: datetime
finished_at: datetime | None
scan_mode: str
folder: str
status: str
messages_seen: int = 0
messages_new: int = 0
messages_parsed: int = 0
evidence_events_created: int = 0
report_path: str | None = None
since: datetime | None = None
@dataclass(frozen=True)
class InboundMailboxMessage:
mailbox_message_id: str
mailbox_id: str
imap_uid: str | None
message_id_header: str | None
received_at: datetime | None
from_address: str | None
to_addresses: list[str]
subject: str | None
raw_headers_ref: str | None
raw_message_ref: str | None
first_seen_at: datetime
last_seen_at: datetime
deduplication_key: str
@dataclass(frozen=True)
class ParsedMailboxMessage:
parsed_message_id: str
mailbox_message_id: str
parser_version: str
message_class: MessageClass
affected_email_address: str | None
original_message_id: str | None
original_recipient: str | None
smtp_status_code: str | None
enhanced_status_code: str | None
reason_code: str | None
confidence: Confidence
parsed_at: datetime
notes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class EmailEvidenceCandidate:
evidence_candidate_id: str
mailbox_message_id: str
parsed_message_id: str
event_type: str
assessment_category: AssessmentCategory
assessment_subclass: str
affected_email_address: str | None
original_message_id: str | None
confidence: Confidence
evidence_strength: EvidenceStrength
occurred_at: datetime | None
observed_at: datetime
deduplication_key: str
raw_message_ref: str | None
notes: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class EndpointQualityUpdate:
affected_email_address: str
reachability: str = "unknown"
suppression_state: str = "unknown"
last_success_at: datetime | None = None
last_failure_at: datetime | None = None
last_auto_reply_at: datetime | None = None
reason_code: str | None = None
confidence: Confidence = Confidence.LOW
quality_signals: list[str] = field(default_factory=list)

451
src/email_connect/parser.py Normal file
View File

@@ -0,0 +1,451 @@
from __future__ import annotations
import hashlib
import html
import re
from datetime import UTC, datetime
from email import policy
from email.parser import BytesParser
from email.utils import getaddresses, parsedate_to_datetime
from pathlib import Path
from uuid import NAMESPACE_URL, uuid5
from .evidence import candidate_from_parsed
from .models import (
Confidence,
EmailEvidenceCandidate,
InboundMailboxMessage,
MessageClass,
ParsedMailboxMessage,
)
PARSER_VERSION = "mailbox-scanner-v0.1"
EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE)
ENHANCED_STATUS_RE = re.compile(r"\b[245]\.\d{1,3}\.\d{1,3}\b")
SMTP_STATUS_RE = re.compile(r"\b[245]\d{2}\b")
def parse_message_bytes(
raw_bytes: bytes,
*,
mailbox_id: str,
raw_message_ref: str | None,
imap_uid: str | None = None,
now: datetime | None = None,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
observed_at = now or datetime.now(UTC)
if not raw_bytes.strip():
return _parse_failed(
mailbox_id=mailbox_id,
raw_message_ref=raw_message_ref,
imap_uid=imap_uid,
observed_at=observed_at,
reason="empty_message",
)
try:
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
except Exception as exc:
return _parse_failed(
mailbox_id=mailbox_id,
raw_message_ref=raw_message_ref,
imap_uid=imap_uid,
observed_at=observed_at,
reason=f"parser_error:{type(exc).__name__}",
)
message_id = _clean_header(msg.get("Message-ID"))
subject = _clean_header(msg.get("Subject"))
from_address = _first_address(msg.get("From"))
to_addresses = [addr for _, addr in getaddresses(msg.get_all("To", []))]
received_at = _parse_date(msg.get("Date"))
text = _extract_text(msg)
headers_text = "\n".join(f"{key}: {value}" for key, value in msg.items())
combined = f"{headers_text}\n\n{subject or ''}\n{text}"
dedup_key = _message_dedup_key(
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id=message_id,
received_at=received_at,
from_address=from_address,
subject=subject,
body=text,
)
mailbox_message_id = str(uuid5(NAMESPACE_URL, "email-connect:message:" + dedup_key))
inbound = InboundMailboxMessage(
mailbox_message_id=mailbox_message_id,
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id_header=message_id,
received_at=received_at,
from_address=from_address,
to_addresses=to_addresses,
subject=subject,
raw_headers_ref=raw_message_ref,
raw_message_ref=raw_message_ref,
first_seen_at=observed_at,
last_seen_at=observed_at,
deduplication_key=dedup_key,
)
parsed = classify_message(inbound, combined, observed_at=observed_at)
candidate = candidate_from_parsed(
parsed,
raw_message_ref=raw_message_ref,
observed_at=observed_at,
occurred_at=received_at,
)
return inbound, parsed, candidate
def parse_message_file(
path: str | Path,
*,
mailbox_id: str,
now: datetime | None = None,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
file_path = Path(path)
return parse_message_bytes(
file_path.read_bytes(),
mailbox_id=mailbox_id,
raw_message_ref=str(file_path),
now=now,
)
def classify_message(
inbound: InboundMailboxMessage,
combined_text: str,
*,
observed_at: datetime,
) -> ParsedMailboxMessage:
text = combined_text.lower()
enhanced_status = _first_match(ENHANCED_STATUS_RE, combined_text)
smtp_status = _first_match(SMTP_STATUS_RE, combined_text)
dsn_fields = _extract_dsn_fields(combined_text)
affected = dsn_fields.get("final_recipient_email") or dsn_fields.get("original_recipient_email")
if affected is None:
affected = _extract_affected_recipient(combined_text)
original_message_id = _extract_headerish(combined_text, "Original-Message-ID")
original_recipient = dsn_fields.get("original_recipient_email") or affected
notes: list[str] = []
message_class = MessageClass.UNRELATED_MESSAGE
confidence = Confidence.LOW
reason_code: str | None = None
if _contains_any(text, ["feedback loop", "abuse report", "spam complaint", "complaint notification"]):
message_class = MessageClass.COMPLAINT_OR_ABUSE
confidence = Confidence.HIGH
reason_code = "complaint"
elif _contains_any(text, ["unsubscribe", "opt-out", "opt out", "remove me"]):
message_class = MessageClass.UNSUBSCRIBE_OR_OPT_OUT
confidence = Confidence.MEDIUM
reason_code = "unsubscribe"
elif _is_out_of_office(text):
message_class = MessageClass.OUT_OF_OFFICE
confidence = Confidence.MEDIUM
reason_code = "auto_reply"
elif _is_challenge_response(text):
message_class = MessageClass.CHALLENGE_RESPONSE
confidence = Confidence.MEDIUM
reason_code = "challenge_response"
elif _contains_any(text, ["will keep trying", "delivery delayed", "message delayed", "not yet delivered"]):
message_class = MessageClass.DELAYED_DELIVERY_NOTICE
confidence = Confidence.HIGH
reason_code = "delayed"
elif _is_dsn_like(text):
message_class, confidence, reason_code = _classify_dsn(text, smtp_status, enhanced_status)
elif _looks_like_human_reply(inbound, text):
message_class = MessageClass.HUMAN_REPLY
confidence = Confidence.MEDIUM
reason_code = "reply"
elif _looks_return_related(text):
message_class = MessageClass.UNKNOWN_RETURN_MESSAGE
confidence = Confidence.LOW
reason_code = "unknown_return"
notes.append("Return-channel message did not match a reliable classifier.")
for key in ["original_recipient", "final_recipient", "action", "diagnostic_code", "remote_mta"]:
if dsn_fields.get(key):
notes.append(f"{key}={dsn_fields[key]}")
if enhanced_status:
notes.append(f"enhanced_status={enhanced_status}")
if smtp_status:
notes.append(f"smtp_status={smtp_status}")
parsed_id_basis = "|".join([inbound.mailbox_message_id, PARSER_VERSION, message_class.value])
return ParsedMailboxMessage(
parsed_message_id=str(uuid5(NAMESPACE_URL, "email-connect:parsed:" + parsed_id_basis)),
mailbox_message_id=inbound.mailbox_message_id,
parser_version=PARSER_VERSION,
message_class=message_class,
affected_email_address=affected,
original_message_id=original_message_id,
original_recipient=original_recipient,
smtp_status_code=smtp_status,
enhanced_status_code=enhanced_status,
reason_code=reason_code,
confidence=confidence,
parsed_at=observed_at,
notes=notes,
)
def _classify_dsn(
text: str,
smtp_status: str | None,
enhanced_status: str | None,
) -> tuple[MessageClass, Confidence, str]:
if _contains_any(text, ["final failure", "giving up", "could not deliver after"]):
return MessageClass.FINAL_DELIVERY_FAILURE, Confidence.HIGH, "expired_without_delivery"
if (enhanced_status and enhanced_status.startswith("5.")) or (smtp_status and smtp_status.startswith("5")):
return MessageClass.HARD_BOUNCE, Confidence.HIGH, "hard_bounce"
if _contains_any(text, ["unknown user", "mailbox not found", "user unknown", "domain not found"]):
return MessageClass.HARD_BOUNCE, Confidence.HIGH, "hard_bounce"
if (enhanced_status and enhanced_status.startswith("4.")) or (smtp_status and smtp_status.startswith("4")):
return MessageClass.SOFT_BOUNCE, Confidence.HIGH, "soft_bounce"
if _contains_any(text, ["temporary failure", "mailbox full", "greylist", "try again later"]):
return MessageClass.SOFT_BOUNCE, Confidence.MEDIUM, "soft_bounce"
return MessageClass.UNKNOWN_RETURN_MESSAGE, Confidence.LOW, "unknown_dsn"
def _extract_text(msg) -> str:
chunks: list[str] = []
html_chunks: list[str] = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == "multipart":
continue
if part.get_content_type() in {"text/plain", "message/delivery-status"}:
try:
chunks.append(str(part.get_content()))
except Exception:
continue
elif part.get_content_type() == "text/html":
try:
html_chunks.append(_html_to_text(str(part.get_content())))
except Exception:
continue
else:
try:
content = str(msg.get_content())
if msg.get_content_type() == "text/html":
html_chunks.append(_html_to_text(content))
else:
chunks.append(content)
except Exception:
payload = msg.get_payload(decode=True)
if payload:
chunks.append(payload.decode(errors="replace"))
if chunks:
return "\n".join(chunks)
return "\n".join(html_chunks)
def _parse_failed(
*,
mailbox_id: str,
raw_message_ref: str | None,
imap_uid: str | None,
observed_at: datetime,
reason: str,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
dedup_key = "|".join([mailbox_id, imap_uid or "", raw_message_ref or "", reason])
mailbox_message_id = str(uuid5(NAMESPACE_URL, "email-connect:message:" + dedup_key))
inbound = InboundMailboxMessage(
mailbox_message_id=mailbox_message_id,
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id_header=None,
received_at=None,
from_address=None,
to_addresses=[],
subject=None,
raw_headers_ref=raw_message_ref,
raw_message_ref=raw_message_ref,
first_seen_at=observed_at,
last_seen_at=observed_at,
deduplication_key=dedup_key,
)
parsed_id_basis = "|".join([mailbox_message_id, PARSER_VERSION, "parse_failed"])
parsed = ParsedMailboxMessage(
parsed_message_id=str(uuid5(NAMESPACE_URL, "email-connect:parsed:" + parsed_id_basis)),
mailbox_message_id=mailbox_message_id,
parser_version=PARSER_VERSION,
message_class=MessageClass.PARSE_FAILED,
affected_email_address=None,
original_message_id=None,
original_recipient=None,
smtp_status_code=None,
enhanced_status_code=None,
reason_code=reason,
confidence=Confidence.HIGH,
parsed_at=observed_at,
notes=[f"parse_failure={reason}"],
)
candidate = candidate_from_parsed(
parsed,
raw_message_ref=raw_message_ref,
observed_at=observed_at,
occurred_at=None,
)
return inbound, parsed, candidate
def _message_dedup_key(
*,
mailbox_id: str,
imap_uid: str | None,
message_id: str | None,
received_at: datetime | None,
from_address: str | None,
subject: str | None,
body: str,
) -> str:
body_hash = hashlib.sha256(body.encode("utf-8", errors="replace")).hexdigest()[:16]
return "|".join(
[
mailbox_id,
imap_uid or "",
message_id or "",
received_at.isoformat() if received_at else "",
from_address or "",
hashlib.sha256((subject or "").encode()).hexdigest()[:12],
body_hash,
]
)
def _clean_header(value: str | None) -> str | None:
if value is None:
return None
cleaned = str(value).strip()
return cleaned or None
def _first_address(value: str | None) -> str | None:
if not value:
return None
addresses = getaddresses([value])
return addresses[0][1] if addresses else None
def _parse_date(value: str | None) -> datetime | None:
if not value:
return None
try:
parsed = parsedate_to_datetime(value)
except (TypeError, ValueError):
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _first_match(pattern: re.Pattern[str], text: str) -> str | None:
match = pattern.search(text)
return match.group(0) if match else None
def _extract_headerish(text: str, name: str) -> str | None:
match = re.search(rf"^{re.escape(name)}:\s*(.+)$", text, re.IGNORECASE | re.MULTILINE)
return match.group(1).strip() if match else None
def _extract_dsn_fields(text: str) -> dict[str, str]:
fields: dict[str, str] = {}
for field, key in [
("Original-Recipient", "original_recipient"),
("Final-Recipient", "final_recipient"),
("Action", "action"),
("Diagnostic-Code", "diagnostic_code"),
("Remote-MTA", "remote_mta"),
]:
value = _extract_headerish(text, field)
if value:
fields[key] = value
if field in {"Original-Recipient", "Final-Recipient"}:
match = EMAIL_RE.search(value)
if match:
fields[f"{key}_email"] = match.group(0).lower()
return fields
def _extract_affected_recipient(text: str) -> str | None:
for name in ["Final-Recipient", "Original-Recipient", "X-Failed-Recipients", "Failed-Recipient"]:
value = _extract_headerish(text, name)
if value:
match = EMAIL_RE.search(value)
if match:
return match.group(0).lower()
match = EMAIL_RE.search(text)
return match.group(0).lower() if match else None
def _contains_any(text: str, needles: list[str]) -> bool:
return any(needle in text for needle in needles)
def _is_dsn_like(text: str) -> bool:
return _contains_any(
text,
[
"delivery status notification",
"delivery failure",
"undeliverable",
"returned mail",
"mail delivery subsystem",
"final-recipient",
"diagnostic-code",
"status:",
],
)
def _is_out_of_office(text: str) -> bool:
return _contains_any(
text,
[
"out of office",
"auto-reply",
"autoreply",
"automatic reply",
"vacation",
"abwesenheitsnotiz",
"nicht im buero",
"nicht im büro",
],
)
def _is_challenge_response(text: str) -> bool:
return _contains_any(
text,
[
"challenge-response",
"challenge response",
"sender verification",
"verify your email before your message can be delivered",
"confirm you are a real person",
"confirm that you sent this message",
"please verify yourself",
],
)
def _looks_like_human_reply(inbound: InboundMailboxMessage, text: str) -> bool:
subject = (inbound.subject or "").lower()
if _contains_any(text, ["auto-submitted: auto-replied", "x-autoreply", "auto-generated"]):
return False
return subject.startswith("re:") or _contains_any(text, ["thanks", "thank you", "i confirm", "received"])
def _looks_return_related(text: str) -> bool:
return _contains_any(text, ["delivery", "mailbox", "recipient", "message", "smtp", "unsubscribe", "reply"])
def _html_to_text(value: str) -> str:
without_tags = re.sub(r"<[^>]+>", " ", value)
return re.sub(r"\s+", " ", html.unescape(without_tags)).strip()

View File

@@ -0,0 +1,107 @@
from __future__ import annotations
import csv
import json
from datetime import UTC, datetime
from pathlib import Path
REPORT_COLUMNS = [
"report_generated_at",
"scan_id",
"mailbox_id",
"mailbox_message_id",
"mailbox_received_at",
"source_from",
"source_to",
"source_subject",
"message_id_header",
"detected_message_class",
"normalized_event_type",
"assessment_category",
"assessment_subclass",
"affected_email_address",
"original_message_id",
"original_recipient",
"smtp_status_code",
"enhanced_status_code",
"reason_code",
"confidence",
"evidence_strength",
"occurred_at",
"observed_at",
"first_seen_at",
"last_seen_at",
"deduplication_key",
"raw_message_ref",
"notes",
]
def report_filename(now: datetime | None = None) -> str:
stamp = (now or datetime.now(UTC)).strftime("%Y%m%d-%H%M%S")
return f"email-channel-evidence-report-{stamp}.csv"
def write_evidence_report(
rows: list[dict],
*,
output_dir: str | Path,
scan_id: str,
mailbox_id: str,
generated_at: datetime | None = None,
) -> Path:
generated = generated_at or datetime.now(UTC)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
path = out_dir / report_filename(generated)
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=REPORT_COLUMNS)
writer.writeheader()
for row in rows:
writer.writerow(_report_row(row, scan_id=scan_id, mailbox_id=mailbox_id, generated_at=generated))
return path
def _report_row(row: dict, *, scan_id: str, mailbox_id: str, generated_at: datetime) -> dict:
metadata = _json(row.get("metadata_json"))
notes = _json(row.get("notes_json"))
return {
"report_generated_at": generated_at.isoformat(),
"scan_id": scan_id,
"mailbox_id": mailbox_id,
"mailbox_message_id": row.get("mailbox_message_id", ""),
"mailbox_received_at": row.get("occurred_at") or "",
"source_from": metadata.get("source_from", ""),
"source_to": metadata.get("source_to", ""),
"source_subject": metadata.get("source_subject", ""),
"message_id_header": metadata.get("message_id_header", ""),
"detected_message_class": metadata.get("message_class", ""),
"normalized_event_type": row.get("event_type", ""),
"assessment_category": row.get("assessment_category", ""),
"assessment_subclass": row.get("assessment_subclass", ""),
"affected_email_address": row.get("affected_email_address") or "",
"original_message_id": row.get("original_message_id") or "",
"original_recipient": metadata.get("original_recipient", ""),
"smtp_status_code": metadata.get("smtp_status_code") or "",
"enhanced_status_code": metadata.get("enhanced_status_code") or "",
"reason_code": metadata.get("reason_code") or "",
"confidence": row.get("confidence", ""),
"evidence_strength": row.get("evidence_strength", ""),
"occurred_at": row.get("occurred_at") or "",
"observed_at": row.get("observed_at") or "",
"first_seen_at": metadata.get("first_seen_at", ""),
"last_seen_at": metadata.get("last_seen_at", ""),
"deduplication_key": row.get("deduplication_key", ""),
"raw_message_ref": row.get("raw_message_ref") or "",
"notes": "; ".join(str(item) for item in notes),
}
def _json(value: str | None) -> dict | list:
if not value:
return {}
try:
return json.loads(value)
except json.JSONDecodeError:
return {}

View File

@@ -0,0 +1,156 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from uuid import uuid4
from .config import AppConfig
from .evidence import endpoint_quality_from_candidate
from .mailbox import source_for_config
from .models import MailboxScan
from .parser import parse_message_bytes
from .reporting import write_evidence_report
from .storage import StateStore
@dataclass(frozen=True)
class ScanResult:
scan: MailboxScan
report_path: Path | None
def scan_mailbox(
config: AppConfig,
*,
output_dir: str | None = None,
full_rescan: bool = False,
report_only_new: bool = False,
dry_run: bool = False,
fixture_dir: str | None = None,
since: str | None = None,
) -> ScanResult:
started_at = datetime.now(UTC)
scan_id = str(uuid4())
since_at = _parse_since(since or config.scan.since)
source = source_for_config(config, fixture_dir_override=fixture_dir)
store = StateStore(config.storage.path)
messages_seen = 0
messages_new = 0
messages_parsed = 0
evidence_created = 0
new_evidence_keys: list[str] = []
last_source_uid: str | None = None
try:
cursor = None if full_rescan else store.get_scan_cursor(config.mailbox.id, config.mailbox.folder)
for message in source.iter_messages(
max_messages=config.scan.max_messages_per_run,
since_uid=cursor,
full_rescan=full_rescan,
include_seen=config.scan.include_seen,
since=since_at,
):
messages_seen += 1
last_source_uid = message.source_uid
inbound, parsed, candidate = parse_message_bytes(
message.raw_bytes,
mailbox_id=config.mailbox.id,
raw_message_ref=message.raw_message_ref,
imap_uid=message.imap_uid,
)
if since_at and inbound.received_at and inbound.received_at < since_at:
continue
if dry_run:
messages_parsed += 1
continue
is_new = store.upsert_message(inbound)
messages_new += int(is_new)
store.insert_parsed(parsed)
messages_parsed += 1
if candidate is not None:
candidate = _enrich_candidate(candidate, inbound, parsed)
inserted = store.insert_evidence(candidate)
evidence_created += int(inserted)
if inserted:
new_evidence_keys.append(candidate.deduplication_key)
quality_update = endpoint_quality_from_candidate(candidate)
if quality_update is not None:
store.apply_endpoint_quality_update(
config.mailbox.id,
quality_update,
observed_at=candidate.observed_at,
)
if not dry_run and last_source_uid:
store.set_scan_cursor(
config.mailbox.id,
config.mailbox.folder,
last_source_uid,
updated_at=datetime.now(UTC),
)
report_path = None
if not dry_run:
report_rows = store.evidence_rows(
deduplication_keys=new_evidence_keys if report_only_new else None,
)
report_path = write_evidence_report(
report_rows,
output_dir=output_dir or config.reports.output_dir,
scan_id=scan_id,
mailbox_id=config.mailbox.id,
)
finished_at = datetime.now(UTC)
scan = MailboxScan(
scan_id=scan_id,
mailbox_id=config.mailbox.id,
started_at=started_at,
finished_at=finished_at,
scan_mode="full_rescan" if full_rescan else config.scan.mode,
folder=config.mailbox.folder,
status="completed",
messages_seen=messages_seen,
messages_new=messages_new,
messages_parsed=messages_parsed,
evidence_events_created=evidence_created,
report_path=str(report_path) if report_path else None,
since=since_at,
)
if not dry_run:
store.insert_scan(scan)
return ScanResult(scan=scan, report_path=report_path)
finally:
store.close()
def _enrich_candidate(candidate, inbound, parsed):
metadata = {
**candidate.metadata,
"source_from": inbound.from_address,
"source_to": ", ".join(inbound.to_addresses),
"source_subject": inbound.subject,
"message_id_header": inbound.message_id_header,
"original_recipient": parsed.original_recipient,
"first_seen_at": inbound.first_seen_at.isoformat(),
"last_seen_at": inbound.last_seen_at.isoformat(),
}
return type(candidate)(
**{
**candidate.__dict__,
"metadata": metadata,
}
)
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
if len(normalized) == 10:
normalized = normalized + "T00:00:00+00:00"
parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00"))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)

View File

@@ -0,0 +1,381 @@
from __future__ import annotations
import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
from .models import (
Confidence,
EmailEvidenceCandidate,
EndpointQualityUpdate,
InboundMailboxMessage,
MailboxScan,
ParsedMailboxMessage,
)
class StateStore:
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(self.path)
self.conn.row_factory = sqlite3.Row
self.init_schema()
def close(self) -> None:
self.conn.close()
def init_schema(self) -> None:
self.conn.executescript(
"""
create table if not exists mailbox_scans (
scan_id text primary key,
mailbox_id text not null,
started_at text not null,
finished_at text,
scan_mode text not null,
folder text not null,
status text not null,
messages_seen integer not null,
messages_new integer not null,
messages_parsed integer not null,
evidence_events_created integer not null,
report_path text,
since text
);
create table if not exists mailbox_messages (
mailbox_message_id text primary key,
mailbox_id text not null,
imap_uid text,
message_id_header text,
received_at text,
from_address text,
to_addresses_json text not null,
subject text,
raw_headers_ref text,
raw_message_ref text,
first_seen_at text not null,
last_seen_at text not null,
deduplication_key text not null unique
);
create table if not exists parsed_messages (
parsed_message_id text primary key,
mailbox_message_id text not null,
parser_version text not null,
message_class text not null,
affected_email_address text,
original_message_id text,
original_recipient text,
smtp_status_code text,
enhanced_status_code text,
reason_code text,
confidence text not null,
parsed_at text not null,
notes_json text not null
);
create table if not exists evidence_candidates (
evidence_candidate_id text primary key,
mailbox_message_id text not null,
parsed_message_id text not null,
event_type text not null,
assessment_category text not null,
assessment_subclass text not null,
affected_email_address text,
original_message_id text,
confidence text not null,
evidence_strength text not null,
occurred_at text,
observed_at text not null,
deduplication_key text not null unique,
raw_message_ref text,
notes_json text not null,
metadata_json text not null
);
create table if not exists scan_cursors (
mailbox_id text not null,
folder text not null,
last_uid text not null,
updated_at text not null,
primary key (mailbox_id, folder)
);
create table if not exists endpoint_quality (
endpoint_key text primary key,
mailbox_id text not null,
affected_email_address text not null,
reachability text not null,
suppression_state text not null,
last_success_at text,
last_failure_at text,
last_auto_reply_at text,
reason_code text,
confidence text not null,
quality_signals_json text not null,
updated_at text not null,
unique (mailbox_id, affected_email_address)
);
"""
)
self.conn.commit()
def upsert_message(self, message: InboundMailboxMessage) -> bool:
existing = self.conn.execute(
"select mailbox_message_id from mailbox_messages where deduplication_key = ?",
(message.deduplication_key,),
).fetchone()
if existing:
self.conn.execute(
"update mailbox_messages set last_seen_at = ? where deduplication_key = ?",
(_dt(message.last_seen_at), message.deduplication_key),
)
self.conn.commit()
return False
self.conn.execute(
"""
insert into mailbox_messages values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
message.mailbox_message_id,
message.mailbox_id,
message.imap_uid,
message.message_id_header,
_dt(message.received_at),
message.from_address,
json.dumps(message.to_addresses),
message.subject,
message.raw_headers_ref,
message.raw_message_ref,
_dt(message.first_seen_at),
_dt(message.last_seen_at),
message.deduplication_key,
),
)
self.conn.commit()
return True
def insert_parsed(self, parsed: ParsedMailboxMessage) -> None:
self.conn.execute(
"""
insert or replace into parsed_messages values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
parsed.parsed_message_id,
parsed.mailbox_message_id,
parsed.parser_version,
parsed.message_class.value,
parsed.affected_email_address,
parsed.original_message_id,
parsed.original_recipient,
parsed.smtp_status_code,
parsed.enhanced_status_code,
parsed.reason_code,
parsed.confidence.value,
_dt(parsed.parsed_at),
json.dumps(parsed.notes),
),
)
self.conn.commit()
def insert_evidence(self, candidate: EmailEvidenceCandidate) -> bool:
try:
self.conn.execute(
"""
insert into evidence_candidates values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
candidate.evidence_candidate_id,
candidate.mailbox_message_id,
candidate.parsed_message_id,
candidate.event_type,
candidate.assessment_category.value,
candidate.assessment_subclass,
candidate.affected_email_address,
candidate.original_message_id,
candidate.confidence.value,
candidate.evidence_strength.value,
_dt(candidate.occurred_at),
_dt(candidate.observed_at),
candidate.deduplication_key,
candidate.raw_message_ref,
json.dumps(candidate.notes),
json.dumps(candidate.metadata),
),
)
except sqlite3.IntegrityError:
return False
self.conn.commit()
return True
def insert_scan(self, scan: MailboxScan) -> None:
self.conn.execute(
"""
insert or replace into mailbox_scans values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
scan.scan_id,
scan.mailbox_id,
_dt(scan.started_at),
_dt(scan.finished_at),
scan.scan_mode,
scan.folder,
scan.status,
scan.messages_seen,
scan.messages_new,
scan.messages_parsed,
scan.evidence_events_created,
scan.report_path,
_dt(scan.since),
),
)
self.conn.commit()
def get_scan_cursor(self, mailbox_id: str, folder: str) -> str | None:
row = self.conn.execute(
"select last_uid from scan_cursors where mailbox_id = ? and folder = ?",
(mailbox_id, folder),
).fetchone()
return str(row["last_uid"]) if row else None
def set_scan_cursor(self, mailbox_id: str, folder: str, last_uid: str, *, updated_at: datetime) -> None:
self.conn.execute(
"""
insert into scan_cursors values (?, ?, ?, ?)
on conflict(mailbox_id, folder) do update set
last_uid = excluded.last_uid,
updated_at = excluded.updated_at
""",
(mailbox_id, folder, last_uid, _dt(updated_at)),
)
self.conn.commit()
def apply_endpoint_quality_update(
self,
mailbox_id: str,
update: EndpointQualityUpdate,
*,
observed_at: datetime,
) -> None:
address = update.affected_email_address.lower()
endpoint_key = f"{mailbox_id}:{address}"
existing = self.conn.execute(
"select * from endpoint_quality where endpoint_key = ?",
(endpoint_key,),
).fetchone()
merged = _merge_endpoint_quality(existing, update, observed_at=observed_at)
self.conn.execute(
"""
insert into endpoint_quality values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(endpoint_key) do update set
reachability = excluded.reachability,
suppression_state = excluded.suppression_state,
last_success_at = excluded.last_success_at,
last_failure_at = excluded.last_failure_at,
last_auto_reply_at = excluded.last_auto_reply_at,
reason_code = excluded.reason_code,
confidence = excluded.confidence,
quality_signals_json = excluded.quality_signals_json,
updated_at = excluded.updated_at
""",
(
endpoint_key,
mailbox_id,
address,
merged["reachability"],
merged["suppression_state"],
_dt(merged["last_success_at"]),
_dt(merged["last_failure_at"]),
_dt(merged["last_auto_reply_at"]),
merged["reason_code"],
merged["confidence"],
json.dumps(merged["quality_signals"]),
_dt(observed_at),
),
)
self.conn.commit()
def endpoint_quality_rows(self) -> list[dict]:
rows = self.conn.execute(
"select * from endpoint_quality order by affected_email_address"
).fetchall()
return [dict(row) for row in rows]
def evidence_rows(self, *, deduplication_keys: list[str] | None = None) -> list[dict]:
if deduplication_keys is not None:
if not deduplication_keys:
return []
placeholders = ", ".join("?" for _ in deduplication_keys)
rows = self.conn.execute(
f"""
select * from evidence_candidates
where deduplication_key in ({placeholders})
order by observed_at, event_type
""",
deduplication_keys,
).fetchall()
return [dict(row) for row in rows]
rows = self.conn.execute("select * from evidence_candidates order by observed_at, event_type").fetchall()
return [dict(row) for row in rows]
def _dt(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def _parse_dt(value: str | None) -> datetime | None:
return datetime.fromisoformat(value) if value else None
def _merge_endpoint_quality(
existing,
update: EndpointQualityUpdate,
*,
observed_at: datetime,
) -> dict:
existing_signals = json.loads(existing["quality_signals_json"]) if existing else []
update_signals = [signal for signal in update.quality_signals if signal]
signals = list(dict.fromkeys([*existing_signals, *update_signals]))
return {
"reachability": _merge_state(existing, "reachability", update.reachability, default="unknown"),
"suppression_state": _merge_state(
existing,
"suppression_state",
update.suppression_state,
default="unknown",
),
"last_success_at": _latest_dt(_parse_dt(existing["last_success_at"]) if existing else None, update.last_success_at),
"last_failure_at": _latest_dt(_parse_dt(existing["last_failure_at"]) if existing else None, update.last_failure_at),
"last_auto_reply_at": _latest_dt(
_parse_dt(existing["last_auto_reply_at"]) if existing else None,
update.last_auto_reply_at,
),
"reason_code": update.reason_code or (existing["reason_code"] if existing else None),
"confidence": _max_confidence(existing["confidence"] if existing else Confidence.LOW.value, update.confidence.value),
"quality_signals": signals,
"updated_at": observed_at,
}
def _merge_state(existing, field: str, new_value: str, *, default: str) -> str:
if new_value != "unknown":
return new_value
if existing:
return str(existing[field])
return default
def _latest_dt(left: datetime | None, right: datetime | None) -> datetime | None:
if left is None:
return right
if right is None:
return left
return max(left, right)
def _max_confidence(left: str, right: str) -> str:
order = {Confidence.LOW.value: 0, Confidence.MEDIUM.value: 1, Confidence.HIGH.value: 2}
return left if order.get(left, 0) >= order.get(right, 0) else right

View File

@@ -0,0 +1,9 @@
From: Sender Verification <verify@example.net>
To: sender@example.com
Subject: Sender verification required
Date: Tue, 02 Jun 2026 10:09:00 +0000
Message-ID: <challenge-response@example.net>
Content-Type: text/plain; charset=utf-8
This is a challenge-response message. Please verify yourself before your message
can be delivered to challenge@example.com.

10
tests/fixtures/mailbox/complaint.eml vendored Normal file
View File

@@ -0,0 +1,10 @@
From: Feedback Loop <fbl@example.net>
To: abuse@example.com
Subject: Spam complaint notification
Date: Tue, 02 Jun 2026 10:04:00 +0000
Message-ID: <complaint@example.net>
Content-Type: text/plain; charset=utf-8
Feedback loop abuse report.
Final-Recipient: rfc822; complained@example.com
This is a spam complaint notification for the original message.

View File

@@ -0,0 +1,10 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery delayed
Date: Tue, 02 Jun 2026 10:05:00 +0000
Message-ID: <delayed@example.net>
Content-Type: text/plain; charset=utf-8
Delivery delayed. We will keep trying to deliver your message.
Final-Recipient: rfc822; waiting@example.com
Status: 4.4.1

View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Final failure
Date: Tue, 02 Jun 2026 10:06:00 +0000
Message-ID: <final-failure@example.net>
Content-Type: text/plain; charset=utf-8
Delivery Status Notification.
Final-Recipient: rfc822; expired@example.com
Action: failed
Status: 5.4.7
Diagnostic-Code: smtp; Could not deliver after retry period. Final failure, giving up.

12
tests/fixtures/mailbox/hard_bounce.eml vendored Normal file
View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery Status Notification (Failure)
Date: Tue, 02 Jun 2026 10:00:00 +0000
Message-ID: <hard-bounce@example.net>
Content-Type: text/plain; charset=utf-8
Delivery failure.
Final-Recipient: rfc822; missing@example.com
Action: failed
Status: 5.1.1
Diagnostic-Code: smtp; 550 5.1.1 User unknown

View File

@@ -0,0 +1,8 @@
From: Recipient <recipient@example.com>
To: sender@example.com
Subject: Re: Your notification
Date: Tue, 02 Jun 2026 10:03:00 +0000
Message-ID: <reply@example.com>
Content-Type: text/plain; charset=utf-8
Thanks, I received this and will review it today.

View File

@@ -0,0 +1,9 @@
From: Recipient <recipient@example.com>
To: sender@example.com
Subject: Auto-reply: Out of office
Date: Tue, 02 Jun 2026 10:02:00 +0000
Message-ID: <ooo@example.com>
Auto-Submitted: auto-replied
Content-Type: text/plain; charset=utf-8
I am out of office until next week.

View File

@@ -0,0 +1 @@

12
tests/fixtures/mailbox/soft_bounce.eml vendored Normal file
View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery temporarily delayed
Date: Tue, 02 Jun 2026 10:01:00 +0000
Message-ID: <soft-bounce@example.net>
Content-Type: text/plain; charset=utf-8
Delivery Status Notification.
Final-Recipient: rfc822; full@example.com
Action: delayed
Status: 4.2.2
Diagnostic-Code: smtp; 452 4.2.2 Mailbox full

View File

@@ -0,0 +1,10 @@
From: Mail System <mailer@example.net>
To: sender@example.com
Subject: Return mailbox notice
Date: Tue, 02 Jun 2026 10:07:00 +0000
Message-ID: <unknown-return@example.net>
Content-Type: text/plain; charset=utf-8
This message references delivery and recipient handling, but it does not include
a reliable SMTP status or delivery-status notification.
Recipient reference: mystery@example.com

View File

@@ -0,0 +1,8 @@
From: Recipient <optout@example.com>
To: sender@example.com
Subject: Please unsubscribe me
Date: Tue, 02 Jun 2026 10:08:00 +0000
Message-ID: <unsubscribe@example.com>
Content-Type: text/plain; charset=utf-8
Please unsubscribe optout@example.com from future messages. Remove me from this list.

97
tests/test_parser.py Normal file
View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import unittest
from pathlib import Path
from email_connect.models import MessageClass
from email_connect.parser import parse_message_file
FIXTURES = Path(__file__).parent / "fixtures" / "mailbox"
class ParserTests(unittest.TestCase):
def test_hard_bounce_maps_to_permanent_rejection(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "hard_bounce.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.HARD_BOUNCE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_permanent")
self.assertEqual(candidate.assessment_subclass, "fail.hard_bounce")
def test_soft_bounce_maps_to_temporary_rejection(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "soft_bounce.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.SOFT_BOUNCE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_temporary")
def test_out_of_office_stays_undef(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "out_of_office.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.OUT_OF_OFFICE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.assessment_category.value, "undef")
self.assertEqual(candidate.event_type, "interaction.out_of_office_received")
def test_human_reply_is_email_channel_success_only(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "human_reply.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.HUMAN_REPLY)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "interaction.reply_received")
self.assertEqual(candidate.assessment_subclass, "success.reply_received")
def test_delayed_delivery_notice_stays_deferred(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "delayed_delivery.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.DELAYED_DELIVERY_NOTICE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.deferred")
self.assertEqual(candidate.assessment_subclass, "undef.deferred")
def test_final_failure_maps_to_expired_without_delivery(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "final_failure.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.FINAL_DELIVERY_FAILURE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_permanent")
self.assertEqual(candidate.assessment_subclass, "fail.expired_without_delivery")
def test_complaint_maps_to_channel_failure(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "complaint.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.COMPLAINT_OR_ABUSE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.channel.complaint_received")
self.assertEqual(candidate.assessment_subclass, "fail.complaint_received")
def test_unsubscribe_maps_to_opt_out(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "unsubscribe.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.UNSUBSCRIBE_OR_OPT_OUT)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.channel.unsubscribe_received")
self.assertEqual(candidate.assessment_subclass, "fail.unsubscribed")
def test_unknown_return_message_is_preserved(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "unknown_return.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.UNKNOWN_RETURN_MESSAGE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.unknown")
def test_challenge_response_stays_identity_uncertain(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "challenge_response.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.CHALLENGE_RESPONSE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "interaction.unverified_actor_interaction")
self.assertEqual(candidate.assessment_subclass, "undef.identity_uncertain")
def test_parse_failure_is_reportable_diagnostic(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "parse_failed.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.PARSE_FAILED)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "diagnostic.message.parse_failed")
self.assertEqual(candidate.assessment_subclass, "undef.parse_failed")
def test_dsn_detail_fields_are_preserved_as_notes(self) -> None:
_inbound, parsed, _candidate = parse_message_file(FIXTURES / "hard_bounce.eml", mailbox_id="test")
self.assertIn("final_recipient=rfc822; missing@example.com", parsed.notes)
self.assertIn("action=failed", parsed.notes)
self.assertIn("diagnostic_code=smtp; 550 5.1.1 User unknown", parsed.notes)
if __name__ == "__main__":
unittest.main()

70
tests/test_scanner.py Normal file
View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import tempfile
import unittest
from csv import DictReader
from pathlib import Path
from email_connect.config import AppConfig, MailboxConfig, ReportsConfig, ScanConfig, SourceConfig, StorageConfig
from email_connect.scanner import scan_mailbox
from email_connect.storage import StateStore
FIXTURES = Path(__file__).parent / "fixtures" / "mailbox"
class ScannerTests(unittest.TestCase):
def test_scan_fixture_directory_writes_report_and_deduplicates(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
first = scan_mailbox(config)
second = scan_mailbox(config)
full = scan_mailbox(config, full_rescan=True, report_only_new=True)
self.assertEqual(first.scan.messages_seen, 11)
self.assertEqual(first.scan.messages_new, 11)
self.assertGreaterEqual(first.scan.evidence_events_created, 11)
self.assertEqual(second.scan.messages_seen, 0)
self.assertEqual(second.scan.messages_new, 0)
self.assertEqual(second.scan.evidence_events_created, 0)
self.assertEqual(full.scan.messages_seen, 11)
self.assertEqual(full.scan.messages_new, 0)
self.assertEqual(full.scan.evidence_events_created, 0)
self.assertTrue(first.report_path and first.report_path.exists())
self.assertTrue(full.report_path and full.report_path.exists())
with full.report_path.open(newline="", encoding="utf-8") as fh:
self.assertEqual(list(DictReader(fh)), [])
def test_scan_updates_endpoint_quality(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
scan_mailbox(config)
store = StateStore(config.storage.path)
try:
rows = {row["affected_email_address"]: row for row in store.endpoint_quality_rows()}
finally:
store.close()
self.assertEqual(rows["missing@example.com"]["reachability"], "unreachable")
self.assertEqual(rows["full@example.com"]["reachability"], "degraded")
self.assertEqual(rows["complained@example.com"]["suppression_state"], "suppressed")
self.assertEqual(rows["optout@example.com"]["suppression_state"], "opted_out")
if __name__ == "__main__":
unittest.main()

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Repository Onboarding and Implementation Foundation"
domain: custodian
repo: email-connect
status: active
status: finished
owner: codex
topic_slug: custodian
created: "2026-06-02"
@@ -55,7 +55,7 @@ Done when:
```task
id: EMAIL-WP-0001-T02
status: todo
status: done
priority: high
state_hub_task_id: "fdfd8b96-7326-414f-8126-79bb3a21b950"
```
@@ -76,7 +76,7 @@ The architecture note should cover:
```task
id: EMAIL-WP-0001-T03
status: todo
status: done
priority: high
state_hub_task_id: "ef1eb769-dfa0-4b46-8633-274d90962423"
```
@@ -105,7 +105,7 @@ click telemetry as proof of human awareness or result satisfaction.
```task
id: EMAIL-WP-0001-T04
status: todo
status: done
priority: medium
state_hub_task_id: "4b94e544-5aad-4c38-8fe3-eed17af79971"
```

View File

@@ -4,7 +4,7 @@ type: workplan
title: "MVP Mailbox Evidence Scanner"
domain: custodian
repo: email-connect
status: ready
status: finished
owner: codex
topic_slug: custodian
created: "2026-06-02"
@@ -688,7 +688,7 @@ Endpoint quality is diagnostic and must not be treated as coordination success.
```task
id: EMAIL-WP-0002-T01
status: todo
status: done
priority: high
state_hub_task_id: "3a17215d-62a9-48ef-877f-a6fbc7e95a22"
```
@@ -716,7 +716,7 @@ Config file is loaded and validated.
```task
id: EMAIL-WP-0002-T02
status: todo
status: done
priority: high
state_hub_task_id: "25a4da12-1bcd-4c6d-a0eb-a2f525b9c4b9"
```
@@ -744,7 +744,7 @@ CLI can connect to mailbox and list/fetch messages without modifying mailbox.
```task
id: EMAIL-WP-0002-T03
status: todo
status: done
priority: high
state_hub_task_id: "16b95a6b-1375-4c91-8b78-0b75d51e0aeb"
```
@@ -773,7 +773,7 @@ Full rescan can revisit all messages while preserving deduplication.
```task
id: EMAIL-WP-0002-T04
status: todo
status: done
priority: high
state_hub_task_id: "5a50cd85-b0ab-4017-aba0-b2087068abb4"
```
@@ -802,7 +802,7 @@ Scanner extracts basic metadata and text from representative bounce and reply me
```task
id: EMAIL-WP-0002-T05
status: todo
status: done
priority: high
state_hub_task_id: "8ea826d1-0add-4573-9bb4-2b73adefba55"
```
@@ -831,7 +831,7 @@ Representative hard and soft bounce samples are classified correctly.
```task
id: EMAIL-WP-0002-T06
status: todo
status: done
priority: high
state_hub_task_id: "4d94a332-173b-4787-8fb2-27aa63db6a8d"
```
@@ -856,7 +856,7 @@ Representative OOO and human reply samples are classified with confidence.
```task
id: EMAIL-WP-0002-T07
status: todo
status: done
priority: high
state_hub_task_id: "8637d383-25f7-45b5-9680-427ed2ca87bf"
```
@@ -880,7 +880,7 @@ Representative complaint and unsubscribe examples are classified.
```task
id: EMAIL-WP-0002-T08
status: todo
status: done
priority: high
state_hub_task_id: "6d62dea0-f416-4c0b-80a0-7c16422b8e5f"
```
@@ -906,7 +906,7 @@ Parsed messages produce evidence candidates according to the mapping table.
```task
id: EMAIL-WP-0002-T09
status: todo
status: done
priority: medium
state_hub_task_id: "0d110877-953f-4aa2-961b-eec81e0159d4"
```
@@ -932,7 +932,7 @@ Complaint/unsubscribe updates suppression state.
```task
id: EMAIL-WP-0002-T10
status: todo
status: done
priority: medium
state_hub_task_id: "5ab35176-d6c2-4c73-b7b3-bde4c097e3ee"
```
@@ -959,7 +959,7 @@ Report can be opened in spreadsheet tools.
```task
id: EMAIL-WP-0002-T11
status: todo
status: done
priority: high
state_hub_task_id: "514fa099-781b-4590-aae4-c28970413b3f"
```
@@ -989,7 +989,7 @@ Automated tests verify expected classification and normalized event output.
```task
id: EMAIL-WP-0002-T12
status: todo
status: done
priority: medium
state_hub_task_id: "a5f7067e-87be-4438-ba35-b12d06a8181e"
```