from __future__ import annotations import pytest from fastapi import Depends, FastAPI from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.api.access import router as access_router from app.core.access import ( INTERNAL_USER_SUBJECT, get_user_permissions, permissions_to_module_map, require_all_permissions, require_any_permission, require_permission, user_has_permission, ) from app.core.config import settings from app.core.security import issue_token, verify_password from app.db.session import Base, get_db from app.models.access import Permission, Role, User from app.seed_access import PERMISSION_DEFINITIONS, ROLE_DEFINITIONS, SEED_USERS, seed_access def _build_session() -> Session: engine = create_engine( "sqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) return sessionmaker(bind=engine, expire_on_commit=False)() # --- Seed + permission lookup ---------------------------------------------- def test_seed_creates_roles_permissions_and_users(): db = _build_session() seed_access(db) db.commit() assert {role.name for role in db.query(Role).all()} == set(ROLE_DEFINITIONS.keys()) assert {p.key for p in db.query(Permission).all()} == {key for key, _ in PERMISSION_DEFINITIONS} assert {user.email for user in db.query(User).all()} == {entry["email"] for entry in SEED_USERS} for user in db.query(User).all(): assert user.password_hash is not None assert user.password_hash != settings.admin_password assert verify_password(settings.admin_password, user.password_hash) def test_seed_is_idempotent(): db = _build_session() seed_access(db) db.commit() seed_access(db) db.commit() assert db.query(User).count() == len(SEED_USERS) assert db.query(Role).count() == len(ROLE_DEFINITIONS) assert db.query(Permission).count() == len(PERMISSION_DEFINITIONS) def test_admin_role_permissions_match_spec(): db = _build_session() seed_access(db) admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one() granted = get_user_permissions(admin) assert granted == set(ROLE_DEFINITIONS["Admin"]["permissions"]) assert "manage_users" in granted assert "manage_permissions" in granted assert "edit_products" in granted assert "edit_mixes" in granted assert "view_scenarios" in granted assert "edit_scenarios" in granted assert "manage_client_access" in granted modules = permissions_to_module_map(granted) assert modules["products"] == "edit" assert modules["mix_master"] == "edit" assert modules["scenarios"] == "edit" assert modules["client_access"] == "manage" def test_operations_role_is_mix_calculator_and_throughput_only(): db = _build_session() seed_access(db) ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one() granted = get_user_permissions(ops) assert granted == { "view_mix_calculator", "use_mix_calculator", "save_mix_calculator_session", "view_throughput", "edit_throughput", } assert not user_has_permission(ops, "edit_raw_materials") assert not user_has_permission(ops, "view_dashboard") assert not user_has_permission(ops, "manage_users") def test_full_access_role_can_edit_operational_data_but_not_users(): db = _build_session() seed_access(db) craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() granted = get_user_permissions(craig) assert {"edit_raw_materials", "edit_products", "edit_mixes", "save_mix_calculator_session"} <= granted assert "manage_users" not in granted assert "manage_permissions" not in granted def test_lean_role_has_unrestricted_workspace_permissions(): db = _build_session() seed_access(db) lean_role = db.query(Role).filter_by(name="lean").one() user = User(email="lean@example.com", name="Lean User", role_id=lean_role.id, is_active=True) db.add(user) db.flush() granted = get_user_permissions(user) assert granted == set(ROLE_DEFINITIONS["lean"]["permissions"]) assert {key for key, _ in PERMISSION_DEFINITIONS} == granted modules = permissions_to_module_map(granted) assert modules["dashboard"] == "view" assert modules["raw_materials"] == "edit" assert modules["mix_master"] == "edit" assert modules["mix_calculator"] == "edit" assert modules["products"] == "edit" assert modules["operations_throughput"] == "edit" assert modules["scenarios"] == "edit" assert modules["client_access"] == "manage" def test_inactive_user_has_no_permissions(): db = _build_session() seed_access(db) admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one() admin.is_active = False assert get_user_permissions(admin) == set() assert not user_has_permission(admin, "view_dashboard") def test_unknown_user_has_no_permissions(): assert get_user_permissions(None) == set() assert not user_has_permission(None, "view_dashboard") def test_user_without_role_has_no_permissions(): db = _build_session() seed_access(db) orphan = User(email="nobody@hunterstockfeeds.com", name="Nobody", role_id=None, is_active=True) db.add(orphan) db.flush() assert get_user_permissions(orphan) == set() # --- Route-level enforcement ----------------------------------------------- @pytest.fixture() def app_and_db(): db = _build_session() seed_access(db) db.commit() app = FastAPI() def override_get_db(): yield db app.dependency_overrides[get_db] = override_get_db @app.get("/needs-edit-raw") def needs_edit_raw(_: User = Depends(require_permission("edit_raw_materials"))): return {"ok": True} @app.get("/needs-any") def needs_any(_: User = Depends(require_any_permission(["edit_raw_materials", "manage_users"]))): return {"ok": True} @app.get("/needs-all") def needs_all(_: User = Depends(require_all_permissions(["view_raw_materials", "edit_raw_materials"]))): return {"ok": True} return TestClient(app), db def _token_for(user: User) -> str: return issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": user.id, "email": user.email}) @pytest.fixture() def access_app_and_db(): db = _build_session() seed_access(db) db.commit() app = FastAPI() def override_get_db(): yield db app.dependency_overrides[get_db] = override_get_db app.include_router(access_router) return TestClient(app), db def test_route_allows_user_with_permission(app_and_db): client, db = app_and_db craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(craig)}"}) assert response.status_code == 200 def test_route_denies_user_without_permission(app_and_db): client, db = app_and_db ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one() response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(ops)}"}) assert response.status_code == 403 assert "edit_raw_materials" in response.json()["detail"] def test_route_denies_inactive_user(app_and_db): client, db = app_and_db craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() craig.is_active = False db.commit() response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(craig)}"}) assert response.status_code == 403 def test_route_denies_missing_token(app_and_db): client, _ = app_and_db response = client.get("/needs-edit-raw") assert response.status_code == 401 def test_route_denies_token_with_wrong_subject(app_and_db): client, db = app_and_db craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() forged = issue_token({"sub": "client", "user_id": craig.id}) response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {forged}"}) assert response.status_code == 401 def test_route_denies_unknown_user_id(app_and_db): client, _ = app_and_db forged = issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": 999_999}) response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {forged}"}) assert response.status_code == 401 def test_require_any_permission_passes_with_one_match(app_and_db): client, db = app_and_db craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() # has edit_raw_materials response = client.get("/needs-any", headers={"Authorization": f"Bearer {_token_for(craig)}"}) assert response.status_code == 200 def test_require_any_permission_denies_when_none_match(app_and_db): client, db = app_and_db ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one() response = client.get("/needs-any", headers={"Authorization": f"Bearer {_token_for(ops)}"}) assert response.status_code == 403 def test_require_all_permissions(app_and_db): client, db = app_and_db admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one() ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one() ok = client.get("/needs-all", headers={"Authorization": f"Bearer {_token_for(admin)}"}) assert ok.status_code == 200 denied = client.get("/needs-all", headers={"Authorization": f"Bearer {_token_for(ops)}"}) assert denied.status_code == 403 def test_internal_login_uses_user_password_hash(access_app_and_db): client, db = access_app_and_db admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one() admin.password_hash = issue_token({"not": "a password"}) db.commit() denied = client.post( "/api/access/login", json={"email": admin.email, "password": settings.admin_password}, ) assert denied.status_code == 401 def test_internal_user_can_change_own_password(access_app_and_db): client, db = access_app_and_db admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one() login_response = client.post( "/api/access/login", json={"email": admin.email, "password": settings.admin_password}, ) assert login_response.status_code == 200 update_response = client.patch( "/api/access/me", json={ "current_password": settings.admin_password, "new_password": "new-personal-password", }, cookies=login_response.cookies, ) assert update_response.status_code == 200 db.refresh(admin) assert admin.password_hash is not None assert verify_password("new-personal-password", admin.password_hash) assert not verify_password(settings.admin_password, admin.password_hash) old_login = client.post( "/api/access/login", json={"email": admin.email, "password": settings.admin_password}, ) assert old_login.status_code == 401 new_login = client.post( "/api/access/login", json={"email": admin.email, "password": "new-personal-password"}, ) assert new_login.status_code == 200