194 lines
7.0 KiB
Python
194 lines
7.0 KiB
Python
"""Database-driven role/permission access control.
|
|
|
|
This module is the single source of truth for permission enforcement on
|
|
internal Hunter Stock Feeds users. Route handlers should depend on
|
|
``require_permission`` / ``require_any_permission`` / ``require_all_permissions``
|
|
rather than checking role names. To change access later, update the database
|
|
seed (``app.seed_access``) — no route code should need to change.
|
|
|
|
Fail-closed rules enforced here:
|
|
* Missing or invalid token -> 401
|
|
* Unknown user_id -> 401
|
|
* Inactive user -> 403
|
|
* No role / missing perm -> 403
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Iterable
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from app.core.security import verify_token
|
|
from app.db.session import get_db
|
|
from app.models.access import Permission, Role, User
|
|
|
|
|
|
bearer_scheme = HTTPBearer(auto_error=False)
|
|
|
|
# Subject claim used by tokens issued for internal Hunter Stock Feeds users.
|
|
# Distinct from the existing client-portal/admin tokens so the two systems
|
|
# cannot impersonate each other.
|
|
INTERNAL_USER_SUBJECT = "internal_user"
|
|
|
|
# Default tenant internal Hunter Stock Feeds users operate against. Internal
|
|
# users see the production workspace data, so they read/write the same tenant
|
|
# the costing seed populates.
|
|
INTERNAL_USER_TENANT_ID = "hunter-premium-produce"
|
|
|
|
# Maps the internal permission keys onto the legacy module/access-level shape
|
|
# the existing client-portal routes already understand. This lets the shared
|
|
# routes (raw materials, mixes, products, etc.) accept either token type
|
|
# without having to be rewritten.
|
|
_PERMISSION_TO_MODULE_LEVEL: dict[str, tuple[str, str]] = {
|
|
"view_dashboard": ("dashboard", "view"),
|
|
"view_mix_calculator": ("mix_calculator", "view"),
|
|
"use_mix_calculator": ("mix_calculator", "edit"),
|
|
"save_mix_calculator_session": ("mix_calculator", "edit"),
|
|
"view_raw_materials": ("raw_materials", "view"),
|
|
"edit_raw_materials": ("raw_materials", "edit"),
|
|
"view_products": ("products", "view"),
|
|
"edit_products": ("products", "edit"),
|
|
"view_mixes": ("mix_master", "view"),
|
|
"edit_mixes": ("mix_master", "edit"),
|
|
# Admin-only permissions (view_users, manage_users, manage_permissions,
|
|
# view_settings, edit_settings) are intentionally excluded — they don't
|
|
# correspond to any of the legacy module keys and remain accessible only
|
|
# via the explicit `require_permission(...)` dependency.
|
|
}
|
|
|
|
_ACCESS_LEVEL_RANK = {"none": 0, "view": 1, "edit": 2, "manage": 3}
|
|
|
|
|
|
def permissions_to_module_map(permission_keys: set[str]) -> dict[str, str]:
|
|
"""Translate internal permission keys into the legacy module→level map.
|
|
|
|
Higher levels (edit > view) win when multiple permissions map to the same
|
|
module key (e.g. ``edit_raw_materials`` overrides ``view_raw_materials``).
|
|
"""
|
|
result: dict[str, str] = {}
|
|
for key in permission_keys:
|
|
mapping = _PERMISSION_TO_MODULE_LEVEL.get(key)
|
|
if not mapping:
|
|
continue
|
|
module_key, level = mapping
|
|
if _ACCESS_LEVEL_RANK[level] > _ACCESS_LEVEL_RANK.get(result.get(module_key, "none"), 0):
|
|
result[module_key] = level
|
|
return result
|
|
|
|
|
|
def get_user_permissions(user: User | None) -> set[str]:
|
|
"""Return the set of permission keys granted to ``user``.
|
|
|
|
Returns an empty set for unknown users, inactive users, or users without
|
|
a role. This means downstream checks fail closed by default.
|
|
"""
|
|
if user is None or not user.is_active or user.role is None:
|
|
return set()
|
|
return {permission.key for permission in user.role.permissions}
|
|
|
|
|
|
def user_has_permission(user: User | None, permission_key: str) -> bool:
|
|
return permission_key in get_user_permissions(user)
|
|
|
|
|
|
def _load_user(db: Session, user_id: int) -> User | None:
|
|
return db.scalar(
|
|
select(User)
|
|
.where(User.id == user_id)
|
|
.options(selectinload(User.role).selectinload(Role.permissions))
|
|
)
|
|
|
|
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
|
|
db: Session = Depends(get_db),
|
|
) -> User:
|
|
"""Resolve the current internal user from the bearer token.
|
|
|
|
Raises 401 for missing/invalid tokens or unknown users, 403 for inactive
|
|
users.
|
|
"""
|
|
if credentials is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required")
|
|
|
|
payload = verify_token(credentials.credentials)
|
|
if payload.get("sub") != INTERNAL_USER_SUBJECT:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
|
|
|
|
user_id = payload.get("user_id")
|
|
if not isinstance(user_id, int):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
|
|
|
|
user = _load_user(db, user_id)
|
|
if user is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User no longer exists")
|
|
if not user.is_active:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is inactive")
|
|
|
|
return user
|
|
|
|
|
|
def require_permission(permission_key: str):
|
|
"""FastAPI dependency factory enforcing a single permission key."""
|
|
|
|
def dependency(user: User = Depends(get_current_user)) -> User:
|
|
if not user_has_permission(user, permission_key):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Missing required permission: {permission_key}",
|
|
)
|
|
return user
|
|
|
|
return dependency
|
|
|
|
|
|
def require_any_permission(permission_keys: Iterable[str]):
|
|
"""FastAPI dependency factory: pass if the user has any of the keys."""
|
|
keys = tuple(permission_keys)
|
|
|
|
def dependency(user: User = Depends(get_current_user)) -> User:
|
|
granted = get_user_permissions(user)
|
|
if not any(key in granted for key in keys):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Requires any of: {list(keys)}",
|
|
)
|
|
return user
|
|
|
|
return dependency
|
|
|
|
|
|
def require_all_permissions(permission_keys: Iterable[str]):
|
|
"""FastAPI dependency factory: pass only if the user has every key."""
|
|
keys = tuple(permission_keys)
|
|
|
|
def dependency(user: User = Depends(get_current_user)) -> User:
|
|
granted = get_user_permissions(user)
|
|
missing = [key for key in keys if key not in granted]
|
|
if missing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Missing required permissions: {missing}",
|
|
)
|
|
return user
|
|
|
|
return dependency
|
|
|
|
|
|
__all__ = [
|
|
"INTERNAL_USER_SUBJECT",
|
|
"Permission",
|
|
"Role",
|
|
"User",
|
|
"bearer_scheme",
|
|
"get_current_user",
|
|
"get_user_permissions",
|
|
"require_all_permissions",
|
|
"require_any_permission",
|
|
"require_permission",
|
|
"user_has_permission",
|
|
]
|