Compare commits

...

13 Commits

Author SHA1 Message Date
7cd7efe956 chore(consistency): sync task status from DB [auto]
Updated by fix-consistency on 2026-06-22:
  - update .custodian-brief.md for email-connect
2026-06-22 23:20:53 +02:00
208b4ae4cc Normalize agent instructions and workplan frontmatter (STATE-WP-0067)
- Align agent files with on-disk workplan prefixes (infer from workplan ids)
- Set workplan domain to registered domain_slug; add topic_slug where applicable
- Repair frontmatter delimiter formatting; migrate legacy task status literals
- Regenerate AGENTS.md, CLAUDE.md, and .claude/rules from State Hub templates
2026-06-22 23:16:24 +02:00
bfb1034132 Mark .repo-classification.yaml human-reviewed (CUST-WP-0050 T02)
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 11:40:44 +02:00
55c3a5417d Reclassify as tooling (CUST-WP-0050 T02)
Apply the new 'tooling' category (reusable internal tooling/infrastructure)
from the Repo Classification Standard. First-pass agent classification.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 03:06:02 +02:00
dcda76eff2 Add repo classification (CUST-WP-0050 T02)
First-pass agent classification per the Repo Classification Standard v1.0
(canon-repo-classification); pending human review.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 02:44:47 +02:00
5a13a12edf Add credential routing instructions for all agent runtimes
Propagate shared credential-routing section (Codex, Claude, Grok, llm-connect)
from state-hub template via scripts/propagate_credential_routing.py.
2026-06-18 22:48:37 +02:00
334a01d8e3 Add capability registry scaffold (REUSE-WP-0014-T04 B02)
Empty helix_forge registry layout for federation publishing.
2026-06-16 01:52:17 +02:00
b7591f531b feat: add expected recipient reporting 2026-06-02 03:07:13 +02:00
5ea6c738d2 docs: add expected recipient reporting workplan 2026-06-02 02:45:33 +02:00
7a9686f53a chore(consistency): sync task status from DB [auto]
Updated by fix-consistency on 2026-06-02:
  - update .custodian-brief.md for email-connect
2026-06-02 02:43:38 +02:00
7ab1f9deb9 feat: finish mailbox evidence scanner mvp 2026-06-02 02:24:39 +02:00
226c045397 feat: expand mailbox evidence scanner 2026-06-02 02:07:50 +02:00
8532583182 feat: start mailbox evidence scanner 2026-06-02 01:19:09 +02:00
56 changed files with 3907 additions and 98 deletions

20
.claude/rules/agents.md Normal file
View File

@@ -0,0 +1,20 @@
## Kaizen Agents
Specialized agent personas available on demand via the state-hub MCP.
**Discover:** `list_kaizen_agents()` — returns all agents with name, description, category
**Load:** `get_kaizen_agent("tdd-workflow")` — returns full instructions; read and follow them
Common agents:
| Agent | Category | When to use |
|-------|----------|-------------|
| `tdd-workflow` | testing | Step-by-step TDD8 workflow for any feature |
| `code-refactoring` | quality | Code quality analysis and safe refactoring |
| `test-maintenance` | testing | Diagnose and fix failing tests |
| `requirements-engineering` | process | Prevent interface/mock mismatches upfront |
| `keepaTodofile` | process | Maintain TODO.md during work |
| `project-management` | process | Track status, determine next steps |
| `datamodel-optimization` | quality | Optimize dataclasses and data structures |
All 17 agents: call `list_kaizen_agents()` for the full list.

View File

@@ -0,0 +1,8 @@
## Architecture
<!-- TODO: Describe the key design decisions and component structure.
Key modules, data flows, external integrations, state machines, etc. -->
## Quick Reference
`~/state-hub/mcp_server/TOOLS.md` — MCP tool reference

View File

@@ -0,0 +1,50 @@
# Credential and access routing
**Audience:** Codex, Claude Code, Grok, and custodian agents that call **llm-connect**
for inference. Run this check **before** requesting secrets, API keys, SSH access,
login tokens, or database passwords — in any repo, not only `ops-warden`.
ops-warden **issues SSH certificates only** (`warden sign`, `cert_command`). Every
other credential need belongs to another subsystem. **Do not** message
`ops-warden` on State Hub expecting a secret value; the reply is a pointer, not a key.
### Lookup (do this first)
```bash
warden route find "<describe your need>" --json
warden route show <catalog-id> --json
```
Requires the `warden` CLI from `~/ops-warden` (`uv tool install .` or `uv run warden`).
| Agent runtime | How to orient |
| --- | --- |
| **Codex / Grok** (shell, HTTP State Hub) | `warden route` commands above; inbox `to_agent=email-connect` is for coordination, not secret vending |
| **Claude Code** (MCP when available) | `get_domain_summary("custodian")` for workstreams; **still** use `warden route` for credential ownership |
| **llm-connect** (inference service) | Never put secret retrieval in prompts; route custody to OpenBao/operator paths surfaced by `warden route` |
### Quick routing table
| I need… | Owner | ops-warden executes? |
| --- | --- | --- |
| SSH cert (`adm`/`agt`/`atm`) | ops-warden | **Yes**`warden sign` |
| API key, DB password, provider token | OpenBao (`railiance-platform`) | No — route only |
| Login / OIDC / MFA | key-cape / Keycloak | No — route only |
| Authorization decision | flex-auth | No — route only |
| activity-core → issue-core emission | activity-core + issue-core | No — `warden route show activity-core-issue-sink` |
| SSH tunnel | ops-bridge (+ `cert_command` from warden) | No — route only |
### Anti-patterns (do not do these)
- `POST /messages/` to `ops-warden` asking for `ISSUE_CORE_API_KEY`, `OPENROUTER_API_KEY`, etc.
- Inventing `warden secret`, `warden login`, `warden bao`, `warden tunnel` — they do not exist
- Pasting secrets into Git, State Hub, workplans, logs, or chat
### Other capabilities (reuse-surface)
Non-credential capabilities are usually discovered through **reuse-surface** federation
(`reuse-surface` registry / `capability.*` indexes). Credential routing is inlined in
every repo's agent instructions because it is high-frequency, high-risk, and easy to
get wrong.
**Canon:** `~/ops-warden/wiki/CredentialRouting.md` · catalog `~/ops-warden/registry/routing/catalog.yaml`

View File

@@ -0,0 +1,38 @@
## First Session Protocol
Triggered when `get_domain_summary("infotech")` shows **no workstreams**.
The project is registered but work has not yet been structured.
**Step 1 — Read, don't write**
- `~/the-custodian/canon/projects/infotech/project_charter_v0.1.md` — purpose, scope
- `~/the-custodian/canon/projects/infotech/roadmap_v0.1.md` — planned phases
- Scan repo root: README, directory structure, existing code or docs
**Step 2 — Survey in-progress work**
Look for TODOs, open branches, half-finished files. Note done vs. started but incomplete.
**Step 3 — Propose workstreams to Bernd**
Propose 13 workstreams — each a coherent strand, weeks to months, anchored to a
roadmap phase. **Wait for approval before creating.**
**Step 4 — Create workplan file first, then DB record (ADR-001)**
```
workplans/EMAIL-WP-NNNN-<slug>.md ← write this first
```
Then register in the hub:
```
create_workstream(topic_id="cee7bedf-2b48-46ef-8601-006474f2ad7a", title="...", owner="...", description="...")
create_task(workstream_id="<id>", title="...", priority="high|medium|low")
```
**Step 5 — Record the setup**
```
add_progress_event(
summary="First session: structured infotech into N workstreams, M tasks",
event_type="milestone",
topic_id="cee7bedf-2b48-46ef-8601-006474f2ad7a",
detail={"workstreams": [...], "tasks_created": M}
)
```
<!-- Delete or archive this file once past first session -->

View File

@@ -0,0 +1,8 @@
## Repo boundary
This repo owns **email-connect** only. It does not own:
<!-- TODO: List what belongs in adjacent repos, e.g.:
- SSH key management → railiance-infra/
- State hub code → state-hub/
-->

View File

@@ -0,0 +1,5 @@
**Purpose:** Headless provider-neutral email communication and evidence service for sending, tracking, diagnosing, and normalizing email-channel events.
**Domain:** infotech
**Repo slug:** email-connect
**Topic ID:** cee7bedf-2b48-46ef-8601-006474f2ad7a

View File

@@ -0,0 +1,85 @@
## Session Protocol
Dev Hub (State Hub API): http://127.0.0.1:8000
MCP server name in `~/.claude.json`: `dev-hub`
**Step 1 — Orient**
Read the offline-safe brief first — it works without a live hub connection:
```bash
cat .custodian-brief.md
```
Then call the MCP tool for richer cross-domain context when MCP tools are exposed:
```
get_domain_summary("infotech")
```
If MCP tools are unavailable in the current agent session, use the REST API:
```bash
curl -s "http://127.0.0.1:8000/state/summary" | python3 -m json.tool
```
If the hub is offline: `cd ~/state-hub && make api`
**Step 2 — Check inbox**
With MCP tools:
```
get_messages(to_agent="email-connect", unread_only=True)
```
Mark read with `mark_message_read(message_id)`. Reply or act on coordination
requests before proceeding.
Without MCP tools:
```bash
curl -s "http://127.0.0.1:8000/messages/?to_agent=email-connect&unread_only=true" \
| python3 -m json.tool
curl -s -X PATCH "http://127.0.0.1:8000/messages/<id>/read" \
-H "Content-Type: application/json" -d '{}'
```
**Step 3 — Scan workplans**
```bash
ls workplans/
```
For each file with `status: ready`, `active`, or `blocked`, note pending
`wait`/`todo`/`progress` tasks.
**Step 4 — Present brief**
1. **Active workstreams** for `infotech` — title, task counts, blocking decisions
2. **Pending tasks** from `workplans/` + any `[repo:email-connect]` hub tasks
3. **Goal guidance** — if `goal_guidance` in summary:
- `needs_workplan`: surface as top action — *"Repo goal '{title}' has no workplan yet"*
- `alignment_warnings`: flag if active work is not aligned with current goal
4. **Suggested next action** — highest-priority open item
5. **SBOM status** — flag if `last_sbom_at` is unset for this repo
If no workstreams: follow First Session Protocol (`first-session.md`).
**During work:** `record_decision()` · `add_progress_event()` · `resolve_decision()`
> State Hub is a *read model*. Bootstrap tools (`create_workstream`, `create_task`)
> are First Session Protocol only. Work structure belongs in repo files (ADR-001).
**Session close:**
With MCP tools:
```
add_progress_event(summary="...", topic_id="cee7bedf-2b48-46ef-8601-006474f2ad7a", workstream_id="<uuid>")
```
Without MCP tools:
```bash
curl -s -X POST http://127.0.0.1:8000/progress/ \
-H "Content-Type: application/json" \
-d '{"topic_id":"cee7bedf-2b48-46ef-8601-006474f2ad7a","workstream_id":"<uuid>","event_type":"note","summary":"what changed","author":"codex"}'
```
If workplan files were modified, ensure the local copy is up to date first:
```bash
git -C <repo_path> pull --ff-only
cd ~/state-hub && make fix-consistency REPO=email-connect
```
For repos where implementation runs on a remote machine (e.g. CoulombCore),
use the combined target which pulls before fixing:
```bash
cd ~/state-hub && make fix-consistency-remote REPO=email-connect
```
**C-15** (DB task ahead of file) is normal in multi-machine workflows — writeback
will sync the file to match DB. **C-16** (repo behind remote) blocks all writes
until you pull — intentional to prevent clobbering remote progress.

View File

@@ -0,0 +1,19 @@
## Stack
<!-- TODO: Fill in language, frameworks, and key dependencies -->
- **Language:**
- **Key deps:**
## Dev Commands
```bash
# TODO: Fill in the standard commands for this repo
# Install dependencies
# Run tests
# Lint / type check
# Build / package (if applicable)
```

View File

@@ -0,0 +1,40 @@
## Workplan Convention (ADR-001)
File location: `workplans/EMAIL-WP-NNNN-<slug>.md`
ID prefix: `EMAIL-WP-`
Work items originate as files in this repo **before** being registered in the hub.
Canonical workplan/workstream frontmatter statuses are:
`proposed`, `ready`, `active`, `blocked`, `backlog`, `finished`, `archived`.
Use `proposed` for a newly drafted plan, `ready` after review against current
repo state, and `finished` when implementation is complete. `stalled` and
`needs_review` are derived health labels, not stored statuses.
Closed workplans may be moved to `workplans/archived/` with a completion-date
prefix: `YYMMDD-EMAIL-WP-NNNN-<slug>.md`. The frontmatter id remains
unchanged; the prefix is only for quick visual reference.
Small opportunistic tasks discovered during another session use **Ad Hoc Tasks**:
`workplans/ADHOC-YYYY-MM-DD.md`, workstream slug `adhoc-YYYY-MM-DD`, and task ids
`ADHOC-YYYY-MM-DD-T01`, `T02`, etc. Use adhocs only for low-risk work completed
directly. Promote anything requiring analysis, design, approval, dependencies, or
multiple planned phases into a normal workplan.
Ecosystem todos from other agents arrive as `[repo:email-connect]` hub tasks —
visible at session start. Pick one up by creating the workplan file, then registering
the workstream.
Task blocks use this shape:
```task
id: EMAIL-WP-NNNN-T01
status: wait | todo | progress | done | cancel
priority: high | medium | low
state_hub_task_id: "<uuid>" # written by fix-consistency — do not edit
```
Status progression is `todo``progress``done`; use `wait` for waiting or
blocked work and `cancel` for stopped work.
<!-- Ralph Loop rules and HEUREKA sequence: ~/.claude/CLAUDE.md — do not duplicate here -->

View File

@@ -1,37 +1,18 @@
<!-- custodian-brief: generated by fix-consistency — do not edit manually -->
# Custodian Brief — email-connect
**Domain:** custodian
**Last synced:** 2026-06-01 22:46 UTC
**Domain:** infotech
**Last synced:** 2026-06-22 21:20 UTC
**State Hub:** http://127.0.0.1:8000 *(adjust if running on a remote machine)*
## Active Workstreams
### MVP Mailbox Evidence Scanner
Progress: 0/12 done | workstream_id: `c81788aa-0d0a-4493-bf41-ab6cc2068f2f`
**Open tasks:**
- · T01 - Repository Bootstrap `3a17215d`
- · T02 - Mailbox Connector `25a4da12`
- · T03 - Scan State and Storage `16b95a6b`
- · T04 - MIME and Header Parsing `5a50cd85`
- · T05 - Bounce and DSN Parser `8ea826d1`
- · T06 - Auto-Reply and Human Reply Classifier `4d94a332`
- · T07 - Complaint and Unsubscribe Classifier `8637d383`
- … and 5 more open tasks
### Repository Onboarding and Implementation Foundation
Progress: 1/4 done | workstream_id: `4533ceb6-bd86-49ee-a014-cffd68f84fbb`
**Open tasks:**
- · T02 - Define The Initial Runtime Architecture `fdfd8b96`
- · T03 - Model The Email Evidence Canon `ef1eb769`
- · T04 - Create The First Service Skeleton `4b94e544`
*(none — repo may need first-session setup)*
---
## MCP Orientation (when available)
If the state-hub MCP server is reachable, call:
`get_domain_summary("custodian")`
`get_domain_summary("infotech")`
This provides richer cross-domain context.
If the MCP call fails, use this file as your orientation source.

5
.gitignore vendored
View File

@@ -171,6 +171,9 @@ cython_debug/
# Ruff stuff:
.ruff_cache/
# email-connect local runtime artifacts
.email-connect/
reports/*.csv
# PyPI configuration file
.pypirc

25
.repo-classification.yaml Normal file
View File

@@ -0,0 +1,25 @@
# Repo classification (Repo Classification Standard v1.0).
repo_classification:
standard: Repo Classification Standard
version: '1.0'
classified_at: '2026-06-22'
classified_by: human
category: tooling
domain: infotech
secondary_domains:
- communication
capability_tags:
- evidence
- traceability
- source-management
- automation
business_stake:
- technology
- operations
- legal
business_mechanics:
- operation
- coordination
notes: Headless, provider-neutral email communication and evidence service. Headless infra
for developers -> domain infotech; communication is a secondary market.

View File

@@ -2,41 +2,15 @@
## Repo Identity
**Purpose:** Headless, provider-neutral email communication and evidence service
for sending, tracking, diagnosing, and normalizing email-channel events without
overclaiming delivery, awareness, or result satisfaction.
**Purpose:** Headless provider-neutral email communication and evidence service for sending, tracking, diagnosing, and normalizing email-channel events.
**Domain:** custodian
**Domain:** infotech
**Repo slug:** email-connect
**Topic ID:** `cee7bedf-2b48-46ef-8601-006474f2ad7a`
**Workplan prefix:** `EMAIL-WP-`
---
## Repo Orientation
This repository is currently in planning/specification shape. Treat
`INTENT.md` as the canonical product intent and
`spec/ProductRequirementsDocument.md` as the detailed requirements source until
implementation architecture is added.
Preserve the core principle: email events are evidence, not result
satisfaction. Provider acceptance, MX acceptance, inbox placement, opens,
clicks, replies, complaints, suppressions, and bounces must remain distinct.
Do not collapse them into "delivered", "read", "accepted", or any similar
business conclusion.
Keep the product provider-neutral and adapter-friendly. It should be useful as
a standalone email communication/evidence service and as the reference
`coordination-engine` adapter. It should not drift into newsletter campaign
management, broad marketing automation, or legal/business outcome adjudication.
There is no runtime stack committed yet. Any stack, API, storage, or provider
choice should be introduced through a workplan and anchored in the current
intent/PRD.
---
## State Hub Integration
The Custodian State Hub tracks work across all domains. Interact via HTTP REST —
@@ -50,8 +24,8 @@ there is no MCP server for Codex agents.
### Orient at session start
```bash
# Offline brief, when present
test -f .custodian-brief.md && cat .custodian-brief.md
# Offline brief — works without hub connection
cat .custodian-brief.md
# Active workstreams for this domain
curl -s "http://127.0.0.1:8000/workstreams/?topic_id=cee7bedf-2b48-46ef-8601-006474f2ad7a&status=active" \
@@ -106,7 +80,7 @@ curl -s -X PATCH "http://127.0.0.1:8000/tasks/<task_id>" \
## Session Protocol
**Start:**
1. `test -f .custodian-brief.md && cat .custodian-brief.md` — domain goal and open workstreams, when present
1. `cat .custodian-brief.md` — domain goal and open workstreams (offline-safe)
2. Check inbox: `GET /messages/?to_agent=email-connect&unread_only=true`; mark read
3. Scan workplans: `ls workplans/` — note `status: ready`, `active`, or `blocked` files and open tasks
4. Check human-needed tasks: `GET /tasks/?needs_human=true`
@@ -127,6 +101,63 @@ curl -s -X PATCH "http://127.0.0.1:8000/tasks/<task_id>" \
---
## Credential and access routing
**Audience:** Codex, Claude Code, Grok, and custodian agents that call **llm-connect**
for inference. Run this check **before** requesting secrets, API keys, SSH access,
login tokens, or database passwords — in any repo, not only `ops-warden`.
ops-warden **issues SSH certificates only** (`warden sign`, `cert_command`). Every
other credential need belongs to another subsystem. **Do not** message
`ops-warden` on State Hub expecting a secret value; the reply is a pointer, not a key.
### Lookup (do this first)
```bash
warden route find "<describe your need>" --json
warden route show <catalog-id> --json
```
Requires the `warden` CLI from `~/ops-warden` (`uv tool install .` or `uv run warden`).
| Agent runtime | How to orient |
| --- | --- |
| **Codex / Grok** (shell, HTTP State Hub) | `warden route` commands above; inbox `to_agent=email-connect` is for coordination, not secret vending |
| **Claude Code** (MCP when available) | `get_domain_summary("custodian")` for workstreams; **still** use `warden route` for credential ownership |
| **llm-connect** (inference service) | Never put secret retrieval in prompts; route custody to OpenBao/operator paths surfaced by `warden route` |
### Quick routing table
| I need… | Owner | ops-warden executes? |
| --- | --- | --- |
| SSH cert (`adm`/`agt`/`atm`) | ops-warden | **Yes** — `warden sign` |
| API key, DB password, provider token | OpenBao (`railiance-platform`) | No — route only |
| Login / OIDC / MFA | key-cape / Keycloak | No — route only |
| Authorization decision | flex-auth | No — route only |
| activity-core → issue-core emission | activity-core + issue-core | No — `warden route show activity-core-issue-sink` |
| SSH tunnel | ops-bridge (+ `cert_command` from warden) | No — route only |
### Anti-patterns (do not do these)
- `POST /messages/` to `ops-warden` asking for `ISSUE_CORE_API_KEY`, `OPENROUTER_API_KEY`, etc.
- Inventing `warden secret`, `warden login`, `warden bao`, `warden tunnel` — they do not exist
- Pasting secrets into Git, State Hub, workplans, logs, or chat
### Other capabilities (reuse-surface)
Non-credential capabilities are usually discovered through **reuse-surface** federation
(`reuse-surface` registry / `capability.*` indexes). Credential routing is inlined in
every repo's agent instructions because it is high-frequency, high-risk, and easy to
get wrong.
**Canon:** `~/ops-warden/wiki/CredentialRouting.md` · catalog `~/ops-warden/registry/routing/catalog.yaml`
<!-- REPO-AGENTS-EXTENSIONS -->
<!-- Append repo-specific agent instructions below this marker.
The state-hub template sync preserves content after this line. -->
---
## Workplan Convention (ADR-001)
Work items originate as files in this repo — not in the hub. The hub is a
@@ -150,7 +181,7 @@ anything needing analysis, design, approval, dependencies, or multiple phases.
id: EMAIL-WP-NNNN
type: workplan
title: "..."
domain: custodian
domain: infotech
repo: email-connect
status: proposed | ready | active | blocked | backlog | finished | archived
owner: codex

View File

@@ -1,20 +1,12 @@
# email-connect - Claude Code Instructions
# email-connect Claude Code Instructions
This repository uses `AGENTS.md` as the canonical local agent instruction file.
Claude Code agents should read `AGENTS.md` first and follow the same State Hub
identity, workplan convention, and repo-specific semantic guardrails.
State Hub identity:
- Domain: `custodian`
- Repo slug: `email-connect`
- Topic ID: `cee7bedf-2b48-46ef-8601-006474f2ad7a`
- Workplan prefix: `EMAIL-WP-`
Core product rule: email events are evidence, not result satisfaction. Do not
treat provider acceptance, MX acceptance, inbox placement, opens, clicks,
replies, complaints, or suppressions as proof of human awareness or business
outcome completion.
Start with `INTENT.md`, then `spec/ProductRequirementsDocument.md`, then the
active files under `workplans/`.
@SCOPE.md
@.claude/rules/repo-identity.md
@.claude/rules/session-protocol.md
@.claude/rules/first-session.md
@.claude/rules/workplan-convention.md
@.claude/rules/stack-and-commands.md
@.claude/rules/architecture.md
@.claude/rules/repo-boundary.md
@.claude/rules/credential-routing.md
@.claude/rules/agents.md

View File

@@ -1,3 +1,51 @@
# repo-seed
# email-connect
A git repository template to bootstrap coulomb projects from.
Headless, provider-neutral email communication and evidence service.
The first implementation slice is the Mailbox Evidence Scanner MVP: scan a
return mailbox or fixture directory, classify inbound email-channel evidence,
store scan state locally, and generate timestamped CSV reports without
overclaiming delivery, awareness, or coordination success.
## Quickstart
```bash
PYTHONPATH=src python3 -m unittest discover -s tests
PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --out reports/
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --expected-recipients tests/fixtures/expected_recipients.txt --out reports/
```
The example config uses `tests/fixtures/mailbox` as a mailbox source. Runtime
state is written to `.email-connect/state.sqlite`; generated CSV reports are
written to `reports/`.
Expected recipients are optional. When provided as a text file or CSV, reports
include a `known_recipient` column, place known recipients first, and add
`undef.no_signal` diagnostics for expected recipients with no mailbox evidence.
See [Mailbox Report Tutorial](docs/mailbox-report-tutorial.md).
For a live mailbox, set `mailbox.protocol: imap`, configure host, port, folder,
and credential environment variable names, then export the credentials before
running `scan-mailbox`. IMAP scans select the configured folder read-only and
fetch message bodies with `BODY.PEEK[]`; mailbox write-back actions such as
marking messages seen are intentionally unsupported in this MVP.
## Current Scope
- Coordination-engine spec review and references.
- Initial adapter descriptor, capability profile, evidence ceiling, and
limitations.
- Fixture and read-only IMAP mailbox sources.
- Conservative mailbox message parser and evidence mapper.
- SQLite state store with scan cursor, message/evidence deduplication, and
endpoint quality hints.
- CSV report generation, including `--report-only-new`.
- Optional expected-recipient text/CSV input, `known_recipient` report
filtering, no-evidence diagnostics, and datetime range filtering.
- Golden fixture tests for hard bounce, soft bounce, delayed delivery, final
failure, complaint, unsubscribe, challenge-response, unknown return,
parse-failure, out-of-office, and human reply signals.
Provider webhooks, outbound sending, suppression workflows, OAuth mailbox login,
and a UI remain outside this first mailbox-scanner slice.

View File

@@ -0,0 +1,37 @@
mailbox:
id: return-mailbox-default
protocol: fixture
host: null
port: 993
tls: true
username_env: EMAIL_CONNECT_IMAP_USER
password_env: EMAIL_CONNECT_IMAP_PASSWORD
folder: INBOX
source:
fixture_dir: tests/fixtures/mailbox
scan:
mode: incremental
max_messages_per_run: 5000
since: null
from: null
to: null
include_seen: true
mark_seen: false
store_raw_headers: true
store_raw_body: false
store_raw_message_ref: true
expected_recipients:
path: null
csv_column: email
storage:
path: .email-connect/state.sqlite
reports:
output_dir: reports
include_all_evidence: true
include_unknown_messages: true
timestamp_timezone: UTC

View File

@@ -0,0 +1,46 @@
# coordination-engine Email Spec Review
## Source Specs
Reviewed local coordination-engine specifications:
- `/home/worsch/coordination-engine/spec/EmailAdapterSpecification.md`
- `/home/worsch/coordination-engine/spec/AdapterInterfaceSpecification.md`
- `/home/worsch/coordination-engine/spec/RuntimeArchitectureAndAdapterSubsystem.md`
- `/home/worsch/coordination-engine/spec/ProductRequirementsDocument.md`
These are referenced, not copied, so `email-connect` stays aligned with the
authoritative coordination-engine checkout.
## Contract Points For email-connect
- `email-connect` is a `notification`, `communication`, and `interaction`
adapter.
- It reports email-channel evidence and advisory assessment. It does not own
coordination case result evaluation.
- Ambiguous provider or mailbox signals must map to the weakest safe normalized
event.
- Provider and mailbox events must preserve raw references and native status
mappings.
- The adapter must expose or enable production of normalized `EvidenceEvent`
records.
- The adapter should expose endpoint quality diagnostics, but endpoint quality
is not coordination success.
- Email's positive evidence ceiling is limited: email tracking cannot prove
human awareness, identity, authority, payload access, or non-repudiation.
- Golden tests must include overclaim prevention, especially "provider
delivered" not becoming awareness or payload delivery.
## First Implementation Implications
- The mailbox scanner emits `EmailEvidenceCandidate` rows that are shaped to
become coordination-engine `EvidenceEvent` records later.
- Classifiers preserve `unknown_return_message` and `parse_failed` instead of
silently discarding uncertainty.
- Out-of-office replies update diagnostics only; they do not prove awareness or
reachability.
- Human replies are email-channel success signals, but not legal acceptance or
coordination result satisfaction.
- CSV reports include event type, assessment category/subclass, confidence,
evidence strength, observed time, occurred time, raw reference, and
deduplication key for auditability.

View File

@@ -0,0 +1,69 @@
# Email Evidence Canon
## Rule
Email events are evidence, not result satisfaction.
The scanner reports email-channel facts and uncertainty. A downstream
coordination runtime decides whether those facts satisfy a coordination case.
## Initial Message Classes
- `hard_bounce`
- `soft_bounce`
- `delayed_delivery_notice`
- `final_delivery_failure`
- `out_of_office`
- `human_reply`
- `complaint_or_abuse`
- `unsubscribe_or_opt_out`
- `challenge_response`
- `unknown_return_message`
- `unrelated_message`
- `parse_failed`
## Initial Normalized Events
| Message class | Normalized event | Assessment |
| --- | --- | --- |
| `hard_bounce` | `notification.endpoint.rejected_permanent` | `fail.hard_bounce` |
| `soft_bounce` | `notification.endpoint.rejected_temporary` | `undef.deferred` |
| `delayed_delivery_notice` | `notification.endpoint.deferred` | `undef.deferred` |
| `final_delivery_failure` | `notification.endpoint.rejected_permanent` | `fail.expired_without_delivery` |
| `out_of_office` | `interaction.out_of_office_received` | `undef.out_of_office` |
| `human_reply` | `interaction.reply_received` | `success.reply_received` |
| `complaint_or_abuse` | `notification.channel.complaint_received` | `fail.complaint_received` |
| `unsubscribe_or_opt_out` | `notification.channel.unsubscribe_received` | `fail.unsubscribed` |
| `unknown_return_message` | `notification.endpoint.unknown` | `undef.conflicting_evidence` |
| `challenge_response` | `interaction.unverified_actor_interaction` | `undef.identity_uncertain` |
| `parse_failed` | `diagnostic.message.parse_failed` | `undef.parse_failed` |
| expected recipient with no evidence | `diagnostic.expected_recipient.no_evidence` | `undef.no_signal` |
## Overclaim Prevention
- No bounce found does not mean delivery success.
- Provider acceptance does not mean endpoint acceptance.
- Endpoint acceptance does not mean inbox placement.
- Out-of-office does not prove recipient awareness or action.
- Human reply does not prove legal acceptance.
- Unknown return messages remain visible.
- Parse failures are diagnostic rows, not delivery or interaction outcomes.
- Expected-recipient no-evidence rows mean no mailbox evidence was found in the
inspected range, not that notification succeeded or failed.
- Scanner and proxy interactions must stay below identity-bound interaction.
## Endpoint Quality Hints
Endpoint quality rows are diagnostic state, not verdicts:
| Evidence | Quality hint |
| --- | --- |
| Hard bounce or final failure | `reachability = unreachable`, `last_failure_at` |
| Soft bounce or delayed delivery | `reachability = degraded`, `last_failure_at` |
| Complaint | `suppression_state = suppressed` |
| Unsubscribe | `suppression_state = opted_out` |
| Human reply | `last_success_at` |
| Out-of-office | `reachability = uncertain`, `last_auto_reply_at` |
These hints can guide future suppression and review workflows, but they do not
prove human awareness, authority, payload access, or coordination success.

View File

@@ -0,0 +1,116 @@
# Initial Runtime Architecture
## Status
This is the first implementation architecture for the mailbox evidence scanner
slice. It is intentionally small and stdlib-only so the repo can run before a
larger service stack is chosen.
## Service Boundary
The first slice is a CLI scanner:
```text
email-connect scan-mailbox --config config/mailbox.example.yml --out reports/
```
It scans an inbound return mailbox source, classifies messages, stores scan
state in SQLite, updates endpoint-quality hints, and writes timestamped CSV
evidence reports.
The source layer supports deterministic fixture directories and a read-only IMAP
connector. IMAP scans select the configured folder with `readonly=True`, fetch
messages using `BODY.PEEK[]`, and reject `mark_seen` because mailbox write-back
actions are out of scope for this MVP.
## Package Layout
```text
src/email_connect/
adapter_contract.py # coordination-engine descriptor and evidence ceiling
cli.py # command line entry points
config.py # config loading
evidence.py # native class to normalized evidence mapping
mailbox.py # fixture and IMAP mailbox sources
models.py # mailbox, parse, evidence, endpoint quality dataclasses
parser.py # MIME/header parsing and conservative classification
reporting.py # CSV report generation
scanner.py # scan orchestration
storage.py # SQLite state store
```
## Persistence
SQLite is the MVP store. The initial schema includes:
- `mailbox_scans`
- `mailbox_messages`
- `parsed_messages`
- `evidence_candidates`
- `scan_cursors`
- `endpoint_quality`
Message deduplication is keyed by mailbox ID, IMAP UID when present, message ID,
received timestamp, sender, subject hash, and body hash. Evidence
deduplication follows the workplan fields: message, parser version, normalized
event, affected recipient, original message, SMTP/enhanced status, and reason.
Incremental scans use `scan_cursors` by mailbox and folder. Full rescans ignore
the cursor while preserving message and evidence deduplication. Endpoint-quality
rows are diagnostic hints derived from explicit evidence events; they are not
coordination outcomes.
## Evidence Mapping
Parser output is represented as `ParsedMailboxMessage`. The mapper converts it
to `EmailEvidenceCandidate` using coordination-engine event names and advisory
assessment classes.
Examples:
- `hard_bounce` -> `notification.endpoint.rejected_permanent`
- `soft_bounce` -> `notification.endpoint.rejected_temporary`
- `delayed_delivery_notice` -> `notification.endpoint.deferred`
- `complaint_or_abuse` -> `notification.channel.complaint_received`
- `unsubscribe_or_opt_out` -> `notification.channel.unsubscribe_received`
- `out_of_office` -> `interaction.out_of_office_received`
- `challenge_response` -> `interaction.unverified_actor_interaction`
- `human_reply` -> `interaction.reply_received`
- `parse_failed` -> `diagnostic.message.parse_failed`
The mapper does not emit evidence for unrelated messages. Unknown return
messages stay visible as `notification.endpoint.unknown`. Parse failures are
visible as diagnostics without claiming delivery, interaction, identity, or
endpoint quality.
## coordination-engine Alignment
The implementation keeps these coordination-engine concepts explicit:
- adapter descriptor
- adapter capability profile
- evidence ceiling
- advisory assessment
- endpoint quality update shape
- event observation and raw reference preservation
- golden tests for overclaim prevention
Email evidence remains below the coordination result layer. The scanner does
not infer inbox placement, human awareness, legal acceptance, payload access, or
case success.
## Provider Boundary
Provider webhook ingestion and outbound send APIs are deliberately outside this
slice. The mailbox scanner uses the same evidence model so future provider
events can enter through a parallel ingestion path and converge at the same
normalization layer.
## Development Commands
```bash
PYTHONPATH=src python3 -m unittest discover -s tests
PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --out reports/
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox --config config/mailbox.example.yml --report-only-new --out reports/
```

View File

@@ -0,0 +1,139 @@
# Mailbox Report Tutorial
This tutorial shows how to generate an email-channel evidence report from a
return mailbox or from the bundled fixture mailbox.
## 1. Verify The Scanner
Run the tests and print the adapter descriptor:
```bash
PYTHONPATH=src python3 -m unittest discover -s tests
PYTHONPATH=src python3 -m email_connect.cli adapter-descriptor
```
## 2. Start With Fixtures
The example config uses `tests/fixtures/mailbox`:
```bash
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox \
--config config/mailbox.example.yml \
--full-rescan \
--out reports/
```
The scanner writes a timestamped CSV file to `reports/`.
## 3. Configure A Live IMAP Mailbox
Copy `config/mailbox.example.yml` and set:
```yaml
mailbox:
protocol: imap
host: imap.example.com
port: 993
tls: true
username_env: EMAIL_CONNECT_IMAP_USER
password_env: EMAIL_CONNECT_IMAP_PASSWORD
folder: INBOX
```
Then export credentials:
```bash
export EMAIL_CONNECT_IMAP_USER='mailbox@example.com'
export EMAIL_CONNECT_IMAP_PASSWORD='app-password'
```
IMAP scans select the folder read-only and fetch messages with `BODY.PEEK[]`.
The scanner does not mark messages seen, move messages, or delete messages.
## 4. Add Expected Recipients
Expected recipients are optional. A newline-separated file can look like:
```text
missing@example.com
recipient@example.com
```
Run:
```bash
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox \
--config config/mailbox.example.yml \
--expected-recipients recipients.txt \
--out reports/
```
CSV input is also supported:
```csv
email,name
missing@example.com,Missing User
recipient@example.com,Known Recipient
```
Run:
```bash
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox \
--config config/mailbox.example.yml \
--expected-recipients recipients.csv \
--expected-recipient-column email \
--out reports/
```
Invalid recipient rows are ignored and printed as warnings.
## 5. Limit The Time Range
Use an inclusive datetime range:
```bash
PYTHONPATH=src python3 -m email_connect.cli scan-mailbox \
--config config/mailbox.example.yml \
--from 2026-06-02T10:00:00Z \
--to 2026-06-02T11:00:00Z \
--out reports/
```
`--since` remains a compatibility alias for the lower bound. When a range is
active, messages with no parseable `Date` header are excluded because the
scanner cannot confirm that they originated inside the requested window.
## 6. Read The Report
Key columns:
- `known_recipient`: `true` when the address was supplied in the expected list.
- `normalized_event_type`: the email evidence or diagnostic event.
- `assessment_category` and `assessment_subclass`: advisory interpretation.
- `affected_email_address`: the endpoint the row is about.
Known recipients appear first by default so spreadsheet filtering is easy.
Expected recipients with no mailbox evidence appear as:
```text
normalized_event_type: diagnostic.expected_recipient.no_evidence
assessment_category: undef
assessment_subclass: undef.no_signal
evidence_strength: none
known_recipient: true
```
That row means only that no mailbox evidence was found for the supplied address
inside the inspected range. It is not evidence of delivery success, delivery
failure, recipient awareness, or legal acceptance.
## 7. Troubleshooting
- Empty report: check folder, time range, and whether incremental cursor state
already skipped older messages. Try `--full-rescan`.
- IMAP credential error: verify the environment variable names and values.
- Missing expected rows: check the recipient file path and CSV column name.
- Unexpected no-evidence rows: confirm that the relevant mailbox evidence is
inside the configured datetime range.

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "email-connect"
version = "0.1.0"
description = "Provider-neutral email communication and evidence service."
readme = "README.md"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [{ name = "Coulomb" }]
dependencies = []
[project.scripts]
email-connect = "email_connect.cli:main"
[tool.setuptools.packages.find]
where = ["src"]

12
registry/README.md Normal file
View File

@@ -0,0 +1,12 @@
# Capability Registry
Markdown-first capability index for federation and reuse planning.
## Authoring
1. Copy a capability entry template (see reuse-surface `templates/capability-entry.template.md`).
2. Add the row to `indexes/capabilities.yaml`.
3. Run `reuse-surface validate` from a checkout with the CLI installed.
4. Merge to `main` and verify publish with `reuse-surface establish --publish-check`.
Federation contract: reuse-surface `docs/RegistryFederation.md`.

View File

View File

@@ -0,0 +1,4 @@
version: 1
updated: '2026-06-16'
domain: helix_forge
capabilities: []

1
reports/.gitkeep Normal file
View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,3 @@
"""email-connect package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
ADAPTER_ID = "email-connect"
ADAPTER_CONTRACT_VERSION = "1.1"
EMITTED_EVENT_TYPES = [
"notification.endpoint.rejected_permanent",
"notification.endpoint.rejected_temporary",
"notification.endpoint.deferred",
"notification.channel.complaint_received",
"notification.channel.unsubscribe_received",
"interaction.reply_received",
"interaction.out_of_office_received",
"notification.endpoint.unknown",
"diagnostic.message.parse_failed",
"diagnostic.expected_recipient.no_evidence",
]
def adapter_descriptor() -> dict:
"""Return the initial coordination-engine adapter descriptor."""
return {
"adapter_id": ADAPTER_ID,
"adapter_name": "email-connect",
"adapter_version": "0.1.0",
"adapter_contract_version": ADAPTER_CONTRACT_VERSION,
"adapter_types": ["notification", "communication", "interaction"],
"deployment_mode": "external",
"capability_profile": {
"can_notify": True,
"can_publish": False,
"can_deliver_payload": False,
"can_collect_payload": False,
"can_grant_access": False,
"can_revoke_access": False,
"can_observe_interaction": True,
"can_observe_identity": False,
"can_observe_authority": False,
"can_emit_delivery_receipts": False,
"can_emit_display_or_read_receipts": False,
"can_emit_return_or_failure_events": True,
"can_emit_late_events": True,
"can_cancel_after_dispatch": False,
"supports_webhooks": False,
"supports_polling": True,
"supports_idempotency": "adapter_managed",
"supports_endpoint_quality": True,
"supports_native_status_mapping": True,
"supports_golden_tests": True,
"limitations": [
"Mailbox evidence cannot prove inbox placement.",
"Reply, open, and click signals do not prove coordination result satisfaction.",
],
},
"supported_actions": [],
"emitted_event_types": EMITTED_EVENT_TYPES,
"supported_channels": ["email"],
"supported_endpoint_types": ["email_address"],
"evidence_profile": {
"native_status_mapping_ref": "src/email_connect/evidence.py",
"golden_tests_ref": "tests/fixtures",
},
"evidence_ceiling": {
"max_positive_event": "interaction.reply_received",
"max_positive_strength": "medium",
"can_prove_human_awareness": False,
"can_prove_payload_access": False,
"can_prove_payload_delivery": False,
"can_prove_identity": False,
"can_prove_authority": False,
"can_prove_non_repudiation": False,
"limitations": [
"Email cannot reliably prove intended-recipient awareness.",
"Mailbox scanning observes return-channel evidence only.",
],
},
"assurance_capability": {
"awareness_assurance": "weak",
"delivery_assurance": "none",
"identity_assurance": "none",
"authority_assurance": "none",
"non_repudiation_assurance": "none",
},
"identity_profile": {
"can_identify_actor": False,
"identity_limitations": ["Inbound mailbox evidence is not authenticated actor evidence."],
},
"late_event_policy": {
"accepts_late_events": True,
"late_event_notes": ["Bounces and replies may arrive after earlier acceptance evidence."],
},
"limitations": [
"No outbound sending in the first mailbox-scanner MVP.",
"No provider webhook integration in the first mailbox-scanner MVP.",
"No legal delivery assessment.",
],
}

66
src/email_connect/cli.py Normal file
View File

@@ -0,0 +1,66 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from .adapter_contract import adapter_descriptor
from .config import load_config
from .scanner import scan_mailbox
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="email-connect")
sub = parser.add_subparsers(dest="command", required=True)
scan = sub.add_parser("scan-mailbox", help="Scan a return mailbox or fixture directory.")
scan.add_argument("--config", required=True)
scan.add_argument("--out", default=None)
scan.add_argument("--full-rescan", action="store_true")
scan.add_argument("--since", default=None)
scan.add_argument("--from", dest="range_from", default=None)
scan.add_argument("--to", dest="range_to", default=None)
scan.add_argument("--expected-recipients", default=None)
scan.add_argument("--expected-recipient-column", default=None)
scan.add_argument("--report-only-new", action="store_true")
scan.add_argument("--dry-run", action="store_true")
scan.add_argument("--fixture-dir", default=None)
sub.add_parser("adapter-descriptor", help="Print the initial coordination-engine adapter descriptor.")
args = parser.parse_args(argv)
if args.command == "adapter-descriptor":
print(json.dumps(adapter_descriptor(), indent=2, sort_keys=True))
return 0
if args.command == "scan-mailbox":
config = load_config(args.config)
result = scan_mailbox(
config,
output_dir=args.out,
full_rescan=args.full_rescan,
report_only_new=args.report_only_new,
dry_run=args.dry_run,
fixture_dir=args.fixture_dir,
since=args.since,
range_from=args.range_from,
range_to=args.range_to,
expected_recipients_path=args.expected_recipients,
expected_recipient_column=args.expected_recipient_column,
)
print(f"scan_id={result.scan.scan_id}")
print(f"messages_seen={result.scan.messages_seen}")
print(f"messages_new={result.scan.messages_new}")
print(f"messages_parsed={result.scan.messages_parsed}")
print(f"evidence_events_created={result.scan.evidence_events_created}")
if result.report_path:
print(f"report_path={Path(result.report_path)}")
for warning in result.warnings:
print(f"warning={warning}")
return 0
return 2
if __name__ == "__main__":
raise SystemExit(main())

156
src/email_connect/config.py Normal file
View File

@@ -0,0 +1,156 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class MailboxConfig:
id: str
protocol: str = "imap"
host: str | None = None
port: int = 993
tls: bool = True
username_env: str | None = None
password_env: str | None = None
folder: str = "INBOX"
@dataclass(frozen=True)
class ScanConfig:
mode: str = "incremental"
max_messages_per_run: int = 5000
since: str | None = None
range_from: str | None = None
range_to: str | None = None
include_seen: bool = True
mark_seen: bool = False
store_raw_headers: bool = True
store_raw_body: bool = False
store_raw_message_ref: bool = True
@dataclass(frozen=True)
class StorageConfig:
path: str = ".email-connect/state.sqlite"
@dataclass(frozen=True)
class ReportsConfig:
output_dir: str = "reports"
include_all_evidence: bool = True
include_unknown_messages: bool = True
timestamp_timezone: str = "UTC"
@dataclass(frozen=True)
class SourceConfig:
fixture_dir: str | None = None
@dataclass(frozen=True)
class ExpectedRecipientsConfig:
path: str | None = None
csv_column: str = "email"
@dataclass(frozen=True)
class AppConfig:
mailbox: MailboxConfig
scan: ScanConfig
storage: StorageConfig
reports: ReportsConfig
source: SourceConfig = SourceConfig()
expected_recipients: ExpectedRecipientsConfig = ExpectedRecipientsConfig()
def load_config(path: str | Path) -> AppConfig:
data = _load_mapping(Path(path))
mailbox = data.get("mailbox", {})
scan = data.get("scan", {})
storage = data.get("storage", {})
reports = data.get("reports", {})
source = data.get("source", {})
expected_recipients = data.get("expected_recipients", {})
return AppConfig(
mailbox=MailboxConfig(
id=str(mailbox.get("id", "return-mailbox-default")),
protocol=str(mailbox.get("protocol", "imap")),
host=mailbox.get("host"),
port=int(mailbox.get("port", 993)),
tls=bool(mailbox.get("tls", True)),
username_env=mailbox.get("username_env"),
password_env=mailbox.get("password_env"),
folder=str(mailbox.get("folder", "INBOX")),
),
scan=ScanConfig(
mode=str(scan.get("mode", "incremental")),
max_messages_per_run=int(scan.get("max_messages_per_run", 5000)),
since=scan.get("since"),
range_from=scan.get("from") or scan.get("range_from"),
range_to=scan.get("to") or scan.get("range_to"),
include_seen=bool(scan.get("include_seen", True)),
mark_seen=bool(scan.get("mark_seen", False)),
store_raw_headers=bool(scan.get("store_raw_headers", True)),
store_raw_body=bool(scan.get("store_raw_body", False)),
store_raw_message_ref=bool(scan.get("store_raw_message_ref", True)),
),
storage=StorageConfig(path=str(storage.get("path", ".email-connect/state.sqlite"))),
reports=ReportsConfig(
output_dir=str(reports.get("output_dir", "reports")),
include_all_evidence=bool(reports.get("include_all_evidence", True)),
include_unknown_messages=bool(reports.get("include_unknown_messages", True)),
timestamp_timezone=str(reports.get("timestamp_timezone", "UTC")),
),
source=SourceConfig(fixture_dir=source.get("fixture_dir")),
expected_recipients=ExpectedRecipientsConfig(
path=expected_recipients.get("path"),
csv_column=str(expected_recipients.get("csv_column", "email")),
),
)
def _load_mapping(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
try:
import yaml # type: ignore
loaded = yaml.safe_load(text)
return loaded or {}
except ModuleNotFoundError:
return _parse_simple_yaml(text)
def _parse_simple_yaml(text: str) -> dict[str, Any]:
"""Parse the small YAML subset used by config/mailbox.example.yml."""
result: dict[str, Any] = {}
current: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" ") and line.endswith(":"):
key = line[:-1].strip()
current = {}
result[key] = current
continue
if current is None or ":" not in line:
raise ValueError(f"Unsupported YAML line: {raw_line}")
key, value = line.strip().split(":", 1)
current[key.strip()] = _parse_scalar(value.strip())
return result
def _parse_scalar(value: str) -> Any:
if value == "" or value == "null":
return None
if value in {"true", "false"}:
return value == "true"
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
try:
return int(value)
except ValueError:
return value

View File

@@ -0,0 +1,223 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from uuid import uuid5, NAMESPACE_URL
from .models import (
AssessmentCategory,
Confidence,
EmailEvidenceCandidate,
EndpointQualityUpdate,
EvidenceStrength,
MessageClass,
ParsedMailboxMessage,
)
@dataclass(frozen=True)
class EvidenceMapping:
event_type: str
assessment_category: AssessmentCategory
assessment_subclass: str
evidence_strength: EvidenceStrength
normalized_meaning: str
EVIDENCE_MAPPINGS: dict[MessageClass, EvidenceMapping | None] = {
MessageClass.HARD_BOUNCE: EvidenceMapping(
"notification.endpoint.rejected_permanent",
AssessmentCategory.FAIL,
"fail.hard_bounce",
EvidenceStrength.NEGATIVE,
"Strong evidence that the recipient endpoint rejected the message permanently.",
),
MessageClass.SOFT_BOUNCE: EvidenceMapping(
"notification.endpoint.rejected_temporary",
AssessmentCategory.UNDEF,
"undef.deferred",
EvidenceStrength.AMBIGUOUS,
"Temporary endpoint failure; retry or later evidence may change interpretation.",
),
MessageClass.DELAYED_DELIVERY_NOTICE: EvidenceMapping(
"notification.endpoint.deferred",
AssessmentCategory.UNDEF,
"undef.deferred",
EvidenceStrength.AMBIGUOUS,
"Delivery is delayed and still uncertain.",
),
MessageClass.FINAL_DELIVERY_FAILURE: EvidenceMapping(
"notification.endpoint.rejected_permanent",
AssessmentCategory.FAIL,
"fail.expired_without_delivery",
EvidenceStrength.NEGATIVE,
"Provider or endpoint gave up after retrying.",
),
MessageClass.OUT_OF_OFFICE: EvidenceMapping(
"interaction.out_of_office_received",
AssessmentCategory.UNDEF,
"undef.out_of_office",
EvidenceStrength.WEAK,
"Mailbox auto-reply was observed; awareness and action remain uncertain.",
),
MessageClass.HUMAN_REPLY: EvidenceMapping(
"interaction.reply_received",
AssessmentCategory.SUCCESS,
"success.reply_received",
EvidenceStrength.MEDIUM,
"A reply-like message was observed on the email channel.",
),
MessageClass.COMPLAINT_OR_ABUSE: EvidenceMapping(
"notification.channel.complaint_received",
AssessmentCategory.FAIL,
"fail.complaint_received",
EvidenceStrength.NEGATIVE,
"Complaint or abuse feedback was observed.",
),
MessageClass.UNSUBSCRIBE_OR_OPT_OUT: EvidenceMapping(
"notification.channel.unsubscribe_received",
AssessmentCategory.FAIL,
"fail.unsubscribed",
EvidenceStrength.NEGATIVE,
"Opt-out or unsubscribe signal was observed.",
),
MessageClass.UNKNOWN_RETURN_MESSAGE: EvidenceMapping(
"notification.endpoint.unknown",
AssessmentCategory.UNDEF,
"undef.conflicting_evidence",
EvidenceStrength.AMBIGUOUS,
"Return-channel message could not be classified reliably.",
),
MessageClass.CHALLENGE_RESPONSE: EvidenceMapping(
"interaction.unverified_actor_interaction",
AssessmentCategory.UNDEF,
"undef.identity_uncertain",
EvidenceStrength.AMBIGUOUS,
"Challenge-response or automated interaction was observed.",
),
MessageClass.UNRELATED_MESSAGE: None,
MessageClass.PARSE_FAILED: EvidenceMapping(
"diagnostic.message.parse_failed",
AssessmentCategory.UNDEF,
"undef.parse_failed",
EvidenceStrength.NONE,
"Message source could not be parsed reliably; no delivery or interaction claim is made.",
),
}
def evidence_deduplication_key(parsed: ParsedMailboxMessage, mapping: EvidenceMapping) -> str:
parts = [
parsed.mailbox_message_id,
parsed.parser_version,
mapping.event_type,
parsed.affected_email_address or "",
parsed.original_message_id or "",
parsed.smtp_status_code or "",
parsed.enhanced_status_code or "",
parsed.reason_code or "",
]
return "|".join(parts)
def candidate_from_parsed(
parsed: ParsedMailboxMessage,
*,
raw_message_ref: str | None,
observed_at: datetime,
occurred_at: datetime | None,
) -> EmailEvidenceCandidate | None:
mapping = EVIDENCE_MAPPINGS.get(parsed.message_class)
if mapping is None:
return None
dedup_key = evidence_deduplication_key(parsed, mapping)
candidate_id = str(uuid5(NAMESPACE_URL, "email-connect:evidence:" + dedup_key))
return EmailEvidenceCandidate(
evidence_candidate_id=candidate_id,
mailbox_message_id=parsed.mailbox_message_id,
parsed_message_id=parsed.parsed_message_id,
event_type=mapping.event_type,
assessment_category=mapping.assessment_category,
assessment_subclass=mapping.assessment_subclass,
affected_email_address=parsed.affected_email_address,
original_message_id=parsed.original_message_id,
confidence=parsed.confidence,
evidence_strength=mapping.evidence_strength,
occurred_at=occurred_at,
observed_at=observed_at,
deduplication_key=dedup_key,
raw_message_ref=raw_message_ref,
notes=[mapping.normalized_meaning, *parsed.notes],
metadata={
"message_class": parsed.message_class.value,
"smtp_status_code": parsed.smtp_status_code,
"enhanced_status_code": parsed.enhanced_status_code,
"reason_code": parsed.reason_code,
},
)
def endpoint_quality_from_candidate(candidate: EmailEvidenceCandidate) -> EndpointQualityUpdate | None:
address = candidate.affected_email_address
if not address:
return None
message_class = candidate.metadata.get("message_class")
signal = str(message_class or candidate.event_type)
reason_code = candidate.metadata.get("reason_code")
observed_at = candidate.observed_at
if message_class in {MessageClass.HARD_BOUNCE.value, MessageClass.FINAL_DELIVERY_FAILURE.value}:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="unreachable",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class in {MessageClass.SOFT_BOUNCE.value, MessageClass.DELAYED_DELIVERY_NOTICE.value}:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="degraded",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.COMPLAINT_OR_ABUSE.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
suppression_state="suppressed",
last_failure_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.UNSUBSCRIBE_OR_OPT_OUT.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
suppression_state="opted_out",
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.HUMAN_REPLY.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
last_success_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
if message_class == MessageClass.OUT_OF_OFFICE.value:
return EndpointQualityUpdate(
affected_email_address=address.lower(),
reachability="uncertain",
last_auto_reply_at=observed_at,
reason_code=reason_code,
confidence=candidate.confidence,
quality_signals=[signal, candidate.event_type],
)
return None

View File

@@ -0,0 +1,196 @@
from __future__ import annotations
import imaplib
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable, Protocol
from .config import AppConfig
@dataclass(frozen=True)
class MailboxSourceMessage:
source_uid: str
raw_bytes: bytes
raw_message_ref: str
imap_uid: str | None = None
class MailboxMessageSource(Protocol):
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
...
class FixtureMailboxSource:
def __init__(self, fixture_dir: str | Path) -> None:
self.fixture_dir = Path(fixture_dir)
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
del include_seen, since
paths = sorted(self.fixture_dir.glob("*.eml"))
emitted = 0
for path in paths:
source_uid = path.name
if not full_rescan and since_uid and source_uid <= since_uid:
continue
yield MailboxSourceMessage(
source_uid=source_uid,
raw_bytes=path.read_bytes(),
raw_message_ref=str(path),
imap_uid=None,
)
emitted += 1
if max_messages and emitted >= max_messages:
break
class ImapMailboxSource:
def __init__(self, config: AppConfig) -> None:
self.config = config
def iter_messages(
self,
*,
max_messages: int,
since_uid: str | None,
full_rescan: bool,
include_seen: bool,
since: datetime | None,
) -> Iterable[MailboxSourceMessage]:
mailbox = self.config.mailbox
if self.config.scan.mark_seen:
raise ValueError("IMAP mark_seen is intentionally unsupported; scans are read-only.")
if not mailbox.host:
raise ValueError("mailbox.host is required for IMAP scans.")
if not mailbox.username_env or not mailbox.password_env:
raise ValueError("mailbox.username_env and mailbox.password_env are required for IMAP scans.")
username = os.environ.get(mailbox.username_env)
password = os.environ.get(mailbox.password_env)
if not username or not password:
raise ValueError(f"IMAP credentials not found in {mailbox.username_env}/{mailbox.password_env}.")
conn: imaplib.IMAP4
if mailbox.tls:
conn = imaplib.IMAP4_SSL(mailbox.host, mailbox.port)
else:
conn = imaplib.IMAP4(mailbox.host, mailbox.port)
selected = False
try:
_expect_ok(conn.login(username, password), "login")
_expect_ok(conn.select(mailbox.folder, readonly=True), f"select {mailbox.folder}")
selected = True
criteria = _search_criteria(include_seen=include_seen, since=since)
_status, search_data = _expect_ok(conn.uid("search", None, *criteria), "uid search")
uids = _decode_uids(search_data)
if not full_rescan and since_uid:
uids = [uid for uid in uids if _uid_after(uid, since_uid)]
uids = sorted(uids, key=_uid_sort_key)
if max_messages:
uids = uids[:max_messages]
for uid in uids:
_fetch_status, fetch_data = _expect_ok(conn.uid("fetch", uid, "(BODY.PEEK[])"), f"uid fetch {uid}")
raw_bytes = _raw_message_from_fetch(fetch_data)
if raw_bytes is None:
continue
yield MailboxSourceMessage(
source_uid=uid,
raw_bytes=raw_bytes,
raw_message_ref=f"imap://{mailbox.host}/{mailbox.folder};UID={uid}",
imap_uid=uid,
)
finally:
if selected:
try:
conn.close()
except imaplib.IMAP4.error:
pass
try:
conn.logout()
except imaplib.IMAP4.error:
pass
def source_for_config(config: AppConfig, *, fixture_dir_override: str | None = None) -> MailboxMessageSource:
if fixture_dir_override:
return FixtureMailboxSource(fixture_dir_override)
if config.mailbox.protocol == "fixture":
fixture_dir = config.source.fixture_dir
if not fixture_dir:
raise ValueError("source.fixture_dir is required for fixture scans.")
return FixtureMailboxSource(fixture_dir)
if config.mailbox.protocol == "imap":
return ImapMailboxSource(config)
raise ValueError(f"Unsupported mailbox protocol: {config.mailbox.protocol}")
def _search_criteria(*, include_seen: bool, since: datetime | None) -> list[str]:
criteria = ["ALL" if include_seen else "UNSEEN"]
if since is not None:
criteria.extend(["SINCE", _imap_date(since)])
return criteria
def _imap_date(value: datetime) -> str:
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
return f"{value.day:02d}-{months[value.month - 1]}-{value.year:04d}"
def _expect_ok(result: tuple[str, list], operation: str) -> tuple[str, list]:
status, data = result
if status != "OK":
raise RuntimeError(f"IMAP {operation} failed with status {status}: {data!r}")
return status, data
def _decode_uids(search_data: list) -> list[str]:
if not search_data:
return []
raw = search_data[0] or b""
if isinstance(raw, str):
raw_text = raw
else:
raw_text = raw.decode("ascii", errors="ignore")
return [part for part in raw_text.split() if part]
def _uid_after(uid: str, since_uid: str) -> bool:
try:
return int(uid) > int(since_uid)
except ValueError:
return uid > since_uid
def _uid_sort_key(uid: str) -> tuple[int, int | str]:
try:
return (0, int(uid))
except ValueError:
return (1, uid)
def _raw_message_from_fetch(fetch_data: list) -> bytes | None:
for item in fetch_data:
if isinstance(item, tuple) and len(item) >= 2 and isinstance(item[1], bytes):
return item[1]
return None

128
src/email_connect/models.py Normal file
View File

@@ -0,0 +1,128 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import StrEnum
from typing import Any
class MessageClass(StrEnum):
HARD_BOUNCE = "hard_bounce"
SOFT_BOUNCE = "soft_bounce"
DELAYED_DELIVERY_NOTICE = "delayed_delivery_notice"
FINAL_DELIVERY_FAILURE = "final_delivery_failure"
OUT_OF_OFFICE = "out_of_office"
HUMAN_REPLY = "human_reply"
COMPLAINT_OR_ABUSE = "complaint_or_abuse"
UNSUBSCRIBE_OR_OPT_OUT = "unsubscribe_or_opt_out"
CHALLENGE_RESPONSE = "challenge_response"
UNKNOWN_RETURN_MESSAGE = "unknown_return_message"
UNRELATED_MESSAGE = "unrelated_message"
PARSE_FAILED = "parse_failed"
class AssessmentCategory(StrEnum):
SUCCESS = "success"
FAIL = "fail"
UNDEF = "undef"
class Confidence(StrEnum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class EvidenceStrength(StrEnum):
NONE = "none"
WEAK = "weak"
MEDIUM = "medium"
STRONG = "strong"
NEGATIVE = "negative"
AMBIGUOUS = "ambiguous"
@dataclass(frozen=True)
class MailboxScan:
scan_id: str
mailbox_id: str
started_at: datetime
finished_at: datetime | None
scan_mode: str
folder: str
status: str
messages_seen: int = 0
messages_new: int = 0
messages_parsed: int = 0
evidence_events_created: int = 0
report_path: str | None = None
since: datetime | None = None
range_start: datetime | None = None
range_end: datetime | None = None
@dataclass(frozen=True)
class InboundMailboxMessage:
mailbox_message_id: str
mailbox_id: str
imap_uid: str | None
message_id_header: str | None
received_at: datetime | None
from_address: str | None
to_addresses: list[str]
subject: str | None
raw_headers_ref: str | None
raw_message_ref: str | None
first_seen_at: datetime
last_seen_at: datetime
deduplication_key: str
@dataclass(frozen=True)
class ParsedMailboxMessage:
parsed_message_id: str
mailbox_message_id: str
parser_version: str
message_class: MessageClass
affected_email_address: str | None
original_message_id: str | None
original_recipient: str | None
smtp_status_code: str | None
enhanced_status_code: str | None
reason_code: str | None
confidence: Confidence
parsed_at: datetime
notes: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class EmailEvidenceCandidate:
evidence_candidate_id: str
mailbox_message_id: str
parsed_message_id: str
event_type: str
assessment_category: AssessmentCategory
assessment_subclass: str
affected_email_address: str | None
original_message_id: str | None
confidence: Confidence
evidence_strength: EvidenceStrength
occurred_at: datetime | None
observed_at: datetime
deduplication_key: str
raw_message_ref: str | None
notes: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class EndpointQualityUpdate:
affected_email_address: str
reachability: str = "unknown"
suppression_state: str = "unknown"
last_success_at: datetime | None = None
last_failure_at: datetime | None = None
last_auto_reply_at: datetime | None = None
reason_code: str | None = None
confidence: Confidence = Confidence.LOW
quality_signals: list[str] = field(default_factory=list)

451
src/email_connect/parser.py Normal file
View File

@@ -0,0 +1,451 @@
from __future__ import annotations
import hashlib
import html
import re
from datetime import UTC, datetime
from email import policy
from email.parser import BytesParser
from email.utils import getaddresses, parsedate_to_datetime
from pathlib import Path
from uuid import NAMESPACE_URL, uuid5
from .evidence import candidate_from_parsed
from .models import (
Confidence,
EmailEvidenceCandidate,
InboundMailboxMessage,
MessageClass,
ParsedMailboxMessage,
)
PARSER_VERSION = "mailbox-scanner-v0.1"
EMAIL_RE = re.compile(r"[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}", re.IGNORECASE)
ENHANCED_STATUS_RE = re.compile(r"\b[245]\.\d{1,3}\.\d{1,3}\b")
SMTP_STATUS_RE = re.compile(r"\b[245]\d{2}\b")
def parse_message_bytes(
raw_bytes: bytes,
*,
mailbox_id: str,
raw_message_ref: str | None,
imap_uid: str | None = None,
now: datetime | None = None,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
observed_at = now or datetime.now(UTC)
if not raw_bytes.strip():
return _parse_failed(
mailbox_id=mailbox_id,
raw_message_ref=raw_message_ref,
imap_uid=imap_uid,
observed_at=observed_at,
reason="empty_message",
)
try:
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
except Exception as exc:
return _parse_failed(
mailbox_id=mailbox_id,
raw_message_ref=raw_message_ref,
imap_uid=imap_uid,
observed_at=observed_at,
reason=f"parser_error:{type(exc).__name__}",
)
message_id = _clean_header(msg.get("Message-ID"))
subject = _clean_header(msg.get("Subject"))
from_address = _first_address(msg.get("From"))
to_addresses = [addr for _, addr in getaddresses(msg.get_all("To", []))]
received_at = _parse_date(msg.get("Date"))
text = _extract_text(msg)
headers_text = "\n".join(f"{key}: {value}" for key, value in msg.items())
combined = f"{headers_text}\n\n{subject or ''}\n{text}"
dedup_key = _message_dedup_key(
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id=message_id,
received_at=received_at,
from_address=from_address,
subject=subject,
body=text,
)
mailbox_message_id = str(uuid5(NAMESPACE_URL, "email-connect:message:" + dedup_key))
inbound = InboundMailboxMessage(
mailbox_message_id=mailbox_message_id,
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id_header=message_id,
received_at=received_at,
from_address=from_address,
to_addresses=to_addresses,
subject=subject,
raw_headers_ref=raw_message_ref,
raw_message_ref=raw_message_ref,
first_seen_at=observed_at,
last_seen_at=observed_at,
deduplication_key=dedup_key,
)
parsed = classify_message(inbound, combined, observed_at=observed_at)
candidate = candidate_from_parsed(
parsed,
raw_message_ref=raw_message_ref,
observed_at=observed_at,
occurred_at=received_at,
)
return inbound, parsed, candidate
def parse_message_file(
path: str | Path,
*,
mailbox_id: str,
now: datetime | None = None,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
file_path = Path(path)
return parse_message_bytes(
file_path.read_bytes(),
mailbox_id=mailbox_id,
raw_message_ref=str(file_path),
now=now,
)
def classify_message(
inbound: InboundMailboxMessage,
combined_text: str,
*,
observed_at: datetime,
) -> ParsedMailboxMessage:
text = combined_text.lower()
enhanced_status = _first_match(ENHANCED_STATUS_RE, combined_text)
smtp_status = _first_match(SMTP_STATUS_RE, combined_text)
dsn_fields = _extract_dsn_fields(combined_text)
affected = dsn_fields.get("final_recipient_email") or dsn_fields.get("original_recipient_email")
if affected is None:
affected = _extract_affected_recipient(combined_text)
original_message_id = _extract_headerish(combined_text, "Original-Message-ID")
original_recipient = dsn_fields.get("original_recipient_email") or affected
notes: list[str] = []
message_class = MessageClass.UNRELATED_MESSAGE
confidence = Confidence.LOW
reason_code: str | None = None
if _contains_any(text, ["feedback loop", "abuse report", "spam complaint", "complaint notification"]):
message_class = MessageClass.COMPLAINT_OR_ABUSE
confidence = Confidence.HIGH
reason_code = "complaint"
elif _contains_any(text, ["unsubscribe", "opt-out", "opt out", "remove me"]):
message_class = MessageClass.UNSUBSCRIBE_OR_OPT_OUT
confidence = Confidence.MEDIUM
reason_code = "unsubscribe"
elif _is_out_of_office(text):
message_class = MessageClass.OUT_OF_OFFICE
confidence = Confidence.MEDIUM
reason_code = "auto_reply"
elif _is_challenge_response(text):
message_class = MessageClass.CHALLENGE_RESPONSE
confidence = Confidence.MEDIUM
reason_code = "challenge_response"
elif _contains_any(text, ["will keep trying", "delivery delayed", "message delayed", "not yet delivered"]):
message_class = MessageClass.DELAYED_DELIVERY_NOTICE
confidence = Confidence.HIGH
reason_code = "delayed"
elif _is_dsn_like(text):
message_class, confidence, reason_code = _classify_dsn(text, smtp_status, enhanced_status)
elif _looks_like_human_reply(inbound, text):
message_class = MessageClass.HUMAN_REPLY
confidence = Confidence.MEDIUM
reason_code = "reply"
elif _looks_return_related(text):
message_class = MessageClass.UNKNOWN_RETURN_MESSAGE
confidence = Confidence.LOW
reason_code = "unknown_return"
notes.append("Return-channel message did not match a reliable classifier.")
for key in ["original_recipient", "final_recipient", "action", "diagnostic_code", "remote_mta"]:
if dsn_fields.get(key):
notes.append(f"{key}={dsn_fields[key]}")
if enhanced_status:
notes.append(f"enhanced_status={enhanced_status}")
if smtp_status:
notes.append(f"smtp_status={smtp_status}")
parsed_id_basis = "|".join([inbound.mailbox_message_id, PARSER_VERSION, message_class.value])
return ParsedMailboxMessage(
parsed_message_id=str(uuid5(NAMESPACE_URL, "email-connect:parsed:" + parsed_id_basis)),
mailbox_message_id=inbound.mailbox_message_id,
parser_version=PARSER_VERSION,
message_class=message_class,
affected_email_address=affected,
original_message_id=original_message_id,
original_recipient=original_recipient,
smtp_status_code=smtp_status,
enhanced_status_code=enhanced_status,
reason_code=reason_code,
confidence=confidence,
parsed_at=observed_at,
notes=notes,
)
def _classify_dsn(
text: str,
smtp_status: str | None,
enhanced_status: str | None,
) -> tuple[MessageClass, Confidence, str]:
if _contains_any(text, ["final failure", "giving up", "could not deliver after"]):
return MessageClass.FINAL_DELIVERY_FAILURE, Confidence.HIGH, "expired_without_delivery"
if (enhanced_status and enhanced_status.startswith("5.")) or (smtp_status and smtp_status.startswith("5")):
return MessageClass.HARD_BOUNCE, Confidence.HIGH, "hard_bounce"
if _contains_any(text, ["unknown user", "mailbox not found", "user unknown", "domain not found"]):
return MessageClass.HARD_BOUNCE, Confidence.HIGH, "hard_bounce"
if (enhanced_status and enhanced_status.startswith("4.")) or (smtp_status and smtp_status.startswith("4")):
return MessageClass.SOFT_BOUNCE, Confidence.HIGH, "soft_bounce"
if _contains_any(text, ["temporary failure", "mailbox full", "greylist", "try again later"]):
return MessageClass.SOFT_BOUNCE, Confidence.MEDIUM, "soft_bounce"
return MessageClass.UNKNOWN_RETURN_MESSAGE, Confidence.LOW, "unknown_dsn"
def _extract_text(msg) -> str:
chunks: list[str] = []
html_chunks: list[str] = []
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == "multipart":
continue
if part.get_content_type() in {"text/plain", "message/delivery-status"}:
try:
chunks.append(str(part.get_content()))
except Exception:
continue
elif part.get_content_type() == "text/html":
try:
html_chunks.append(_html_to_text(str(part.get_content())))
except Exception:
continue
else:
try:
content = str(msg.get_content())
if msg.get_content_type() == "text/html":
html_chunks.append(_html_to_text(content))
else:
chunks.append(content)
except Exception:
payload = msg.get_payload(decode=True)
if payload:
chunks.append(payload.decode(errors="replace"))
if chunks:
return "\n".join(chunks)
return "\n".join(html_chunks)
def _parse_failed(
*,
mailbox_id: str,
raw_message_ref: str | None,
imap_uid: str | None,
observed_at: datetime,
reason: str,
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
dedup_key = "|".join([mailbox_id, imap_uid or "", raw_message_ref or "", reason])
mailbox_message_id = str(uuid5(NAMESPACE_URL, "email-connect:message:" + dedup_key))
inbound = InboundMailboxMessage(
mailbox_message_id=mailbox_message_id,
mailbox_id=mailbox_id,
imap_uid=imap_uid,
message_id_header=None,
received_at=None,
from_address=None,
to_addresses=[],
subject=None,
raw_headers_ref=raw_message_ref,
raw_message_ref=raw_message_ref,
first_seen_at=observed_at,
last_seen_at=observed_at,
deduplication_key=dedup_key,
)
parsed_id_basis = "|".join([mailbox_message_id, PARSER_VERSION, "parse_failed"])
parsed = ParsedMailboxMessage(
parsed_message_id=str(uuid5(NAMESPACE_URL, "email-connect:parsed:" + parsed_id_basis)),
mailbox_message_id=mailbox_message_id,
parser_version=PARSER_VERSION,
message_class=MessageClass.PARSE_FAILED,
affected_email_address=None,
original_message_id=None,
original_recipient=None,
smtp_status_code=None,
enhanced_status_code=None,
reason_code=reason,
confidence=Confidence.HIGH,
parsed_at=observed_at,
notes=[f"parse_failure={reason}"],
)
candidate = candidate_from_parsed(
parsed,
raw_message_ref=raw_message_ref,
observed_at=observed_at,
occurred_at=None,
)
return inbound, parsed, candidate
def _message_dedup_key(
*,
mailbox_id: str,
imap_uid: str | None,
message_id: str | None,
received_at: datetime | None,
from_address: str | None,
subject: str | None,
body: str,
) -> str:
body_hash = hashlib.sha256(body.encode("utf-8", errors="replace")).hexdigest()[:16]
return "|".join(
[
mailbox_id,
imap_uid or "",
message_id or "",
received_at.isoformat() if received_at else "",
from_address or "",
hashlib.sha256((subject or "").encode()).hexdigest()[:12],
body_hash,
]
)
def _clean_header(value: str | None) -> str | None:
if value is None:
return None
cleaned = str(value).strip()
return cleaned or None
def _first_address(value: str | None) -> str | None:
if not value:
return None
addresses = getaddresses([value])
return addresses[0][1] if addresses else None
def _parse_date(value: str | None) -> datetime | None:
if not value:
return None
try:
parsed = parsedate_to_datetime(value)
except (TypeError, ValueError):
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _first_match(pattern: re.Pattern[str], text: str) -> str | None:
match = pattern.search(text)
return match.group(0) if match else None
def _extract_headerish(text: str, name: str) -> str | None:
match = re.search(rf"^{re.escape(name)}:\s*(.+)$", text, re.IGNORECASE | re.MULTILINE)
return match.group(1).strip() if match else None
def _extract_dsn_fields(text: str) -> dict[str, str]:
fields: dict[str, str] = {}
for field, key in [
("Original-Recipient", "original_recipient"),
("Final-Recipient", "final_recipient"),
("Action", "action"),
("Diagnostic-Code", "diagnostic_code"),
("Remote-MTA", "remote_mta"),
]:
value = _extract_headerish(text, field)
if value:
fields[key] = value
if field in {"Original-Recipient", "Final-Recipient"}:
match = EMAIL_RE.search(value)
if match:
fields[f"{key}_email"] = match.group(0).lower()
return fields
def _extract_affected_recipient(text: str) -> str | None:
for name in ["Final-Recipient", "Original-Recipient", "X-Failed-Recipients", "Failed-Recipient"]:
value = _extract_headerish(text, name)
if value:
match = EMAIL_RE.search(value)
if match:
return match.group(0).lower()
match = EMAIL_RE.search(text)
return match.group(0).lower() if match else None
def _contains_any(text: str, needles: list[str]) -> bool:
return any(needle in text for needle in needles)
def _is_dsn_like(text: str) -> bool:
return _contains_any(
text,
[
"delivery status notification",
"delivery failure",
"undeliverable",
"returned mail",
"mail delivery subsystem",
"final-recipient",
"diagnostic-code",
"status:",
],
)
def _is_out_of_office(text: str) -> bool:
return _contains_any(
text,
[
"out of office",
"auto-reply",
"autoreply",
"automatic reply",
"vacation",
"abwesenheitsnotiz",
"nicht im buero",
"nicht im büro",
],
)
def _is_challenge_response(text: str) -> bool:
return _contains_any(
text,
[
"challenge-response",
"challenge response",
"sender verification",
"verify your email before your message can be delivered",
"confirm you are a real person",
"confirm that you sent this message",
"please verify yourself",
],
)
def _looks_like_human_reply(inbound: InboundMailboxMessage, text: str) -> bool:
subject = (inbound.subject or "").lower()
if _contains_any(text, ["auto-submitted: auto-replied", "x-autoreply", "auto-generated"]):
return False
return subject.startswith("re:") or _contains_any(text, ["thanks", "thank you", "i confirm", "received"])
def _looks_return_related(text: str) -> bool:
return _contains_any(text, ["delivery", "mailbox", "recipient", "message", "smtp", "unsubscribe", "reply"])
def _html_to_text(value: str) -> str:
without_tags = re.sub(r"<[^>]+>", " ", value)
return re.sub(r"\s+", " ", html.unescape(without_tags)).strip()

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import csv
import re
from dataclasses import dataclass, field
from pathlib import Path
EMAIL_RE = re.compile(r"^[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}$", re.IGNORECASE)
@dataclass(frozen=True)
class ExpectedRecipients:
addresses: tuple[str, ...] = ()
invalid_entries: tuple[str, ...] = ()
def load_expected_recipients(
path: str | Path | None,
*,
csv_column: str | None = "email",
) -> ExpectedRecipients:
if not path:
return ExpectedRecipients()
recipient_path = Path(path)
if recipient_path.suffix.lower() == ".csv":
return _load_csv_recipients(recipient_path, csv_column=csv_column or "email")
return _load_line_recipients(recipient_path)
def normalize_email_address(value: str | None) -> str | None:
if value is None:
return None
normalized = value.strip().lower()
if not normalized:
return None
return normalized if EMAIL_RE.fullmatch(normalized) else None
@dataclass
class _RecipientCollector:
addresses: dict[str, None] = field(default_factory=dict)
invalid_entries: list[str] = field(default_factory=list)
def add(self, value: str | None, *, source: str) -> None:
normalized = normalize_email_address(value)
if normalized:
self.addresses[normalized] = None
return
if value and value.strip():
self.invalid_entries.append(f"{source}: {value.strip()}")
def result(self) -> ExpectedRecipients:
return ExpectedRecipients(
addresses=tuple(self.addresses.keys()),
invalid_entries=tuple(self.invalid_entries),
)
def _load_line_recipients(path: Path) -> ExpectedRecipients:
collector = _RecipientCollector()
for line_number, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
line = raw_line.strip()
if not line or line.startswith("#"):
continue
collector.add(line, source=f"{path}:{line_number}")
return collector.result()
def _load_csv_recipients(path: Path, *, csv_column: str) -> ExpectedRecipients:
collector = _RecipientCollector()
with path.open(newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
if reader.fieldnames is None:
return collector.result()
column = csv_column if csv_column in reader.fieldnames else reader.fieldnames[0]
for line_number, row in enumerate(reader, start=2):
collector.add(row.get(column), source=f"{path}:{line_number}:{column}")
return collector.result()

View File

@@ -0,0 +1,146 @@
from __future__ import annotations
import csv
import json
from datetime import UTC, datetime
from pathlib import Path
REPORT_COLUMNS = [
"report_generated_at",
"scan_id",
"mailbox_id",
"mailbox_message_id",
"mailbox_received_at",
"source_from",
"source_to",
"source_subject",
"message_id_header",
"detected_message_class",
"normalized_event_type",
"assessment_category",
"assessment_subclass",
"affected_email_address",
"known_recipient",
"original_message_id",
"original_recipient",
"smtp_status_code",
"enhanced_status_code",
"reason_code",
"confidence",
"evidence_strength",
"occurred_at",
"observed_at",
"first_seen_at",
"last_seen_at",
"deduplication_key",
"raw_message_ref",
"notes",
]
def report_filename(now: datetime | None = None) -> str:
stamp = (now or datetime.now(UTC)).strftime("%Y%m%d-%H%M%S")
return f"email-channel-evidence-report-{stamp}.csv"
def write_evidence_report(
rows: list[dict],
*,
output_dir: str | Path,
scan_id: str,
mailbox_id: str,
generated_at: datetime | None = None,
expected_recipients: set[str] | None = None,
) -> Path:
generated = generated_at or datetime.now(UTC)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
path = _unique_report_path(out_dir / report_filename(generated))
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=REPORT_COLUMNS)
writer.writeheader()
for row in _ordered_rows(rows, expected_recipients=expected_recipients or set()):
writer.writerow(_report_row(row, scan_id=scan_id, mailbox_id=mailbox_id, generated_at=generated))
return path
def _report_row(row: dict, *, scan_id: str, mailbox_id: str, generated_at: datetime) -> dict:
metadata = _json(row.get("metadata_json"))
notes = _json(row.get("notes_json"))
known_recipient = _known_recipient(row, expected_recipients=set(row.get("_expected_recipients", [])))
return {
"report_generated_at": generated_at.isoformat(),
"scan_id": scan_id,
"mailbox_id": mailbox_id,
"mailbox_message_id": row.get("mailbox_message_id", ""),
"mailbox_received_at": row.get("occurred_at") or "",
"source_from": metadata.get("source_from", ""),
"source_to": metadata.get("source_to", ""),
"source_subject": metadata.get("source_subject", ""),
"message_id_header": metadata.get("message_id_header", ""),
"detected_message_class": metadata.get("message_class", ""),
"normalized_event_type": row.get("event_type", ""),
"assessment_category": row.get("assessment_category", ""),
"assessment_subclass": row.get("assessment_subclass", ""),
"affected_email_address": row.get("affected_email_address") or "",
"known_recipient": "true" if known_recipient else "false",
"original_message_id": row.get("original_message_id") or "",
"original_recipient": metadata.get("original_recipient", ""),
"smtp_status_code": metadata.get("smtp_status_code") or "",
"enhanced_status_code": metadata.get("enhanced_status_code") or "",
"reason_code": metadata.get("reason_code") or "",
"confidence": row.get("confidence", ""),
"evidence_strength": row.get("evidence_strength", ""),
"occurred_at": row.get("occurred_at") or "",
"observed_at": row.get("observed_at") or "",
"first_seen_at": metadata.get("first_seen_at", ""),
"last_seen_at": metadata.get("last_seen_at", ""),
"deduplication_key": row.get("deduplication_key", ""),
"raw_message_ref": row.get("raw_message_ref") or "",
"notes": "; ".join(str(item) for item in notes),
}
def _ordered_rows(rows: list[dict], *, expected_recipients: set[str]) -> list[dict]:
enriched = [dict(row, _expected_recipients=tuple(expected_recipients)) for row in rows]
if not expected_recipients:
return enriched
return sorted(
enriched,
key=lambda row: (
not _known_recipient(row, expected_recipients=expected_recipients),
str(row.get("affected_email_address") or ""),
str(row.get("observed_at") or ""),
str(row.get("event_type") or ""),
str(row.get("deduplication_key") or ""),
),
)
def _known_recipient(row: dict, *, expected_recipients: set[str]) -> bool:
if row.get("known_recipient") is True:
return True
address = str(row.get("affected_email_address") or "").lower()
return bool(address and address in expected_recipients)
def _json(value: str | None) -> dict | list:
if not value:
return {}
try:
return json.loads(value)
except json.JSONDecodeError:
return {}
def _unique_report_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
for index in range(1, 1000):
candidate = path.with_name(f"{stem}-{index:02d}{suffix}")
if not candidate.exists():
return candidate
raise RuntimeError(f"Could not allocate unique report filename for {path}")

View File

@@ -0,0 +1,269 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from uuid import uuid4
from .config import AppConfig
from .evidence import endpoint_quality_from_candidate
from .mailbox import source_for_config
from .models import MailboxScan
from .parser import parse_message_bytes
from .recipients import load_expected_recipients
from .reporting import write_evidence_report
from .storage import StateStore
@dataclass(frozen=True)
class ScanResult:
scan: MailboxScan
report_path: Path | None
warnings: tuple[str, ...] = ()
def scan_mailbox(
config: AppConfig,
*,
output_dir: str | None = None,
full_rescan: bool = False,
report_only_new: bool = False,
dry_run: bool = False,
fixture_dir: str | None = None,
since: str | None = None,
range_from: str | None = None,
range_to: str | None = None,
expected_recipients_path: str | None = None,
expected_recipient_column: str | None = None,
) -> ScanResult:
started_at = datetime.now(UTC)
scan_id = str(uuid4())
range_start = _parse_datetime(range_from or since or config.scan.range_from or config.scan.since)
range_end = _parse_datetime(range_to or config.scan.range_to)
if range_start and range_end and range_start > range_end:
raise ValueError("scan datetime range lower bound must be before or equal to upper bound.")
since_at = range_start
expected = load_expected_recipients(
expected_recipients_path or config.expected_recipients.path,
csv_column=expected_recipient_column or config.expected_recipients.csv_column,
)
expected_addresses = set(expected.addresses)
warnings = tuple(f"invalid expected recipient ignored: {entry}" for entry in expected.invalid_entries)
source = source_for_config(config, fixture_dir_override=fixture_dir)
store = StateStore(config.storage.path)
messages_seen = 0
messages_new = 0
messages_parsed = 0
evidence_created = 0
new_evidence_keys: list[str] = []
last_source_uid: str | None = None
try:
cursor = None if full_rescan else store.get_scan_cursor(config.mailbox.id, config.mailbox.folder)
for message in source.iter_messages(
max_messages=config.scan.max_messages_per_run,
since_uid=cursor,
full_rescan=full_rescan,
include_seen=config.scan.include_seen,
since=since_at,
):
messages_seen += 1
last_source_uid = message.source_uid
inbound, parsed, candidate = parse_message_bytes(
message.raw_bytes,
mailbox_id=config.mailbox.id,
raw_message_ref=message.raw_message_ref,
imap_uid=message.imap_uid,
)
if not _in_range(inbound.received_at, range_start=range_start, range_end=range_end):
continue
if dry_run:
messages_parsed += 1
continue
is_new = store.upsert_message(inbound)
messages_new += int(is_new)
store.insert_parsed(parsed)
messages_parsed += 1
if candidate is not None:
candidate = _enrich_candidate(candidate, inbound, parsed)
inserted = store.insert_evidence(candidate)
evidence_created += int(inserted)
if inserted:
new_evidence_keys.append(candidate.deduplication_key)
quality_update = endpoint_quality_from_candidate(candidate)
if quality_update is not None:
store.apply_endpoint_quality_update(
config.mailbox.id,
quality_update,
observed_at=candidate.observed_at,
)
if not dry_run and last_source_uid:
store.set_scan_cursor(
config.mailbox.id,
config.mailbox.folder,
last_source_uid,
updated_at=datetime.now(UTC),
)
report_path = None
if not dry_run:
range_evidence_rows = store.evidence_rows(range_start=range_start, range_end=range_end)
report_rows = store.evidence_rows(
deduplication_keys=new_evidence_keys if report_only_new else None,
range_start=range_start,
range_end=range_end,
)
report_rows = [
*report_rows,
*_no_evidence_rows(
mailbox_id=config.mailbox.id,
expected_addresses=expected_addresses,
evidence_rows=range_evidence_rows,
observed_at=datetime.now(UTC),
range_start=range_start,
range_end=range_end,
),
]
report_path = write_evidence_report(
report_rows,
output_dir=output_dir or config.reports.output_dir,
scan_id=scan_id,
mailbox_id=config.mailbox.id,
expected_recipients=expected_addresses,
)
finished_at = datetime.now(UTC)
scan = MailboxScan(
scan_id=scan_id,
mailbox_id=config.mailbox.id,
started_at=started_at,
finished_at=finished_at,
scan_mode="full_rescan" if full_rescan else config.scan.mode,
folder=config.mailbox.folder,
status="completed",
messages_seen=messages_seen,
messages_new=messages_new,
messages_parsed=messages_parsed,
evidence_events_created=evidence_created,
report_path=str(report_path) if report_path else None,
since=since_at,
range_start=range_start,
range_end=range_end,
)
if not dry_run:
store.insert_scan(scan)
return ScanResult(scan=scan, report_path=report_path, warnings=warnings)
finally:
store.close()
def _enrich_candidate(candidate, inbound, parsed):
metadata = {
**candidate.metadata,
"source_from": inbound.from_address,
"source_to": ", ".join(inbound.to_addresses),
"source_subject": inbound.subject,
"message_id_header": inbound.message_id_header,
"original_recipient": parsed.original_recipient,
"first_seen_at": inbound.first_seen_at.isoformat(),
"last_seen_at": inbound.last_seen_at.isoformat(),
}
return type(candidate)(
**{
**candidate.__dict__,
"metadata": metadata,
}
)
def _parse_datetime(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
if len(normalized) == 10:
normalized = normalized + "T00:00:00+00:00"
parsed = datetime.fromisoformat(normalized.replace("Z", "+00:00"))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
def _in_range(
received_at: datetime | None,
*,
range_start: datetime | None,
range_end: datetime | None,
) -> bool:
if range_start is None and range_end is None:
return True
if received_at is None:
return False
if range_start is not None and received_at < range_start:
return False
if range_end is not None and received_at > range_end:
return False
return True
def _no_evidence_rows(
*,
mailbox_id: str,
expected_addresses: set[str],
evidence_rows: list[dict],
observed_at: datetime,
range_start: datetime | None,
range_end: datetime | None,
) -> list[dict]:
if not expected_addresses:
return []
known_evidence_addresses = {
str(row.get("affected_email_address") or "").lower()
for row in evidence_rows
if row.get("affected_email_address")
}
rows = []
for address in sorted(expected_addresses - known_evidence_addresses):
rows.append(_no_evidence_row(mailbox_id, address, observed_at, range_start=range_start, range_end=range_end))
return rows
def _no_evidence_row(
mailbox_id: str,
address: str,
observed_at: datetime,
*,
range_start: datetime | None,
range_end: datetime | None,
) -> dict:
range_key = "|".join([
range_start.isoformat() if range_start else "",
range_end.isoformat() if range_end else "",
])
return {
"mailbox_message_id": "",
"event_type": "diagnostic.expected_recipient.no_evidence",
"assessment_category": "undef",
"assessment_subclass": "undef.no_signal",
"affected_email_address": address,
"original_message_id": "",
"confidence": "high",
"evidence_strength": "none",
"occurred_at": "",
"observed_at": observed_at.isoformat(),
"deduplication_key": f"{mailbox_id}|expected_recipient|no_evidence|{address}|{range_key}",
"raw_message_ref": "",
"notes_json": json.dumps([
"Expected recipient was supplied by the operator; no mailbox evidence was found in the inspected range.",
"This is not evidence of delivery success or delivery failure.",
]),
"metadata_json": json.dumps({
"message_class": "expected_recipient_no_evidence",
"original_recipient": address,
"range_start": range_start.isoformat() if range_start else None,
"range_end": range_end.isoformat() if range_end else None,
}),
"known_recipient": True,
}

View File

@@ -0,0 +1,423 @@
from __future__ import annotations
import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
from .models import (
Confidence,
EmailEvidenceCandidate,
EndpointQualityUpdate,
InboundMailboxMessage,
MailboxScan,
ParsedMailboxMessage,
)
class StateStore:
def __init__(self, path: str | Path) -> None:
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(self.path)
self.conn.row_factory = sqlite3.Row
self.init_schema()
def close(self) -> None:
self.conn.close()
def init_schema(self) -> None:
self.conn.executescript(
"""
create table if not exists mailbox_scans (
scan_id text primary key,
mailbox_id text not null,
started_at text not null,
finished_at text,
scan_mode text not null,
folder text not null,
status text not null,
messages_seen integer not null,
messages_new integer not null,
messages_parsed integer not null,
evidence_events_created integer not null,
report_path text,
since text,
range_start text,
range_end text
);
create table if not exists mailbox_messages (
mailbox_message_id text primary key,
mailbox_id text not null,
imap_uid text,
message_id_header text,
received_at text,
from_address text,
to_addresses_json text not null,
subject text,
raw_headers_ref text,
raw_message_ref text,
first_seen_at text not null,
last_seen_at text not null,
deduplication_key text not null unique
);
create table if not exists parsed_messages (
parsed_message_id text primary key,
mailbox_message_id text not null,
parser_version text not null,
message_class text not null,
affected_email_address text,
original_message_id text,
original_recipient text,
smtp_status_code text,
enhanced_status_code text,
reason_code text,
confidence text not null,
parsed_at text not null,
notes_json text not null
);
create table if not exists evidence_candidates (
evidence_candidate_id text primary key,
mailbox_message_id text not null,
parsed_message_id text not null,
event_type text not null,
assessment_category text not null,
assessment_subclass text not null,
affected_email_address text,
original_message_id text,
confidence text not null,
evidence_strength text not null,
occurred_at text,
observed_at text not null,
deduplication_key text not null unique,
raw_message_ref text,
notes_json text not null,
metadata_json text not null
);
create table if not exists scan_cursors (
mailbox_id text not null,
folder text not null,
last_uid text not null,
updated_at text not null,
primary key (mailbox_id, folder)
);
create table if not exists endpoint_quality (
endpoint_key text primary key,
mailbox_id text not null,
affected_email_address text not null,
reachability text not null,
suppression_state text not null,
last_success_at text,
last_failure_at text,
last_auto_reply_at text,
reason_code text,
confidence text not null,
quality_signals_json text not null,
updated_at text not null,
unique (mailbox_id, affected_email_address)
);
"""
)
self._ensure_column("mailbox_scans", "range_start", "text")
self._ensure_column("mailbox_scans", "range_end", "text")
self.conn.commit()
def _ensure_column(self, table: str, column: str, column_type: str) -> None:
columns = {
str(row["name"])
for row in self.conn.execute(f"pragma table_info({table})").fetchall()
}
if column not in columns:
self.conn.execute(f"alter table {table} add column {column} {column_type}")
def upsert_message(self, message: InboundMailboxMessage) -> bool:
existing = self.conn.execute(
"select mailbox_message_id from mailbox_messages where deduplication_key = ?",
(message.deduplication_key,),
).fetchone()
if existing:
self.conn.execute(
"update mailbox_messages set last_seen_at = ? where deduplication_key = ?",
(_dt(message.last_seen_at), message.deduplication_key),
)
self.conn.commit()
return False
self.conn.execute(
"""
insert into mailbox_messages values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
message.mailbox_message_id,
message.mailbox_id,
message.imap_uid,
message.message_id_header,
_dt(message.received_at),
message.from_address,
json.dumps(message.to_addresses),
message.subject,
message.raw_headers_ref,
message.raw_message_ref,
_dt(message.first_seen_at),
_dt(message.last_seen_at),
message.deduplication_key,
),
)
self.conn.commit()
return True
def insert_parsed(self, parsed: ParsedMailboxMessage) -> None:
self.conn.execute(
"""
insert or replace into parsed_messages values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
parsed.parsed_message_id,
parsed.mailbox_message_id,
parsed.parser_version,
parsed.message_class.value,
parsed.affected_email_address,
parsed.original_message_id,
parsed.original_recipient,
parsed.smtp_status_code,
parsed.enhanced_status_code,
parsed.reason_code,
parsed.confidence.value,
_dt(parsed.parsed_at),
json.dumps(parsed.notes),
),
)
self.conn.commit()
def insert_evidence(self, candidate: EmailEvidenceCandidate) -> bool:
try:
self.conn.execute(
"""
insert into evidence_candidates values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
candidate.evidence_candidate_id,
candidate.mailbox_message_id,
candidate.parsed_message_id,
candidate.event_type,
candidate.assessment_category.value,
candidate.assessment_subclass,
candidate.affected_email_address,
candidate.original_message_id,
candidate.confidence.value,
candidate.evidence_strength.value,
_dt(candidate.occurred_at),
_dt(candidate.observed_at),
candidate.deduplication_key,
candidate.raw_message_ref,
json.dumps(candidate.notes),
json.dumps(candidate.metadata),
),
)
except sqlite3.IntegrityError:
return False
self.conn.commit()
return True
def insert_scan(self, scan: MailboxScan) -> None:
self.conn.execute(
"""
insert or replace into mailbox_scans values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
scan.scan_id,
scan.mailbox_id,
_dt(scan.started_at),
_dt(scan.finished_at),
scan.scan_mode,
scan.folder,
scan.status,
scan.messages_seen,
scan.messages_new,
scan.messages_parsed,
scan.evidence_events_created,
scan.report_path,
_dt(scan.since),
_dt(scan.range_start),
_dt(scan.range_end),
),
)
self.conn.commit()
def get_scan_cursor(self, mailbox_id: str, folder: str) -> str | None:
row = self.conn.execute(
"select last_uid from scan_cursors where mailbox_id = ? and folder = ?",
(mailbox_id, folder),
).fetchone()
return str(row["last_uid"]) if row else None
def set_scan_cursor(self, mailbox_id: str, folder: str, last_uid: str, *, updated_at: datetime) -> None:
self.conn.execute(
"""
insert into scan_cursors values (?, ?, ?, ?)
on conflict(mailbox_id, folder) do update set
last_uid = excluded.last_uid,
updated_at = excluded.updated_at
""",
(mailbox_id, folder, last_uid, _dt(updated_at)),
)
self.conn.commit()
def apply_endpoint_quality_update(
self,
mailbox_id: str,
update: EndpointQualityUpdate,
*,
observed_at: datetime,
) -> None:
address = update.affected_email_address.lower()
endpoint_key = f"{mailbox_id}:{address}"
existing = self.conn.execute(
"select * from endpoint_quality where endpoint_key = ?",
(endpoint_key,),
).fetchone()
merged = _merge_endpoint_quality(existing, update, observed_at=observed_at)
self.conn.execute(
"""
insert into endpoint_quality values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(endpoint_key) do update set
reachability = excluded.reachability,
suppression_state = excluded.suppression_state,
last_success_at = excluded.last_success_at,
last_failure_at = excluded.last_failure_at,
last_auto_reply_at = excluded.last_auto_reply_at,
reason_code = excluded.reason_code,
confidence = excluded.confidence,
quality_signals_json = excluded.quality_signals_json,
updated_at = excluded.updated_at
""",
(
endpoint_key,
mailbox_id,
address,
merged["reachability"],
merged["suppression_state"],
_dt(merged["last_success_at"]),
_dt(merged["last_failure_at"]),
_dt(merged["last_auto_reply_at"]),
merged["reason_code"],
merged["confidence"],
json.dumps(merged["quality_signals"]),
_dt(observed_at),
),
)
self.conn.commit()
def endpoint_quality_rows(self) -> list[dict]:
rows = self.conn.execute(
"select * from endpoint_quality order by affected_email_address"
).fetchall()
return [dict(row) for row in rows]
def evidence_rows(
self,
*,
deduplication_keys: list[str] | None = None,
range_start: datetime | None = None,
range_end: datetime | None = None,
) -> list[dict]:
if deduplication_keys is not None:
if not deduplication_keys:
return []
placeholders = ", ".join("?" for _ in deduplication_keys)
rows = self.conn.execute(
f"""
select * from evidence_candidates
where deduplication_key in ({placeholders})
order by observed_at, event_type
""",
deduplication_keys,
).fetchall()
return _filter_rows_by_range([dict(row) for row in rows], range_start=range_start, range_end=range_end)
rows = self.conn.execute("select * from evidence_candidates order by observed_at, event_type").fetchall()
return _filter_rows_by_range([dict(row) for row in rows], range_start=range_start, range_end=range_end)
def _dt(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def _parse_dt(value: str | None) -> datetime | None:
return datetime.fromisoformat(value) if value else None
def _filter_rows_by_range(
rows: list[dict],
*,
range_start: datetime | None,
range_end: datetime | None,
) -> list[dict]:
if range_start is None and range_end is None:
return rows
return [row for row in rows if _row_in_range(row, range_start=range_start, range_end=range_end)]
def _row_in_range(row: dict, *, range_start: datetime | None, range_end: datetime | None) -> bool:
occurred_at = _parse_dt(row.get("occurred_at"))
if occurred_at is None:
return False
if range_start is not None and occurred_at < range_start:
return False
if range_end is not None and occurred_at > range_end:
return False
return True
def _merge_endpoint_quality(
existing,
update: EndpointQualityUpdate,
*,
observed_at: datetime,
) -> dict:
existing_signals = json.loads(existing["quality_signals_json"]) if existing else []
update_signals = [signal for signal in update.quality_signals if signal]
signals = list(dict.fromkeys([*existing_signals, *update_signals]))
return {
"reachability": _merge_state(existing, "reachability", update.reachability, default="unknown"),
"suppression_state": _merge_state(
existing,
"suppression_state",
update.suppression_state,
default="unknown",
),
"last_success_at": _latest_dt(_parse_dt(existing["last_success_at"]) if existing else None, update.last_success_at),
"last_failure_at": _latest_dt(_parse_dt(existing["last_failure_at"]) if existing else None, update.last_failure_at),
"last_auto_reply_at": _latest_dt(
_parse_dt(existing["last_auto_reply_at"]) if existing else None,
update.last_auto_reply_at,
),
"reason_code": update.reason_code or (existing["reason_code"] if existing else None),
"confidence": _max_confidence(existing["confidence"] if existing else Confidence.LOW.value, update.confidence.value),
"quality_signals": signals,
"updated_at": observed_at,
}
def _merge_state(existing, field: str, new_value: str, *, default: str) -> str:
if new_value != "unknown":
return new_value
if existing:
return str(existing[field])
return default
def _latest_dt(left: datetime | None, right: datetime | None) -> datetime | None:
if left is None:
return right
if right is None:
return left
return max(left, right)
def _max_confidence(left: str, right: str) -> str:
order = {Confidence.LOW.value: 0, Confidence.MEDIUM.value: 1, Confidence.HIGH.value: 2}
return left if order.get(left, 0) >= order.get(right, 0) else right

View File

@@ -0,0 +1,5 @@
email,name
optout@example.com,Opt Out
csv-absent@example.com,Missing From Mailbox
OPTOut@example.com,Duplicate Case Variant
not-an-address,Invalid
1 email name
2 optout@example.com Opt Out
3 csv-absent@example.com Missing From Mailbox
4 OPTOut@example.com Duplicate Case Variant
5 not-an-address Invalid

View File

@@ -0,0 +1,4 @@
missing@example.com
absent@example.com
MISSING@example.com
not-an-address

View File

@@ -0,0 +1,9 @@
From: Sender Verification <verify@example.net>
To: sender@example.com
Subject: Sender verification required
Date: Tue, 02 Jun 2026 10:09:00 +0000
Message-ID: <challenge-response@example.net>
Content-Type: text/plain; charset=utf-8
This is a challenge-response message. Please verify yourself before your message
can be delivered to challenge@example.com.

10
tests/fixtures/mailbox/complaint.eml vendored Normal file
View File

@@ -0,0 +1,10 @@
From: Feedback Loop <fbl@example.net>
To: abuse@example.com
Subject: Spam complaint notification
Date: Tue, 02 Jun 2026 10:04:00 +0000
Message-ID: <complaint@example.net>
Content-Type: text/plain; charset=utf-8
Feedback loop abuse report.
Final-Recipient: rfc822; complained@example.com
This is a spam complaint notification for the original message.

View File

@@ -0,0 +1,10 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery delayed
Date: Tue, 02 Jun 2026 10:05:00 +0000
Message-ID: <delayed@example.net>
Content-Type: text/plain; charset=utf-8
Delivery delayed. We will keep trying to deliver your message.
Final-Recipient: rfc822; waiting@example.com
Status: 4.4.1

View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Final failure
Date: Tue, 02 Jun 2026 10:06:00 +0000
Message-ID: <final-failure@example.net>
Content-Type: text/plain; charset=utf-8
Delivery Status Notification.
Final-Recipient: rfc822; expired@example.com
Action: failed
Status: 5.4.7
Diagnostic-Code: smtp; Could not deliver after retry period. Final failure, giving up.

12
tests/fixtures/mailbox/hard_bounce.eml vendored Normal file
View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery Status Notification (Failure)
Date: Tue, 02 Jun 2026 10:00:00 +0000
Message-ID: <hard-bounce@example.net>
Content-Type: text/plain; charset=utf-8
Delivery failure.
Final-Recipient: rfc822; missing@example.com
Action: failed
Status: 5.1.1
Diagnostic-Code: smtp; 550 5.1.1 User unknown

View File

@@ -0,0 +1,8 @@
From: Recipient <recipient@example.com>
To: sender@example.com
Subject: Re: Your notification
Date: Tue, 02 Jun 2026 10:03:00 +0000
Message-ID: <reply@example.com>
Content-Type: text/plain; charset=utf-8
Thanks, I received this and will review it today.

View File

@@ -0,0 +1,9 @@
From: Recipient <recipient@example.com>
To: sender@example.com
Subject: Auto-reply: Out of office
Date: Tue, 02 Jun 2026 10:02:00 +0000
Message-ID: <ooo@example.com>
Auto-Submitted: auto-replied
Content-Type: text/plain; charset=utf-8
I am out of office until next week.

View File

@@ -0,0 +1 @@

12
tests/fixtures/mailbox/soft_bounce.eml vendored Normal file
View File

@@ -0,0 +1,12 @@
From: Mail Delivery Subsystem <mailer-daemon@example.net>
To: sender@example.com
Subject: Delivery temporarily delayed
Date: Tue, 02 Jun 2026 10:01:00 +0000
Message-ID: <soft-bounce@example.net>
Content-Type: text/plain; charset=utf-8
Delivery Status Notification.
Final-Recipient: rfc822; full@example.com
Action: delayed
Status: 4.2.2
Diagnostic-Code: smtp; 452 4.2.2 Mailbox full

View File

@@ -0,0 +1,10 @@
From: Mail System <mailer@example.net>
To: sender@example.com
Subject: Return mailbox notice
Date: Tue, 02 Jun 2026 10:07:00 +0000
Message-ID: <unknown-return@example.net>
Content-Type: text/plain; charset=utf-8
This message references delivery and recipient handling, but it does not include
a reliable SMTP status or delivery-status notification.
Recipient reference: mystery@example.com

View File

@@ -0,0 +1,8 @@
From: Recipient <optout@example.com>
To: sender@example.com
Subject: Please unsubscribe me
Date: Tue, 02 Jun 2026 10:08:00 +0000
Message-ID: <unsubscribe@example.com>
Content-Type: text/plain; charset=utf-8
Please unsubscribe optout@example.com from future messages. Remove me from this list.

97
tests/test_parser.py Normal file
View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import unittest
from pathlib import Path
from email_connect.models import MessageClass
from email_connect.parser import parse_message_file
FIXTURES = Path(__file__).parent / "fixtures" / "mailbox"
class ParserTests(unittest.TestCase):
def test_hard_bounce_maps_to_permanent_rejection(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "hard_bounce.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.HARD_BOUNCE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_permanent")
self.assertEqual(candidate.assessment_subclass, "fail.hard_bounce")
def test_soft_bounce_maps_to_temporary_rejection(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "soft_bounce.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.SOFT_BOUNCE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_temporary")
def test_out_of_office_stays_undef(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "out_of_office.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.OUT_OF_OFFICE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.assessment_category.value, "undef")
self.assertEqual(candidate.event_type, "interaction.out_of_office_received")
def test_human_reply_is_email_channel_success_only(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "human_reply.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.HUMAN_REPLY)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "interaction.reply_received")
self.assertEqual(candidate.assessment_subclass, "success.reply_received")
def test_delayed_delivery_notice_stays_deferred(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "delayed_delivery.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.DELAYED_DELIVERY_NOTICE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.deferred")
self.assertEqual(candidate.assessment_subclass, "undef.deferred")
def test_final_failure_maps_to_expired_without_delivery(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "final_failure.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.FINAL_DELIVERY_FAILURE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.rejected_permanent")
self.assertEqual(candidate.assessment_subclass, "fail.expired_without_delivery")
def test_complaint_maps_to_channel_failure(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "complaint.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.COMPLAINT_OR_ABUSE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.channel.complaint_received")
self.assertEqual(candidate.assessment_subclass, "fail.complaint_received")
def test_unsubscribe_maps_to_opt_out(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "unsubscribe.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.UNSUBSCRIBE_OR_OPT_OUT)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.channel.unsubscribe_received")
self.assertEqual(candidate.assessment_subclass, "fail.unsubscribed")
def test_unknown_return_message_is_preserved(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "unknown_return.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.UNKNOWN_RETURN_MESSAGE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "notification.endpoint.unknown")
def test_challenge_response_stays_identity_uncertain(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "challenge_response.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.CHALLENGE_RESPONSE)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "interaction.unverified_actor_interaction")
self.assertEqual(candidate.assessment_subclass, "undef.identity_uncertain")
def test_parse_failure_is_reportable_diagnostic(self) -> None:
_inbound, parsed, candidate = parse_message_file(FIXTURES / "parse_failed.eml", mailbox_id="test")
self.assertEqual(parsed.message_class, MessageClass.PARSE_FAILED)
self.assertIsNotNone(candidate)
self.assertEqual(candidate.event_type, "diagnostic.message.parse_failed")
self.assertEqual(candidate.assessment_subclass, "undef.parse_failed")
def test_dsn_detail_fields_are_preserved_as_notes(self) -> None:
_inbound, parsed, _candidate = parse_message_file(FIXTURES / "hard_bounce.eml", mailbox_id="test")
self.assertIn("final_recipient=rfc822; missing@example.com", parsed.notes)
self.assertIn("action=failed", parsed.notes)
self.assertIn("diagnostic_code=smtp; 550 5.1.1 User unknown", parsed.notes)
if __name__ == "__main__":
unittest.main()

31
tests/test_recipients.py Normal file
View File

@@ -0,0 +1,31 @@
from __future__ import annotations
import unittest
from pathlib import Path
from email_connect.recipients import load_expected_recipients, normalize_email_address
FIXTURES = Path(__file__).parent / "fixtures"
class RecipientTests(unittest.TestCase):
def test_normalizes_email_addresses(self) -> None:
self.assertEqual(normalize_email_address(" USER@Example.COM "), "user@example.com")
self.assertIsNone(normalize_email_address("not-an-address"))
def test_loads_line_separated_recipients(self) -> None:
recipients = load_expected_recipients(FIXTURES / "expected_recipients.txt")
self.assertEqual(recipients.addresses, ("missing@example.com", "absent@example.com"))
self.assertEqual(len(recipients.invalid_entries), 1)
def test_loads_csv_recipients(self) -> None:
recipients = load_expected_recipients(FIXTURES / "expected_recipients.csv", csv_column="email")
self.assertEqual(recipients.addresses, ("optout@example.com", "csv-absent@example.com"))
self.assertEqual(len(recipients.invalid_entries), 1)
if __name__ == "__main__":
unittest.main()

179
tests/test_scanner.py Normal file
View File

@@ -0,0 +1,179 @@
from __future__ import annotations
import tempfile
import unittest
from csv import DictReader
from pathlib import Path
from email_connect.config import AppConfig, MailboxConfig, ReportsConfig, ScanConfig, SourceConfig, StorageConfig
from email_connect.scanner import scan_mailbox
from email_connect.storage import StateStore
FIXTURES = Path(__file__).parent / "fixtures" / "mailbox"
RECIPIENT_FIXTURES = Path(__file__).parent / "fixtures"
class ScannerTests(unittest.TestCase):
def test_scan_fixture_directory_writes_report_and_deduplicates(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
first = scan_mailbox(config)
second = scan_mailbox(config)
full = scan_mailbox(config, full_rescan=True, report_only_new=True)
self.assertEqual(first.scan.messages_seen, 11)
self.assertEqual(first.scan.messages_new, 11)
self.assertGreaterEqual(first.scan.evidence_events_created, 11)
self.assertEqual(second.scan.messages_seen, 0)
self.assertEqual(second.scan.messages_new, 0)
self.assertEqual(second.scan.evidence_events_created, 0)
self.assertEqual(full.scan.messages_seen, 11)
self.assertEqual(full.scan.messages_new, 0)
self.assertEqual(full.scan.evidence_events_created, 0)
self.assertTrue(first.report_path and first.report_path.exists())
with first.report_path.open(newline="", encoding="utf-8") as fh:
first_rows = list(DictReader(fh))
self.assertTrue(first_rows)
self.assertTrue(all(row["known_recipient"] == "false" for row in first_rows))
self.assertTrue(full.report_path and full.report_path.exists())
with full.report_path.open(newline="", encoding="utf-8") as fh:
self.assertEqual(list(DictReader(fh)), [])
def test_scan_updates_endpoint_quality(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
scan_mailbox(config)
store = StateStore(config.storage.path)
try:
rows = {row["affected_email_address"]: row for row in store.endpoint_quality_rows()}
finally:
store.close()
self.assertEqual(rows["missing@example.com"]["reachability"], "unreachable")
self.assertEqual(rows["full@example.com"]["reachability"], "degraded")
self.assertEqual(rows["complained@example.com"]["suppression_state"], "suppressed")
self.assertEqual(rows["optout@example.com"]["suppression_state"], "opted_out")
def test_expected_recipients_sort_first_and_get_no_evidence_rows(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
result = scan_mailbox(
config,
full_rescan=True,
expected_recipients_path=str(RECIPIENT_FIXTURES / "expected_recipients.txt"),
)
self.assertEqual(len(result.warnings), 1)
self.assertTrue(result.report_path and result.report_path.exists())
with result.report_path.open(newline="", encoding="utf-8") as fh:
rows = list(DictReader(fh))
self.assertGreater(len(rows), 2)
known_flags = [row["known_recipient"] for row in rows]
self.assertEqual(known_flags, sorted(known_flags, reverse=True))
missing_rows = [row for row in rows if row["affected_email_address"] == "missing@example.com"]
self.assertTrue(missing_rows)
self.assertTrue(all(row["known_recipient"] == "true" for row in missing_rows))
absent_rows = [row for row in rows if row["affected_email_address"] == "absent@example.com"]
self.assertEqual(len(absent_rows), 1)
self.assertEqual(absent_rows[0]["normalized_event_type"], "diagnostic.expected_recipient.no_evidence")
self.assertEqual(absent_rows[0]["assessment_category"], "undef")
self.assertEqual(absent_rows[0]["assessment_subclass"], "undef.no_signal")
self.assertEqual(absent_rows[0]["evidence_strength"], "none")
store = StateStore(config.storage.path)
try:
quality_addresses = {row["affected_email_address"] for row in store.endpoint_quality_rows()}
finally:
store.close()
self.assertNotIn("absent@example.com", quality_addresses)
def test_csv_expected_recipients_are_supported(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
result = scan_mailbox(
config,
full_rescan=True,
expected_recipients_path=str(RECIPIENT_FIXTURES / "expected_recipients.csv"),
expected_recipient_column="email",
)
self.assertEqual(len(result.warnings), 1)
self.assertTrue(result.report_path and result.report_path.exists())
with result.report_path.open(newline="", encoding="utf-8") as fh:
rows = list(DictReader(fh))
optout_rows = [row for row in rows if row["affected_email_address"] == "optout@example.com"]
self.assertTrue(optout_rows)
self.assertTrue(all(row["known_recipient"] == "true" for row in optout_rows))
csv_absent = [row for row in rows if row["affected_email_address"] == "csv-absent@example.com"]
self.assertEqual(csv_absent[0]["normalized_event_type"], "diagnostic.expected_recipient.no_evidence")
def test_datetime_range_excludes_messages_outside_range(self) -> None:
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
expected_path = root / "expected.txt"
expected_path.write_text("complained@example.com\nmissing@example.com\n", encoding="utf-8")
config = AppConfig(
mailbox=MailboxConfig(id="test-mailbox", protocol="fixture"),
scan=ScanConfig(),
storage=StorageConfig(path=str(root / "state.sqlite")),
reports=ReportsConfig(output_dir=str(root / "reports")),
source=SourceConfig(fixture_dir=str(FIXTURES)),
)
result = scan_mailbox(
config,
full_rescan=True,
expected_recipients_path=str(expected_path),
range_from="2026-06-02T10:04:00Z",
range_to="2026-06-02T10:04:00Z",
)
self.assertEqual(result.scan.messages_seen, 11)
self.assertEqual(result.scan.messages_parsed, 1)
self.assertTrue(result.report_path and result.report_path.exists())
with result.report_path.open(newline="", encoding="utf-8") as fh:
rows = list(DictReader(fh))
self.assertEqual({row["affected_email_address"] for row in rows}, {"complained@example.com", "missing@example.com"})
complaint = [row for row in rows if row["affected_email_address"] == "complained@example.com"][0]
missing = [row for row in rows if row["affected_email_address"] == "missing@example.com"][0]
self.assertEqual(complaint["normalized_event_type"], "notification.channel.complaint_received")
self.assertEqual(missing["normalized_event_type"], "diagnostic.expected_recipient.no_evidence")
self.assertEqual(result.scan.range_start.isoformat(), "2026-06-02T10:04:00+00:00")
self.assertEqual(result.scan.range_end.isoformat(), "2026-06-02T10:04:00+00:00")
if __name__ == "__main__":
unittest.main()

View File

@@ -2,9 +2,9 @@
id: EMAIL-WP-0001
type: workplan
title: "Repository Onboarding and Implementation Foundation"
domain: custodian
domain: infotech
repo: email-connect
status: active
status: finished
owner: codex
topic_slug: custodian
created: "2026-06-02"
@@ -55,7 +55,7 @@ Done when:
```task
id: EMAIL-WP-0001-T02
status: todo
status: done
priority: high
state_hub_task_id: "fdfd8b96-7326-414f-8126-79bb3a21b950"
```
@@ -76,7 +76,7 @@ The architecture note should cover:
```task
id: EMAIL-WP-0001-T03
status: todo
status: done
priority: high
state_hub_task_id: "ef1eb769-dfa0-4b46-8633-274d90962423"
```
@@ -105,7 +105,7 @@ click telemetry as proof of human awareness or result satisfaction.
```task
id: EMAIL-WP-0001-T04
status: todo
status: done
priority: medium
state_hub_task_id: "4b94e544-5aad-4c38-8fe3-eed17af79971"
```

View File

@@ -2,9 +2,9 @@
id: EMAIL-WP-0002
type: workplan
title: "MVP Mailbox Evidence Scanner"
domain: custodian
domain: infotech
repo: email-connect
status: ready
status: finished
owner: codex
topic_slug: custodian
created: "2026-06-02"
@@ -653,7 +653,8 @@ This allows rescanning old messages after parser improvements.
Initial mappings:
| Parsed class | Normalized event | Assessment |
| ------------------------- | ------------------------------------------- | ------------------------------- |
| ------------------------- | ------------------------------------
------- | ------------------------------- |
| `hard_bounce` | `notification.endpoint.rejected_permanent` | `fail.hard_bounce` |
| `soft_bounce` | `notification.endpoint.rejected_temporary` | `undef.deferred` |
| `delayed_delivery_notice` | `notification.endpoint.deferred` | `undef.deferred` |
@@ -672,7 +673,8 @@ The scanner should update basic endpoint quality.
Examples:
| Evidence | Endpoint quality update |
| ------------- | ---------------------------------------------------------- |
| ------------- | ------------------------------------
---------------------- |
| Hard bounce | `reachability = unreachable`, `last_failure_at = now` |
| Soft bounce | `reachability = degraded`, `last_failure_at = now` |
| Complaint | `suppression_state = suppressed` |
@@ -688,7 +690,7 @@ Endpoint quality is diagnostic and must not be treated as coordination success.
```task
id: EMAIL-WP-0002-T01
status: todo
status: done
priority: high
state_hub_task_id: "3a17215d-62a9-48ef-877f-a6fbc7e95a22"
```
@@ -716,7 +718,7 @@ Config file is loaded and validated.
```task
id: EMAIL-WP-0002-T02
status: todo
status: done
priority: high
state_hub_task_id: "25a4da12-1bcd-4c6d-a0eb-a2f525b9c4b9"
```
@@ -744,7 +746,7 @@ CLI can connect to mailbox and list/fetch messages without modifying mailbox.
```task
id: EMAIL-WP-0002-T03
status: todo
status: done
priority: high
state_hub_task_id: "16b95a6b-1375-4c91-8b78-0b75d51e0aeb"
```
@@ -773,7 +775,7 @@ Full rescan can revisit all messages while preserving deduplication.
```task
id: EMAIL-WP-0002-T04
status: todo
status: done
priority: high
state_hub_task_id: "5a50cd85-b0ab-4017-aba0-b2087068abb4"
```
@@ -802,7 +804,7 @@ Scanner extracts basic metadata and text from representative bounce and reply me
```task
id: EMAIL-WP-0002-T05
status: todo
status: done
priority: high
state_hub_task_id: "8ea826d1-0add-4573-9bb4-2b73adefba55"
```
@@ -831,7 +833,7 @@ Representative hard and soft bounce samples are classified correctly.
```task
id: EMAIL-WP-0002-T06
status: todo
status: done
priority: high
state_hub_task_id: "4d94a332-173b-4787-8fb2-27aa63db6a8d"
```
@@ -856,7 +858,7 @@ Representative OOO and human reply samples are classified with confidence.
```task
id: EMAIL-WP-0002-T07
status: todo
status: done
priority: high
state_hub_task_id: "8637d383-25f7-45b5-9680-427ed2ca87bf"
```
@@ -880,7 +882,7 @@ Representative complaint and unsubscribe examples are classified.
```task
id: EMAIL-WP-0002-T08
status: todo
status: done
priority: high
state_hub_task_id: "6d62dea0-f416-4c0b-80a0-7c16422b8e5f"
```
@@ -906,7 +908,7 @@ Parsed messages produce evidence candidates according to the mapping table.
```task
id: EMAIL-WP-0002-T09
status: todo
status: done
priority: medium
state_hub_task_id: "0d110877-953f-4aa2-961b-eec81e0159d4"
```
@@ -932,7 +934,7 @@ Complaint/unsubscribe updates suppression state.
```task
id: EMAIL-WP-0002-T10
status: todo
status: done
priority: medium
state_hub_task_id: "5ab35176-d6c2-4c73-b7b3-bde4c097e3ee"
```
@@ -959,7 +961,7 @@ Report can be opened in spreadsheet tools.
```task
id: EMAIL-WP-0002-T11
status: todo
status: done
priority: high
state_hub_task_id: "514fa099-781b-4590-aae4-c28970413b3f"
```
@@ -989,7 +991,7 @@ Automated tests verify expected classification and normalized event output.
```task
id: EMAIL-WP-0002-T12
status: todo
status: done
priority: medium
state_hub_task_id: "a5f7067e-87be-4438-ba35-b12d06a8181e"
```

View File

@@ -0,0 +1,356 @@
---
id: EMAIL-WP-0003
type: workplan
title: "Expected Recipient Reporting and Mailbox Tutorial"
domain: infotech
repo: email-connect
status: finished
owner: codex
topic_slug: custodian
created: "2026-06-02"
updated: "2026-06-02"
state_hub_workstream_id: "438149d2-4f20-42b1-91fd-cdeff29dec7d"
---
# EMAIL-WP-0003 - Expected Recipient Reporting and Mailbox Tutorial
## 1. Purpose
This workplan extends the mailbox evidence scanner so operators can provide an
optional set of target email addresses that were expected to receive
notifications. When expected recipients are provided, the scanner should include
them in the evidence report even when no mailbox evidence is known for a given
recipient.
The result is a report that can answer both:
```text
What evidence did the mailbox contain?
Which expected recipients have no known email-channel evidence?
```
The scanner must continue to work without a target-recipient list. Email events
remain evidence, not proof of delivery, awareness, or coordination success.
## 2. User Story
As an operator, I want to provide a line-separated list or CSV file of email
addresses that were supposed to receive notifications, scan a mailbox for return
evidence within a chosen time range, and generate a report where expected
recipients are easy to filter and appear before incidental mailbox-only
addresses.
## 3. In Scope
The workplan shall support:
- Optional expected-recipient input from a newline-separated text file.
- Optional expected-recipient input from CSV.
- CLI and config support for recipient list paths.
- Email address normalization and deduplication.
- Reports that can be generated without any expected-recipient input.
- Reports that include expected recipients with no known evidence.
- An explicit `known_recipient` boolean column in the report.
- Default report ordering with known recipients first.
- An `undef` evidence/report row for expected recipients where nothing is known.
- Mail inspection limited to a datetime range.
- Excluding all email evidence from messages outside the configured range.
- Tests that prove no overclaiming occurs for unknown expected recipients.
- A tutorial for generating a mailbox report from configuration through output
review.
## 4. Out of Scope
The workplan does not require:
- Outbound sending.
- Proving that all provided expected recipients were actually contacted.
- Requiring expected recipients for report generation.
- Legal delivery assessment.
- A suppression-management UI.
- Multi-mailbox correlation.
- Cross-batch campaign management.
## 5. Report Semantics
Expected recipients are advisory context supplied by the operator. If an
expected recipient has no evidence rows, the scanner should emit a conservative
unknown row:
```text
event_type: diagnostic.expected_recipient.no_evidence
assessment_category: undef
assessment_subclass: undef.no_signal
evidence_strength: none
known_recipient: true
```
This row means only:
```text
The recipient was supplied as an expected notification target, and this scan
found no mailbox evidence for that address in the inspected time range.
```
It must not mean:
```text
delivery failed
delivery succeeded
recipient was not notified
recipient ignored the message
```
Mailbox-only evidence rows for addresses not in the supplied expected-recipient
set should remain visible with:
```text
known_recipient: false
```
If no expected-recipient input is provided, the report should still be generated
from mailbox evidence only and `known_recipient` should default to `false`.
## 6. Time Range Semantics
The scanner should support an optional inclusive datetime range:
```text
--from 2026-06-01T00:00:00Z
--to 2026-06-02T23:59:59Z
```
Messages outside the range must be excluded before parsing and evidence
generation whenever the message timestamp is available. The range should also be
usable from config. If a message has no parseable timestamp while a range is
active, it is excluded because the scanner cannot confirm that it originated
inside the requested window.
Existing `--since` behavior may be retained as a compatibility alias for the
lower bound, but the new range should be expressed clearly in documentation.
## 7. CLI Target
Example commands:
```text
email-connect scan-mailbox --config config/mailbox.yml --out reports/
email-connect scan-mailbox --config config/mailbox.yml --expected-recipients recipients.txt --out reports/
email-connect scan-mailbox --config config/mailbox.yml --expected-recipients recipients.csv --expected-recipient-column email --out reports/
email-connect scan-mailbox --config config/mailbox.yml --from 2026-06-01T00:00:00Z --to 2026-06-02T23:59:59Z --out reports/
```
## 8. Work Packages
## T01 - Expected Recipient Input Model
```task
id: EMAIL-WP-0003-T01
status: done
priority: high
state_hub_task_id: "d1cd0de0-cbd5-4e8d-8179-000ba10e5506"
```
Tasks:
```text
Add expected-recipient config fields
Add CLI option for expected-recipient file path
Support newline-separated email address files
Support CSV files with configurable email column
Normalize addresses case-insensitively
Deduplicate recipient addresses
Reject or warn on invalid addresses without aborting the scan
```
Acceptance:
```text
The scanner can load zero, one, or many expected recipients from text or CSV.
Invalid recipient rows are visible as warnings or diagnostics.
```
## T02 - Known Recipient Report Column and Ordering
```task
id: EMAIL-WP-0003-T02
status: done
priority: high
state_hub_task_id: "3d7d3bb8-4118-4158-b874-b4e0527eaa85"
```
Tasks:
```text
Add known_recipient boolean column to CSV reports
Mark evidence rows true when affected_email_address matches expected recipients
Mark mailbox-only rows false when no expected list is provided or no match exists
Sort report rows with known recipients first by default
Preserve deterministic secondary sorting
Document filtering behavior for spreadsheet users
```
Acceptance:
```text
Generated reports include known_recipient and place known-recipient rows before
unknown-recipient rows by default.
```
## T03 - No-Evidence Rows for Expected Recipients
```task
id: EMAIL-WP-0003-T03
status: done
priority: high
state_hub_task_id: "aa737837-2f19-4fbf-9920-f98413bd9779"
```
Tasks:
```text
Detect expected recipients with no matching evidence in the inspected range
Generate diagnostic.expected_recipient.no_evidence rows for those recipients
Use assessment_category undef
Use assessment_subclass undef.no_signal
Use evidence_strength none
Avoid endpoint-quality updates from no-evidence rows
Avoid implying delivery failure or delivery success
Deduplicate generated no-evidence rows across rescans
```
Acceptance:
```text
Expected recipients with no mailbox evidence appear in the report as undef
no-signal diagnostics, not as failures or successes.
```
## T04 - Optional Recipient Context
```task
id: EMAIL-WP-0003-T04
status: done
priority: medium
state_hub_task_id: "731cf592-1bbe-4143-b21b-721af281528c"
```
Tasks:
```text
Keep report generation working when no recipient list is provided
Keep report generation working when recipient list is empty
Ensure expected-recipient input is not required for mailbox-only reports
Ensure mailbox-only evidence remains visible even when expected recipients are provided
```
Acceptance:
```text
Reports can be generated with no expected recipients, empty expected recipients,
or partial expected recipients.
```
## T05 - Datetime Range Filtering
```task
id: EMAIL-WP-0003-T05
status: done
priority: high
state_hub_task_id: "22585e83-d995-42d9-9ab2-c383b055fbb8"
```
Tasks:
```text
Add config fields for scan datetime lower and upper bounds
Add CLI options for datetime lower and upper bounds
Treat --since as a compatibility alias for the lower bound
Exclude messages outside the configured range from parsing and evidence generation
Define behavior for messages with no parseable Date header
Apply filtering consistently to fixture and IMAP scans
Store the range on MailboxScan
Add tests for inclusive lower and upper bounds
```
Acceptance:
```text
When a datetime range is configured, the scanner inspects only messages whose
message timestamp falls within the range according to the documented rules.
```
## T06 - Report and Evidence Tests
```task
id: EMAIL-WP-0003-T06
status: done
priority: high
state_hub_task_id: "f30cd5b9-5035-42b4-9eca-a104e2b26ecb"
```
Tasks:
```text
Add text recipient-list fixture
Add CSV recipient-list fixture
Add tests for known_recipient true and false rows
Add tests for known-recipient-first ordering
Add tests for no-evidence undef rows
Add tests that no-evidence rows do not update endpoint quality
Add tests for report generation with no recipient input
Add tests for datetime range exclusion
```
Acceptance:
```text
Automated tests prove expected-recipient reporting, optional recipient input,
and datetime range filtering.
```
## T07 - Mailbox Report Tutorial
```task
id: EMAIL-WP-0003-T07
status: done
priority: medium
state_hub_task_id: "00a29cb9-ac5a-4784-a9c4-7f2d4905405c"
```
Tasks:
```text
Create a tutorial for configuring mailbox access
Show fixture-based dry run
Show live IMAP configuration
Show expected-recipient text list usage
Show expected-recipient CSV usage
Show datetime range usage
Explain known_recipient filtering
Explain undef no-signal rows
Explain evidence limitations and overclaim prevention
Include troubleshooting notes for credentials and empty reports
```
Acceptance:
```text
A new user can follow the tutorial to generate and interpret an email-connect
mailbox evidence report.
```
## 9. Completion Criteria
This workplan is complete when:
1. Expected-recipient input is optional and supports text and CSV files.
2. Reports include `known_recipient`.
3. Known recipients sort first by default.
4. Expected recipients with no evidence produce `undef.no_signal` diagnostic
rows.
5. The scanner still works without any expected recipients.
6. Datetime range filtering excludes messages outside the inspected range.
7. Tests cover recipient input, report ordering, no-evidence rows, optional
input, and datetime range filtering.
8. A tutorial documents how to generate a mailbox evidence report.