from __future__ import annotations from datetime import datetime from sqlalchemy import Select, select from sqlalchemy.orm import Session, selectinload from app.models.client_access import ( ClientAccessAuditEvent, ClientAccount, ClientFeatureAccess, ClientUser, ClientUserModulePermission, ) MODULE_CATALOG = ( ("dashboard", "Dashboard", "workspace", "Top-level operational dashboard"), ("raw_materials", "Raw Materials", "costing", "Maintain live material costs and versions"), ("mix_master", "Mix Master", "costing", "Create and maintain mix worksheets"), ("products", "Products", "pricing", "Review finished product pricing"), ("scenarios", "Scenarios", "planning", "Run scenario overrides and comparisons"), ("powerbi_export", "Power BI Export", "reporting", "Expose client access data to BI consumers"), ("client_access", "Client Access", "administration", "Manage user access, module permissions, and audit history"), ) MODULE_INDEX = { key: {"module_name": name, "module_group": group, "description": description} for key, name, group, description in MODULE_CATALOG } ACCESS_LEVEL_ORDER = {"none": 0, "view": 1, "edit": 2, "manage": 3} def client_access_query() -> Select[tuple[ClientAccount]]: return ( select(ClientAccount) .options( selectinload(ClientAccount.users).selectinload(ClientUser.module_permissions), selectinload(ClientAccount.features), selectinload(ClientAccount.audit_events), ) .order_by(ClientAccount.name) ) def list_client_accounts(db: Session) -> list[ClientAccount]: ensure_client_user_module_permissions(db) return db.scalars(client_access_query()).all() def get_client_user_by_email(db: Session, *, email: str, tenant_id: str | None = None) -> ClientUser | None: statement = select(ClientUser).where(ClientUser.email == email) if tenant_id: statement = statement.where(ClientUser.tenant_id == tenant_id) return db.scalar( statement.options(selectinload(ClientUser.module_permissions)).order_by(ClientUser.id.desc()) ) def module_access_map(user: ClientUser) -> dict[str, str]: return {permission.module_key: permission.access_level for permission in user.module_permissions} def has_access_level(access_level: str | None, minimum_level: str) -> bool: return ACCESS_LEVEL_ORDER.get(access_level or "none", 0) >= ACCESS_LEVEL_ORDER.get(minimum_level, 0) def default_access_level_for_role(role: str, module_key: str) -> str: normalized = role.strip().lower() if normalized == "superadmin": return "manage" if module_key == "client_access" else "edit" if normalized == "admin": return "edit" if module_key != "client_access" else "none" if normalized == "operator": return "edit" if module_key in {"dashboard", "raw_materials", "mix_master", "products", "scenarios"} else "none" if normalized == "viewer": return "view" if module_key in {"dashboard", "products", "powerbi_export"} else "none" return "none" def ensure_user_module_permissions(db: Session, user: ClientUser) -> bool: existing = {permission.module_key for permission in user.module_permissions} created = False for module_key, _, _, _ in MODULE_CATALOG: if module_key in existing: continue db.add( ClientUserModulePermission( tenant_id=user.tenant_id, client_account_id=user.client_account_id, client_user_id=user.id, module_key=module_key, access_level=default_access_level_for_role(user.role, module_key), ) ) created = True return created def ensure_client_user_module_permissions(db: Session) -> None: users = db.scalars(select(ClientUser).options(selectinload(ClientUser.module_permissions))).all() changed = False for user in users: changed = ensure_user_module_permissions(db, user) or changed if changed: db.commit() def serialize_client_user(user: ClientUser) -> dict: return { "id": user.id, "client_account_id": user.client_account_id, "full_name": user.full_name, "email": user.email, "role": user.role, "status": user.status, "is_new_user": user.is_new_user, "last_login_at": user.last_login_at, "created_at": user.created_at, "module_permissions": [serialize_module_permission(permission) for permission in user.module_permissions], } def serialize_client_feature(feature: ClientFeatureAccess) -> dict: return { "id": feature.id, "client_account_id": feature.client_account_id, "feature_key": feature.feature_key, "feature_name": feature.feature_name, "feature_group": feature.feature_group, "description": feature.description, "enabled": feature.enabled, "updated_at": feature.updated_at, "created_at": feature.created_at, } def serialize_module_permission(permission: ClientUserModulePermission) -> dict: module_info = MODULE_INDEX.get(permission.module_key, {}) return { "id": permission.id, "client_account_id": permission.client_account_id, "client_user_id": permission.client_user_id, "module_key": permission.module_key, "module_name": module_info.get("module_name", permission.module_key.replace("_", " ").title()), "module_group": module_info.get("module_group", "workspace"), "description": module_info.get("description"), "access_level": permission.access_level, "updated_at": permission.updated_at, "created_at": permission.created_at, } def serialize_audit_event(event: ClientAccessAuditEvent) -> dict: return { "id": event.id, "client_account_id": event.client_account_id, "actor_type": event.actor_type, "actor_name": event.actor_name, "actor_email": event.actor_email, "actor_role": event.actor_role, "action": event.action, "target_type": event.target_type, "target_id": event.target_id, "module_key": event.module_key, "summary": event.summary, "created_at": event.created_at, } def serialize_client_account(client: ClientAccount) -> dict: users = [serialize_client_user(user) for user in client.users] features = [serialize_client_feature(feature) for feature in client.features] active_users = sum(1 for user in users if user["status"] == "active") new_users = sum(1 for user in users if user["is_new_user"]) enabled_features = sum(1 for feature in features if feature["enabled"]) return { "id": client.id, "tenant_id": client.tenant_id, "name": client.name, "client_code": client.client_code, "status": client.status, "powerbi_workspace": client.powerbi_workspace, "notes": client.notes, "created_at": client.created_at, "users": users, "features": features, "active_user_count": active_users, "new_user_count": new_users, "enabled_feature_count": enabled_features, "total_feature_count": len(features), "audit_history": [serialize_audit_event(event) for event in client.audit_events[:40]], } def build_client_access_export(clients: list[ClientAccount]) -> dict: serialized_clients = [serialize_client_account(client) for client in clients] client_rows = [] user_rows = [] feature_rows = [] permission_rows = [] audit_rows = [] for client in serialized_clients: client_rows.append( { "client_id": client["id"], "tenant_id": client["tenant_id"], "client_name": client["name"], "client_code": client["client_code"], "client_status": client["status"], "powerbi_workspace": client["powerbi_workspace"], "active_user_count": client["active_user_count"], "new_user_count": client["new_user_count"], "enabled_feature_count": client["enabled_feature_count"], "total_feature_count": client["total_feature_count"], } ) for user in client["users"]: user_rows.append( { "client_id": client["id"], "client_name": client["name"], "user_id": user["id"], "full_name": user["full_name"], "email": user["email"], "role": user["role"], "status": user["status"], "is_new_user": user["is_new_user"], "last_login_at": user["last_login_at"], "created_at": user["created_at"], } ) for permission in user["module_permissions"]: permission_rows.append( { "client_id": client["id"], "client_name": client["name"], "user_id": user["id"], "user_email": user["email"], "module_key": permission["module_key"], "module_name": permission["module_name"], "module_group": permission["module_group"], "access_level": permission["access_level"], "updated_at": permission["updated_at"], } ) for feature in client["features"]: feature_rows.append( { "client_id": client["id"], "client_name": client["name"], "feature_id": feature["id"], "feature_key": feature["feature_key"], "feature_name": feature["feature_name"], "feature_group": feature["feature_group"], "enabled": feature["enabled"], "updated_at": feature["updated_at"], } ) for event in client["audit_history"]: audit_rows.append( { "client_id": client["id"], "client_name": client["name"], "event_id": event["id"], "actor_email": event["actor_email"], "actor_role": event["actor_role"], "action": event["action"], "target_type": event["target_type"], "target_id": event["target_id"], "module_key": event["module_key"], "summary": event["summary"], "created_at": event["created_at"], } ) return { "generated_at": datetime.utcnow(), "client_rows": client_rows, "user_rows": user_rows, "feature_rows": feature_rows, "permission_rows": permission_rows, "audit_rows": audit_rows, "clients": serialized_clients, } def record_audit_event( db: Session, *, tenant_id: str, client_account_id: int, actor_type: str, actor_name: str, actor_email: str, actor_role: str, action: str, target_type: str, target_id: int | None, module_key: str | None, summary: str, ) -> None: db.add( ClientAccessAuditEvent( tenant_id=tenant_id, client_account_id=client_account_id, actor_type=actor_type, actor_name=actor_name, actor_email=actor_email, actor_role=actor_role, action=action, target_type=target_type, target_id=target_id, module_key=module_key, summary=summary, ) )