446 lines
17 KiB
Python
446 lines
17 KiB
Python
from datetime import date
|
|
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine, inspect, text
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.core.config import settings
|
|
from app.db.migrations import bootstrap_schema, sync_tenant_ids
|
|
from app.db.session import Base
|
|
from app.main import app
|
|
from app.models.assumption import FreightCostRule, PackagingCostRule, ProcessCostRule
|
|
from app.models.client_access import ClientAccessAuditEvent, ClientAccount, ClientFeatureAccess, ClientUser
|
|
from app.schemas.mix_calculator import MixCalculatorSessionCreate
|
|
from app.models.mix import Mix, MixIngredient
|
|
from app.models.product import Product
|
|
from app.models.raw_material import RawMaterial, RawMaterialPriceVersion
|
|
from app.services.client_access_service import build_client_access_export, ensure_user_module_permissions, serialize_client_account
|
|
from app.services.costing_engine import calculate_mix_cost, calculate_product_cost, calculate_raw_material_cost, serialize_raw_material
|
|
from app.services.mix_calculator_service import calculate_mix_calculator_preview
|
|
|
|
|
|
def build_session() -> Session:
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(bind=engine)
|
|
session = sessionmaker(bind=engine, expire_on_commit=False)()
|
|
return session
|
|
|
|
|
|
def test_calculate_raw_material_cost():
|
|
raw_material = RawMaterial(name="Maize", unit_of_measure="tonne", kg_per_unit=1000, status="active")
|
|
price = RawMaterialPriceVersion(market_value=500, waste_percentage=0.02, effective_date=date(2026, 4, 1))
|
|
|
|
result = calculate_raw_material_cost(raw_material, price)
|
|
|
|
assert result.loss_cost == 10.0
|
|
assert result.cost_per_unit == 510.0
|
|
assert result.cost_per_kg == 0.51
|
|
|
|
|
|
def test_serialize_raw_material_ignores_non_positive_active_price():
|
|
raw_material = RawMaterial(name="Apple Flavouring", unit_of_measure="kg", kg_per_unit=1, status="active")
|
|
raw_material.price_versions.append(RawMaterialPriceVersion(market_value=0, waste_percentage=0, effective_date=date(2026, 4, 1), status="active"))
|
|
|
|
serialized = serialize_raw_material(raw_material)
|
|
|
|
assert serialized["current_price"] is None
|
|
|
|
|
|
def test_mix_and_product_cost_breakdown():
|
|
db = build_session()
|
|
|
|
maize = RawMaterial(name="Maize", unit_of_measure="tonne", kg_per_unit=1000, status="active")
|
|
maize.price_versions.append(RawMaterialPriceVersion(market_value=520, waste_percentage=0.02, effective_date=date(2026, 4, 1)))
|
|
barley = RawMaterial(name="Barley", unit_of_measure="tonne", kg_per_unit=1000, status="active")
|
|
barley.price_versions.append(RawMaterialPriceVersion(market_value=470, waste_percentage=0.015, effective_date=date(2026, 4, 1)))
|
|
db.add_all([maize, barley])
|
|
db.flush()
|
|
|
|
mix = Mix(client_name="Specialty Feeds", name="Pigeon Mix", status="active", version=1)
|
|
db.add(mix)
|
|
db.flush()
|
|
db.add_all(
|
|
[
|
|
MixIngredient(mix_id=mix.id, raw_material_id=maize.id, quantity_kg=180),
|
|
MixIngredient(mix_id=mix.id, raw_material_id=barley.id, quantity_kg=100),
|
|
]
|
|
)
|
|
db.add(ProcessCostRule(process_name="standard_bagging", grading_cost=0.055, bagging_cost=0.04, cracking_cost=0.0))
|
|
db.add(PackagingCostRule(sale_type="standard", unit_of_measure="20kg bag", own_bag=False, bag_cost=0.63))
|
|
db.add(FreightCostRule(sale_type="standard", unit_of_measure="20kg bag", cost_per_unit=1.45))
|
|
db.flush()
|
|
product = Product(
|
|
client_name="Specialty Feeds",
|
|
name="Specialty Pigeon Breeder 20kg",
|
|
mix_id=mix.id,
|
|
sale_type="standard",
|
|
own_bag=False,
|
|
unit_of_measure="20kg bag",
|
|
items_per_pallet=50,
|
|
bagging_process="standard_bagging",
|
|
distributor_margin=0.225,
|
|
wholesale_margin=0.18,
|
|
)
|
|
db.add(product)
|
|
db.commit()
|
|
|
|
mix_result = calculate_mix_cost(db, mix.id)
|
|
product_result = calculate_product_cost(db, product.id)
|
|
|
|
assert mix_result["total_mix_kg"] == 280
|
|
assert mix_result["mix_cost_per_kg"] == 0.5114
|
|
assert product_result["client_name"] == "Specialty Feeds"
|
|
assert product_result["mix_name"] == "Pigeon Mix"
|
|
assert product_result["finished_product_delivered"] == 14.208
|
|
assert product_result["distributor_price"] == 18.3329
|
|
assert product_result["wholesale_price"] == 17.3268
|
|
|
|
|
|
def test_mix_calculator_preview_scales_saved_mix_and_warns_on_fractional_bags():
|
|
db = build_session()
|
|
|
|
maize = RawMaterial(name="Maize", unit_of_measure="tonne", kg_per_unit=1000, status="active")
|
|
barley = RawMaterial(name="Barley", unit_of_measure="tonne", kg_per_unit=1000, status="active")
|
|
db.add_all([maize, barley])
|
|
db.flush()
|
|
|
|
mix = Mix(tenant_id="specialty-feeds", client_name="Specialty Feeds", name="Pigeon Mix", status="active", version=1)
|
|
db.add(mix)
|
|
db.flush()
|
|
db.add_all(
|
|
[
|
|
MixIngredient(tenant_id="specialty-feeds", mix_id=mix.id, raw_material_id=maize.id, quantity_kg=180),
|
|
MixIngredient(tenant_id="specialty-feeds", mix_id=mix.id, raw_material_id=barley.id, quantity_kg=100),
|
|
]
|
|
)
|
|
db.flush()
|
|
|
|
product = Product(
|
|
tenant_id="specialty-feeds",
|
|
client_name="Specialty Feeds",
|
|
name="Specialty Pigeon Breeder 20kg",
|
|
mix_id=mix.id,
|
|
sale_type="standard",
|
|
own_bag=False,
|
|
unit_of_measure="20kg bag",
|
|
items_per_pallet=50,
|
|
bagging_process="standard_bagging",
|
|
)
|
|
db.add(product)
|
|
db.commit()
|
|
|
|
preview = calculate_mix_calculator_preview(
|
|
db,
|
|
tenant_id="specialty-feeds",
|
|
payload=MixCalculatorSessionCreate(
|
|
mix_date=date(2026, 4, 29),
|
|
client_name="Specialty Feeds",
|
|
product_id=product.id,
|
|
batch_size_kg=550,
|
|
prepared_by_name="Shift A",
|
|
notes="Mid-morning run",
|
|
),
|
|
)
|
|
|
|
assert preview["batch_size_kg"] == 550
|
|
assert preview["total_bags"] == 27.5
|
|
assert preview["lines"][0]["required_kg"] == 353.5714
|
|
assert preview["lines"][1]["required_kg"] == 196.4286
|
|
assert len(preview["warnings"]) == 1
|
|
assert "not a whole-bag quantity" in preview["warnings"][0]
|
|
|
|
|
|
def test_root_and_login_endpoints():
|
|
with TestClient(app) as client:
|
|
root_response = client.get("/")
|
|
assert root_response.status_code == 200
|
|
assert root_response.json()["endpoints"]["client_login"] == "/api/auth/client/login"
|
|
assert root_response.json()["endpoints"]["admin_login"] == "/api/auth/admin/login"
|
|
|
|
client_login_response = client.post(
|
|
"/api/auth/client/login",
|
|
json={"email": settings.client_email, "password": settings.client_password},
|
|
)
|
|
assert client_login_response.status_code == 200
|
|
assert client_login_response.json()["email"] == settings.client_email
|
|
assert client_login_response.json()["tenant_id"] == settings.client_tenant_id
|
|
assert client_login_response.json()["client_role"] == "superadmin"
|
|
assert client_login_response.json()["module_permissions"]["client_access"] == "manage"
|
|
|
|
admin_login_response = client.post(
|
|
"/api/auth/admin/login",
|
|
json={"email": settings.admin_email, "password": settings.admin_password},
|
|
)
|
|
assert admin_login_response.status_code == 200
|
|
assert admin_login_response.json()["email"] == settings.admin_email
|
|
|
|
|
|
def test_client_access_export_helpers():
|
|
db = build_session()
|
|
|
|
client = ClientAccount(
|
|
tenant_id="specialty-feeds",
|
|
name="Specialty Feeds",
|
|
client_code="SPEC",
|
|
status="active",
|
|
powerbi_workspace="farm-ops-prod",
|
|
)
|
|
client.users.extend(
|
|
[
|
|
ClientUser(
|
|
full_name="Amelia Hart",
|
|
email="amelia.hart@specialtyfeeds.example",
|
|
role="superadmin",
|
|
status="active",
|
|
is_new_user=False,
|
|
),
|
|
ClientUser(
|
|
full_name="Ethan Cole",
|
|
email="ethan.cole@specialtyfeeds.example",
|
|
role="operator",
|
|
status="invited",
|
|
is_new_user=True,
|
|
),
|
|
]
|
|
)
|
|
client.features.extend(
|
|
[
|
|
ClientFeatureAccess(
|
|
feature_key="dashboard",
|
|
feature_name="Dashboard",
|
|
feature_group="workspace",
|
|
enabled=True,
|
|
),
|
|
ClientFeatureAccess(
|
|
feature_key="products",
|
|
feature_name="Products",
|
|
feature_group="pricing",
|
|
enabled=False,
|
|
),
|
|
]
|
|
)
|
|
db.add(client)
|
|
db.flush()
|
|
for user in client.users:
|
|
ensure_user_module_permissions(db, user)
|
|
client.audit_events.append(
|
|
ClientAccessAuditEvent(
|
|
tenant_id=client.tenant_id,
|
|
actor_type="lean_admin",
|
|
actor_name="Lean 101",
|
|
actor_email="admin@lean101.local",
|
|
actor_role="admin",
|
|
action="client_access.seeded",
|
|
target_type="client_account",
|
|
target_id=client.id,
|
|
module_key="client_access",
|
|
summary="Initial client access controls were seeded.",
|
|
)
|
|
)
|
|
db.commit()
|
|
db.refresh(client)
|
|
|
|
serialized = serialize_client_account(client)
|
|
export = build_client_access_export([client])
|
|
|
|
assert serialized["active_user_count"] == 1
|
|
assert serialized["new_user_count"] == 1
|
|
assert serialized["enabled_feature_count"] == 1
|
|
assert export["client_rows"][0]["client_code"] == "SPEC"
|
|
assert export["user_rows"][0]["client_name"] == "Specialty Feeds"
|
|
assert len(export["feature_rows"]) == 2
|
|
assert len(export["permission_rows"]) >= 1
|
|
assert len(export["audit_rows"]) == 1
|
|
|
|
|
|
def test_client_access_endpoints():
|
|
with TestClient(app) as client:
|
|
login_response = client.post(
|
|
"/api/auth/admin/login",
|
|
json={"email": settings.admin_email, "password": settings.admin_password},
|
|
)
|
|
token = login_response.json()["token"]
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
access_response = client.get("/api/client-access", headers=headers)
|
|
assert access_response.status_code == 200
|
|
assert len(access_response.json()) >= 1
|
|
assert "audit_history" in access_response.json()[0]
|
|
assert "module_permissions" in access_response.json()[0]["users"][0]
|
|
|
|
export_response = client.get("/api/powerbi/client-access", headers=headers)
|
|
assert export_response.status_code == 200
|
|
assert "client_rows" in export_response.json()
|
|
assert "permission_rows" in export_response.json()
|
|
|
|
client_login_response = client.post(
|
|
"/api/auth/client/login",
|
|
json={"email": settings.client_email, "password": settings.client_password},
|
|
)
|
|
client_headers = {"Authorization": f"Bearer {client_login_response.json()['token']}"}
|
|
superadmin_access_response = client.get("/api/client-access", headers=client_headers)
|
|
assert superadmin_access_response.status_code == 200
|
|
assert len(superadmin_access_response.json()) == 1
|
|
|
|
|
|
def test_mix_calculator_endpoints_respect_owner_visibility():
|
|
with TestClient(app) as client:
|
|
superadmin_login = client.post(
|
|
"/api/auth/client/login",
|
|
json={"email": settings.client_email, "password": settings.client_password},
|
|
)
|
|
assert superadmin_login.status_code == 200
|
|
superadmin_headers = {"Authorization": f"Bearer {superadmin_login.json()['token']}"}
|
|
|
|
options_response = client.get("/api/mix-calculator/options", headers=superadmin_headers)
|
|
assert options_response.status_code == 200
|
|
options_payload = options_response.json()
|
|
assert len(options_payload["products"]) >= 100
|
|
seeded_product = next(product for product in options_payload["products"] if product["product_name"] == "Specialty Pigeon Breeder 20kg")
|
|
assert seeded_product["unit_size_kg"] == 20
|
|
|
|
create_response = client.post(
|
|
"/api/mix-calculator",
|
|
json={
|
|
"mix_date": "2026-04-29",
|
|
"client_name": seeded_product["client_name"],
|
|
"product_id": seeded_product["product_id"],
|
|
"batch_size_kg": 560,
|
|
"prepared_by_name": "Amelia Hart",
|
|
"notes": "Morning production run",
|
|
},
|
|
headers=superadmin_headers,
|
|
)
|
|
assert create_response.status_code == 201
|
|
created = create_response.json()
|
|
assert created["product_name"] == seeded_product["product_name"]
|
|
assert created["session_number"].startswith("HPP-20260429-")
|
|
assert created["total_bags"] == 28
|
|
assert len(created["lines"]) > 0
|
|
assert created["lines"][0]["required_kg"] > 0
|
|
|
|
patch_response = client.patch(
|
|
f"/api/mix-calculator/{created['id']}",
|
|
json={"batch_size_kg": 550},
|
|
headers=superadmin_headers,
|
|
)
|
|
assert patch_response.status_code == 200
|
|
assert patch_response.json()["total_bags"] == 27.5
|
|
assert len(patch_response.json()["warnings"]) == 1
|
|
|
|
operator_login = client.post(
|
|
"/api/auth/client/login",
|
|
json={"email": "ethan.cole@hunterpremiumproduce.example", "password": settings.client_password},
|
|
)
|
|
assert operator_login.status_code == 200
|
|
operator_headers = {"Authorization": f"Bearer {operator_login.json()['token']}"}
|
|
|
|
operator_list_response = client.get("/api/mix-calculator", headers=operator_headers)
|
|
assert operator_list_response.status_code == 200
|
|
assert operator_list_response.json() == []
|
|
|
|
operator_detail_response = client.get(f"/api/mix-calculator/{created['id']}", headers=operator_headers)
|
|
assert operator_detail_response.status_code == 404
|
|
|
|
|
|
def test_module_permission_blocks_client_module_access():
|
|
with TestClient(app) as client:
|
|
admin_login_response = client.post(
|
|
"/api/auth/admin/login",
|
|
json={"email": settings.admin_email, "password": settings.admin_password},
|
|
)
|
|
admin_headers = {"Authorization": f"Bearer {admin_login_response.json()['token']}"}
|
|
access_response = client.get("/api/client-access", headers=admin_headers)
|
|
first_client = access_response.json()[0]
|
|
first_user = next(user for user in first_client["users"] if user["email"] == settings.client_email)
|
|
|
|
permission = next(
|
|
permission for permission in first_user["module_permissions"] if permission["module_key"] == "raw_materials"
|
|
)
|
|
client.patch(
|
|
f"/api/client-access/users/{first_user['id']}/module-permissions/{permission['module_key']}",
|
|
json={"access_level": "none"},
|
|
headers=admin_headers,
|
|
)
|
|
|
|
client_login_response = client.post(
|
|
"/api/auth/client/login",
|
|
json={"email": settings.client_email, "password": settings.client_password},
|
|
)
|
|
client_headers = {"Authorization": f"Bearer {client_login_response.json()['token']}"}
|
|
raw_materials_response = client.get("/api/raw-materials", headers=client_headers)
|
|
|
|
assert raw_materials_response.status_code == 403
|
|
|
|
|
|
def test_bootstrap_schema_creates_missing_tables_and_patches_legacy_tenant_columns():
|
|
engine = create_engine(
|
|
"sqlite://",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
|
|
with engine.begin() as connection:
|
|
connection.execute(
|
|
text(
|
|
"""
|
|
CREATE TABLE client_accounts (
|
|
id INTEGER PRIMARY KEY,
|
|
tenant_id VARCHAR(64),
|
|
name VARCHAR(255),
|
|
client_code VARCHAR(64),
|
|
status VARCHAR(32),
|
|
powerbi_workspace VARCHAR(128),
|
|
notes TEXT,
|
|
created_at DATETIME
|
|
)
|
|
"""
|
|
)
|
|
)
|
|
connection.execute(
|
|
text(
|
|
"""
|
|
CREATE TABLE raw_materials (
|
|
id INTEGER PRIMARY KEY,
|
|
name VARCHAR(255),
|
|
supplier VARCHAR(255),
|
|
unit_of_measure VARCHAR(64),
|
|
kg_per_unit FLOAT,
|
|
status VARCHAR(32),
|
|
notes TEXT,
|
|
created_at DATETIME
|
|
)
|
|
"""
|
|
)
|
|
)
|
|
connection.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO client_accounts (id, tenant_id, name, client_code, status)
|
|
VALUES (1, 'specialty-feeds', 'Specialty Feeds', 'SPEC', 'active')
|
|
"""
|
|
)
|
|
)
|
|
connection.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO raw_materials (id, name, supplier, unit_of_measure, kg_per_unit, status)
|
|
VALUES (1, 'Maize', 'Example Supplier', 'tonne', 1000, 'active')
|
|
"""
|
|
)
|
|
)
|
|
|
|
report = bootstrap_schema(engine, Base.metadata)
|
|
synced_rows = sync_tenant_ids(engine)
|
|
|
|
assert "products" in report.created_tables
|
|
assert "raw_materials.tenant_id" in report.added_columns
|
|
assert "tenant_id" in {column["name"] for column in inspect(engine).get_columns("raw_materials")}
|
|
assert synced_rows["raw_materials"] == 1
|
|
|
|
with engine.begin() as connection:
|
|
tenant_id = connection.execute(text("SELECT tenant_id FROM raw_materials WHERE id = 1")).scalar_one()
|
|
|
|
assert tenant_id == "specialty-feeds"
|