Files
data-entry-app/backend/app/api/access.py
T
2026-05-08 09:06:14 +12:00

233 lines
7.8 KiB
Python

"""Internal-user authentication and permission introspection routes.
Frontends should call ``GET /api/access/me`` to discover which permission keys
the current user has, then use those keys to hide/show navigation items.
**Visibility is not security** — every privileged backend route must depend on
``require_permission`` (or one of its siblings) directly.
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from app.core.access import (
INTERNAL_USER_SUBJECT,
INTERNAL_USER_TENANT_ID,
get_current_user,
get_user_permissions,
permissions_to_module_map,
require_permission,
)
from app.core.config import settings
from app.core.security import hash_password, issue_token, verify_password
from app.db.session import get_db
from app.models.access import Permission, Role, User
router = APIRouter(prefix="/api/access", tags=["access"])
class LoginRequest(BaseModel):
email: str
password: str
class UserSession(BaseModel):
# Mirrors the existing `LoginResponse` shape so the frontend's `AppSession`
# store can consume this response without a separate type. `permissions`
# is the new permission-key array; `module_permissions` is the legacy
# module→access-level map for nav gating.
user_id: int
email: str
name: str
role: str
role_name: str | None = None
is_active: bool
tenant_id: str | None = None
client_role: str | None = None
client_account_id: int | None = None
module_permissions: dict[str, str] = {}
permissions: list[str]
token: str | None = None
class RoleRead(BaseModel):
id: int
name: str
description: str | None
permissions: list[str]
class UserRead(BaseModel):
id: int
email: str
name: str
is_active: bool
role: str | None
def _serialize_session(user: User, *, include_token: bool = False) -> UserSession:
permission_set = get_user_permissions(user)
permissions = sorted(permission_set)
module_permissions = permissions_to_module_map(permission_set)
role_name = user.role.name if user.role else None
token = None
if include_token:
token = issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": user.id, "email": user.email})
# role="internal" is a marker the shared auth deps recognise so internal
# users can hit the same routes as client-portal users without being
# confused with them. Display name lives in role_name / client_role.
return UserSession(
user_id=user.id,
email=user.email,
name=user.name,
role="internal",
role_name=role_name,
is_active=user.is_active,
tenant_id=INTERNAL_USER_TENANT_ID,
client_role=role_name,
client_account_id=None,
module_permissions=module_permissions,
permissions=permissions,
token=token,
)
@router.post("/login", response_model=UserSession)
def login(payload: LoginRequest, db: Session = Depends(get_db)):
"""Internal-user login.
Authenticates against a shared internal password (``ADMIN_PASSWORD``) and
looks up the user by email. Inactive or unknown users are rejected with
a generic 401 to avoid leaking which emails are valid.
"""
if payload.password != settings.admin_password:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
email = payload.email.strip().lower()
user = db.scalar(
select(User)
.where(User.email == email)
.options(selectinload(User.role).selectinload(Role.permissions))
)
if user is None or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
return _serialize_session(user, include_token=True)
@router.get("/me", response_model=UserSession)
def read_me(user: User = Depends(get_current_user)):
"""Return the current user with permission keys for UI navigation gating."""
return _serialize_session(user)
@router.get("/me/permissions", response_model=list[str])
def read_my_permissions(user: User = Depends(get_current_user)):
return sorted(get_user_permissions(user))
class UpdateMeRequest(BaseModel):
name: str | None = None
email: str | None = None
current_password: str | None = None
new_password: str | None = None
@router.patch("/me", response_model=UserSession)
def update_me(
payload: UpdateMeRequest,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Allow an internal user to update their own name, email, or password."""
if payload.new_password:
# Require current password verification before allowing a password change.
# Users who have never set a personal password must supply the shared
# admin password as the current credential.
current_ok = (
verify_password(payload.current_password or "", user.password_hash)
if user.password_hash
else (payload.current_password or "") == settings.admin_password
)
if not current_ok:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect",
)
if len(payload.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="New password must be at least 8 characters",
)
user.password_hash = hash_password(payload.new_password)
if payload.name is not None:
name = payload.name.strip()
if not name:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Name cannot be empty")
user.name = name
if payload.email is not None:
email = payload.email.strip().lower()
if not email or "@" not in email:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid email address")
existing = db.scalar(select(User).where(User.email == email, User.id != user.id))
if existing:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email is already in use")
user.email = email
db.commit()
db.refresh(user)
return _serialize_session(user, include_token=True)
# Permission-enforced administrative endpoints. Route bodies should not check
# role names — every gate is the require_permission(...) dependency.
@router.get("/users", response_model=list[UserRead])
def list_users(
db: Session = Depends(get_db),
_: User = Depends(require_permission("view_users")), # gated by permission key
):
users = db.scalars(select(User).options(selectinload(User.role))).all()
return [
UserRead(
id=user.id,
email=user.email,
name=user.name,
is_active=user.is_active,
role=user.role.name if user.role else None,
)
for user in users
]
@router.get("/roles", response_model=list[RoleRead])
def list_roles(
db: Session = Depends(get_db),
_: User = Depends(require_permission("manage_permissions")), # gated by permission key
):
roles = db.scalars(
select(Role).options(selectinload(Role.permissions)).order_by(Role.name)
).all()
return [
RoleRead(
id=role.id,
name=role.name,
description=role.description,
permissions=sorted(p.key for p in role.permissions),
)
for role in roles
]
@router.get("/permissions", response_model=list[str])
def list_permissions(
db: Session = Depends(get_db),
_: User = Depends(require_permission("manage_permissions")), # gated by permission key
):
return sorted(p.key for p in db.scalars(select(Permission)).all())