63 lines
1.9 KiB
Python
63 lines
1.9 KiB
Python
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from app.db.session import Base, get_db
|
|
from app.core.access import INTERNAL_USER_SUBJECT, INTERNAL_USER_TENANT_ID
|
|
from app.core.security import issue_token
|
|
from app.models.access import User
|
|
from app.models.throughput import ThroughputProduct
|
|
from app.api.throughput import router as throughput_router
|
|
from app.seed import seed_access
|
|
|
|
|
|
def run():
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
Base.metadata.create_all(bind=engine)
|
|
db = sessionmaker(bind=engine, expire_on_commit=False)()
|
|
seed_access(db)
|
|
|
|
product = ThroughputProduct(
|
|
tenant_id=INTERNAL_USER_TENANT_ID,
|
|
name="Test Product 20kg",
|
|
default_bag_size=20,
|
|
active=True,
|
|
)
|
|
db.add(product)
|
|
db.commit()
|
|
|
|
app = FastAPI()
|
|
app.dependency_overrides[get_db] = lambda: iter([db])
|
|
app.include_router(throughput_router)
|
|
client = TestClient(app)
|
|
|
|
ops = db.query(User).filter_by(email="ops@hunterstockfeeds.com").one()
|
|
token = issue_token({"sub": INTERNAL_USER_SUBJECT, "user_id": ops.id, "email": ops.email})
|
|
|
|
payload = {
|
|
"production_date": "2026-06-01",
|
|
"product_id": product.id,
|
|
"product_name_snapshot": "Test Product 20kg",
|
|
"bag_size": 20,
|
|
"quantity": 10,
|
|
"quantity_type": "bags",
|
|
"for_order": True,
|
|
"for_stock": False,
|
|
"job_number": "JOB123",
|
|
"stock_quantity": None,
|
|
"staff_name": "Jake",
|
|
"notes": None,
|
|
}
|
|
resp = client.post(
|
|
"/api/throughput/entries",
|
|
json=payload,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
print("STATUS:", resp.status_code)
|
|
print("BODY:", resp.text[:1500])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|