Files
email-connect/src/email_connect/reporting.py

147 lines
5.1 KiB
Python

from __future__ import annotations
import csv
import json
from datetime import UTC, datetime
from pathlib import Path
REPORT_COLUMNS = [
"report_generated_at",
"scan_id",
"mailbox_id",
"mailbox_message_id",
"mailbox_received_at",
"source_from",
"source_to",
"source_subject",
"message_id_header",
"detected_message_class",
"normalized_event_type",
"assessment_category",
"assessment_subclass",
"affected_email_address",
"known_recipient",
"original_message_id",
"original_recipient",
"smtp_status_code",
"enhanced_status_code",
"reason_code",
"confidence",
"evidence_strength",
"occurred_at",
"observed_at",
"first_seen_at",
"last_seen_at",
"deduplication_key",
"raw_message_ref",
"notes",
]
def report_filename(now: datetime | None = None) -> str:
stamp = (now or datetime.now(UTC)).strftime("%Y%m%d-%H%M%S")
return f"email-channel-evidence-report-{stamp}.csv"
def write_evidence_report(
rows: list[dict],
*,
output_dir: str | Path,
scan_id: str,
mailbox_id: str,
generated_at: datetime | None = None,
expected_recipients: set[str] | None = None,
) -> Path:
generated = generated_at or datetime.now(UTC)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
path = _unique_report_path(out_dir / report_filename(generated))
with path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(fh, fieldnames=REPORT_COLUMNS)
writer.writeheader()
for row in _ordered_rows(rows, expected_recipients=expected_recipients or set()):
writer.writerow(_report_row(row, scan_id=scan_id, mailbox_id=mailbox_id, generated_at=generated))
return path
def _report_row(row: dict, *, scan_id: str, mailbox_id: str, generated_at: datetime) -> dict:
metadata = _json(row.get("metadata_json"))
notes = _json(row.get("notes_json"))
known_recipient = _known_recipient(row, expected_recipients=set(row.get("_expected_recipients", [])))
return {
"report_generated_at": generated_at.isoformat(),
"scan_id": scan_id,
"mailbox_id": mailbox_id,
"mailbox_message_id": row.get("mailbox_message_id", ""),
"mailbox_received_at": row.get("occurred_at") or "",
"source_from": metadata.get("source_from", ""),
"source_to": metadata.get("source_to", ""),
"source_subject": metadata.get("source_subject", ""),
"message_id_header": metadata.get("message_id_header", ""),
"detected_message_class": metadata.get("message_class", ""),
"normalized_event_type": row.get("event_type", ""),
"assessment_category": row.get("assessment_category", ""),
"assessment_subclass": row.get("assessment_subclass", ""),
"affected_email_address": row.get("affected_email_address") or "",
"known_recipient": "true" if known_recipient else "false",
"original_message_id": row.get("original_message_id") or "",
"original_recipient": metadata.get("original_recipient", ""),
"smtp_status_code": metadata.get("smtp_status_code") or "",
"enhanced_status_code": metadata.get("enhanced_status_code") or "",
"reason_code": metadata.get("reason_code") or "",
"confidence": row.get("confidence", ""),
"evidence_strength": row.get("evidence_strength", ""),
"occurred_at": row.get("occurred_at") or "",
"observed_at": row.get("observed_at") or "",
"first_seen_at": metadata.get("first_seen_at", ""),
"last_seen_at": metadata.get("last_seen_at", ""),
"deduplication_key": row.get("deduplication_key", ""),
"raw_message_ref": row.get("raw_message_ref") or "",
"notes": "; ".join(str(item) for item in notes),
}
def _ordered_rows(rows: list[dict], *, expected_recipients: set[str]) -> list[dict]:
enriched = [dict(row, _expected_recipients=tuple(expected_recipients)) for row in rows]
if not expected_recipients:
return enriched
return sorted(
enriched,
key=lambda row: (
not _known_recipient(row, expected_recipients=expected_recipients),
str(row.get("affected_email_address") or ""),
str(row.get("observed_at") or ""),
str(row.get("event_type") or ""),
str(row.get("deduplication_key") or ""),
),
)
def _known_recipient(row: dict, *, expected_recipients: set[str]) -> bool:
if row.get("known_recipient") is True:
return True
address = str(row.get("affected_email_address") or "").lower()
return bool(address and address in expected_recipients)
def _json(value: str | None) -> dict | list:
if not value:
return {}
try:
return json.loads(value)
except json.JSONDecodeError:
return {}
def _unique_report_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
for index in range(1, 1000):
candidate = path.with_name(f"{stem}-{index:02d}{suffix}")
if not candidate.exists():
return candidate
raise RuntimeError(f"Could not allocate unique report filename for {path}")