Files

31 lines
1.3 KiB
Python

from typing import Dict, Any, List
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import jwt, JWTError
from .database import async_session # For dependency injection
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://id.bund.de/auth",
tokenUrl="https://api.direktvermittlung.de/oauth/token",
scopes={"citizen:write": "Submit documents", "official:read": "View cases", "official:write": "Respond"}
)
SECRET_KEY = "your-secret" # From env
ALGORITHM = "HS256"
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
scopes: List[str] = payload.get("scope", "").split()
role = "CITIZEN" if "citizen" in scopes else "OFFICIAL"
if user_id is None:
raise credentials_exception
return {"user_id": user_id, "role": role, "scopes": scopes}
except JWTError:
raise credentials_exception