from __future__ import annotations from dataclasses import dataclass from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from sqlalchemy import select from sqlalchemy.orm import Session, selectinload from app.core.security import verify_token from app.db.session import get_db from app.models.client_access import ClientFeatureAccess, ClientUser from app.services.client_access_service import has_access_level, module_access_map bearer_scheme = HTTPBearer(auto_error=False) @dataclass(frozen=True) class AuthSession: role: str email: str name: str tenant_id: str | None = None client_role: str | None = None user_id: int | None = None client_account_id: int | None = None module_permissions: dict[str, str] | None = None def get_auth_session(credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme)) -> AuthSession: if credentials is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required") payload = verify_token(credentials.credentials) return AuthSession( role=str(payload.get("role", "")), email=str(payload.get("email", "")), name=str(payload.get("name", "")), tenant_id=payload.get("tenant_id"), client_role=payload.get("client_role"), user_id=payload.get("user_id"), client_account_id=payload.get("client_account_id"), module_permissions={}, ) def require_client_session(session: AuthSession = Depends(get_auth_session)) -> AuthSession: if session.role != "client": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client access required") if not session.tenant_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client tenant is missing") if not session.user_id or not session.client_account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client user context is missing") return session def require_admin_session(session: AuthSession = Depends(get_auth_session)) -> AuthSession: if session.role != "admin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required") return session def load_current_client_user(db: Session, session: AuthSession) -> ClientUser: user = db.scalar( select(ClientUser) .where( ClientUser.id == session.user_id, ClientUser.client_account_id == session.client_account_id, ClientUser.tenant_id == session.tenant_id, ) .options(selectinload(ClientUser.module_permissions)) ) if user is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client user access is no longer valid") if user.status == "suspended": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client user access is suspended") return user def require_client_module_access(module_key: str, minimum_level: str = "view"): def dependency( session: AuthSession = Depends(require_client_session), db: Session = Depends(get_db), ) -> AuthSession: user = load_current_client_user(db, session) feature = db.scalar( select(ClientFeatureAccess).where( ClientFeatureAccess.client_account_id == user.client_account_id, ClientFeatureAccess.tenant_id == user.tenant_id, ClientFeatureAccess.feature_key == module_key, ) ) if feature is not None and not feature.enabled: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"{module_key} is disabled for this client") permissions = module_access_map(user) if not has_access_level(permissions.get(module_key), minimum_level): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"{module_key} access is not permitted") return AuthSession( role=session.role, email=session.email, name=session.name, tenant_id=session.tenant_id, client_role=user.role, user_id=user.id, client_account_id=user.client_account_id, module_permissions=permissions, ) return dependency def require_client_access_manager_session( session: AuthSession = Depends(get_auth_session), db: Session = Depends(get_db), ) -> AuthSession: if session.role == "admin": return session if session.role != "client": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Client access management requires admin or superadmin access") user = load_current_client_user(db, require_client_session(session)) permissions = module_access_map(user) if user.role != "superadmin" or not has_access_level(permissions.get("client_access"), "manage"): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Superadmin client access is required") return AuthSession( role=session.role, email=session.email, name=session.name, tenant_id=session.tenant_id, client_role=user.role, user_id=user.id, client_account_id=user.client_account_id, module_permissions=permissions, )