Files
data-entry-app/backend/app/core/access.py
T

198 lines
7.4 KiB
Python
Raw Normal View History

"""Database-driven role/permission access control.
This module is the single source of truth for permission enforcement on
internal Hunter Stock Feeds users. Route handlers should depend on
``require_permission`` / ``require_any_permission`` / ``require_all_permissions``
rather than checking role names. To change access later, update the database
seed (``app.seed_access``) — no route code should need to change.
Fail-closed rules enforced here:
* Missing or invalid token -> 401
* Unknown user_id -> 401
* Inactive user -> 403
* No role / missing perm -> 403
"""
from __future__ import annotations
from typing import Iterable
2026-05-10 09:46:07 +12:00
from fastapi import Depends, HTTPException, Request, status
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from app.core.security import verify_token
2026-05-10 09:46:07 +12:00
from app.core.http import CLIENT_AUTH_COOKIE, get_bearer_or_cookie_token
from app.core.security_logging import log_security_event
from app.db.session import get_db
from app.models.access import Permission, Role, User
# Subject claim used by tokens issued for internal Hunter Stock Feeds users.
# Distinct from the existing client-portal/admin tokens so the two systems
# cannot impersonate each other.
INTERNAL_USER_SUBJECT = "internal_user"
# Default tenant internal Hunter Stock Feeds users operate against. Internal
# users see the production workspace data, so they read/write the same tenant
# the costing seed populates.
INTERNAL_USER_TENANT_ID = "hunter-premium-produce"
# Maps the internal permission keys onto the legacy module/access-level shape
# the existing client-portal routes already understand. This lets the shared
# routes (raw materials, mixes, products, etc.) accept either token type
# without having to be rewritten.
_PERMISSION_TO_MODULE_LEVEL: dict[str, tuple[str, str]] = {
"view_dashboard": ("dashboard", "view"),
"view_mix_calculator": ("mix_calculator", "view"),
"use_mix_calculator": ("mix_calculator", "edit"),
"save_mix_calculator_session": ("mix_calculator", "edit"),
"view_raw_materials": ("raw_materials", "view"),
"edit_raw_materials": ("raw_materials", "edit"),
"view_products": ("products", "view"),
"edit_products": ("products", "edit"),
"view_mixes": ("mix_master", "view"),
"edit_mixes": ("mix_master", "edit"),
2026-05-31 20:19:44 +12:00
"view_throughput": ("operations_throughput", "view"),
"edit_throughput": ("operations_throughput", "edit"),
# Admin-only permissions (view_users, manage_users, manage_permissions,
# view_settings, edit_settings) are intentionally excluded — they don't
# correspond to any of the legacy module keys and remain accessible only
# via the explicit `require_permission(...)` dependency.
}
_ACCESS_LEVEL_RANK = {"none": 0, "view": 1, "edit": 2, "manage": 3}
def permissions_to_module_map(permission_keys: set[str]) -> dict[str, str]:
"""Translate internal permission keys into the legacy module→level map.
Higher levels (edit > view) win when multiple permissions map to the same
module key (e.g. ``edit_raw_materials`` overrides ``view_raw_materials``).
"""
result: dict[str, str] = {}
for key in permission_keys:
mapping = _PERMISSION_TO_MODULE_LEVEL.get(key)
if not mapping:
continue
module_key, level = mapping
if _ACCESS_LEVEL_RANK[level] > _ACCESS_LEVEL_RANK.get(result.get(module_key, "none"), 0):
result[module_key] = level
return result
def get_user_permissions(user: User | None) -> set[str]:
"""Return the set of permission keys granted to ``user``.
Returns an empty set for unknown users, inactive users, or users without
a role. This means downstream checks fail closed by default.
"""
if user is None or not user.is_active or user.role is None:
return set()
return {permission.key for permission in user.role.permissions}
def user_has_permission(user: User | None, permission_key: str) -> bool:
return permission_key in get_user_permissions(user)
def _load_user(db: Session, user_id: int) -> User | None:
return db.scalar(
select(User)
.where(User.id == user_id)
.options(selectinload(User.role).selectinload(Role.permissions))
)
def get_current_user(
2026-05-10 09:46:07 +12:00
request: Request,
db: Session = Depends(get_db),
) -> User:
"""Resolve the current internal user from the bearer token.
Raises 401 for missing/invalid tokens or unknown users, 403 for inactive
users.
"""
2026-05-10 09:46:07 +12:00
token = get_bearer_or_cookie_token(request, cookie_name=CLIENT_AUTH_COOKIE.name)
if token is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required")
2026-05-10 09:46:07 +12:00
payload = verify_token(token)
if payload.get("sub") != INTERNAL_USER_SUBJECT:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
user_id = payload.get("user_id")
if not isinstance(user_id, int):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token")
user = _load_user(db, user_id)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User no longer exists")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="User is inactive")
return user
def require_permission(permission_key: str):
"""FastAPI dependency factory enforcing a single permission key."""
def dependency(user: User = Depends(get_current_user)) -> User:
if not user_has_permission(user, permission_key):
2026-05-10 09:46:07 +12:00
log_security_event("authz.denied", role=user.role.name if user.role else None, permission=permission_key)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permission: {permission_key}",
)
return user
return dependency
def require_any_permission(permission_keys: Iterable[str]):
"""FastAPI dependency factory: pass if the user has any of the keys."""
keys = tuple(permission_keys)
def dependency(user: User = Depends(get_current_user)) -> User:
granted = get_user_permissions(user)
if not any(key in granted for key in keys):
2026-05-10 09:46:07 +12:00
log_security_event("authz.denied", role=user.role.name if user.role else None, permissions=list(keys))
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires any of: {list(keys)}",
)
return user
return dependency
def require_all_permissions(permission_keys: Iterable[str]):
"""FastAPI dependency factory: pass only if the user has every key."""
keys = tuple(permission_keys)
def dependency(user: User = Depends(get_current_user)) -> User:
granted = get_user_permissions(user)
missing = [key for key in keys if key not in granted]
if missing:
2026-05-10 09:46:07 +12:00
log_security_event("authz.denied", role=user.role.name if user.role else None, permissions=missing)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {missing}",
)
return user
return dependency
__all__ = [
"INTERNAL_USER_SUBJECT",
"Permission",
"Role",
"User",
"bearer_scheme",
"get_current_user",
"get_user_permissions",
"require_all_permissions",
"require_any_permission",
"require_permission",
"user_has_permission",
]