# Implementation Guide: DirektVermittlungDe While the API Documentation explains *how to use* the API, this guide explains *how to build* it, focusing on the backend engineering challenges like encryption handling, database schema, and performance patterns defined in your architecture. --- **Status:** Draft v1.0 **Target Audience:** Backend Engineering Team [cite_start]**Context:** Implements constraints from `DvdArchitektur.txt` [cite: 6] --- ## 1\. Technology Stack & Setup [cite_start]Based on the architectural constraints[cite: 45, 46, 47], the recommended reference stack is: * **Service Layer:** Java (Spring Boot 3.x) or Go (Gin/Echo) for high-concurrency performance. * **Primary Database:** PostgreSQL 15+ (Relational data for Routing/Threads). * **Blob Storage:** S3-Compatible Storage (AWS S3 / MinIO) for encrypted PDF payloads. * **Cache/PubSub:** Redis 7.x (Session store, Rate limiting, Async Job queues). ### Project Structure (Bounded Contexts) [cite_start]Organize the codebase into modules matching the architecture[cite: 43]: * `dvd-intake-service`: Handles `/documents` and Metadata extraction. * `dvd-communication-service`: Handles `/threads` and `/messages`. * `dvd-routing-engine`: The logic component for assigning units. * `dvd-export-worker`: Async background workers for eAkte exports. ----- ## 2\. Security Implementation Details ### [cite_start]2.1 Handling "Blind" E2E Encryption [cite: 27] The backend **must not** attempt to decrypt the `encryptedPayload`. * **Ingest:** Receive the `encryptedPayload` (Base64 or binary multipart). Stream it directly to S3 Blob Storage. Do not load the full file into RAM to avoid OOM on large files. * **Metadata:** Only persist the `metadata` JSON object to PostgreSQL for routing logic. * **Validation:** Verify the `encryptedPayload` is a valid encrypted container (e.g., check PGP/AES headers) but treat the content as opaque. ### [cite_start]2.2 Stateless Authentication (OAuth2) [cite: 44] * **Gateway Layer:** Implement a centralized API Gateway (e.g., Spring Cloud Gateway / Nginx) that validates JWT signatures (JWKS) from BundID (Citizens) and Authority-IDP (Officials). * **Context Propagation:** Extract the `sub` (User ID) and `scope` from the JWT and pass them to downstream microservices via HTTP Headers (e.g., `X-User-Id`, `X-User-Role`). ----- ## 3\. Database Schema Recommendations (PostgreSQL) [cite_start]Map the domain objects [cite: 64] to the following relational schema. **Table: `documents`** ```sql CREATE TABLE documents ( id UUID PRIMARY KEY, reference_number VARCHAR(50) NOT NULL, -- "Aktenzeichen" authority_id VARCHAR(50) NOT NULL, -- Routing target status VARCHAR(20) DEFAULT 'RECEIVED', -- RECEIVED, ROUTED, ASSIGNED storage_path VARCHAR(255) NOT NULL, -- S3 Key for encrypted blob created_at TIMESTAMPTZ DEFAULT NOW(), [cite_start]retention_date TIMESTAMPTZ -- For GDPR auto-deletion [cite: 14] ); CREATE INDEX idx_docs_authority ON documents(authority_id, status); ``` **Table: `threads`** ```sql CREATE TABLE threads ( id UUID PRIMARY KEY, document_id UUID REFERENCES documents(id), type VARCHAR(20) NOT NULL, -- CHAT, CALLBACK, APPOINTMENT assigned_official_id VARCHAR(100), -- Nullable until claimed last_activity_at TIMESTAMPTZ ); ``` **Table: `messages`** ```sql CREATE TABLE messages ( id UUID PRIMARY KEY, thread_id UUID REFERENCES threads(id), sender_role VARCHAR(20) NOT NULL, content_blob TEXT NOT NULL, -- Encrypted content created_at TIMESTAMPTZ DEFAULT NOW() ); -- Efficient Cursor Pagination: Index on (thread_id, created_at) CREATE INDEX idx_msgs_thread_time ON messages(thread_id, created_at DESC); ``` ----- ## 4\. Performance & Scalability Patterns ### [cite_start]4.1 Rate Limiting (Redis Token Bucket) [cite: 24] To protect against DDoS and ensure fair usage (NFR-2), implement specific limits: * **Citizens:** 10 requests/minute (prevent spamming threads). * **Officials:** 1000 requests/minute (allow rapid batch processing). *Implementation Tip:* Use a Redis-based "Sliding Window" Lua script. Key format: `rate_limit:{user_id}`. ### [cite_start]4.2 Caching Strategy [cite: 47] * **Routing Rules:** Cache `RoutingRules` in Redis for 1 hour. Invalidate immediately on Admin updates. * **ETags:** For `GET /documents/{id}`, generate an ETag based on the `updated_at` timestamp. Return `304 Not Modified` to save bandwidth if the client has the latest version. ----- ## [cite_start]5. Async Export Workflow [cite: 16] For the `POST /exports` endpoint: 1. **API Layer:** Validate request -\> Publish event `ExportRequested` to RabbitMQ/Redis Stream -\> Return `202 Accepted` + `jobId`. 2. **Worker:** * Consume `ExportRequested`. * Fetch `encryptedPayload` from S3. * Fetch Message History from Postgres. * *Note:* The Worker might need a special "Authority Key" to re-encrypt the package for the target eAkte system, depending on the specific crypto-concept. * Push result to the Authority's Ingest Interface. * Update Job Status to `COMPLETED`. ----- ## 6\. Definition of Done Checklist Before deploying to the staging environment, ensure: * [cite_start][ ] **Load Test:** System handles 500 concurrent document uploads without error[cite: 24]. * [ ] **Security Audit:** Confirm no PII (Aktenzeichen) is logged in plaintext application logs. * [cite_start][ ] **Cleanup:** The "GDPR Reaper" job is active and deletes `documents` where `retention_date < NOW()`[cite: 14]. # Implementation Guide: DirektVermittlungDe **Status:** Draft v1.1 **Stack:** Python / FastAPI **Context:** Implements constraints from `DvdArchitektur.txt` and ADR-007. ## 1. Technology Stack * **Language:** Python 3.11+ * **Web Framework:** FastAPI (with Uvicorn + Gunicorn) * **Validation:** Pydantic V2 (Strict Mode) * **Database ORM:** SQLAlchemy (Async) or Tortoise-ORM * **Task Queue:** ARQ (Redis-based) or Celery * **Primary DB:** PostgreSQL 15+ * **Blob Store:** MinIO / AWS S3 ## 2. Project Structure & Patterns Organize the monolithic repo or microservices using "Clean Architecture": ```text /src /domain # Pydantic models & Business Rules (Pure Python) /adapters # DB, S3, External APIs /service # Application Logic /api # FastAPI Routes /workers # Background Job Definitions ``` ## 2. Project Structure & Patterns Organize the monolithic repo or microservices using "Clean Architecture": ```text /src /domain # Pydantic models & Business Rules (Pure Python) /adapters # DB, S3, External APIs /service # Application Logic /api # FastAPI Routes /workers # Background Job Definitions ## 3. The "Hybrid Concurrency" Pattern (Critical) To meet NFR-1 (<300ms) and NFR-2 (10k sessions), you must not block the Event Loop. ### 3.1 The Rule - NEVER use time.sleep, requests, or heavy computation (e.g., pypdf, cryptography) inside an async def. - ALWAYS use await for I/O. - ALWAYS use loop.run_in_executor for CPU tasks. ### 3.2 Implementation Snippet ```python import asyncio from concurrent.futures import ProcessPoolExecutor from fastapi import APIRouter, UploadFile import some_heavy_crypto_lib router = APIRouter() # Create a dedicated pool for CPU tasks cpu_pool = ProcessPoolExecutor(max_workers=4) def cpu_bound_decryption(payload: bytes) -> dict: # This runs in a separate process, bypassing the GIL return some_heavy_crypto_lib.decrypt_and_parse(payload) @router.post("/documents") async def upload_document(file: UploadFile): content = await file.read() # Non-blocking I/O # Offload CPU work to the pool loop = asyncio.get_running_loop() metadata = await loop.run_in_executor( cpu_pool, cpu_bound_decryption, content ) return {"status": "processed", "meta": metadata} ``` ## 4. Security Implementation ### 4.1 "Blind" Ingest - Stream uploads directly to S3 using aiobotocore to avoid loading 50MB PDFs into RAM. - Do not attempt to read the encryptedPayload in the main web service process. ### 4.2 Auth Middleware Use fastapi.security.OAuth2AuthorizationCodeBearer. Implement a dependency that validates the JWT signature using a cached JWKS (JSON Web Key Set) to avoid a network call on every request. ## 5. Database Schema (SQLAlchemy Async) ```python from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy import String, UUID, DateTime import uuid from datetime import datetime class Base(DeclarativeBase): pass class Document(Base): __tablename__ = "documents" id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) reference_number: Mapped[str] = mapped_column(String(50), index=True) status: Mapped[str] = mapped_column(String(20), default="RECEIVED") # … ``` ## 6. Testing Strategy (Agentic TDD) - Framework: pytest + pytest-asyncio. - Mocking: Use respx for mocking external HTTP calls (Authority Systems). - Database: Use testcontainers-python to spin up a real Postgres for integration tests. - Prompting the Agent: "Write an async pytest for POST /documents. Use ProcessPoolExecutor mock to verify CPU offloading." xxx