237 lines
7.7 KiB
Python
237 lines
7.7 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
from fastapi import Depends, FastAPI
|
||
|
|
from fastapi.testclient import TestClient
|
||
|
|
from sqlalchemy import create_engine
|
||
|
|
from sqlalchemy.orm import Session, sessionmaker
|
||
|
|
from sqlalchemy.pool import StaticPool
|
||
|
|
|
||
|
|
from app.core.access import (
|
||
|
|
INTERNAL_USER_SUBJECT,
|
||
|
|
get_user_permissions,
|
||
|
|
require_all_permissions,
|
||
|
|
require_any_permission,
|
||
|
|
require_permission,
|
||
|
|
user_has_permission,
|
||
|
|
)
|
||
|
|
from app.core.security import issue_token
|
||
|
|
from app.db.session import Base, get_db
|
||
|
|
from app.models.access import Permission, Role, User
|
||
|
|
from app.seed_access import PERMISSION_DEFINITIONS, ROLE_DEFINITIONS, SEED_USERS, seed_access
|
||
|
|
|
||
|
|
|
||
|
|
def _build_session() -> Session:
|
||
|
|
engine = create_engine(
|
||
|
|
"sqlite:///:memory:",
|
||
|
|
connect_args={"check_same_thread": False},
|
||
|
|
poolclass=StaticPool,
|
||
|
|
)
|
||
|
|
Base.metadata.create_all(bind=engine)
|
||
|
|
return sessionmaker(bind=engine, expire_on_commit=False)()
|
||
|
|
|
||
|
|
|
||
|
|
# --- Seed + permission lookup ----------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
def test_seed_creates_roles_permissions_and_users():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
db.commit()
|
||
|
|
|
||
|
|
assert {role.name for role in db.query(Role).all()} == set(ROLE_DEFINITIONS.keys())
|
||
|
|
assert {p.key for p in db.query(Permission).all()} == {key for key, _ in PERMISSION_DEFINITIONS}
|
||
|
|
assert {user.email for user in db.query(User).all()} == {entry["email"] for entry in SEED_USERS}
|
||
|
|
|
||
|
|
|
||
|
|
def test_seed_is_idempotent():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
db.commit()
|
||
|
|
seed_access(db)
|
||
|
|
db.commit()
|
||
|
|
|
||
|
|
assert db.query(User).count() == len(SEED_USERS)
|
||
|
|
assert db.query(Role).count() == len(ROLE_DEFINITIONS)
|
||
|
|
assert db.query(Permission).count() == len(PERMISSION_DEFINITIONS)
|
||
|
|
|
||
|
|
|
||
|
|
def test_admin_role_permissions_match_spec():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
|
||
|
|
admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one()
|
||
|
|
granted = get_user_permissions(admin)
|
||
|
|
|
||
|
|
assert granted == set(ROLE_DEFINITIONS["Admin"]["permissions"])
|
||
|
|
assert "manage_users" in granted
|
||
|
|
assert "manage_permissions" in granted
|
||
|
|
# Admin spec deliberately excludes edit_products / edit_mixes.
|
||
|
|
assert "edit_products" not in granted
|
||
|
|
assert "edit_mixes" not in granted
|
||
|
|
|
||
|
|
|
||
|
|
def test_operations_role_is_mix_calculator_only():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
|
||
|
|
ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one()
|
||
|
|
granted = get_user_permissions(ops)
|
||
|
|
|
||
|
|
assert granted == {"view_mix_calculator", "use_mix_calculator", "save_mix_calculator_session"}
|
||
|
|
assert not user_has_permission(ops, "edit_raw_materials")
|
||
|
|
assert not user_has_permission(ops, "view_dashboard")
|
||
|
|
assert not user_has_permission(ops, "manage_users")
|
||
|
|
|
||
|
|
|
||
|
|
def test_full_access_role_can_edit_operational_data_but_not_users():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
|
||
|
|
craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one()
|
||
|
|
granted = get_user_permissions(craig)
|
||
|
|
|
||
|
|
assert {"edit_raw_materials", "edit_products", "edit_mixes", "save_mix_calculator_session"} <= granted
|
||
|
|
assert "manage_users" not in granted
|
||
|
|
assert "manage_permissions" not in granted
|
||
|
|
|
||
|
|
|
||
|
|
def test_inactive_user_has_no_permissions():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
|
||
|
|
admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one()
|
||
|
|
admin.is_active = False
|
||
|
|
|
||
|
|
assert get_user_permissions(admin) == set()
|
||
|
|
assert not user_has_permission(admin, "view_dashboard")
|
||
|
|
|
||
|
|
|
||
|
|
def test_unknown_user_has_no_permissions():
|
||
|
|
assert get_user_permissions(None) == set()
|
||
|
|
assert not user_has_permission(None, "view_dashboard")
|
||
|
|
|
||
|
|
|
||
|
|
def test_user_without_role_has_no_permissions():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
|
||
|
|
orphan = User(email="nobody@hunterstockfeeds.com", name="Nobody", role_id=None, is_active=True)
|
||
|
|
db.add(orphan)
|
||
|
|
db.flush()
|
||
|
|
|
||
|
|
assert get_user_permissions(orphan) == set()
|
||
|
|
|
||
|
|
|
||
|
|
# --- Route-level enforcement -----------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture()
|
||
|
|
def app_and_db():
|
||
|
|
db = _build_session()
|
||
|
|
seed_access(db)
|
||
|
|
db.commit()
|
||
|
|
|
||
|
|
app = FastAPI()
|
||
|
|
|
||
|
|
def override_get_db():
|
||
|
|
yield db
|
||
|
|
|
||
|
|
app.dependency_overrides[get_db] = override_get_db
|
||
|
|
|
||
|
|
@app.get("/needs-edit-raw")
|
||
|
|
def needs_edit_raw(_: User = Depends(require_permission("edit_raw_materials"))):
|
||
|
|
return {"ok": True}
|
||
|
|
|
||
|
|
@app.get("/needs-any")
|
||
|
|
def needs_any(_: User = Depends(require_any_permission(["edit_raw_materials", "manage_users"]))):
|
||
|
|
return {"ok": True}
|
||
|
|
|
||
|
|
@app.get("/needs-all")
|
||
|
|
def needs_all(_: User = Depends(require_all_permissions(["view_raw_materials", "edit_raw_materials"]))):
|
||
|
|
return {"ok": True}
|
||
|
|
|
||
|
|
return TestClient(app), db
|
||
|
|
|
||
|
|
|
||
|
|
def _token_for(user: User) -> str:
|
||
|
|
return issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": user.id, "email": user.email})
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_allows_user_with_permission(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one()
|
||
|
|
|
||
|
|
response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(craig)}"})
|
||
|
|
assert response.status_code == 200
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_denies_user_without_permission(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one()
|
||
|
|
|
||
|
|
response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(ops)}"})
|
||
|
|
assert response.status_code == 403
|
||
|
|
assert "edit_raw_materials" in response.json()["detail"]
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_denies_inactive_user(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one()
|
||
|
|
craig.is_active = False
|
||
|
|
db.commit()
|
||
|
|
|
||
|
|
response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {_token_for(craig)}"})
|
||
|
|
assert response.status_code == 403
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_denies_missing_token(app_and_db):
|
||
|
|
client, _ = app_and_db
|
||
|
|
response = client.get("/needs-edit-raw")
|
||
|
|
assert response.status_code == 401
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_denies_token_with_wrong_subject(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one()
|
||
|
|
forged = issue_token({"sub": "client", "user_id": craig.id})
|
||
|
|
|
||
|
|
response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {forged}"})
|
||
|
|
assert response.status_code == 401
|
||
|
|
|
||
|
|
|
||
|
|
def test_route_denies_unknown_user_id(app_and_db):
|
||
|
|
client, _ = app_and_db
|
||
|
|
forged = issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": 999_999})
|
||
|
|
|
||
|
|
response = client.get("/needs-edit-raw", headers={"Authorization": f"Bearer {forged}"})
|
||
|
|
assert response.status_code == 401
|
||
|
|
|
||
|
|
|
||
|
|
def test_require_any_permission_passes_with_one_match(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
craig = db.query(User).filter_by(email="craig@hunterstockfeeds.com").one() # has edit_raw_materials
|
||
|
|
|
||
|
|
response = client.get("/needs-any", headers={"Authorization": f"Bearer {_token_for(craig)}"})
|
||
|
|
assert response.status_code == 200
|
||
|
|
|
||
|
|
|
||
|
|
def test_require_any_permission_denies_when_none_match(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one()
|
||
|
|
|
||
|
|
response = client.get("/needs-any", headers={"Authorization": f"Bearer {_token_for(ops)}"})
|
||
|
|
assert response.status_code == 403
|
||
|
|
|
||
|
|
|
||
|
|
def test_require_all_permissions(app_and_db):
|
||
|
|
client, db = app_and_db
|
||
|
|
admin = db.query(User).filter_by(email="admin@hunterstockfeeds.com").one()
|
||
|
|
ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one()
|
||
|
|
|
||
|
|
ok = client.get("/needs-all", headers={"Authorization": f"Bearer {_token_for(admin)}"})
|
||
|
|
assert ok.status_code == 200
|
||
|
|
|
||
|
|
denied = client.get("/needs-all", headers={"Authorization": f"Bearer {_token_for(ops)}"})
|
||
|
|
assert denied.status_code == 403
|