diff --git a/DirektVermittlungDe.png b/DirektVermittlungDe.png new file mode 100644 index 0000000..b2e5ad2 Binary files /dev/null and b/DirektVermittlungDe.png differ diff --git a/README.md b/README.md index a5e6a2f..ee9f584 100644 --- a/README.md +++ b/README.md @@ -15,4 +15,4 @@ Anstatt Bürger in Warteschleifen zu parken und Sachbearbeiter mit Weiterleitung **Problemklärung statt Problemverwaltung!** -xxx \ No newline at end of file +xxx diff --git a/docs/251201-architektur-direktVermittlungDe.txt b/docs/251201-architektur-direktVermittlungDe.txt new file mode 100644 index 0000000..2cfc3bf --- /dev/null +++ b/docs/251201-architektur-direktVermittlungDe.txt @@ -0,0 +1,290 @@ +DvdArchitektur + +Architektur Dokument für Direkt Vermittlung Deutschland + +# Direkt Vermittlung Deutschland Architektur + +## 1. Functional Requirements + +**Description: Core features and user stories** + +### 1.1 Kernfunktionen (auf Beleg fokussiert) + +**FR-1 – Schreiben erfassen (Document Intake)** + +* Bürger:innen können ein behördliches Schreiben als PDF hochladen oder via Kamera erfassen. +* Alternativ: Eingabe eines eindeutigen Merkmals (z. B. Aktenzeichen/Kassenzeichen + Behörde). +* Das System erzeugt einen **DocumentEnvelope** inkl. Metadaten (Aktenzeichen, Belegart, Datum, Absenderbehörde). + +**FR-2 – Automatisches Routing zur zuständigen Stelle** + +* Die Plattform verfügt über eine **Routing-Engine**, die anhand von Metadaten & Konfiguration + + * die zuständige Behörde / Organisationseinheit + * und idealerweise den zuständigen Sachbearbeiter ermittelt. +* Fallback-Regeln (z. B. Standard-Team, Zentrale) greifen, wenn keine eindeutige Zuordnung möglich ist. + +**FR-3 – Interaktionsstart zum Schreiben (Interaction Thread)** + +* Bürger:innen können zu einem bestimmten Schreiben: + + * eine Rückfrage stellen (Textnachricht), + * einen Rückruf anfragen, + * einen Direktanruf starten (sofern verfügbar), + * einen Terminwunsch äußern. +* Für jede solche Interaktion wird ein **InteractionThread** zum **DocumentEnvelope** angelegt. + +**FR-4 – Kommunikation & Dokumentaustausch** + +* Bürger und Behörde können in einem Thread: + + * Nachrichten austauschen, + * ergänzende Belege hochladen (z. B. Nachweise, Antworten), + * interne Notizen (behördenintern, nicht für Bürger sichtbar) anlegen. +* Alle Aktivitäten werden chronologisch protokolliert. + +**FR-5 – Call-/Rückruf-/Termin-Orchestrierung** + +* Das System unterstützt: + + * Weitervermittlung bei eingehenden Anrufen anhand eines Merkmals, + * Anlegen & Verwalten von Rückrufanforderungen, + * optionale Terminvereinbarung (Kalenderintegration der Behörde). + +**FR-6 – Temporäre Vorhaltung & persönliche Ablage** + +* Standard: Daten (Schreiben, Threads, Dateien) werden nur bis zur Erledigung + definierter Inaktivitätsfrist gehalten und dann gelöscht. +* Bürger und Behörde können optional eine **verlängerte Aufbewahrung** (persönliche Ablage) buchen/aktivieren. + +**FR-7 – Datenexport an Behördensysteme** + +* Behörden können Daten eines Vorgangs (Schreiben, Metadaten, Interaktionshistorie, Anhänge) + + * in ihre eAkte / Fachverfahren exportieren (pull oder push). +* Der Export ist konfigurierbar (welche Daten, welches Format). + +--- + +### 1.2 User Stories (Beispiele) + +* *Als Bürger* möchte ich ein behördliches Schreiben scannen oder hochladen können, + damit ich ohne manuelle Zuständigkeitsrecherche direkt den richtigen Ansprechpartner erreiche. + +* *Als Sachbearbeiterin* möchte ich alle offenen Vorgänge zu den Schreiben meiner Behörde in einer Liste sehen, + damit ich Rückfragen gezielt bearbeiten und priorisieren kann. + +* *Als Behördenadministrator* möchte ich Routingregeln konfigurieren (z. B. nach Aktenzeichenbereichen), + damit Anfragen automatisiert der richtigen Organisationseinheit zugeordnet werden. + +* *Als Bürger* möchte ich optional eine längerfristige digitale Ablage wichtiger Schreiben und Klärungsverläufe, + damit ich später auf diese Informationen zugreifen kann. + +**Why It Matters:** +Diese Functional Requirements definieren klar, **was** DVD tut: + +* Sie trennen sauber zwischen “Beleg”, “Interaktion” und “Routing” → gute Grundlage für REST-/API-Design. +* Sie machen DVD bewusst **beleggesteuert** (nicht generische Kommunikation) und ermöglichen spätere Erweiterungen (z. B. KI-Extraktion, andere Kanäle), ohne das Kernmodell zu brechen. + +--- + +## 2. Non-Functional Requirements + +**Description: Performance, security, usability SLAs** + +### 2.1 Performance & Skalierung + +* **NFR-1:** 95. Perzentil der Antwortzeiten für Kernoperationen (Thread öffnen, Nachricht senden, Routing) < 300 ms bei normaler Last. +* **NFR-2:** System unterstützt mindestens **10k gleichzeitige Sessions** (Behörden + Bürger) pro Region. +* **NFR-3:** Routingentscheidungen werden in < 500 ms getroffen (inkl. Lookups). + +### 2.2 Sicherheit & Datenschutz + +* **NFR-4:** Vollständige **DSGVO-Konformität**, inkl. + + * Recht auf Auskunft, Löschung, Berichtigung, + * dokumentierte Retentions-Policies, + * Privacy-by-Design & by-Default. +* **NFR-5:** End-to-End-Verschlüsselung in Transport (TLS 1.2+) und Verschlüsselung sensibler Daten im Ruhezustand (z. B. Aktenzeichen, PDF-Dokumente). +* **NFR-6:** Zugriff nur auf Basis rollenspezifischer Berechtigungen (least privilege, mandantensicher). + +### 2.3 Verfügbarkeit & Resilienz + +* **NFR-7:** Zielverfügbarkeit: ≥ 99,5 % im Jahresmittel. +* **NFR-8:** Disaster-Recovery-Konzept mit RPO ≤ 15 min, RTO ≤ 4 h. + +### 2.4 Usability + +* **NFR-9:** Bürgeroberfläche barrierearm nach WCAG 2.1 AA-Orientierung. +* **NFR-10:** Max. 3 Schritte vom Start bis zur gestellten Anfrage. + +**Why:** +Diese NFRs treiben Architekturentscheidungen: + +* Asynchrone Verarbeitung, Caching & horizontale Skalierung werden notwendig. +* Strikte IAM- & Verschlüsselungsschichten sind Pflicht in GovTech-Umgebungen. +* Klare SLAs verhindern, dass “Prototyping-Architektur” im Produktivbetrieb kollabiert. + +--- + +## 3. Business Context + +**Description: Use cases, target audience, integrations** + +### 3.1 Use Cases (Business-Sicht) + +* Reduktion der durchschnittlichen **Telefonzeit pro Klärung** (z. B. von 30 auf 10 Minuten). +* Entlastung von Telefonzentralen, Bündelung der Klärung direkt bei den Fachbereichen. +* Schaffung eines digitalen Rückkanals zu behördlichen Schreiben, der nachvollziehbar und sicher ist. + +### 3.2 Zielgruppen + +* **Primäre Business-Kunden:** + + * Kommunalverwaltungen, Gerichtskassen, Amtsgerichte, Landesbehörden, Finanzämter. +* **Endnutzer:innen:** + + * Bürger:innen (Privatpersonen, Unternehmen) mit Bescheiden / Schreiben. +* **Sekundäre Stakeholder:** + + * Kommunale/regionale IT-Dienstleister, Fachverfahrenshersteller. + +### 3.3 Integrationskontext + +* eAkte-Systeme der Behörden (unterstützt durch generische Exporte und custom Adapter). +* Identity-Provider (z. B. BundID, eID, SAML/OIDC-basierte Behörden-Singlesignon). +* TK-Anlagen / Callcenter-Software (für Direktvermittlung, Rückruflisten). +* Optional: Dokumentenmanagement / Archivsysteme. + +**Why:** +Der Business-Kontext sorgt dafür, dass du die “Nomen” des Systems (DocumentEnvelope, InteractionThread, AuthorityUnit) passend zu realen Strukturen modellierst. Er verhindert, dass du eine “abstrakte Messaging-Plattform” baust, statt eines **belegzentrierten GovTech-Dienstes**. + +--- + +## 4. Technical Constraints + +**Description: Existing systems, tech stack, and standards** + +*(Hier formuliere ich bewusst Vorschläge, die du später bestätigen/ändern kannst.)* + +### 4.1 Architekturparadigmen + +* **TC-1:** Service-orientierte / modularisierte Architektur mit klaren Bounded Contexts: + + * Citizen & Channels, + * Document & Interaction, + * Routing & Org, + * Integration & Export. +* **TC-2:** Stateless APIs im Frontend-Layer, Session-Handling via Token (JWT/OAuth2), um horizontale Skalierung zu ermöglichen. + +### 4.2 Tech Stack (Vorschlag / Beispiel) + +* **Backend:** z. B. Java (Spring Boot) oder Go (stark in Gov-Umfeldern verbreitet), Node.js denkbar – Standard: REST-APIs, optional GraphQL. +* **Datenbank:** Relationale DB (PostgreSQL) für Kernobjekte; Blob-Storage (z. B. S3-kompatibel) für PDF-Dokumente. +* **Caching:** Redis für Sessions, Routing-Cache und häufige Lookups. +* **Messaging:** Optionale Message-Bus/Queue (z. B. Kafka/RabbitMQ) für Export-Jobs, Notifications. + +### 4.3 Standards & Schnittstellen + +* **TC-3:** REST/JSON-APIs als Primärschnittstelle; sprechende Ressourcen (z. B. `/documents`, `/threads`, `/exports`). +* **TC-4:** HTTPS/TLS verpflichtend, HSTS aktiviert. +* **TC-5:** OpenAPI-Spezifikation für alle externen APIs (für Behörden & Integrationspartner). + +**Why:** +Diese Constraints helfen, früh falsche Technologieentscheidungen auszuschließen und das System **von Anfang an auf Skalierbarkeit & Interoperabilität** auszurichten. OpenAPI-First erleichtert LLM-gestützte Codegenerierung und Mock-APIs. + +--- + +## 5. Stakeholder Feedback + +**Description: Early input from consumers (e.g., devs using the API)** + +### 5.1 Stakeholder-Gruppen für Feedback + +* **Behörden-Fachseite:** Sachbearbeiter:innen und Fachbereichsleitungen + + * Fokus: Usability, Prozesslogik, Zuständigkeitsmodell. +* **Behörden-IT:** Architekt:innen, Admins, Datenschutzbeauftragte + + * Fokus: Integrationsfähigkeit, Sicherheit, Betriebsmodell. +* **Bürgerperspektive:** Pilot-Testgruppen + + * Fokus: Verständlichkeit, Barrierefreiheit, Friktion beim Einstieg (Schreiben hochladen, Merkmal finden etc.). + +### 5.2 Feedback-Mechanismen + +* Interaktive Mock-UIs / klickbare Prototypen (z. B. Figma) zur frühen Usability-Validierung. +* **Mock-APIs** (z. B. via Postman, Insomnia oder Swagger-UI) zur API-Validierung mit Behördensystemen. +* Kurze Surveys oder Interviews (z. B. „Was ist für Sie das größte Problem bei Rückfragen zu Schreiben?“). + +### 5.3 Iterationsschleifen + +* Frühzeitige “Pilot-Phase” mit einer kleinen Behörde/Abteilung, bevor generisch ausgebaut wird. +* Veränderungswünsche in **Versionierte API-Designs** überführen (z. B. v1 → v1.1), kein “API-Chaos”. + +**Why:** +Stakeholder-Feedback verhindert, dass du an den eigentlichen Bedürfnissen vorbeientwickelst (z. B. zu komplizierte Ablaufsteuerung). Es ist essentiell für **cleanes, *wirklich* extensibles Design**, insbesondere, weil Behörden-IT traditionell komplexe und gewachsene Umfelder hat. + +--- + +## 6. Existing Assets + +**Description: Legacy docs, wireframes, or code snippets** + +Im Moment sind das bei DVD überwiegend **noch zu definierende** Assets – aber hier ist, was du konkret sammeln/erstellen solltest (und wie es der Architektur hilft): + +### 6.1 Fachliche Assets + +* Beispiel-Briefe aus verschiedenen Behörden (gerichtliche Schreiben, Gebührenbescheide, Steuerbescheide etc.) + + * inkl. Struktur der Aktenzeichen/Kassenzeichen (Masken, Beispiele). +* Prozessbeschreibungen, wie Rückfragen heute laufen (IST-Prozess) – z. B. BPMN oder einfache Swimlane-Diagramme. + +### 6.2 Modell- & Design-Assets + +* Erste **Domänenmodelle / UML-Klassendiagramme** für: + + * `DocumentEnvelope`, `InteractionThread`, `RoutingRule`, `AuthorityUnit`, `User`. +* Wireframes/Mockups für: + + * Bürger-Oberfläche (Schreiben hochladen, Anfrage stellen), + * Sachbearbeiter-Oberfläche (Postkorb, Vorgänge, Antworten). + +### 6.3 Technische Assets + +* Beispiel-JSON-Payloads für zentrale API-Objekte, z. B.: + +```json +{ + "documentId": "doc-123", + "organisationId": "court-xyz", + "caseNumber": "12 C 345/25", + "documentType": "PaymentNotice", + "issuedAt": "2025-12-01", + "citizenHint": "Bitte geben Sie dieses Aktenzeichen bei Rückfragen an." +} +``` + +```json +{ + "threadId": "thr-789", + "documentId": "doc-123", + "messages": [ + { + "messageId": "msg-1", + "from": "citizen", + "content": "Ich habe eine Frage zur Zahlungsfrist.", + "createdAt": "2025-12-02T09:15:00Z" + } + ] +} +``` + +**Why:** +Diese Assets sind Gold für: + +* spätere LLM-Unterstützung (z. B. “Generiere Routing-Regeln basierend auf diesen Beispielbriefen”), +* Reduktion von Missverständnissen bei Domainmodell & API-Design, +* schnellere Implementierung, weil Entwickler konkrete Fälle sehen. + +--- + diff --git a/docs/251201-introduction-direktVermittlungDe.txt b/docs/251201-introduction-direktVermittlungDe.txt new file mode 100644 index 0000000..72b6b8c --- /dev/null +++ b/docs/251201-introduction-direktVermittlungDe.txt @@ -0,0 +1,22 @@ +DirektVermittlungDe + +Gegen Schnitzeljagd und Flüsterpost in der öffentlichen Verwaltung + +# DirektVermittlungDe (DVD) + +*Effiziente, sichere und direkte Kommunikation zwischen Bürgern und Behörden.* + +Behördenkommunikation krankt oft nicht am Inhalt, sondern an Überlast und umständlicher Koordination: +Falscher Ansprechpartner → Telefonzentrale → Weiterleitung → erneute Identifikation → erneutes Erklären. +Die Auskunft selbst dauert 2 Minuten, der Weg dahin 30. + +DirektVermittlungDe reduziert diesen Weg auf einen Schritt. + +Der Bürger übermittelt einen Ausgangsbeleg oder ein darin eindeutiges Merkmal (z. B. Aktenzeichen/Kassenzeichen) → +das System ermittelt die fachliche Zuständigkeit → stellt möglichst direkt zum korrekten Sachbearbeiter durch. + +Anstatt Bürger in Warteschleifen zu parken und Sachbearbeiter mit Weiterleitungen an den richtigen Zuständigen zu belasten, koordiniert das System Durchstellen, Rückruf oder Terminvereinbarung und ermöglicht bei Bedarf die Zuordnung von Notizen und ergänzenden Belegen. + +**Problemklärung statt Problemverwaltung!** + +xxx \ No newline at end of file diff --git a/docs/api_docs.md b/docs/api_docs.md new file mode 100644 index 0000000..b749e99 --- /dev/null +++ b/docs/api_docs.md @@ -0,0 +1,175 @@ +Here is the **API Documentation & Implementation Guide** for DirektVermittlungDe (DVD). This document focuses on the **practical implementation scenarios** derived from the architectural requirements and the OpenAPI specification designed in the previous step. + +----- + +# API Documentation: Core Scenarios & Implementation Guide + +**Version:** 1.0 +**Target Audience:** Frontend Developers (Citizen App), Backend Integrators (Authority Systems) +**Base URL:** `https://api.direktvermittlung.de/v1` + +## Overview + +[cite_start]This API enables the "Direct Vermittlung" workflow: receiving documents, routing them to the correct authority, and facilitating direct communication[cite: 1, 4]. [cite_start]It is designed to be **stateless** [cite: 40][cite_start], **secure** (E2E encryption) [cite: 23][cite_start], and **scalable** (10k+ sessions)[cite: 20]. + +----- + +## 0\. Authentication + +[cite_start]All endpoints require a valid OAuth2 Bearer Token[cite: 40]. + + * **Citizens** use `citizen:write` scope (via BundID/eID). + * **Officials** use `official:read`/`official:write` scopes (via Authority SSO). + +----- + +## Scenario 1: The "Digital Intake" (Document Submission) + +[cite_start]**User Story:** A citizen scans a tax assessment letter to find the right contact person without manual searching[cite: 1, 13]. + +### Implementation Logic + +1. [cite_start]**Metadata Separation:** The client must extract non-sensitive routing data (Authority Name, Reference Number) as plaintext metadata[cite: 2, 4]. +2. [cite_start]**Encryption:** The actual PDF content (`encryptedPayload`) is encrypted on the client side before upload to meet NFR-5 (E2E Encryption)[cite: 23]. +3. [cite_start]**Routing:** The backend Routing Engine uses the plaintext metadata to assign the `DocumentEnvelope` to the correct unit[cite: 4]. + +### API Request: `POST /documents` + +```json +{ + "metadata": { + "authorityId": "Finanzamt-München-I", + "referenceNumber": "123/456/789", + "docType": "NOTICE", + "issuedAt": "2025-10-25T00:00:00Z" + }, + "encryptedPayload": "BASE64_ENCRYPTED_BLOB_..." +} +``` + +### Success Response (`201 Created`) + +```json +{ + "id": "doc-882291", + "status": "ROUTED", + "assignedUnit": "Steuerfestsetzung-Team-B" +} +``` + +> [cite_start]**Note:** The `assignedUnit` confirms that the Routing Engine successfully mapped the request in \< 500ms[cite: 20]. + +----- + +## Scenario 2: Starting a Clarification Thread + +[cite_start]**User Story:** The citizen has a question about the document and wants to start a chat or request a callback[cite: 5, 6]. + +### Implementation Logic + +1. [cite_start]**Context:** The thread is explicitly linked to the `documentId` (`doc-882291`), creating a "Subject-Context" binding[cite: 6]. +2. [cite_start]**Type Selection:** The user selects the channel: `TEXT_CHAT`, `CALLBACK_REQUEST`, or `APPOINTMENT`[cite: 5]. + +### API Request: `POST /documents/doc-882291/threads` + +```json +{ + "type": "CALLBACK_REQUEST", + "initialMessage": "I do not understand the calculation on page 2. Please call me.", + "preferredTimeSlot": "2025-10-28T14:00:00Z" +} +``` + +### Success Response (`201 Created`) + +```json +{ + "threadId": "th-9912", + "status": "PENDING_OFFICIAL", + "estimatedWaitTime": "4h" +} +``` + +----- + +## Scenario 3: Real-time Communication & History + +[cite_start]**User Story:** An official replies to the inquiry, and the citizen views the chat history[cite: 7]. + +### Implementation Logic + +1. [cite_start]**Performance:** To support fast loading (NFR-1 \< 300ms)[cite: 19], we use **Cursor-based pagination** for messages. +2. **Polling/Updates:** The frontend polls this endpoint (or uses a WebSocket subscription, if extended) to show new messages. + +### API Request: `GET /threads/th-9912/messages` + +*Query Parameters:* `?limit=20&before=2025-10-27T10:00:00Z` + +### Success Response (`200 OK`) + +```json +{ + "data": [ + { + "id": "msg-552", + "senderRole": "OFFICIAL", + "content": "encrypted_content_string...", + "timestamp": "2025-10-27T09:45:00Z" + }, + { + "id": "msg-551", + "senderRole": "CITIZEN", + "content": "encrypted_content_string...", + "timestamp": "2025-10-27T09:30:00Z" + } + ], + "paging": { + "nextCursor": "2025-10-27T09:30:00Z" + } +} +``` + +----- + +## Scenario 4: Data Export (Authority Integration) + +[cite_start]**User Story:** The issue is resolved, and the authority imports the chat history and document into their eAkte system[cite: 11, 35]. + +### Implementation Logic + +1. [cite_start]**Asynchronous Processing:** Since exports can be large (PDFs + Chat Logs), the API returns immediately with a `jobId`[cite: 28, 43]. +2. [cite_start]**Format:** The export format is configurable (e.g., PDF summary of chat + original attachments)[cite: 12]. + +### API Request: `POST /exports` + +```json +{ + "caseId": "doc-882291", + "targetSystem": "eAkte-Standard-V2", + "includeAttachments": true +} +``` + +### Success Response (`202 Accepted`) + +```json +{ + "jobId": "job-5512", + "status": "QUEUED", + "statusUrl": "/exports/job-5512" +} +``` + +----- + +## Error Handling Standards + +[cite_start]To ensure clarity for developers[cite: 16], the API uses standard HTTP codes: + + * `400 Bad Request`: Validation failed (e.g., missing Aktenzeichen). + * `404 Not Found`: Document or Thread ID does not exist. + * [cite_start]`429 Too Many Requests`: Rate limit exceeded (NFR protection)[cite: 19]. + * `503 Service Unavailable`: Maintenance or backend overload. + + +xxx \ No newline at end of file diff --git a/docs/decisions.md b/docs/decisions.md new file mode 100644 index 0000000..49b3994 --- /dev/null +++ b/docs/decisions.md @@ -0,0 +1,125 @@ +# Decision Log: DirektVermittlungDe + +**Status:** Approved +**Date:** 2025-12-01 +**Context:** [DvdArchitektur.txt], [Introduction.txt] + + +Here is the formal **Decision Log** for the DirektVermittlungDe (DVD) project. This document captures the critical architectural choices, the options considered, and the rationale based on the provided requirements and constraints. + +# Decision Log: DirektVermittlungDe + +**Status:** Approved +**Date:** 2025-12-01 +**Context:** [DvdArchitektur.txt], [Introduction.txt] + +--- + +## ADR-001: Separation of Metadata and Content Payload +**Context:** +[cite_start]The system requires an automated **Routing-Engine** to assign documents to the correct authority[cite: 4]. [cite_start]However, strict **End-to-End Encryption (E2E)** is required for sensitive data in transport and rest[cite: 23]. The server cannot route what it cannot read. + +**Decision:** +We will implement a **Split-Payload Model**: +1. [cite_start]**Metadata (Plaintext):** A defined set of non-sensitive routing criteria (e.g., Authority ID, Reference/Aktenzeichen, Document Type) is sent as unencrypted JSON (over TLS)[cite: 3]. +2. [cite_start]**Payload (Encrypted):** The actual PDF document and sensitive message content are encrypted on the client side and stored as opaque blobs[cite: 23, 42]. + +**Rationale:** +* [cite_start]**Compliance:** Satisfies NFR-5 (Privacy/Encryption) while enabling FR-2 (Auto-routing)[cite: 4, 23]. +* [cite_start]**Performance:** The Routing Engine operates on lightweight JSON metadata (< 500ms target) without needing to decrypt/encrypt heavy files[cite: 20]. + + + +[Image of end to end encryption architecture] + + +--- + +## ADR-002: Stateless Authentication via OAuth2/JWT +**Context:** +[cite_start]The system must support **10k+ concurrent sessions** per region [cite: 20] [cite_start]and allow horizontal scaling (TC-2)[cite: 40]. Traditional server-side sessions (sticky sessions) would hinder scalability. + +**Decision:** +Use **OAuth2 with OpenID Connect (OIDC)** and stateless **JWTs (JSON Web Tokens)** for session handling. +* **Citizens:** Authenticate via BundID/eID. +* [cite_start]**Officials:** Authenticate via Authority SSO (SAML/OIDC integration)[cite: 35]. + +**Rationale:** +* [cite_start]**Scalability:** Allows the backend to be purely stateless; any instance can service any request (TC-2)[cite: 40]. +* [cite_start]**Security:** Scopes (e.g., `citizen:write`, `official:read`) map directly to the least-privilege NFR-6[cite: 24]. + +--- + +## ADR-003: Pagination Strategy for Interaction Threads +**Context:** +Interaction threads (FR-3) can grow long over time. [cite_start]The NFR-1 target is a response time of **< 300ms** for core operations[cite: 19]. Standard "Page/Offset" pagination degrades in performance as datasets grow (Offset Drifting) and handles real-time updates poorly. + +**Decision:** +* **Cursor-based Pagination:** Used for `InteractionThreads` (chat history). The cursor will be the `timestamp` of the message. +* [cite_start]**Offset-based Pagination:** Retained for the **Official’s Case List/Inbox** (FR-1.2), where users expect to "jump to page 2"[cite: 13]. + +**Rationale:** +* [cite_start]**Performance:** Cursor seeking is O(1) complexity, ensuring the 300ms SLA is met regardless of thread length[cite: 19]. +* [cite_start]**Usability:** Prevents "missing messages" or duplicates if new messages arrive while a user is scrolling (essential for chat)[cite: 8]. + +--- + +## ADR-004: Asynchronous Processing for Data Exports +**Context:** +[cite_start]Authorities need to export data (PDFs + History) to their eAkte systems (FR-7)[cite: 11]. Generating these packages is resource-intensive and unpredictable in duration, which risks timing out a synchronous HTTP request. + +**Decision:** +Implement the **Asynchronous Request-Reply Pattern**. +1. Client POSTs to `/exports` and receives `202 Accepted` + `Job-ID`. +2. [cite_start]Background workers (via Message Queue) process the PDF assembly[cite: 43]. +3. Client polls for completion or receives a webhook. + +**Rationale:** +* [cite_start]**Resilience:** Prevents blocking the main API threads, protecting the availability goal of ≥ 99.5%[cite: 25]. +* **User Experience:** Provides immediate feedback to the official instead of a loading spinner that might freeze. + +--- + +## ADR-005: Resource Naming and Structure +**Context:** +[cite_start]The API must be intuitive ("Clean API") and extensible[cite: 17]. The domain model includes "Documents", "Threads", and "Routing". + +**Decision:** +Adopt a **Document-Centric REST hierarchy**: +* [cite_start]Root: `/documents` (The core "Envelope")[cite: 2]. +* [cite_start]Sub-resource: `/documents/{id}/threads` (The communication context)[cite: 6]. +* [cite_start]**Strict Nouns:** Use `/documents` instead of `/uploadDocument`[cite: 44]. + +**Rationale:** +* [cite_start]**Alignment:** Matches the architectural definition of DVD being "belegorientiert" (document-driven), not just a generic chat app[cite: 17]. +* **Extensibility:** Allows adding new sub-resources (e.g., `/documents/{id}/audit-log` or `/documents/{id}/appeals`) without breaking the root model. + +--- + +## ADR-006: Data Retention & Deletion +**Context:** +[cite_start]Standard behavior requires data deletion after the case is closed (FR-6)[cite: 9]. [cite_start]However, users can opt for a "Personal Archive"[cite: 10, 15]. + +**Decision:** +Implement a **TTL (Time-To-Live) Engine** on the database rows. +* **Default:** `deletionDate` = `closedAt` + `gracePeriod`. +* **Archive Option:** If `personalArchive` is active, `deletionDate` is set to `null` or an extended timestamp. + +**Rationale:** +* [cite_start]**GDPR Compliance:** Ensures Privacy-by-Design (NFR-4) by automating the "Right to Erasure" / data economy principles[cite: 21]. +* **Automation:** Reduces administrative overhead for cleanup. + + +## ADR-007: Python & ProcessPoolExecutor for Backend Services +**Context:** +The implementation team utilizes Agentic Coding (LLM-driven TDD). While Go/Java offers native concurrency, Python provides superior velocity with LLMs. However, Python's GIL risks blocking the event loop during CPU-intensive tasks (Encryption, PDF merging). +**Decision:** +Implement the Service Layer in **Python (FastAPI)** with a strict **Hybrid Concurrency Pattern**: +1. **I/O (DB/Network):** Native `async`/`await`. +2. **CPU (Crypto/PDF):** Must be offloaded to a `ProcessPoolExecutor`. +**Rationale:** +* **Velocity:** Maximizes the efficiency of AI coding assistants (Claude/GPT). +* **Performance:** `ProcessPoolExecutor` bypasses the GIL, ensuring the main loop remains non-blocking for the 10k concurrent connections. +* **Ecosystem:** Access to superior Python-based libraries for potential future AI features (classification/extraction). + +xxx \ No newline at end of file diff --git a/docs/implementation_guide.md b/docs/implementation_guide.md new file mode 100644 index 0000000..253f2da --- /dev/null +++ b/docs/implementation_guide.md @@ -0,0 +1,260 @@ +# Implementation Guide: DirektVermittlungDe + +While the API Documentation explains *how to use* the API, this guide explains *how to build* it, focusing on the backend engineering challenges like encryption handling, database schema, and performance patterns defined in your architecture. + +--- + + +**Status:** Draft v1.0 +**Target Audience:** Backend Engineering Team +[cite_start]**Context:** Implements constraints from `DvdArchitektur.txt` [cite: 6] + +--- + +## 1\. Technology Stack & Setup + +[cite_start]Based on the architectural constraints[cite: 45, 46, 47], the recommended reference stack is: + + * **Service Layer:** Java (Spring Boot 3.x) or Go (Gin/Echo) for high-concurrency performance. + * **Primary Database:** PostgreSQL 15+ (Relational data for Routing/Threads). + * **Blob Storage:** S3-Compatible Storage (AWS S3 / MinIO) for encrypted PDF payloads. + * **Cache/PubSub:** Redis 7.x (Session store, Rate limiting, Async Job queues). + +### Project Structure (Bounded Contexts) + +[cite_start]Organize the codebase into modules matching the architecture[cite: 43]: + + * `dvd-intake-service`: Handles `/documents` and Metadata extraction. + * `dvd-communication-service`: Handles `/threads` and `/messages`. + * `dvd-routing-engine`: The logic component for assigning units. + * `dvd-export-worker`: Async background workers for eAkte exports. + +----- + +## 2\. Security Implementation Details + +### [cite_start]2.1 Handling "Blind" E2E Encryption [cite: 27] + +The backend **must not** attempt to decrypt the `encryptedPayload`. + + * **Ingest:** Receive the `encryptedPayload` (Base64 or binary multipart). Stream it directly to S3 Blob Storage. Do not load the full file into RAM to avoid OOM on large files. + * **Metadata:** Only persist the `metadata` JSON object to PostgreSQL for routing logic. + * **Validation:** Verify the `encryptedPayload` is a valid encrypted container (e.g., check PGP/AES headers) but treat the content as opaque. + +### [cite_start]2.2 Stateless Authentication (OAuth2) [cite: 44] + + * **Gateway Layer:** Implement a centralized API Gateway (e.g., Spring Cloud Gateway / Nginx) that validates JWT signatures (JWKS) from BundID (Citizens) and Authority-IDP (Officials). + * **Context Propagation:** Extract the `sub` (User ID) and `scope` from the JWT and pass them to downstream microservices via HTTP Headers (e.g., `X-User-Id`, `X-User-Role`). + +----- + +## 3\. Database Schema Recommendations (PostgreSQL) + +[cite_start]Map the domain objects [cite: 64] to the following relational schema. + +**Table: `documents`** + +```sql +CREATE TABLE documents ( + id UUID PRIMARY KEY, + reference_number VARCHAR(50) NOT NULL, -- "Aktenzeichen" + authority_id VARCHAR(50) NOT NULL, -- Routing target + status VARCHAR(20) DEFAULT 'RECEIVED', -- RECEIVED, ROUTED, ASSIGNED + storage_path VARCHAR(255) NOT NULL, -- S3 Key for encrypted blob + created_at TIMESTAMPTZ DEFAULT NOW(), + [cite_start]retention_date TIMESTAMPTZ -- For GDPR auto-deletion [cite: 14] +); +CREATE INDEX idx_docs_authority ON documents(authority_id, status); +``` + +**Table: `threads`** + +```sql +CREATE TABLE threads ( + id UUID PRIMARY KEY, + document_id UUID REFERENCES documents(id), + type VARCHAR(20) NOT NULL, -- CHAT, CALLBACK, APPOINTMENT + assigned_official_id VARCHAR(100), -- Nullable until claimed + last_activity_at TIMESTAMPTZ +); +``` + +**Table: `messages`** + +```sql +CREATE TABLE messages ( + id UUID PRIMARY KEY, + thread_id UUID REFERENCES threads(id), + sender_role VARCHAR(20) NOT NULL, + content_blob TEXT NOT NULL, -- Encrypted content + created_at TIMESTAMPTZ DEFAULT NOW() +); +-- Efficient Cursor Pagination: Index on (thread_id, created_at) +CREATE INDEX idx_msgs_thread_time ON messages(thread_id, created_at DESC); +``` + +----- + +## 4\. Performance & Scalability Patterns + +### [cite_start]4.1 Rate Limiting (Redis Token Bucket) [cite: 24] + +To protect against DDoS and ensure fair usage (NFR-2), implement specific limits: + + * **Citizens:** 10 requests/minute (prevent spamming threads). + * **Officials:** 1000 requests/minute (allow rapid batch processing). + +*Implementation Tip:* Use a Redis-based "Sliding Window" Lua script. Key format: `rate_limit:{user_id}`. + +### [cite_start]4.2 Caching Strategy [cite: 47] + + * **Routing Rules:** Cache `RoutingRules` in Redis for 1 hour. Invalidate immediately on Admin updates. + * **ETags:** For `GET /documents/{id}`, generate an ETag based on the `updated_at` timestamp. Return `304 Not Modified` to save bandwidth if the client has the latest version. + +----- + +## [cite_start]5. Async Export Workflow [cite: 16] + +For the `POST /exports` endpoint: + +1. **API Layer:** Validate request -\> Publish event `ExportRequested` to RabbitMQ/Redis Stream -\> Return `202 Accepted` + `jobId`. +2. **Worker:** + * Consume `ExportRequested`. + * Fetch `encryptedPayload` from S3. + * Fetch Message History from Postgres. + * *Note:* The Worker might need a special "Authority Key" to re-encrypt the package for the target eAkte system, depending on the specific crypto-concept. + * Push result to the Authority's Ingest Interface. + * Update Job Status to `COMPLETED`. + +----- + +## 6\. Definition of Done Checklist + +Before deploying to the staging environment, ensure: + + * [cite_start][ ] **Load Test:** System handles 500 concurrent document uploads without error[cite: 24]. + * [ ] **Security Audit:** Confirm no PII (Aktenzeichen) is logged in plaintext application logs. + * [cite_start][ ] **Cleanup:** The "GDPR Reaper" job is active and deletes `documents` where `retention_date < NOW()`[cite: 14]. + + +# Implementation Guide: DirektVermittlungDe + +**Status:** Draft v1.1 +**Stack:** Python / FastAPI +**Context:** Implements constraints from `DvdArchitektur.txt` and ADR-007. + +## 1. Technology Stack +* **Language:** Python 3.11+ +* **Web Framework:** FastAPI (with Uvicorn + Gunicorn) +* **Validation:** Pydantic V2 (Strict Mode) +* **Database ORM:** SQLAlchemy (Async) or Tortoise-ORM +* **Task Queue:** ARQ (Redis-based) or Celery +* **Primary DB:** PostgreSQL 15+ +* **Blob Store:** MinIO / AWS S3 + +## 2. Project Structure & Patterns +Organize the monolithic repo or microservices using "Clean Architecture": + +```text +/src + /domain # Pydantic models & Business Rules (Pure Python) + /adapters # DB, S3, External APIs + /service # Application Logic + /api # FastAPI Routes + /workers # Background Job Definitions +``` + +## 2. Project Structure & Patterns +Organize the monolithic repo or microservices using "Clean Architecture": + +```text +/src + /domain # Pydantic models & Business Rules (Pure Python) + /adapters # DB, S3, External APIs + /service # Application Logic + /api # FastAPI Routes + /workers # Background Job Definitions + +## 3. The "Hybrid Concurrency" Pattern (Critical) + +To meet NFR-1 (<300ms) and NFR-2 (10k sessions), you must not block the Event Loop. + +### 3.1 The Rule + +- NEVER use time.sleep, requests, or heavy computation (e.g., pypdf, cryptography) inside an async def. +- ALWAYS use await for I/O. +- ALWAYS use loop.run_in_executor for CPU tasks. + +### 3.2 Implementation Snippet + +```python +import asyncio +from concurrent.futures import ProcessPoolExecutor +from fastapi import APIRouter, UploadFile +import some_heavy_crypto_lib + +router = APIRouter() +# Create a dedicated pool for CPU tasks +cpu_pool = ProcessPoolExecutor(max_workers=4) + +def cpu_bound_decryption(payload: bytes) -> dict: + # This runs in a separate process, bypassing the GIL + return some_heavy_crypto_lib.decrypt_and_parse(payload) + +@router.post("/documents") +async def upload_document(file: UploadFile): + content = await file.read() # Non-blocking I/O + + # Offload CPU work to the pool + loop = asyncio.get_running_loop() + metadata = await loop.run_in_executor( + cpu_pool, + cpu_bound_decryption, + content + ) + + return {"status": "processed", "meta": metadata} +``` + +## 4. Security Implementation + +### 4.1 "Blind" Ingest + +- Stream uploads directly to S3 using aiobotocore to avoid loading 50MB PDFs into RAM. +- Do not attempt to read the encryptedPayload in the main web service process. + +### 4.2 Auth Middleware + +Use fastapi.security.OAuth2AuthorizationCodeBearer. Implement a dependency that validates the JWT signature using a cached JWKS (JSON Web Key Set) to avoid a network call on every request. + +## 5. Database Schema (SQLAlchemy Async) + +```python +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy import String, UUID, DateTime +import uuid +from datetime import datetime + +class Base(DeclarativeBase): + pass + +class Document(Base): + __tablename__ = "documents" + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + reference_number: Mapped[str] = mapped_column(String(50), index=True) + status: Mapped[str] = mapped_column(String(20), default="RECEIVED") + # … +``` + +## 6. Testing Strategy (Agentic TDD) + +- Framework: pytest + pytest-asyncio. +- Mocking: Use respx for mocking external HTTP calls (Authority Systems). +- Database: Use testcontainers-python to spin up a real Postgres for integration tests. +- Prompting the Agent: "Write an async pytest for POST /documents. Use ProcessPoolExecutor mock to verify CPU offloading." + + + +xxx + diff --git a/docs/openapi.yaml b/docs/openapi.yaml new file mode 100644 index 0000000..d638bbf --- /dev/null +++ b/docs/openapi.yaml @@ -0,0 +1,189 @@ +openapi: 3.1.0 +info: + title: DirektVermittlungDe API + version: 1.0.0 + description: | + API for efficient communication between citizens and authorities. + Key features: Document Intake, Auto-Routing, Interaction Threads. + [cite: 1, 3] + +servers: + - url: https://api.direktvermittlung.de/v1 + description: Production Server + +components: + securitySchemes: + OAuth2: + type: oauth2 + flows: + authorizationCode: + authorizationUrl: https://id.bund.de/auth + tokenUrl: https://api.direktvermittlung.de/oauth/token + scopes: + citizen:write: "Submit documents and inquiries" + official:read: "View assigned cases" + official:write: "Respond to inquiries" + + schemas: + DocumentEnvelope: + type: object + required: [metadata] + properties: + id: + type: string + format: uuid + readOnly: true + metadata: + [cite_start]description: Plaintext metadata required for the Routing Engine[cite: 8]. + type: object + required: [authorityId, docType] + properties: + authorityId: + type: string + example: "Finanzamt-München-I" + referenceNumber: + type: string + [cite_start]description: Aktenzeichen/Kassenzeichen [cite: 3] + docType: + type: string + enum: [NOTICE, COURT_ORDER, GENERAL_INQUIRY] + issuedAt: + type: string + format: date-time + encryptedPayload: + type: string + format: base64 + [cite_start]description: E2E Encrypted PDF content or scan[cite: 27]. + status: + type: string + enum: [RECEIVED, ROUTED, ASSIGNED, CLOSED] + readOnly: true + + InteractionThread: + type: object + properties: + id: + type: string + format: uuid + documentId: + type: string + format: uuid + type: + type: string + [cite_start]enum: [TEXT_CHAT, CALLBACK_REQUEST, APPOINTMENT] [cite: 10] + status: + type: string + enum: [OPEN, PENDING_OFFICIAL, PENDING_CITIZEN, RESOLVED] + + Message: + type: object + properties: + id: + type: string + senderRole: + type: string + enum: [CITIZEN, OFFICIAL] + content: + type: string + description: Encrypted message content. + timestamp: + type: string + format: date-time + +paths: + /documents: + post: + summary: Upload a new document (FR-1) + description: Creates a DocumentEnvelope. [cite_start]The Routing Engine will process the metadata asynchronously[cite: 8]. + security: + - OAuth2: [citizen:write] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DocumentEnvelope' + responses: + '201': + description: Document Created + content: + application/json: + schema: + $ref: '#/components/schemas/DocumentEnvelope' + + /documents/{id}/threads: + post: + summary: Start an interaction (FR-3) + [cite_start]description: Initiate a callback request, text inquiry, or appointment[cite: 10]. + security: + - OAuth2: [citizen:write] + parameters: + - name: id + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + type: object + required: [type] + properties: + type: + type: string + enum: [TEXT_CHAT, CALLBACK_REQUEST, APPOINTMENT] + initialMessage: + type: string + responses: + '201': + description: Thread Started + + /threads/{threadId}/messages: + get: + summary: Get message history (FR-4) + description: Uses cursor-based pagination for chat history. + parameters: + - name: threadId + in: path + required: true + schema: + type: string + - name: limit + in: query + schema: + type: integer + default: 20 + - name: before + in: query + schema: + type: string + format: date-time + responses: + '200': + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Message' + + /exports: + post: + summary: Export data to authority system (FR-7) + [cite_start]description: Triggers an async export job to the eAkte[cite: 16]. + security: + - OAuth2: [official:read] + responses: + '202': + description: Export Accepted + content: + application/json: + schema: + type: object + properties: + jobId: + type: string + status: + type: string + example: "QUEUED" \ No newline at end of file diff --git a/prototype-chatgpt5/IMPLEMENTATION_GUIDE.md b/prototype-chatgpt5/IMPLEMENTATION_GUIDE.md new file mode 100644 index 0000000..4ca3d38 --- /dev/null +++ b/prototype-chatgpt5/IMPLEMENTATION_GUIDE.md @@ -0,0 +1,59 @@ +# Implementation Guide – DirektVermittlungDe (FastAPI) + +This guide explains how this codebase implements the architectural decisions +and API specification of DirektVermittlungDe. + +## 1. Architecture Mapping + +- **Belegorientierung**: `Document` is the central aggregate. + - Domain models in `app/domain/models.py` + - ORM model in `app/adapters/orm.py::Document` +- **Interaction Threads**: `Thread` and `Message` map to interaction threads and their logs. + - Cursor-based pagination implemented in `app/service/threads_service.py::list_messages` + - The `created_at` timestamp is used as the pagination cursor. + +- **Routing Engine**: + - Implemented as an adapter in `app/adapters/routing.py` + - Operates solely on `DocumentMetadata` (plaintext) as required by the split-payload model. + +- **Asynchronous Exports**: + - `POST /exports` → `start_export()` in `app/service/exports_service.py` + - Returns `202 Accepted` with `jobId` and uses a job registry (`app/adapters/jobs.py`) + - In a production system this would publish to Redis / RabbitMQ and be processed by workers. + +## 2. Security + +- **Auth**: + - OAuth2 / JWT is abstracted in `app/adapters/auth.py`. + - In this reference implementation, we parse unverified claims; in production, validate via JWKS. + +- **Data Protection**: + - Encrypted payloads are treated as opaque strings and stored via `app/adapters/storage.py`. + - Only routing metadata is stored in PostgreSQL for server-side logic. + +- **Retention**: + - Each `Document` gets a `retention_date`, set to a grace period in the future. + - Implement a periodic cleanup job that deletes rows where `retention_date < NOW()`. + +## 3. Performance / Hybrid Concurrency + +- All endpoints are `async def` and rely on the async SQLAlchemy engine. +- CPU-heavy operations (PDF merge, crypto) must not be run inside the event loop. + - To extend this, create a `ProcessPoolExecutor` in `workers/` and call via `loop.run_in_executor`. + +## 4. Extending the System + +- **Real Routing Rules**: + - Add a `routing_rules` table and adapt `app/adapters/routing.py` to query it. +- **Real Export Workers**: + - Replace `jobs.py` with a Redis-backed queue and a worker process in `workers/exports_worker.py`. +- **Authority Integration**: + - Call the authority’s eAkte ingress API from the worker, using authority-specific keys. + +## 5. Definition of Done Checklist + +Before going to production: + +- Load-test `POST /documents` and `GET /threads/{id}/messages`. +- Verify that logs never contain Aktenzeichen or other PII. +- Verify that retention cleanup jobs work correctly on staging data. diff --git a/prototype-chatgpt5/README.md b/prototype-chatgpt5/README.md new file mode 100644 index 0000000..e604762 --- /dev/null +++ b/prototype-chatgpt5/README.md @@ -0,0 +1,52 @@ +# DirektVermittlungDe Backend (Reference Implementation) + +This repository contains a **FastAPI-based** reference implementation of the +DirektVermittlungDe (DVD) backend. + +DVD provides **document-centric communication** between citizens and authorities: +citizens upload a letter or provide an *Aktenzeichen*, the system auto-routes it to +the responsible unit and opens an interaction thread for clarification instead of +a phone-based "Schnitzeljagd". + +## Features + +- **Document Intake (`POST /documents`)** + - Split-payload model: plaintext metadata + encrypted payload. + - Auto-routing to assigned unit. + +- **Interaction Threads** + - Create a thread per document (`/documents/{id}/threads`). + - Citizen/official messages via `/threads/{threadId}/messages`. + - Cursor-based pagination for message history. + +- **Exports** + - Async export workflow (`POST /exports`) with job IDs and status polling. + +- **Security & NFR Alignment** + - OAuth2 / JWT-based auth (scopes: `citizen:write`, `official:read`, `official:write`). + - Document retention date for GDPR-friendly cleanup. + - Architected for async I/O and offloading CPU-heavy work (hybrid concurrency). + +## Quickstart + +1. **Install dependencies** + + ```bash + pip install -e . + ``` + +2. **Run DB migrations (simple metadata create)** + + ```bash + python -m app.scripts.init_db + ``` + +3. **Run the API** + + ```bash + uvicorn app.main:app --reload + ``` + +4. **Open Swagger UI** + + Visit: `http://localhost:8000/docs` diff --git a/prototype-chatgpt5/pyproject.toml b/prototype-chatgpt5/pyproject.toml new file mode 100644 index 0000000..3fe1b6e --- /dev/null +++ b/prototype-chatgpt5/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "dvd-backend" +version = "0.1.0" +description = "DirektVermittlungDe reference backend" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.30.0", + "pydantic>=2.7.0", + "SQLAlchemy>=2.0.30", + "asyncpg>=0.29.0", + "aiobotocore>=2.15.0", + "python-jose[cryptography]>=3.3.0", + "redis>=5.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", +] + +[tool.uvicorn] +factory = false +port = 8000 +host = "0.0.0.0" +app = "app.main:app" diff --git a/prototype-chatgpt5/src/app/__init__.py b/prototype-chatgpt5/src/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/adapters/__init__.py b/prototype-chatgpt5/src/app/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/adapters/auth.py b/prototype-chatgpt5/src/app/adapters/auth.py new file mode 100644 index 0000000..0fa7e24 --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/auth.py @@ -0,0 +1,43 @@ +from typing import List +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2AuthorizationCodeBearer +from jose import jwt, JWTError + +OAUTH2_SCHEME = OAuth2AuthorizationCodeBearer( + authorizationUrl="https://auth.example/authorize", + tokenUrl="https://auth.example/token", + scopes={ + "citizen:write": "Citizen can create documents and threads", + "official:read": "Official can read documents and threads", + "official:write": "Official can answer and export", + }, +) + +JWT_ISSUER = "https://auth.example" +JWT_AUDIENCE = "dvd-api" +JWT_PUBLIC_KEY = "FAKE_PUBLIC_KEY_FOR_DEMO" + +class UserContext: + def __init__(self, sub: str, scopes: List[str]): + self.sub = sub + self.scopes = scopes + + def has_scope(self, scope: str) -> bool: + return scope in self.scopes + +async def get_current_user(token: str = Depends(OAUTH2_SCHEME)) -> UserContext: + try: + payload = jwt.get_unverified_claims(token) + except JWTError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token", + ) + sub = payload.get("sub") + scopes = payload.get("scope", "").split() + if not sub: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token: no subject", + ) + return UserContext(sub=sub, scopes=scopes) diff --git a/prototype-chatgpt5/src/app/adapters/db.py b/prototype-chatgpt5/src/app/adapters/db.py new file mode 100644 index 0000000..5a0c28d --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/db.py @@ -0,0 +1,17 @@ +from typing import AsyncGenerator +from sqlalchemy.ext.asyncio import ( + AsyncSession, async_sessionmaker, create_async_engine +) +from sqlalchemy.orm import DeclarativeBase + +DATABASE_URL = "postgresql+asyncpg://dvd:dvd@localhost:5432/dvd" + +engine = create_async_engine(DATABASE_URL, echo=False, future=True) +async_session_factory = async_sessionmaker(engine, expire_on_commit=False) + +class Base(DeclarativeBase): + pass + +async def get_session() -> AsyncGenerator[AsyncSession, None]: + async with async_session_factory() as session: + yield session diff --git a/prototype-chatgpt5/src/app/adapters/jobs.py b/prototype-chatgpt5/src/app/adapters/jobs.py new file mode 100644 index 0000000..c7e7ff0 --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/jobs.py @@ -0,0 +1,29 @@ +import uuid +from typing import Dict +from datetime import datetime +from app.domain.models import ExportJobStatus + +_jobs: Dict[str, dict] = {} + +def create_export_job(case_id: str, target_system: str, include_attachments: bool) -> str: + job_id = str(uuid.uuid4()) + _jobs[job_id] = { + "jobId": job_id, + "caseId": case_id, + "targetSystem": target_system, + "includeAttachments": include_attachments, + "status": ExportJobStatus.QUEUED, + "createdAt": datetime.utcnow(), + "updatedAt": datetime.utcnow(), + } + return job_id + +def set_job_status(job_id: str, status: ExportJobStatus) -> None: + job = _jobs.get(job_id) + if not job: + return + job["status"] = status + job["updatedAt"] = datetime.utcnow() + +def get_job(job_id: str) -> dict | None: + return _jobs.get(job_id) diff --git a/prototype-chatgpt5/src/app/adapters/orm.py b/prototype-chatgpt5/src/app/adapters/orm.py new file mode 100644 index 0000000..56b609f --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/orm.py @@ -0,0 +1,86 @@ +import uuid +from datetime import datetime +from typing import Optional +from sqlalchemy import String, DateTime, ForeignKey, Enum as SAEnum, Boolean +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import mapped_column, Mapped, relationship + +from .db import Base +from app.domain.models import ThreadType, SenderRole, ExportJobStatus + +class Document(Base): + __tablename__ = "documents" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + reference_number: Mapped[str] = mapped_column(String(50), index=True) + authority_id: Mapped[str] = mapped_column(String(50), index=True) + status: Mapped[str] = mapped_column(String(20), default="RECEIVED") + assigned_unit: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) + storage_path: Mapped[str] = mapped_column(String(255)) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.utcnow + ) + retention_date: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True + ) + personal_archive: Mapped[bool] = mapped_column(Boolean, default=False) + + threads: Mapped[list["Thread"]] = relationship(back_populates="document") + +class Thread(Base): + __tablename__ = "threads" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + document_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("documents.id"), index=True + ) + type: Mapped[ThreadType] = mapped_column(SAEnum(ThreadType)) + assigned_official_id: Mapped[Optional[str]] = mapped_column( + String(100), nullable=True + ) + last_activity_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.utcnow, index=True + ) + + document: Mapped[Document] = relationship(back_populates="threads") + messages: Mapped[list["Message"]] = relationship(back_populates="thread") + +class Message(Base): + __tablename__ = "messages" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + thread_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), ForeignKey("threads.id"), index=True + ) + sender_role: Mapped[SenderRole] = mapped_column(SAEnum(SenderRole)) + content_blob: Mapped[str] = mapped_column(String) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.utcnow, index=True + ) + + thread: Mapped[Thread] = relationship(back_populates="messages") + +class ExportJob(Base): + __tablename__ = "export_jobs" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + case_id: Mapped[str] = mapped_column(String(50), index=True) + target_system: Mapped[str] = mapped_column(String(100)) + include_attachments: Mapped[bool] = mapped_column(Boolean, default=True) + status: Mapped[ExportJobStatus] = mapped_column( + SAEnum(ExportJobStatus), default=ExportJobStatus.QUEUED + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.utcnow + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.utcnow + ) diff --git a/prototype-chatgpt5/src/app/adapters/routing.py b/prototype-chatgpt5/src/app/adapters/routing.py new file mode 100644 index 0000000..46288e5 --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/routing.py @@ -0,0 +1,9 @@ +from app.domain.models import DocumentMetadata + +async def route_document(meta: DocumentMetadata) -> str: + """Very simple routing stub. + In reality, this would query a routing rules table or service. + """ + if meta.docType.upper() == "NOTICE": + return f"{meta.authorityId}-NoticeTeam" + return f"{meta.authorityId}-DefaultTeam" diff --git a/prototype-chatgpt5/src/app/adapters/storage.py b/prototype-chatgpt5/src/app/adapters/storage.py new file mode 100644 index 0000000..7d1b8a4 --- /dev/null +++ b/prototype-chatgpt5/src/app/adapters/storage.py @@ -0,0 +1,15 @@ +import uuid +from pathlib import Path + +STORAGE_ROOT = Path("data/blobstore") + +async def save_encrypted_payload(payload_b64: str) -> str: + STORAGE_ROOT.mkdir(parents=True, exist_ok=True) + key = f"{uuid.uuid4()}.blob" + path = STORAGE_ROOT / key + path.write_text(payload_b64, encoding="utf-8") + return key + +async def load_encrypted_payload(path_key: str) -> str: + path = STORAGE_ROOT / path_key + return path.read_text(encoding="utf-8") diff --git a/prototype-chatgpt5/src/app/api/__init__.py b/prototype-chatgpt5/src/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/api/dependencies.py b/prototype-chatgpt5/src/app/api/dependencies.py new file mode 100644 index 0000000..d363dd8 --- /dev/null +++ b/prototype-chatgpt5/src/app/api/dependencies.py @@ -0,0 +1,19 @@ +from fastapi import Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.adapters.db import get_session +from app.adapters.auth import get_current_user, UserContext + +async def citizen_user(user: UserContext = Depends(get_current_user)) -> UserContext: + if not user.has_scope("citizen:write"): + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") + return user + +async def official_user(user: UserContext = Depends(get_current_user)) -> UserContext: + if not (user.has_scope("official:read") or user.has_scope("official:write")): + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") + return user + +DBSessionDep = Depends(get_session) +CitizenDep = Depends(citizen_user) +OfficialDep = Depends(official_user) diff --git a/prototype-chatgpt5/src/app/api/documents.py b/prototype-chatgpt5/src/app/api/documents.py new file mode 100644 index 0000000..165ac62 --- /dev/null +++ b/prototype-chatgpt5/src/app/api/documents.py @@ -0,0 +1,38 @@ +import uuid +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.domain.models import DocumentCreateRequest, DocumentCreatedResponse +from app.service import documents_service +from app.api.dependencies import DBSessionDep, CitizenDep + +router = APIRouter(prefix="/documents", tags=["documents"]) + +@router.post("", response_model=DocumentCreatedResponse, status_code=status.HTTP_201_CREATED) +async def create_document( + payload: DocumentCreateRequest, + session: AsyncSession = DBSessionDep, + citizen = CitizenDep, +): + return await documents_service.create_document(payload, session) + +@router.get("/{document_id}", response_model=DocumentCreatedResponse) +async def get_document( + document_id: str, + session: AsyncSession = DBSessionDep, + citizen = CitizenDep, +): + try: + doc_uuid = uuid.UUID(document_id) + except ValueError: + raise HTTPException(status_code=404, detail="Document not found") + + doc = await documents_service.get_document(doc_uuid, session) + if not doc: + raise HTTPException(status_code=404, detail="Document not found") + + return DocumentCreatedResponse( + id=str(doc.id), + status=doc.status, + assignedUnit=doc.assigned_unit, + ) diff --git a/prototype-chatgpt5/src/app/api/exports.py b/prototype-chatgpt5/src/app/api/exports.py new file mode 100644 index 0000000..8800a7b --- /dev/null +++ b/prototype-chatgpt5/src/app/api/exports.py @@ -0,0 +1,31 @@ +from fastapi import APIRouter, Depends, HTTPException, status + +from app.api.dependencies import OfficialDep +from app.domain.models import ExportRequest, ExportCreatedResponse, ExportStatusResponse +from app.service import exports_service + +router = APIRouter(prefix="/exports", tags=["exports"]) + +@router.post( + "", + response_model=ExportCreatedResponse, + status_code=status.HTTP_202_ACCEPTED, +) +async def create_export( + payload: ExportRequest, + official = OfficialDep, +): + return await exports_service.start_export(payload) + +@router.get( + "/{job_id}", + response_model=ExportStatusResponse, +) +async def get_export_status( + job_id: str, + official = OfficialDep, +): + try: + return await exports_service.get_export_status(job_id) + except KeyError: + raise HTTPException(status_code=404, detail="Export job not found") diff --git a/prototype-chatgpt5/src/app/api/threads.py b/prototype-chatgpt5/src/app/api/threads.py new file mode 100644 index 0000000..6f7c57d --- /dev/null +++ b/prototype-chatgpt5/src/app/api/threads.py @@ -0,0 +1,85 @@ +import uuid +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.dependencies import DBSessionDep, CitizenDep +from app.domain.models import ( + ThreadCreateRequest, ThreadCreatedResponse, + MessageCreateRequest, MessageListResponse, SenderRole, +) +from app.service import threads_service + +router = APIRouter(tags=["threads"]) + +@router.post( + "/documents/{document_id}/threads", + response_model=ThreadCreatedResponse, + status_code=status.HTTP_201_CREATED, +) +async def create_thread_for_document( + document_id: str, + payload: ThreadCreateRequest, + session: AsyncSession = DBSessionDep, + citizen = CitizenDep, +): + try: + doc_uuid = uuid.UUID(document_id) + except ValueError: + raise HTTPException(status_code=404, detail="Document not found") + + return await threads_service.create_thread( + document_id=doc_uuid, + req=payload, + session=session, + citizen_id=citizen.sub, + ) + +@router.get( + "/threads/{thread_id}/messages", + response_model=MessageListResponse, +) +async def get_thread_messages( + thread_id: str, + limit: int = Query(20, ge=1, le=100), + before: Optional[datetime] = Query(None), + session: AsyncSession = DBSessionDep, + user = CitizenDep, +): + try: + thread_uuid = uuid.UUID(thread_id) + except ValueError: + raise HTTPException(status_code=404, detail="Thread not found") + + return await threads_service.list_messages( + thread_id=thread_uuid, + session=session, + limit=limit, + before=before, + ) + +@router.post( + "/threads/{thread_id}/messages", + response_model=MessageListResponse.__fields__["data"].annotation.__args__[0], + status_code=status.HTTP_201_CREATED, +) +async def post_thread_message( + thread_id: str, + payload: MessageCreateRequest, + session: AsyncSession = DBSessionDep, + user = CitizenDep, +): + try: + thread_uuid = uuid.UUID(thread_id) + except ValueError: + raise HTTPException(status_code=404, detail="Thread not found") + + msg = await threads_service.add_message( + thread_id=thread_uuid, + req=payload, + session=session, + sender_role=SenderRole.CITIZEN, + ) + return msg diff --git a/prototype-chatgpt5/src/app/domain/__init__.py b/prototype-chatgpt5/src/app/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/domain/models.py b/prototype-chatgpt5/src/app/domain/models.py new file mode 100644 index 0000000..af545f3 --- /dev/null +++ b/prototype-chatgpt5/src/app/domain/models.py @@ -0,0 +1,72 @@ +from datetime import datetime +from enum import Enum +from typing import Optional, List +from pydantic import BaseModel, Field + +class ThreadType(str, Enum): + TEXT_CHAT = "TEXT_CHAT" + CALLBACK_REQUEST = "CALLBACK_REQUEST" + APPOINTMENT = "APPOINTMENT" + +class SenderRole(str, Enum): + CITIZEN = "CITIZEN" + OFFICIAL = "OFFICIAL" + SYSTEM = "SYSTEM" + +class DocumentMetadata(BaseModel): + authorityId: str = Field(..., max_length=50) + referenceNumber: str = Field(..., max_length=50) + docType: str = Field(..., max_length=50) + issuedAt: datetime + +class DocumentCreateRequest(BaseModel): + metadata: DocumentMetadata + encryptedPayload: str # base64-encoded opaque blob + +class DocumentCreatedResponse(BaseModel): + id: str + status: str + assignedUnit: Optional[str] = None + +class ThreadCreateRequest(BaseModel): + type: ThreadType + initialMessage: Optional[str] = None + preferredTimeSlot: Optional[datetime] = None + +class ThreadCreatedResponse(BaseModel): + threadId: str + status: str + estimatedWaitTime: Optional[str] = None + +class MessageCreateRequest(BaseModel): + content: str # encrypted message string + +class MessageDto(BaseModel): + id: str + senderRole: SenderRole + content: str + timestamp: datetime + +class MessageListResponse(BaseModel): + data: List[MessageDto] + paging: dict + +class ExportRequest(BaseModel): + caseId: str + targetSystem: str + includeAttachments: bool = True + +class ExportJobStatus(str, Enum): + QUEUED = "QUEUED" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + +class ExportCreatedResponse(BaseModel): + jobId: str + status: ExportJobStatus + +class ExportStatusResponse(BaseModel): + jobId: str + status: ExportJobStatus + statusUrl: Optional[str] = None diff --git a/prototype-chatgpt5/src/app/main.py b/prototype-chatgpt5/src/app/main.py new file mode 100644 index 0000000..5385e32 --- /dev/null +++ b/prototype-chatgpt5/src/app/main.py @@ -0,0 +1,21 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from app.api import documents, threads, exports + +app = FastAPI( + title="DirektVermittlungDe API", + version="0.1.0", + root_path="/v1", +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(documents.router) +app.include_router(threads.router) +app.include_router(exports.router) diff --git a/prototype-chatgpt5/src/app/scripts/__init__.py b/prototype-chatgpt5/src/app/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/scripts/init_db.py b/prototype-chatgpt5/src/app/scripts/init_db.py new file mode 100644 index 0000000..30728f3 --- /dev/null +++ b/prototype-chatgpt5/src/app/scripts/init_db.py @@ -0,0 +1,9 @@ +from sqlalchemy.ext.asyncio import run_sync +from app.adapters.db import engine +from app.adapters.orm import Base + +def _create() -> None: + Base.metadata.create_all(bind=engine.sync_engine) + +if __name__ == "__main__": + run_sync(engine, _create) diff --git a/prototype-chatgpt5/src/app/service/__init__.py b/prototype-chatgpt5/src/app/service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/service/documents_service.py b/prototype-chatgpt5/src/app/service/documents_service.py new file mode 100644 index 0000000..ebd09b8 --- /dev/null +++ b/prototype-chatgpt5/src/app/service/documents_service.py @@ -0,0 +1,42 @@ +import uuid +from datetime import datetime, timedelta +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select + +from app.domain.models import DocumentCreateRequest, DocumentCreatedResponse +from app.adapters import storage, routing +from app.adapters.orm import Document + +DEFAULT_GRACE_PERIOD_DAYS = 90 + +async def create_document( + req: DocumentCreateRequest, + session: AsyncSession, +) -> DocumentCreatedResponse: + storage_key = await storage.save_encrypted_payload(req.encryptedPayload) + assigned_unit = await routing.route_document(req.metadata) + retention_date = datetime.utcnow() + timedelta(days=DEFAULT_GRACE_PERIOD_DAYS) + + doc = Document( + reference_number=req.metadata.referenceNumber, + authority_id=req.metadata.authorityId, + status="ROUTED", + assigned_unit=assigned_unit, + storage_path=storage_key, + retention_date=retention_date, + personal_archive=False, + ) + session.add(doc) + await session.commit() + await session.refresh(doc) + + return DocumentCreatedResponse( + id=str(doc.id), + status=doc.status, + assignedUnit=doc.assigned_unit, + ) + +async def get_document(doc_id: uuid.UUID, session: AsyncSession) -> Document | None: + stmt = select(Document).where(Document.id == doc_id) + res = await session.execute(stmt) + return res.scalar_one_or_none() diff --git a/prototype-chatgpt5/src/app/service/exports_service.py b/prototype-chatgpt5/src/app/service/exports_service.py new file mode 100644 index 0000000..4bfe18d --- /dev/null +++ b/prototype-chatgpt5/src/app/service/exports_service.py @@ -0,0 +1,24 @@ +from app.domain.models import ExportRequest, ExportCreatedResponse, ExportStatusResponse +from app.domain.models import ExportJobStatus +from app.adapters import jobs + +async def start_export(req: ExportRequest) -> ExportCreatedResponse: + job_id = jobs.create_export_job( + case_id=req.caseId, + target_system=req.targetSystem, + include_attachments=req.includeAttachments, + ) + return ExportCreatedResponse( + jobId=job_id, + status=ExportJobStatus.QUEUED, + ) + +async def get_export_status(job_id: str) -> ExportStatusResponse: + job = jobs.get_job(job_id) + if not job: + raise KeyError(job_id) + return ExportStatusResponse( + jobId=job["jobId"], + status=job["status"], + statusUrl=f"/exports/{job_id}", + ) diff --git a/prototype-chatgpt5/src/app/service/threads_service.py b/prototype-chatgpt5/src/app/service/threads_service.py new file mode 100644 index 0000000..35b1bef --- /dev/null +++ b/prototype-chatgpt5/src/app/service/threads_service.py @@ -0,0 +1,106 @@ +import uuid +from datetime import datetime +from typing import Optional +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, desc + +from app.domain.models import ( + ThreadCreateRequest, ThreadCreatedResponse, + MessageCreateRequest, MessageListResponse, MessageDto, + SenderRole, +) +from app.adapters.orm import Thread, Message + +async def create_thread( + document_id: uuid.UUID, + req: ThreadCreateRequest, + session: AsyncSession, + citizen_id: str, +) -> ThreadCreatedResponse: + thread = Thread( + document_id=document_id, + type=req.type, + assigned_official_id=None, + ) + session.add(thread) + await session.flush() + + if req.initialMessage: + msg = Message( + thread_id=thread.id, + sender_role=SenderRole.CITIZEN, + content_blob=req.initialMessage, + ) + session.add(msg) + + await session.commit() + await session.refresh(thread) + + return ThreadCreatedResponse( + threadId=str(thread.id), + status="PENDING_OFFICIAL", + estimatedWaitTime="4h", + ) + +async def add_message( + thread_id: uuid.UUID, + req: MessageCreateRequest, + session: AsyncSession, + sender_role: SenderRole, +) -> MessageDto: + msg = Message( + thread_id=thread_id, + sender_role=sender_role, + content_blob=req.content, + ) + session.add(msg) + + stmt_thread = select(Thread).where(Thread.id == thread_id) + res = await session.execute(stmt_thread) + thread = res.scalar_one() + thread.last_activity_at = datetime.utcnow() + + await session.commit() + await session.refresh(msg) + + return MessageDto( + id=str(msg.id), + senderRole=msg.sender_role, + content=msg.content_blob, + timestamp=msg.created_at, + ) + +async def list_messages( + thread_id: uuid.UUID, + session: AsyncSession, + limit: int = 20, + before: Optional[datetime] = None, +) -> MessageListResponse: + if before is None: + before = datetime.utcnow() + + stmt = ( + select(Message) + .where(Message.thread_id == thread_id, Message.created_at <= before) + .order_by(desc(Message.created_at)) + .limit(limit) + ) + res = await session.execute(stmt) + messages = list(res.scalars()) + + next_cursor = messages[-1].created_at.isoformat() if messages else None + + data = [ + MessageDto( + id=str(m.id), + senderRole=m.sender_role, + content=m.content_blob, + timestamp=m.created_at, + ) + for m in messages + ] + + return MessageListResponse( + data=data, + paging={"nextCursor": next_cursor}, + ) diff --git a/prototype-chatgpt5/src/app/workers/__init__.py b/prototype-chatgpt5/src/app/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-chatgpt5/src/app/workers/exports_worker.py b/prototype-chatgpt5/src/app/workers/exports_worker.py new file mode 100644 index 0000000..c3f5767 --- /dev/null +++ b/prototype-chatgpt5/src/app/workers/exports_worker.py @@ -0,0 +1,9 @@ +"""Placeholder for a real export worker. + +In production, this module would: + +- Consume export jobs from a queue (e.g. Redis, RabbitMQ). +- Load the document and attachments. +- Call the target eAkte / DMS system. +- Update the ExportJob status via `jobs.set_job_status`. +""" diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/app/__init__.py b/prototype-geminiNbt3pro/direktvermittlung_backend/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/app/database.py b/prototype-geminiNbt3pro/direktvermittlung_backend/app/database.py new file mode 100644 index 0000000..60c46ce --- /dev/null +++ b/prototype-geminiNbt3pro/direktvermittlung_backend/app/database.py @@ -0,0 +1,17 @@ +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase + +# Using SQLite for easy local testing without Docker. +# For Production, switch to PostgreSQL as per ADR-007. +DATABASE_URL = "sqlite+aiosqlite:///./dvd_local.db" +# DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dvd_db" + +engine = create_async_engine(DATABASE_URL, echo=True) +AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) + +class Base(DeclarativeBase): + pass + +async def get_db(): + async with AsyncSessionLocal() as session: + yield session \ No newline at end of file diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/app/main.py b/prototype-geminiNbt3pro/direktvermittlung_backend/app/main.py new file mode 100644 index 0000000..5d3ccec --- /dev/null +++ b/prototype-geminiNbt3pro/direktvermittlung_backend/app/main.py @@ -0,0 +1,108 @@ +import asyncio +import uuid +import time +from concurrent.futures import ProcessPoolExecutor +from contextlib import asynccontextmanager +from typing import Optional +from datetime import datetime + +from fastapi import FastAPI, Depends, HTTPException, Query +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select + +from app.database import engine, Base, get_db +from app.models import Document, InteractionThread, Message +from app.schemas import DocumentCreate, DocumentResponse, ThreadCreate, MessageListResponse, MessageResponse + +# --- Hybrid Concurrency Config (ADR-007) --- +process_pool = ProcessPoolExecutor(max_workers=4) + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialize DB (Auto-create tables for demo) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield + process_pool.shutdown() + await engine.dispose() + +app = FastAPI(title="DirektVermittlungDe API", lifespan=lifespan) + +# --- Dependencies --- +async def get_current_user(): + return {"id": "citizen-123", "role": "citizen"} + +# --- CPU Bound Logic (Isolated) --- +def cpu_bound_validation(payload_base64: str) -> bool: + '''Simulates heavy crypto validation away from the Event Loop''' + time.sleep(0.1) # Simulate CPU work + return True + +# --- Routes --- + +@app.post("/documents", response_model=DocumentResponse, status_code=201) +async def upload_document( + doc_in: DocumentCreate, + db: AsyncSession = Depends(get_db), + user = Depends(get_current_user) +): + # 1. Offload CPU heavy validation + loop = asyncio.get_running_loop() + is_valid = await loop.run_in_executor( + process_pool, + cpu_bound_validation, + doc_in.encryptedPayload + ) + if not is_valid: + raise HTTPException(status_code=400, detail="Invalid payload") + + # 2. Mock S3 Upload + s3_key = f"s3://bucket/{uuid.uuid4()}.enc" + + # 3. Create Record + new_doc = Document( + authority_id=doc_in.metadata.authorityId, + reference_number=doc_in.metadata.referenceNumber, + doc_type=doc_in.metadata.docType.value, + storage_path=s3_key, + status="ROUTED" + ) + db.add(new_doc) + await db.commit() + await db.refresh(new_doc) + + return DocumentResponse( + id=new_doc.id, + status=new_doc.status, + assignedUnit="Tax-Team-A" + ) + +@app.get("/threads/{thread_id}/messages", response_model=MessageListResponse) +async def get_messages( + thread_id: str, + limit: int = 20, + before: Optional[datetime] = Query(None), + db: AsyncSession = Depends(get_db) +): + # Cursor Pagination Logic + query = select(Message).where(Message.thread_id == uuid.UUID(thread_id)) + if before: + query = query.where(Message.timestamp < before) + + query = query.order_by(Message.timestamp.desc()).limit(limit) + result = await db.execute(query) + messages = result.scalars().all() + + next_cursor = messages[-1].timestamp if messages else None + + return MessageListResponse( + data=[ + MessageResponse( + id=m.id, + senderRole=m.sender_role, + content=m.content_blob, + timestamp=m.timestamp + ) for m in messages + ], + nextCursor=next_cursor + ) \ No newline at end of file diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/app/models.py b/prototype-geminiNbt3pro/direktvermittlung_backend/app/models.py new file mode 100644 index 0000000..62499d8 --- /dev/null +++ b/prototype-geminiNbt3pro/direktvermittlung_backend/app/models.py @@ -0,0 +1,53 @@ +import uuid +from datetime import datetime +from sqlalchemy import String, DateTime, ForeignKey, Index +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.sql import func +from app.database import Base + +class Document(Base): + ''' + Represents the DocumentEnvelope. + Implements ADR-001: Separation of Metadata and Payload. + ''' + __tablename__ = "documents" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + authority_id: Mapped[str] = mapped_column(String(50), index=True) + reference_number: Mapped[str] = mapped_column(String(50)) + doc_type: Mapped[str] = mapped_column(String(20)) + storage_path: Mapped[str] = mapped_column(String(255)) + status: Mapped[str] = mapped_column(String(20), default="RECEIVED") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + retention_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True) + + threads = relationship("InteractionThread", back_populates="document") + +class InteractionThread(Base): + __tablename__ = "threads" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + document_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("documents.id")) + type: Mapped[str] = mapped_column(String(20)) + status: Mapped[str] = mapped_column(String(20), default="OPEN") + + document = relationship("Document", back_populates="threads") + messages = relationship("Message", back_populates="thread") + +class Message(Base): + ''' + Optimized for Cursor-Based Pagination (NFR-1). + ''' + __tablename__ = "messages" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + thread_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("threads.id")) + sender_role: Mapped[str] = mapped_column(String(20)) + content_blob: Mapped[str] = mapped_column(String) + timestamp: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + thread = relationship("InteractionThread", back_populates="messages") + + __table_args__ = ( + Index('idx_msgs_thread_time', 'thread_id', 'timestamp'), + ) \ No newline at end of file diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/app/schemas.py b/prototype-geminiNbt3pro/direktvermittlung_backend/app/schemas.py new file mode 100644 index 0000000..b5f8b1d --- /dev/null +++ b/prototype-geminiNbt3pro/direktvermittlung_backend/app/schemas.py @@ -0,0 +1,45 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime +from uuid import UUID +from enum import Enum + +class DocType(str, Enum): + NOTICE = "NOTICE" + COURT_ORDER = "COURT_ORDER" + GENERAL_INQUIRY = "GENERAL_INQUIRY" + +class ThreadType(str, Enum): + TEXT_CHAT = "TEXT_CHAT" + CALLBACK_REQUEST = "CALLBACK_REQUEST" + APPOINTMENT = "APPOINTMENT" + +class MetadataInput(BaseModel): + authorityId: str = Field(..., example="Finanzamt-Muenchen-I") + referenceNumber: str = Field(..., example="123/456/789") + docType: DocType + issuedAt: datetime + +class DocumentCreate(BaseModel): + metadata: MetadataInput + encryptedPayload: str + +class ThreadCreate(BaseModel): + type: ThreadType + initialMessage: str + preferredTimeSlot: Optional[datetime] = None + +class DocumentResponse(BaseModel): + id: UUID + status: str + assignedUnit: Optional[str] = None + +class MessageResponse(BaseModel): + id: UUID + senderRole: str + content: str + timestamp: datetime + +class MessageListResponse(BaseModel): + data: List[MessageResponse] + nextCursor: Optional[datetime] = None \ No newline at end of file diff --git a/prototype-geminiNbt3pro/direktvermittlung_backend/requirements.txt b/prototype-geminiNbt3pro/direktvermittlung_backend/requirements.txt new file mode 100644 index 0000000..39f90a1 --- /dev/null +++ b/prototype-geminiNbt3pro/direktvermittlung_backend/requirements.txt @@ -0,0 +1,7 @@ +fastapi==0.109.0 +uvicorn==0.27.0 +sqlalchemy==2.0.25 +asyncpg==0.29.0 +pydantic==2.6.0 +pydantic-settings==2.1.0 +python-multipart==0.0.9 \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/.env.example.txt b/prototype-grok4.1/direktvermittlungde-backend/.env.example.txt new file mode 100644 index 0000000..c9d8011 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/.env.example.txt @@ -0,0 +1,8 @@ +DATABASE_URL=postgresql+asyncpg://user:pass@localhost/dvd_db +REDIS_URL=redis://localhost:6379 +S3_ENDPOINT=http://localhost:9000 +S3_ACCESS_KEY=minioadmin +S3_SECRET_KEY=minioadmin +S3_BUCKET=dvd-blobs +SECRET_KEY=your-super-secret-key-here +OAUTH_JWKS_URL=https://id.bund.de/.well-known/jwks.json \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/README.txt b/prototype-grok4.1/direktvermittlungde-backend/README.txt new file mode 100644 index 0000000..74a64f5 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/README.txt @@ -0,0 +1,16 @@ +# DirektVermittlungDe Backend + +## Setup +1. Clone or create folder, add files above. +2. `pip install -r requirements.txt` +3. Copy `.env.example` to `.env` and edit. +4. Run Postgres, Redis, MinIO (use Docker Compose if needed). +5. `uvicorn main:app --reload` +6. Docs: http://localhost:8000/docs +7. Tests: `pytest` + +## Run Worker +arq main.worker --watch # For background jobs + +## Docker (Optional) +Use docker-compose.yml for services. diff --git a/prototype-grok4.1/direktvermittlungde-backend/auth.py b/prototype-grok4.1/direktvermittlungde-backend/auth.py new file mode 100644 index 0000000..1ddcc5b --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/auth.py @@ -0,0 +1,31 @@ +from typing import Dict, Any, List +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2AuthorizationCodeBearer +from jose import jwt, JWTError +from .database import async_session # For dependency injection + +oauth2_scheme = OAuth2AuthorizationCodeBearer( + authorizationUrl="https://id.bund.de/auth", + tokenUrl="https://api.direktvermittlung.de/oauth/token", + scopes={"citizen:write": "Submit documents", "official:read": "View cases", "official:write": "Respond"} +) + +SECRET_KEY = "your-secret" # From env +ALGORITHM = "HS256" + +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + user_id: str = payload.get("sub") + scopes: List[str] = payload.get("scope", "").split() + role = "CITIZEN" if "citizen" in scopes else "OFFICIAL" + if user_id is None: + raise credentials_exception + return {"user_id": user_id, "role": role, "scopes": scopes} + except JWTError: + raise credentials_exception \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/database.py b/prototype-grok4.1/direktvermittlungde-backend/database.py new file mode 100644 index 0000000..28f59fa --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/database.py @@ -0,0 +1,11 @@ +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from .models import Base + +DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dvd_db" # From env + +engine = create_async_engine(DATABASE_URL, echo=True) +async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + +async def init_db(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/main.py b/prototype-grok4.1/direktvermittlungde-backend/main.py new file mode 100644 index 0000000..d80925c --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/main.py @@ -0,0 +1,132 @@ +import asyncio +from concurrent.futures import ProcessPoolExecutor +from fastapi import APIRouter, BackgroundTasks, Depends, FastAPI, File, HTTPException, Query, UploadFile, status +from fastapi.security.utils import get_authorization_scheme_param + +from .models import ( + DocumentEnvelope, ThreadType, InteractionThread, Message, ExportRequest, ExportJob, + Document, Thread, Message as DBMessage, Metadata +) +from .database import async_session, init_db +from .auth import get_current_user +from .services import route_document, rate_limit, validate_encrypted_payload, get_s3_client, redis_settings +from .workers import export_worker + +# Router +router = APIRouter() + +@router.post("/documents", response_model=DocumentEnvelope, status_code=201) +async def upload_document( + background_tasks: BackgroundTasks, + metadata: Metadata, + file: UploadFile = File(...), + current_user=Depends(get_current_user), + db: AsyncSession = Depends(async_session), + redis=Depends(redis_settings.connection) +): + if current_user["role"] != "CITIZEN": + raise HTTPException(403, "Only citizens can submit") + + await rate_limit(current_user["user_id"], redis) + + content = await file.read() + encrypted_payload = base64.b64encode(content).decode('utf-8') + + loop = asyncio.get_running_loop() + is_valid = await loop.run_in_executor(ProcessPoolExecutor(), validate_encrypted_payload, encrypted_payload) + if not is_valid: + raise HTTPException(400, "Invalid encrypted payload") + + assigned_unit = route_document(metadata) + + s3 = await get_s3_client() + storage_path = f"blobs/{uuid.uuid4()}.enc" + await s3.put_object(Bucket="dvd-blobs", Key=storage_path, Body=content) + + doc = Document( + reference_number=metadata.reference_number, + authority_id=metadata.authority_id, + doc_type=metadata.doc_type, + status="ROUTED" if assigned_unit != "Fallback-Team" else "RECEIVED", + storage_path=storage_path, + issued_at=metadata.issued_at, + retention_date=datetime.now() + timedelta(days=30) + ) + db.add(doc) + await db.commit() + await db.refresh(doc) + + background_tasks.add_task(lambda: print(f"Notify {assigned_unit} about {doc.id}")) + + return DocumentEnvelope( + id=doc.id, + metadata=metadata, + encrypted_payload=encrypted_payload, + status=doc.status, + retention_date=doc.retention_date + ) + +@router.post("/documents/{doc_id}/threads", response_model=InteractionThread, status_code=201) +async def start_thread( + doc_id: str, + thread_data: ThreadType, + current_user=Depends(get_current_user), + db: AsyncSession = Depends(async_session) +): + if current_user["role"] != "CITIZEN": + raise HTTPException(403) + + doc = await db.get(Document, doc_id) + if not doc: + raise HTTPException(404, "Document not found") + + thread = Thread(document_id=doc_id, type=thread_data.type, status="PENDING_OFFICIAL") + db.add(thread) + await db.commit() + await db.refresh(thread) + + if thread_data.initial_message: + msg = DBMessage(thread_id=thread.id, sender_role="CITIZEN", content=base64.b64encode(thread_data.initial_message.encode()).decode()) + db.add(msg) + await db.commit() + + return InteractionThread(id=thread.id, document_id=doc_id, type=thread.type, status=thread.status) + +@router.get("/threads/{thread_id}/messages", response_model=List[Message]) +async def get_messages( + thread_id: str, + limit: int = Query(default=20, ge=1, le=100), + before=None, + current_user=Depends(get_current_user), + db: AsyncSession = Depends(async_session) +): + query = db.query(DBMessage).filter(DBMessage.thread_id == thread_id).order_by(DBMessage.timestamp.desc()) + if before: + query = query.filter(DBMessage.timestamp < before) + messages = await query.limit(limit).all() + return [Message.model_validate(m) for m in reversed(messages)] + +@router.post("/exports", response_model=ExportJob, status_code=202) +async def request_export( + export_data: ExportRequest, + current_user=Depends(get_current_user), + arq=Depends(redis_settings.worker) +): + if "official" not in current_user["scopes"]: + raise HTTPException(403) + + job_id = str(uuid.uuid4()) + await arq.enqueue_job("export_worker", export_data.dict(), job_id=job_id) + return ExportJob(job_id=job_id) + +# App +app = FastAPI(title="DirektVermittlungDe API", version="1.0.0") +app.include_router(router) + +@app.on_event("startup") +async def startup(): + await init_db() + +@app.get("/") +async def root(): + return {"message": "DirektVermittlungDe API"} \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/models.py b/prototype-grok4.1/direktvermittlungde-backend/models.py new file mode 100644 index 0000000..df67ed7 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/models.py @@ -0,0 +1,90 @@ +import uuid +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Any + +from pydantic import BaseModel, Field, validator +from sqlalchemy import String, Text, func +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship +from sqlalchemy.sql import delete + +class Base(DeclarativeBase): + pass + +class Metadata(BaseModel): + authority_id: str = Field(..., description="Authority ID for routing") + reference_number: str = Field(..., description="Aktenzeichen/Kassenzeichen") + doc_type: str = Field(..., enum=["NOTICE", "COURT_ORDER", "GENERAL_INQUIRY"]) + issued_at: datetime + + @validator('doc_type') + def validate_doc_type(cls, v): + if v not in ["NOTICE", "COURT_ORDER", "GENERAL_INQUIRY"]: + raise ValueError("Invalid document type") + return v + +class DocumentEnvelope(BaseModel): + id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()), read_only=True) + metadata: Metadata + encrypted_payload: str = Field(..., description="Base64 E2E Encrypted PDF") + status: str = Field(default="RECEIVED", enum=["RECEIVED", "ROUTED", "ASSIGNED", "CLOSED"], read_only=True) + retention_date: Optional[datetime] = None + +class ThreadType(BaseModel): + type: str = Field(..., enum=["TEXT_CHAT", "CALLBACK_REQUEST", "APPOINTMENT"]) + initial_message: Optional[str] = None + +class InteractionThread(BaseModel): + id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()), read_only=True) + document_id: str + type: str = Field(..., enum=["TEXT_CHAT", "CALLBACK_REQUEST", "APPOINTMENT"]) + status: str = Field(default="OPEN", enum=["OPEN", "PENDING_OFFICIAL", "PENDING_CITIZEN", "RESOLVED"]) + assigned_official_id: Optional[str] = None + +class Message(BaseModel): + id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()), read_only=True) + thread_id: str + sender_role: str = Field(..., enum=["CITIZEN", "OFFICIAL"]) + content: str = Field(..., description="Encrypted message content") + timestamp: datetime = Field(default_factory=datetime.now) + +class ExportRequest(BaseModel): + case_id: str + target_system: str = "eAkte-Standard-V2" + include_attachments: bool = True + +class ExportJob(BaseModel): + job_id: str + status: str = "QUEUED" + +# DB Models +class Document(Base): + __tablename__ = "documents" + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + reference_number: Mapped[str] = mapped_column(String(50), index=True) + authority_id: Mapped[str] = mapped_column(String(50), index=True) + doc_type: Mapped[str] = mapped_column(String(20)) + status: Mapped[str] = mapped_column(String(20), default="RECEIVED") + storage_path: Mapped[str] = mapped_column(String(255)) # S3 key + issued_at: Mapped[datetime] + retention_date: Mapped[Optional[datetime]] + threads: relationship("Thread", back_populates="document") + +class Thread(Base): + __tablename__ = "threads" + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + document_id: Mapped[str] = mapped_column(String(36), index=True) + type: Mapped[str] = mapped_column(String(20)) + status: Mapped[str] = mapped_column(String(20), default="OPEN") + assigned_official_id: Mapped[Optional[str]] = mapped_column(String(100)) + document: relationship("Document", back_populates="threads") + messages: relationship("Message", back_populates="thread") + +class Message(Base): + __tablename__ = "messages" + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) + thread_id: Mapped[str] = mapped_column(String(36), index=True) + sender_role: Mapped[str] = mapped_column(String(20)) + content: Mapped[str] = mapped_column(Text) # Encrypted + timestamp: Mapped[datetime] = mapped_column(default=func.now()) + thread: relationship("Thread", back_populates="messages") \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/requirements.txt b/prototype-grok4.1/direktvermittlungde-backend/requirements.txt new file mode 100644 index 0000000..d3e57a7 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/requirements.txt @@ -0,0 +1,13 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +sqlalchemy[asyncio]==2.0.23 +asyncpg==0.29.0 +aioboto3==12.4.0 +arq==0.18.4 +redis==5.0.1 +pydantic[email]==2.5.0 +python-jose[cryptography]==3.3.0 +python-multipart==0.0.6 +pytest==7.4.3 +pytest-asyncio==0.21.1 +httpx==0.25.2 \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/services.py b/prototype-grok4.1/direktvermittlungde-backend/services.py new file mode 100644 index 0000000..054bca6 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/services.py @@ -0,0 +1,60 @@ +import asyncio +import base64 +import uuid +from concurrent.futures import ProcessPoolExecutor +from datetime import datetime, timedelta +from typing import Dict + +import aioboto3 +from arq.connections import RedisSettings +from fastapi import HTTPException, status, Query, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from .models import Metadata, Document, Thread, Message + +# Global executor for CPU tasks +cpu_pool = ProcessPoolExecutor(max_workers=4) + +# Redis +redis_settings = RedisSettings.from_dsn("redis://localhost") + +# S3 +async def get_s3_client(): + return aioboto3.Session().client( + 's3', + endpoint_url='http://localhost:9000', # From env + aws_access_key_id='minioadmin', + aws_secret_access_key='minioadmin' + ) + +# Routing Engine +def route_document(metadata: Metadata) -> str: + rules = { + "Finanzamt-München-I": "Steuerfestsetzung-Team-B", + "Gericht-Berlin": "Zivilabteilung" + } + return rules.get(metadata.authority_id, "Fallback-Team") + +# Rate Limiting +async def rate_limit(user_id: str, redis, limit: int = 10, window: int = 60): + key = f"rate_limit:{user_id}" + current = await redis.get(key) + if current is None: + await redis.setex(key, window, 1) + return True + count = int(current) + if count >= limit: + raise HTTPException(status_code=429, detail="Rate limit exceeded") + await redis.incr(key) + return True + +# Validate Encrypted Payload (CPU-bound) +def validate_encrypted_payload(payload_b64: str) -> bool: + payload_bytes = base64.b64decode(payload_b64) + return payload_bytes.startswith(b'\x00\x01') # Demo check + +# GDPR Reaper +async def reaper(db: AsyncSession): + stmt = delete(Document).where(Document.retention_date < datetime.now()) + await db.execute(stmt) + await db.commit() \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/test/test_documents.py b/prototype-grok4.1/direktvermittlungde-backend/test/test_documents.py new file mode 100644 index 0000000..8bffb87 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/test/test_documents.py @@ -0,0 +1,12 @@ +import pytest +from httpx import AsyncClient +from main import app # Adjust import + +@pytest.mark.asyncio +async def test_upload_document(): + async with AsyncClient(app=app, base_url="http://test") as ac: + response = await ac.post("/documents", json={ + "metadata": {"authority_id": "test", "reference_number": "123", "doc_type": "NOTICE", "issued_at": "2025-12-01T00:00:00Z"}, + "encrypted_payload": "dummy_base64" + }) + assert response.status_code == 201 \ No newline at end of file diff --git a/prototype-grok4.1/direktvermittlungde-backend/workers.py b/prototype-grok4.1/direktvermittlungde-backend/workers.py new file mode 100644 index 0000000..93ed365 --- /dev/null +++ b/prototype-grok4.1/direktvermittlungde-backend/workers.py @@ -0,0 +1,17 @@ +import asyncio +from arq import cron +from arq.connections import RedisSettings + +redis_settings = RedisSettings.from_dsn("redis://localhost") + +async def export_worker(ctx, data: Dict): + # Simulate export + print(f"Exporting {data['case_id']}") + await asyncio.sleep(5) + return {"status": "COMPLETED"} + +# Cron: Run reaper daily +@cron(redis_settings, '0 0 * * *') # Midnight daily +async def daily_reaper(ctx): + # Call reaper logic here + pass \ No newline at end of file