Files
email-connect/src/email_connect/config.py

157 lines
4.9 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class MailboxConfig:
id: str
protocol: str = "imap"
host: str | None = None
port: int = 993
tls: bool = True
username_env: str | None = None
password_env: str | None = None
folder: str = "INBOX"
@dataclass(frozen=True)
class ScanConfig:
mode: str = "incremental"
max_messages_per_run: int = 5000
since: str | None = None
range_from: str | None = None
range_to: str | None = None
include_seen: bool = True
mark_seen: bool = False
store_raw_headers: bool = True
store_raw_body: bool = False
store_raw_message_ref: bool = True
@dataclass(frozen=True)
class StorageConfig:
path: str = ".email-connect/state.sqlite"
@dataclass(frozen=True)
class ReportsConfig:
output_dir: str = "reports"
include_all_evidence: bool = True
include_unknown_messages: bool = True
timestamp_timezone: str = "UTC"
@dataclass(frozen=True)
class SourceConfig:
fixture_dir: str | None = None
@dataclass(frozen=True)
class ExpectedRecipientsConfig:
path: str | None = None
csv_column: str = "email"
@dataclass(frozen=True)
class AppConfig:
mailbox: MailboxConfig
scan: ScanConfig
storage: StorageConfig
reports: ReportsConfig
source: SourceConfig = SourceConfig()
expected_recipients: ExpectedRecipientsConfig = ExpectedRecipientsConfig()
def load_config(path: str | Path) -> AppConfig:
data = _load_mapping(Path(path))
mailbox = data.get("mailbox", {})
scan = data.get("scan", {})
storage = data.get("storage", {})
reports = data.get("reports", {})
source = data.get("source", {})
expected_recipients = data.get("expected_recipients", {})
return AppConfig(
mailbox=MailboxConfig(
id=str(mailbox.get("id", "return-mailbox-default")),
protocol=str(mailbox.get("protocol", "imap")),
host=mailbox.get("host"),
port=int(mailbox.get("port", 993)),
tls=bool(mailbox.get("tls", True)),
username_env=mailbox.get("username_env"),
password_env=mailbox.get("password_env"),
folder=str(mailbox.get("folder", "INBOX")),
),
scan=ScanConfig(
mode=str(scan.get("mode", "incremental")),
max_messages_per_run=int(scan.get("max_messages_per_run", 5000)),
since=scan.get("since"),
range_from=scan.get("from") or scan.get("range_from"),
range_to=scan.get("to") or scan.get("range_to"),
include_seen=bool(scan.get("include_seen", True)),
mark_seen=bool(scan.get("mark_seen", False)),
store_raw_headers=bool(scan.get("store_raw_headers", True)),
store_raw_body=bool(scan.get("store_raw_body", False)),
store_raw_message_ref=bool(scan.get("store_raw_message_ref", True)),
),
storage=StorageConfig(path=str(storage.get("path", ".email-connect/state.sqlite"))),
reports=ReportsConfig(
output_dir=str(reports.get("output_dir", "reports")),
include_all_evidence=bool(reports.get("include_all_evidence", True)),
include_unknown_messages=bool(reports.get("include_unknown_messages", True)),
timestamp_timezone=str(reports.get("timestamp_timezone", "UTC")),
),
source=SourceConfig(fixture_dir=source.get("fixture_dir")),
expected_recipients=ExpectedRecipientsConfig(
path=expected_recipients.get("path"),
csv_column=str(expected_recipients.get("csv_column", "email")),
),
)
def _load_mapping(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
try:
import yaml # type: ignore
loaded = yaml.safe_load(text)
return loaded or {}
except ModuleNotFoundError:
return _parse_simple_yaml(text)
def _parse_simple_yaml(text: str) -> dict[str, Any]:
"""Parse the small YAML subset used by config/mailbox.example.yml."""
result: dict[str, Any] = {}
current: dict[str, Any] | None = None
for raw_line in text.splitlines():
line = raw_line.split("#", 1)[0].rstrip()
if not line.strip():
continue
if not line.startswith(" ") and line.endswith(":"):
key = line[:-1].strip()
current = {}
result[key] = current
continue
if current is None or ":" not in line:
raise ValueError(f"Unsupported YAML line: {raw_line}")
key, value = line.strip().split(":", 1)
current[key.strip()] = _parse_scalar(value.strip())
return result
def _parse_scalar(value: str) -> Any:
if value == "" or value == "null":
return None
if value in {"true", "false"}:
return value == "true"
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
try:
return int(value)
except ValueError:
return value