generated from coulomb/repo-seed
feat: start mailbox evidence scanner
This commit is contained in:
140
src/email_connect/config.py
Normal file
140
src/email_connect/config.py
Normal file
@@ -0,0 +1,140 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MailboxConfig:
|
||||
id: str
|
||||
protocol: str = "imap"
|
||||
host: str | None = None
|
||||
port: int = 993
|
||||
tls: bool = True
|
||||
username_env: str | None = None
|
||||
password_env: str | None = None
|
||||
folder: str = "INBOX"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ScanConfig:
|
||||
mode: str = "incremental"
|
||||
max_messages_per_run: int = 5000
|
||||
since: str | None = None
|
||||
include_seen: bool = True
|
||||
mark_seen: bool = False
|
||||
store_raw_headers: bool = True
|
||||
store_raw_body: bool = False
|
||||
store_raw_message_ref: bool = True
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class StorageConfig:
|
||||
path: str = ".email-connect/state.sqlite"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReportsConfig:
|
||||
output_dir: str = "reports"
|
||||
include_all_evidence: bool = True
|
||||
include_unknown_messages: bool = True
|
||||
timestamp_timezone: str = "UTC"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SourceConfig:
|
||||
fixture_dir: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AppConfig:
|
||||
mailbox: MailboxConfig
|
||||
scan: ScanConfig
|
||||
storage: StorageConfig
|
||||
reports: ReportsConfig
|
||||
source: SourceConfig = SourceConfig()
|
||||
|
||||
|
||||
def load_config(path: str | Path) -> AppConfig:
|
||||
data = _load_mapping(Path(path))
|
||||
mailbox = data.get("mailbox", {})
|
||||
scan = data.get("scan", {})
|
||||
storage = data.get("storage", {})
|
||||
reports = data.get("reports", {})
|
||||
source = data.get("source", {})
|
||||
return AppConfig(
|
||||
mailbox=MailboxConfig(
|
||||
id=str(mailbox.get("id", "return-mailbox-default")),
|
||||
protocol=str(mailbox.get("protocol", "imap")),
|
||||
host=mailbox.get("host"),
|
||||
port=int(mailbox.get("port", 993)),
|
||||
tls=bool(mailbox.get("tls", True)),
|
||||
username_env=mailbox.get("username_env"),
|
||||
password_env=mailbox.get("password_env"),
|
||||
folder=str(mailbox.get("folder", "INBOX")),
|
||||
),
|
||||
scan=ScanConfig(
|
||||
mode=str(scan.get("mode", "incremental")),
|
||||
max_messages_per_run=int(scan.get("max_messages_per_run", 5000)),
|
||||
since=scan.get("since"),
|
||||
include_seen=bool(scan.get("include_seen", True)),
|
||||
mark_seen=bool(scan.get("mark_seen", False)),
|
||||
store_raw_headers=bool(scan.get("store_raw_headers", True)),
|
||||
store_raw_body=bool(scan.get("store_raw_body", False)),
|
||||
store_raw_message_ref=bool(scan.get("store_raw_message_ref", True)),
|
||||
),
|
||||
storage=StorageConfig(path=str(storage.get("path", ".email-connect/state.sqlite"))),
|
||||
reports=ReportsConfig(
|
||||
output_dir=str(reports.get("output_dir", "reports")),
|
||||
include_all_evidence=bool(reports.get("include_all_evidence", True)),
|
||||
include_unknown_messages=bool(reports.get("include_unknown_messages", True)),
|
||||
timestamp_timezone=str(reports.get("timestamp_timezone", "UTC")),
|
||||
),
|
||||
source=SourceConfig(fixture_dir=source.get("fixture_dir")),
|
||||
)
|
||||
|
||||
|
||||
def _load_mapping(path: Path) -> dict[str, Any]:
|
||||
text = path.read_text(encoding="utf-8")
|
||||
try:
|
||||
import yaml # type: ignore
|
||||
|
||||
loaded = yaml.safe_load(text)
|
||||
return loaded or {}
|
||||
except ModuleNotFoundError:
|
||||
return _parse_simple_yaml(text)
|
||||
|
||||
|
||||
def _parse_simple_yaml(text: str) -> dict[str, Any]:
|
||||
"""Parse the small YAML subset used by config/mailbox.example.yml."""
|
||||
|
||||
result: dict[str, Any] = {}
|
||||
current: dict[str, Any] | None = None
|
||||
for raw_line in text.splitlines():
|
||||
line = raw_line.split("#", 1)[0].rstrip()
|
||||
if not line.strip():
|
||||
continue
|
||||
if not line.startswith(" ") and line.endswith(":"):
|
||||
key = line[:-1].strip()
|
||||
current = {}
|
||||
result[key] = current
|
||||
continue
|
||||
if current is None or ":" not in line:
|
||||
raise ValueError(f"Unsupported YAML line: {raw_line}")
|
||||
key, value = line.strip().split(":", 1)
|
||||
current[key.strip()] = _parse_scalar(value.strip())
|
||||
return result
|
||||
|
||||
|
||||
def _parse_scalar(value: str) -> Any:
|
||||
if value == "" or value == "null":
|
||||
return None
|
||||
if value in {"true", "false"}:
|
||||
return value == "true"
|
||||
if value.startswith('"') and value.endswith('"'):
|
||||
return value[1:-1]
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
return value
|
||||
Reference in New Issue
Block a user