Files

194 lines
7.0 KiB
Python

"""Database-driven role/permission access control.
This module is the single source of truth for permission enforcement on
internal Hunter Stock Feeds users. Route handlers should depend on
``require_permission`` / ``require_any_permission`` / ``require_all_permissions``
rather than checking role names. To change access later, update the database
seed (``app.seed_access``) — no route code should need to change.
Fail-closed rules enforced here:
* Missing or invalid token -> 401
* Unknown user_id -> 401
* Inactive user -> 403
* No role / missing perm -> 403
"""
from __future__ import annotations
from typing import Iterable
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from app.core.security import verify_token
from app.db.session import get_db
from app.models.access import Permission, Role, User
bearer_scheme = HTTPBearer(auto_error=False)
# Subject claim used by tokens issued for internal Hunter Stock Feeds users.
# Distinct from the existing client-portal/admin tokens so the two systems
# cannot impersonate each other.
INTERNAL_USER_SUBJECT = "internal_user"
# Default tenant internal Hunter Stock Feeds users operate against. Internal
# users see the production workspace data, so they read/write the same tenant
# the costing seed populates.
INTERNAL_USER_TENANT_ID = "hunter-premium-produce"
# Maps the internal permission keys onto the legacy module/access-level shape
# the existing client-portal routes already understand. This lets the shared
# routes (raw materials, mixes, products, etc.) accept either token type
# without having to be rewritten.
_PERMISSION_TO_MODULE_LEVEL: dict[str, tuple[str, str]] = {
"view_dashboard": ("dashboard", "view"),
"view_mix_calculator": ("mix_calculator", "view"),
"use_mix_calculator": ("mix_calculator", "edit"),
"save_mix_calculator_session": ("mix_calculator", "edit"),
"view_raw_materials": ("raw_materials", "view"),
"edit_raw_materials": ("raw_materials", "edit"),
"view_products": ("products", "view"),
"edit_products": ("products", "edit"),
"view_mixes": ("mix_master", "view"),
"edit_mixes": ("mix_master", "edit"),
# Admin-only permissions (view_users, manage_users, manage_permissions,
# view_settings, edit_settings) are intentionally excluded — they don't
# correspond to any of the legacy module keys and remain accessible only
# via the explicit `require_permission(...)` dependency.
}
_ACCESS_LEVEL_RANK = {"none": 0, "view": 1, "edit": 2, "manage": 3}
def permissions_to_module_map(permission_keys: set[str]) -> dict[str, str]:
"""Translate internal permission keys into the legacy module→level map.
Higher levels (edit > view) win when multiple permissions map to the same
module key (e.g. ``edit_raw_materials`` overrides ``view_raw_materials``).
"""
result: dict[str, str] = {}
for key in permission_keys:
mapping = _PERMISSION_TO_MODULE_LEVEL.get(key)
if not mapping:
continue
module_key, level = mapping
if _ACCESS_LEVEL_RANK[level] > _ACCESS_LEVEL_RANK.get(result.get(module_key, "none"), 0):
result[module_key] = level
return result
def get_user_permissions(user: User | None) -> set[str]:
"""Return the set of permission keys granted to ``user``.
Returns an empty set for unknown users, inactive users, or users without
a role. This means downstream checks fail closed by default.
"""
if user is None or not user.is_active or user.role is None:
return set()
return {permission.key for permission in user.role.permissions}
def user_has_permission(user: User | None, permission_key: str) -> bool:
return permission_key in get_user_permissions(user)
def _load_user(db: Session, user_id: int) -> User | None:
return db.scalar(
select(User)
.where(User.id == user_id)
.options(selectinload(User.role).selectinload(Role.permissions))
)
def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
db: Session = Depends(get_db),
) -> User:
"""Resolve the current internal user from the bearer token.
Raises 401 for missing/invalid tokens or unknown users, 403 for inactive
users.
"""
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required")
payload = verify_token(credentials.credentials)
if payload.get("sub") != INTERNAL_USER_SUBJECT:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
user_id = payload.get("user_id")
if not isinstance(user_id, int):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
user = _load_user(db, user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User no longer exists")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is inactive")
return user
def require_permission(permission_key: str):
"""FastAPI dependency factory enforcing a single permission key."""
def dependency(user: User = Depends(get_current_user)) -> User:
if not user_has_permission(user, permission_key):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permission: {permission_key}",
)
return user
return dependency
def require_any_permission(permission_keys: Iterable[str]):
"""FastAPI dependency factory: pass if the user has any of the keys."""
keys = tuple(permission_keys)
def dependency(user: User = Depends(get_current_user)) -> User:
granted = get_user_permissions(user)
if not any(key in granted for key in keys):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires any of: {list(keys)}",
)
return user
return dependency
def require_all_permissions(permission_keys: Iterable[str]):
"""FastAPI dependency factory: pass only if the user has every key."""
keys = tuple(permission_keys)
def dependency(user: User = Depends(get_current_user)) -> User:
granted = get_user_permissions(user)
missing = [key for key in keys if key not in granted]
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {missing}",
)
return user
return dependency
__all__ = [
"INTERNAL_USER_SUBJECT",
"Permission",
"Role",
"User",
"bearer_scheme",
"get_current_user",
"get_user_permissions",
"require_all_permissions",
"require_any_permission",
"require_permission",
"user_has_permission",
]