Files
data-entry-app/backend/tests/test_costing_engine.py
T

259 lines
9.3 KiB
Python
Raw Normal View History

2026-04-25 20:43:37 +12:00
from datetime import date
from fastapi.testclient import TestClient
2026-04-27 21:53:36 +12:00
from sqlalchemy import create_engine, inspect, text
2026-04-25 20:43:37 +12:00
from sqlalchemy.orm import Session, sessionmaker
2026-04-27 21:53:36 +12:00
from sqlalchemy.pool import StaticPool
2026-04-25 20:43:37 +12:00
from app.core.config import settings
2026-04-27 21:53:36 +12:00
from app.db.migrations import bootstrap_schema, sync_tenant_ids
2026-04-25 20:43:37 +12:00
from app.db.session import Base
from app.main import app
from app.models.assumption import FreightCostRule, PackagingCostRule, ProcessCostRule
2026-04-25 22:51:36 +12:00
from app.models.client_access import ClientAccount, ClientFeatureAccess, ClientUser
2026-04-25 20:43:37 +12:00
from app.models.mix import Mix, MixIngredient
from app.models.product import Product
from app.models.raw_material import RawMaterial, RawMaterialPriceVersion
2026-04-25 22:51:36 +12:00
from app.services.client_access_service import build_client_access_export, serialize_client_account
2026-04-25 20:43:37 +12:00
from app.services.costing_engine import calculate_mix_cost, calculate_product_cost, calculate_raw_material_cost
def build_session() -> Session:
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(bind=engine)
session = sessionmaker(bind=engine, expire_on_commit=False)()
return session
def test_calculate_raw_material_cost():
raw_material = RawMaterial(name="Maize", unit_of_measure="tonne", kg_per_unit=1000, status="active")
price = RawMaterialPriceVersion(market_value=500, waste_percentage=0.02, effective_date=date(2026, 4, 1))
result = calculate_raw_material_cost(raw_material, price)
assert result.loss_cost == 10.0
assert result.cost_per_unit == 510.0
assert result.cost_per_kg == 0.51
def test_mix_and_product_cost_breakdown():
db = build_session()
maize = RawMaterial(name="Maize", unit_of_measure="tonne", kg_per_unit=1000, status="active")
maize.price_versions.append(RawMaterialPriceVersion(market_value=520, waste_percentage=0.02, effective_date=date(2026, 4, 1)))
barley = RawMaterial(name="Barley", unit_of_measure="tonne", kg_per_unit=1000, status="active")
barley.price_versions.append(RawMaterialPriceVersion(market_value=470, waste_percentage=0.015, effective_date=date(2026, 4, 1)))
db.add_all([maize, barley])
db.flush()
mix = Mix(client_name="Specialty Feeds", name="Pigeon Mix", status="active", version=1)
db.add(mix)
db.flush()
db.add_all(
[
MixIngredient(mix_id=mix.id, raw_material_id=maize.id, quantity_kg=180),
MixIngredient(mix_id=mix.id, raw_material_id=barley.id, quantity_kg=100),
]
)
db.add(ProcessCostRule(process_name="standard_bagging", grading_cost=0.055, bagging_cost=0.04, cracking_cost=0.0))
db.add(PackagingCostRule(sale_type="standard", unit_of_measure="20kg bag", own_bag=False, bag_cost=0.63))
db.add(FreightCostRule(sale_type="standard", unit_of_measure="20kg bag", cost_per_unit=1.45))
db.flush()
product = Product(
client_name="Specialty Feeds",
name="Specialty Pigeon Breeder 20kg",
mix_id=mix.id,
sale_type="standard",
own_bag=False,
unit_of_measure="20kg bag",
items_per_pallet=50,
bagging_process="standard_bagging",
distributor_margin=0.225,
wholesale_margin=0.18,
)
db.add(product)
db.commit()
mix_result = calculate_mix_cost(db, mix.id)
product_result = calculate_product_cost(db, product.id)
assert mix_result["total_mix_kg"] == 280
assert mix_result["mix_cost_per_kg"] == 0.5114
2026-04-27 21:53:36 +12:00
assert product_result["client_name"] == "Specialty Feeds"
assert product_result["mix_name"] == "Pigeon Mix"
2026-04-25 20:43:37 +12:00
assert product_result["finished_product_delivered"] == 14.208
assert product_result["distributor_price"] == 18.3329
assert product_result["wholesale_price"] == 17.3268
def test_root_and_login_endpoints():
client = TestClient(app)
root_response = client.get("/")
assert root_response.status_code == 200
2026-04-25 22:51:36 +12:00
assert root_response.json()["endpoints"]["client_login"] == "/api/auth/client/login"
assert root_response.json()["endpoints"]["admin_login"] == "/api/auth/admin/login"
2026-04-25 20:43:37 +12:00
2026-04-25 22:51:36 +12:00
client_login_response = client.post(
"/api/auth/client/login",
json={"email": settings.client_email, "password": settings.client_password},
2026-04-25 20:43:37 +12:00
)
2026-04-25 22:51:36 +12:00
assert client_login_response.status_code == 200
assert client_login_response.json()["email"] == settings.client_email
assert client_login_response.json()["tenant_id"] == settings.client_tenant_id
admin_login_response = client.post(
"/api/auth/admin/login",
json={"email": settings.admin_email, "password": settings.admin_password},
)
assert admin_login_response.status_code == 200
assert admin_login_response.json()["email"] == settings.admin_email
def test_client_access_export_helpers():
db = build_session()
client = ClientAccount(
tenant_id="specialty-feeds",
name="Specialty Feeds",
client_code="SPEC",
status="active",
powerbi_workspace="farm-ops-prod",
)
client.users.extend(
[
ClientUser(
full_name="Amelia Hart",
email="amelia.hart@specialtyfeeds.example",
role="admin",
status="active",
is_new_user=False,
),
ClientUser(
full_name="Ethan Cole",
email="ethan.cole@specialtyfeeds.example",
role="operator",
status="invited",
is_new_user=True,
),
]
)
client.features.extend(
[
ClientFeatureAccess(
feature_key="dashboard",
feature_name="Dashboard",
feature_group="workspace",
enabled=True,
),
ClientFeatureAccess(
feature_key="products",
feature_name="Products",
feature_group="pricing",
enabled=False,
),
]
)
db.add(client)
db.commit()
db.refresh(client)
serialized = serialize_client_account(client)
export = build_client_access_export([client])
assert serialized["active_user_count"] == 1
assert serialized["new_user_count"] == 1
assert serialized["enabled_feature_count"] == 1
assert export["client_rows"][0]["client_code"] == "SPEC"
assert export["user_rows"][0]["client_name"] == "Specialty Feeds"
assert len(export["feature_rows"]) == 2
def test_client_access_endpoints():
with TestClient(app) as client:
login_response = client.post(
"/api/auth/admin/login",
json={"email": settings.admin_email, "password": settings.admin_password},
)
token = login_response.json()["token"]
headers = {"Authorization": f"Bearer {token}"}
access_response = client.get("/api/client-access", headers=headers)
assert access_response.status_code == 200
assert len(access_response.json()) >= 1
export_response = client.get("/api/powerbi/client-access", headers=headers)
assert export_response.status_code == 200
assert "client_rows" in export_response.json()
2026-04-27 21:53:36 +12:00
def test_bootstrap_schema_creates_missing_tables_and_patches_legacy_tenant_columns():
engine = create_engine(
"sqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
with engine.begin() as connection:
connection.execute(
text(
"""
CREATE TABLE client_accounts (
id INTEGER PRIMARY KEY,
tenant_id VARCHAR(64),
name VARCHAR(255),
client_code VARCHAR(64),
status VARCHAR(32),
powerbi_workspace VARCHAR(128),
notes TEXT,
created_at DATETIME
)
"""
)
)
connection.execute(
text(
"""
CREATE TABLE raw_materials (
id INTEGER PRIMARY KEY,
name VARCHAR(255),
supplier VARCHAR(255),
unit_of_measure VARCHAR(64),
kg_per_unit FLOAT,
status VARCHAR(32),
notes TEXT,
created_at DATETIME
)
"""
)
)
connection.execute(
text(
"""
INSERT INTO client_accounts (id, tenant_id, name, client_code, status)
VALUES (1, 'specialty-feeds', 'Specialty Feeds', 'SPEC', 'active')
"""
)
)
connection.execute(
text(
"""
INSERT INTO raw_materials (id, name, supplier, unit_of_measure, kg_per_unit, status)
VALUES (1, 'Maize', 'Example Supplier', 'tonne', 1000, 'active')
"""
)
)
report = bootstrap_schema(engine, Base.metadata)
synced_rows = sync_tenant_ids(engine)
assert "products" in report.created_tables
assert "raw_materials.tenant_id" in report.added_columns
assert "tenant_id" in {column["name"] for column in inspect(engine).get_columns("raw_materials")}
assert synced_rows["raw_materials"] == 1
with engine.begin() as connection:
tenant_id = connection.execute(text("SELECT tenant_id FROM raw_materials WHERE id = 1")).scalar_one()
assert tenant_id == "specialty-feeds"