"""Database-driven role/permission access control. This module is the single source of truth for permission enforcement on internal Hunter Stock Feeds users. Route handlers should depend on ``require_permission`` / ``require_any_permission`` / ``require_all_permissions`` rather than checking role names. To change access later, update the database seed (``app.seed_access``) — no route code should need to change. Fail-closed rules enforced here: * Missing or invalid token -> 401 * Unknown user_id -> 401 * Inactive user -> 403 * No role / missing perm -> 403 """ from __future__ import annotations from typing import Iterable from fastapi import Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.orm import Session, selectinload from app.core.security import verify_token from app.core.http import CLIENT_AUTH_COOKIE, get_bearer_or_cookie_token from app.core.security_logging import log_security_event from app.db.session import get_db from app.models.access import Permission, Role, User # Subject claim used by tokens issued for internal Hunter Stock Feeds users. # Distinct from the existing client-portal/admin tokens so the two systems # cannot impersonate each other. INTERNAL_USER_SUBJECT = "internal_user" # Default tenant internal Hunter Stock Feeds users operate against. Internal # users see the production workspace data, so they read/write the same tenant # the costing seed populates. INTERNAL_USER_TENANT_ID = "hunter-premium-produce" # Maps the internal permission keys onto the legacy module/access-level shape # the existing client-portal routes already understand. This lets the shared # routes (raw materials, mixes, products, etc.) accept either token type # without having to be rewritten. _PERMISSION_TO_MODULE_LEVEL: dict[str, tuple[str, str]] = { "view_dashboard": ("dashboard", "view"), "view_mix_calculator": ("mix_calculator", "view"), "use_mix_calculator": ("mix_calculator", "edit"), "save_mix_calculator_session": ("mix_calculator", "edit"), "view_raw_materials": ("raw_materials", "view"), "edit_raw_materials": ("raw_materials", "edit"), "view_products": ("products", "view"), "edit_products": ("products", "edit"), "view_mixes": ("mix_master", "view"), "edit_mixes": ("mix_master", "edit"), # Admin-only permissions (view_users, manage_users, manage_permissions, # view_settings, edit_settings) are intentionally excluded — they don't # correspond to any of the legacy module keys and remain accessible only # via the explicit `require_permission(...)` dependency. } _ACCESS_LEVEL_RANK = {"none": 0, "view": 1, "edit": 2, "manage": 3} def permissions_to_module_map(permission_keys: set[str]) -> dict[str, str]: """Translate internal permission keys into the legacy module→level map. Higher levels (edit > view) win when multiple permissions map to the same module key (e.g. ``edit_raw_materials`` overrides ``view_raw_materials``). """ result: dict[str, str] = {} for key in permission_keys: mapping = _PERMISSION_TO_MODULE_LEVEL.get(key) if not mapping: continue module_key, level = mapping if _ACCESS_LEVEL_RANK[level] > _ACCESS_LEVEL_RANK.get(result.get(module_key, "none"), 0): result[module_key] = level return result def get_user_permissions(user: User | None) -> set[str]: """Return the set of permission keys granted to ``user``. Returns an empty set for unknown users, inactive users, or users without a role. This means downstream checks fail closed by default. """ if user is None or not user.is_active or user.role is None: return set() return {permission.key for permission in user.role.permissions} def user_has_permission(user: User | None, permission_key: str) -> bool: return permission_key in get_user_permissions(user) def _load_user(db: Session, user_id: int) -> User | None: return db.scalar( select(User) .where(User.id == user_id) .options(selectinload(User.role).selectinload(Role.permissions)) ) def get_current_user( request: Request, db: Session = Depends(get_db), ) -> User: """Resolve the current internal user from the bearer token. Raises 401 for missing/invalid tokens or unknown users, 403 for inactive users. """ token = get_bearer_or_cookie_token(request, cookie_name=CLIENT_AUTH_COOKIE.name) if token is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required") payload = verify_token(token) if payload.get("sub") != INTERNAL_USER_SUBJECT: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token") user_id = payload.get("user_id") if not isinstance(user_id, int): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token") user = _load_user(db, user_id) if user is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User no longer exists") if not user.is_active: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is inactive") return user def require_permission(permission_key: str): """FastAPI dependency factory enforcing a single permission key.""" def dependency(user: User = Depends(get_current_user)) -> User: if not user_has_permission(user, permission_key): log_security_event("authz.denied", role=user.role.name if user.role else None, permission=permission_key) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permission: {permission_key}", ) return user return dependency def require_any_permission(permission_keys: Iterable[str]): """FastAPI dependency factory: pass if the user has any of the keys.""" keys = tuple(permission_keys) def dependency(user: User = Depends(get_current_user)) -> User: granted = get_user_permissions(user) if not any(key in granted for key in keys): log_security_event("authz.denied", role=user.role.name if user.role else None, permissions=list(keys)) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Requires any of: {list(keys)}", ) return user return dependency def require_all_permissions(permission_keys: Iterable[str]): """FastAPI dependency factory: pass only if the user has every key.""" keys = tuple(permission_keys) def dependency(user: User = Depends(get_current_user)) -> User: granted = get_user_permissions(user) missing = [key for key in keys if key not in granted] if missing: log_security_event("authz.denied", role=user.role.name if user.role else None, permissions=missing) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing required permissions: {missing}", ) return user return dependency __all__ = [ "INTERNAL_USER_SUBJECT", "Permission", "Role", "User", "bearer_scheme", "get_current_user", "get_user_permissions", "require_all_permissions", "require_any_permission", "require_permission", "user_has_permission", ]