Files

231 lines
8.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, selectinload
from app.api.deps import AuthSession, require_client_access_manager_session
from app.db.session import get_db
from app.models.client_access import ClientAccount, ClientFeatureAccess, ClientUser, ClientUserModulePermission
from app.schemas.client_access import (
ClientAccessRead,
ClientFeatureUpdate,
ClientUserCreate,
ClientUserModulePermissionUpdate,
ClientUserUpdate,
)
from app.services.client_access_service import (
ACCESS_LEVEL_ORDER,
MODULE_INDEX,
default_access_level_for_role,
ensure_user_module_permissions,
list_client_accounts,
record_audit_event,
serialize_client_account,
)
router = APIRouter(prefix="/api/client-access", tags=["client-access"])
def _authorized_client_scope(db: Session, session: AuthSession) -> list[ClientAccount]:
clients = list_client_accounts(db)
if session.role == "admin":
return clients
return [client for client in clients if client.id == session.client_account_id]
def _get_client_or_404(db: Session, client_id: int, session: AuthSession) -> ClientAccount:
clients = _authorized_client_scope(db, session)
client = next((client for client in clients if client.id == client_id), None)
if client is None:
raise HTTPException(status_code=404, detail="Client account not found")
return client
def _get_user_or_404(db: Session, user_id: int, session: AuthSession) -> ClientUser:
user = db.scalar(
select(ClientUser)
.where(ClientUser.id == user_id)
.options(selectinload(ClientUser.module_permissions))
)
if user is None:
raise HTTPException(status_code=404, detail="Client user not found")
_get_client_or_404(db, user.client_account_id, session)
return user
def _read_client_account(db: Session, client_id: int, session: AuthSession) -> dict:
client = next((item for item in _authorized_client_scope(db, session) if item.id == client_id), None)
if client is None:
raise HTTPException(status_code=404, detail="Client account not found")
return serialize_client_account(client)
def _actor_metadata(session: AuthSession) -> dict[str, str]:
if session.role == "admin":
return {
"actor_type": "lean_admin",
"actor_name": session.name,
"actor_email": session.email,
"actor_role": "admin",
}
return {
"actor_type": "client_superadmin",
"actor_name": session.name,
"actor_email": session.email,
"actor_role": session.client_role or "client",
}
@router.get("", response_model=list[ClientAccessRead])
def get_client_access(
db: Session = Depends(get_db),
session: AuthSession = Depends(require_client_access_manager_session),
):
return [serialize_client_account(client) for client in _authorized_client_scope(db, session)]
@router.post("/users", response_model=ClientAccessRead, status_code=status.HTTP_201_CREATED)
def create_client_user(
payload: ClientUserCreate,
db: Session = Depends(get_db),
session: AuthSession = Depends(require_client_access_manager_session),
):
client = _get_client_or_404(db, payload.client_account_id, session)
user = ClientUser(tenant_id=client.tenant_id, **payload.model_dump())
db.add(user)
db.flush()
ensure_user_module_permissions(db, user)
record_audit_event(
db,
tenant_id=client.tenant_id,
client_account_id=client.id,
action="user.created",
target_type="client_user",
target_id=user.id,
module_key="client_access",
summary=f"{user.full_name} was created with the {user.role} role.",
**_actor_metadata(session),
)
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="A user with that email already exists for this client") from exc
return _read_client_account(db, payload.client_account_id, session)
@router.patch("/users/{user_id}", response_model=ClientAccessRead)
def update_client_user(
user_id: int,
payload: ClientUserUpdate,
db: Session = Depends(get_db),
session: AuthSession = Depends(require_client_access_manager_session),
):
user = _get_user_or_404(db, user_id, session)
changes = payload.model_dump(exclude_unset=True)
original_role = user.role
for field, value in changes.items():
setattr(user, field, value)
if "role" in changes and changes["role"] != original_role:
for permission in user.module_permissions:
permission.access_level = default_access_level_for_role(user.role, permission.module_key)
record_audit_event(
db,
tenant_id=user.tenant_id,
client_account_id=user.client_account_id,
action="user.updated",
target_type="client_user",
target_id=user.id,
module_key="client_access",
summary=f"{user.full_name} access was updated.",
**_actor_metadata(session),
)
try:
db.commit()
except IntegrityError as exc:
db.rollback()
raise HTTPException(status_code=409, detail="A user with that email already exists for this client") from exc
return _read_client_account(db, user.client_account_id, session)
@router.patch("/users/{user_id}/module-permissions/{module_key}", response_model=ClientAccessRead)
def update_client_user_module_permission(
user_id: int,
module_key: str,
payload: ClientUserModulePermissionUpdate,
db: Session = Depends(get_db),
session: AuthSession = Depends(require_client_access_manager_session),
):
user = _get_user_or_404(db, user_id, session)
if payload.access_level not in ACCESS_LEVEL_ORDER:
raise HTTPException(status_code=422, detail="Invalid access level")
permission = db.scalar(
select(ClientUserModulePermission).where(
ClientUserModulePermission.client_user_id == user.id,
ClientUserModulePermission.module_key == module_key,
)
)
if permission is None:
if module_key not in MODULE_INDEX:
raise HTTPException(status_code=404, detail="Module permission not found")
permission = ClientUserModulePermission(
tenant_id=user.tenant_id,
client_account_id=user.client_account_id,
client_user_id=user.id,
module_key=module_key,
access_level=default_access_level_for_role(user.role, module_key),
)
db.add(permission)
db.flush()
permission.access_level = payload.access_level
module_name = MODULE_INDEX.get(module_key, {}).get("module_name", module_key.replace("_", " ").title())
record_audit_event(
db,
tenant_id=user.tenant_id,
client_account_id=user.client_account_id,
action="module_permission.updated",
target_type="client_user_module_permission",
target_id=permission.id,
module_key=module_key,
summary=f"{user.full_name} now has {payload.access_level} access to {module_name}.",
**_actor_metadata(session),
)
db.commit()
return _read_client_account(db, user.client_account_id, session)
@router.patch("/features/{feature_id}", response_model=ClientAccessRead)
def update_client_feature(
feature_id: int,
payload: ClientFeatureUpdate,
db: Session = Depends(get_db),
session: AuthSession = Depends(require_client_access_manager_session),
):
feature = db.scalar(select(ClientFeatureAccess).where(ClientFeatureAccess.id == feature_id))
if feature is None:
raise HTTPException(status_code=404, detail="Client feature not found")
_get_client_or_404(db, feature.client_account_id, session)
feature.enabled = payload.enabled
record_audit_event(
db,
tenant_id=feature.tenant_id,
client_account_id=feature.client_account_id,
action="feature.updated",
target_type="client_feature",
target_id=feature.id,
module_key=feature.feature_key,
summary=f"{feature.feature_name} was {'enabled' if payload.enabled else 'disabled'}.",
**_actor_metadata(session),
)
db.commit()
return _read_client_account(db, feature.client_account_id, session)