generated from coulomb/repo-seed
feat: finish mailbox evidence scanner mvp
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import html
|
||||
import re
|
||||
from datetime import UTC, datetime
|
||||
from email import policy
|
||||
@@ -34,7 +35,25 @@ def parse_message_bytes(
|
||||
now: datetime | None = None,
|
||||
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
|
||||
observed_at = now or datetime.now(UTC)
|
||||
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
|
||||
if not raw_bytes.strip():
|
||||
return _parse_failed(
|
||||
mailbox_id=mailbox_id,
|
||||
raw_message_ref=raw_message_ref,
|
||||
imap_uid=imap_uid,
|
||||
observed_at=observed_at,
|
||||
reason="empty_message",
|
||||
)
|
||||
|
||||
try:
|
||||
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
|
||||
except Exception as exc:
|
||||
return _parse_failed(
|
||||
mailbox_id=mailbox_id,
|
||||
raw_message_ref=raw_message_ref,
|
||||
imap_uid=imap_uid,
|
||||
observed_at=observed_at,
|
||||
reason=f"parser_error:{type(exc).__name__}",
|
||||
)
|
||||
|
||||
message_id = _clean_header(msg.get("Message-ID"))
|
||||
subject = _clean_header(msg.get("Subject"))
|
||||
@@ -104,8 +123,12 @@ def classify_message(
|
||||
text = combined_text.lower()
|
||||
enhanced_status = _first_match(ENHANCED_STATUS_RE, combined_text)
|
||||
smtp_status = _first_match(SMTP_STATUS_RE, combined_text)
|
||||
affected = _extract_affected_recipient(combined_text)
|
||||
dsn_fields = _extract_dsn_fields(combined_text)
|
||||
affected = dsn_fields.get("final_recipient_email") or dsn_fields.get("original_recipient_email")
|
||||
if affected is None:
|
||||
affected = _extract_affected_recipient(combined_text)
|
||||
original_message_id = _extract_headerish(combined_text, "Original-Message-ID")
|
||||
original_recipient = dsn_fields.get("original_recipient_email") or affected
|
||||
notes: list[str] = []
|
||||
|
||||
message_class = MessageClass.UNRELATED_MESSAGE
|
||||
@@ -124,6 +147,10 @@ def classify_message(
|
||||
message_class = MessageClass.OUT_OF_OFFICE
|
||||
confidence = Confidence.MEDIUM
|
||||
reason_code = "auto_reply"
|
||||
elif _is_challenge_response(text):
|
||||
message_class = MessageClass.CHALLENGE_RESPONSE
|
||||
confidence = Confidence.MEDIUM
|
||||
reason_code = "challenge_response"
|
||||
elif _contains_any(text, ["will keep trying", "delivery delayed", "message delayed", "not yet delivered"]):
|
||||
message_class = MessageClass.DELAYED_DELIVERY_NOTICE
|
||||
confidence = Confidence.HIGH
|
||||
@@ -140,6 +167,9 @@ def classify_message(
|
||||
reason_code = "unknown_return"
|
||||
notes.append("Return-channel message did not match a reliable classifier.")
|
||||
|
||||
for key in ["original_recipient", "final_recipient", "action", "diagnostic_code", "remote_mta"]:
|
||||
if dsn_fields.get(key):
|
||||
notes.append(f"{key}={dsn_fields[key]}")
|
||||
if enhanced_status:
|
||||
notes.append(f"enhanced_status={enhanced_status}")
|
||||
if smtp_status:
|
||||
@@ -153,7 +183,7 @@ def classify_message(
|
||||
message_class=message_class,
|
||||
affected_email_address=affected,
|
||||
original_message_id=original_message_id,
|
||||
original_recipient=affected,
|
||||
original_recipient=original_recipient,
|
||||
smtp_status_code=smtp_status,
|
||||
enhanced_status_code=enhanced_status,
|
||||
reason_code=reason_code,
|
||||
@@ -183,6 +213,7 @@ def _classify_dsn(
|
||||
|
||||
def _extract_text(msg) -> str:
|
||||
chunks: list[str] = []
|
||||
html_chunks: list[str] = []
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
if part.get_content_maintype() == "multipart":
|
||||
@@ -192,14 +223,75 @@ def _extract_text(msg) -> str:
|
||||
chunks.append(str(part.get_content()))
|
||||
except Exception:
|
||||
continue
|
||||
elif part.get_content_type() == "text/html":
|
||||
try:
|
||||
html_chunks.append(_html_to_text(str(part.get_content())))
|
||||
except Exception:
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
chunks.append(str(msg.get_content()))
|
||||
content = str(msg.get_content())
|
||||
if msg.get_content_type() == "text/html":
|
||||
html_chunks.append(_html_to_text(content))
|
||||
else:
|
||||
chunks.append(content)
|
||||
except Exception:
|
||||
payload = msg.get_payload(decode=True)
|
||||
if payload:
|
||||
chunks.append(payload.decode(errors="replace"))
|
||||
return "\n".join(chunks)
|
||||
if chunks:
|
||||
return "\n".join(chunks)
|
||||
return "\n".join(html_chunks)
|
||||
|
||||
|
||||
def _parse_failed(
|
||||
*,
|
||||
mailbox_id: str,
|
||||
raw_message_ref: str | None,
|
||||
imap_uid: str | None,
|
||||
observed_at: datetime,
|
||||
reason: str,
|
||||
) -> tuple[InboundMailboxMessage, ParsedMailboxMessage, EmailEvidenceCandidate | None]:
|
||||
dedup_key = "|".join([mailbox_id, imap_uid or "", raw_message_ref or "", reason])
|
||||
mailbox_message_id = str(uuid5(NAMESPACE_URL, "email-connect:message:" + dedup_key))
|
||||
inbound = InboundMailboxMessage(
|
||||
mailbox_message_id=mailbox_message_id,
|
||||
mailbox_id=mailbox_id,
|
||||
imap_uid=imap_uid,
|
||||
message_id_header=None,
|
||||
received_at=None,
|
||||
from_address=None,
|
||||
to_addresses=[],
|
||||
subject=None,
|
||||
raw_headers_ref=raw_message_ref,
|
||||
raw_message_ref=raw_message_ref,
|
||||
first_seen_at=observed_at,
|
||||
last_seen_at=observed_at,
|
||||
deduplication_key=dedup_key,
|
||||
)
|
||||
parsed_id_basis = "|".join([mailbox_message_id, PARSER_VERSION, "parse_failed"])
|
||||
parsed = ParsedMailboxMessage(
|
||||
parsed_message_id=str(uuid5(NAMESPACE_URL, "email-connect:parsed:" + parsed_id_basis)),
|
||||
mailbox_message_id=mailbox_message_id,
|
||||
parser_version=PARSER_VERSION,
|
||||
message_class=MessageClass.PARSE_FAILED,
|
||||
affected_email_address=None,
|
||||
original_message_id=None,
|
||||
original_recipient=None,
|
||||
smtp_status_code=None,
|
||||
enhanced_status_code=None,
|
||||
reason_code=reason,
|
||||
confidence=Confidence.HIGH,
|
||||
parsed_at=observed_at,
|
||||
notes=[f"parse_failure={reason}"],
|
||||
)
|
||||
candidate = candidate_from_parsed(
|
||||
parsed,
|
||||
raw_message_ref=raw_message_ref,
|
||||
observed_at=observed_at,
|
||||
occurred_at=None,
|
||||
)
|
||||
return inbound, parsed, candidate
|
||||
|
||||
|
||||
def _message_dedup_key(
|
||||
@@ -262,6 +354,25 @@ def _extract_headerish(text: str, name: str) -> str | None:
|
||||
return match.group(1).strip() if match else None
|
||||
|
||||
|
||||
def _extract_dsn_fields(text: str) -> dict[str, str]:
|
||||
fields: dict[str, str] = {}
|
||||
for field, key in [
|
||||
("Original-Recipient", "original_recipient"),
|
||||
("Final-Recipient", "final_recipient"),
|
||||
("Action", "action"),
|
||||
("Diagnostic-Code", "diagnostic_code"),
|
||||
("Remote-MTA", "remote_mta"),
|
||||
]:
|
||||
value = _extract_headerish(text, field)
|
||||
if value:
|
||||
fields[key] = value
|
||||
if field in {"Original-Recipient", "Final-Recipient"}:
|
||||
match = EMAIL_RE.search(value)
|
||||
if match:
|
||||
fields[f"{key}_email"] = match.group(0).lower()
|
||||
return fields
|
||||
|
||||
|
||||
def _extract_affected_recipient(text: str) -> str | None:
|
||||
for name in ["Final-Recipient", "Original-Recipient", "X-Failed-Recipients", "Failed-Recipient"]:
|
||||
value = _extract_headerish(text, name)
|
||||
@@ -309,6 +420,21 @@ def _is_out_of_office(text: str) -> bool:
|
||||
)
|
||||
|
||||
|
||||
def _is_challenge_response(text: str) -> bool:
|
||||
return _contains_any(
|
||||
text,
|
||||
[
|
||||
"challenge-response",
|
||||
"challenge response",
|
||||
"sender verification",
|
||||
"verify your email before your message can be delivered",
|
||||
"confirm you are a real person",
|
||||
"confirm that you sent this message",
|
||||
"please verify yourself",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _looks_like_human_reply(inbound: InboundMailboxMessage, text: str) -> bool:
|
||||
subject = (inbound.subject or "").lower()
|
||||
if _contains_any(text, ["auto-submitted: auto-replied", "x-autoreply", "auto-generated"]):
|
||||
@@ -318,3 +444,8 @@ def _looks_like_human_reply(inbound: InboundMailboxMessage, text: str) -> bool:
|
||||
|
||||
def _looks_return_related(text: str) -> bool:
|
||||
return _contains_any(text, ["delivery", "mailbox", "recipient", "message", "smtp", "unsubscribe", "reply"])
|
||||
|
||||
|
||||
def _html_to_text(value: str) -> str:
|
||||
without_tags = re.sub(r"<[^>]+>", " ", value)
|
||||
return re.sub(r"\s+", " ", html.unescape(without_tags)).strip()
|
||||
|
||||
Reference in New Issue
Block a user