314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""Shared ordering-portal business logic: lifecycle, visibility, totals,
|
|
serialization, and audit.
|
|
|
|
Customer isolation is enforced by callers passing ``tenant_id`` /
|
|
``client_account_id`` into every query; this module never widens that scope.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.deps import AuthSession
|
|
from app.core.config import settings
|
|
from app.models.client_access import ClientAccount
|
|
from app.models.ordering import (
|
|
CatalogueProduct,
|
|
CustomerProductVisibility,
|
|
Order,
|
|
OrderLine,
|
|
OrderStatusHistory,
|
|
)
|
|
from app.services.client_access_service import record_audit_event
|
|
from app.services.ordering_pricing import resolve_price
|
|
|
|
# The ordering catalogue is "standard globally" — it lives under a single
|
|
# seller/ordering tenant, while individual customers (ClientAccounts) may have
|
|
# their own tenant_id. Customer isolation therefore keys on the globally-unique
|
|
# ``client_account_id``, NOT on the customer's session tenant. Admin and
|
|
# customer code both store/read ordering data under ORDERING_TENANT.
|
|
ORDERING_TENANT = settings.client_tenant_id
|
|
|
|
# --- Lifecycle ---------------------------------------------------------------
|
|
|
|
# Full internal lifecycle → the simplified status a customer sees.
|
|
CUSTOMER_VISIBLE_STATUS = {
|
|
"draft": "draft",
|
|
"submitted": "submitted",
|
|
"under_review": "processing",
|
|
"confirmed": "confirmed",
|
|
"sent_to_xero": "confirmed",
|
|
"in_production": "processing",
|
|
"ready_for_pickup": "ready",
|
|
"dispatched": "dispatched",
|
|
"completed": "completed",
|
|
"cancelled": "cancelled",
|
|
}
|
|
|
|
# Statuses an admin may move an order into from a given status. Cancellation is
|
|
# allowed from any non-terminal status (handled separately).
|
|
ALLOWED_ADMIN_TRANSITIONS: dict[str, set[str]] = {
|
|
"draft": {"submitted", "cancelled"},
|
|
"submitted": {"under_review", "confirmed", "cancelled"},
|
|
"under_review": {"confirmed", "submitted", "cancelled"},
|
|
"confirmed": {"sent_to_xero", "in_production", "cancelled"},
|
|
"sent_to_xero": {"in_production", "cancelled"},
|
|
"in_production": {"ready_for_pickup", "dispatched", "cancelled"},
|
|
"ready_for_pickup": {"dispatched", "completed", "cancelled"},
|
|
"dispatched": {"completed", "cancelled"},
|
|
"completed": set(),
|
|
"cancelled": set(),
|
|
}
|
|
|
|
# Customers may only edit an order while it is a draft.
|
|
CUSTOMER_EDITABLE_STATUSES = {"draft"}
|
|
|
|
|
|
def can_admin_transition(from_status: str, to_status: str) -> bool:
|
|
if to_status == "cancelled" and from_status not in {"completed", "cancelled"}:
|
|
return True
|
|
return to_status in ALLOWED_ADMIN_TRANSITIONS.get(from_status, set())
|
|
|
|
|
|
def record_status_change(
|
|
db: Session,
|
|
order: Order,
|
|
*,
|
|
to_status: str,
|
|
actor_type: str,
|
|
actor_name: str | None,
|
|
note: str | None = None,
|
|
) -> None:
|
|
db.add(
|
|
OrderStatusHistory(
|
|
tenant_id=order.tenant_id,
|
|
order_id=order.id,
|
|
from_status=order.status,
|
|
to_status=to_status,
|
|
actor_type=actor_type,
|
|
actor_name=actor_name,
|
|
note=note,
|
|
)
|
|
)
|
|
order.status = to_status
|
|
|
|
|
|
def next_order_number(db: Session, tenant_id: str) -> str:
|
|
# Cheap monotonic-ish numbering based on max id; good enough for v1.
|
|
last_id = db.scalar(select(Order.id).order_by(Order.id.desc()).limit(1)) or 0
|
|
return f"ORD-{last_id + 1:06d}"
|
|
|
|
|
|
# --- Product visibility ------------------------------------------------------
|
|
|
|
|
|
def visible_product_ids_for_customer(db: Session, *, tenant_id: str, client_account_id: int) -> set[int]:
|
|
"""Ids the customer is explicitly *hidden* from (opt-out model)."""
|
|
rows = db.scalars(
|
|
select(CustomerProductVisibility.product_id).where(
|
|
CustomerProductVisibility.client_account_id == client_account_id,
|
|
CustomerProductVisibility.tenant_id == tenant_id,
|
|
CustomerProductVisibility.visible.is_(False),
|
|
)
|
|
).all()
|
|
return set(rows)
|
|
|
|
|
|
def list_visible_products(
|
|
db: Session, *, tenant_id: str, client_account_id: int
|
|
) -> list[CatalogueProduct]:
|
|
hidden = visible_product_ids_for_customer(
|
|
db, tenant_id=tenant_id, client_account_id=client_account_id
|
|
)
|
|
products = db.scalars(
|
|
select(CatalogueProduct)
|
|
.where(CatalogueProduct.tenant_id == tenant_id, CatalogueProduct.active.is_(True))
|
|
.order_by(CatalogueProduct.category, CatalogueProduct.name)
|
|
).all()
|
|
return [p for p in products if p.id not in hidden]
|
|
|
|
|
|
# --- Totals ------------------------------------------------------------------
|
|
|
|
|
|
def effective_unit_price(line: OrderLine) -> float | None:
|
|
if line.admin_override_price is not None:
|
|
return line.admin_override_price
|
|
return line.unit_price
|
|
|
|
|
|
def recompute_order_totals(order: Order) -> None:
|
|
subtotal = 0.0
|
|
requires_quote = False
|
|
for line in order.lines:
|
|
unit = effective_unit_price(line)
|
|
if line.requires_quote or unit is None:
|
|
requires_quote = True
|
|
line.line_total = None
|
|
continue
|
|
line.line_total = round(unit * line.quantity, 4)
|
|
subtotal += line.line_total
|
|
order.subtotal_ex_gst = round(subtotal, 4)
|
|
order.requires_quote = requires_quote
|
|
|
|
|
|
# --- Serialization -----------------------------------------------------------
|
|
|
|
|
|
def serialize_product(
|
|
product: CatalogueProduct,
|
|
*,
|
|
db: Session | None = None,
|
|
client_account_id: int | None = None,
|
|
quantity: float = 1.0,
|
|
) -> dict:
|
|
"""Serialize a catalogue product. When ``db`` + ``client_account_id`` are
|
|
given, attach the resolved customer-specific price + provenance."""
|
|
data = {
|
|
"id": product.id,
|
|
"name": product.name,
|
|
"sku": product.sku,
|
|
"description": product.description,
|
|
"category": product.category,
|
|
"image_url": product.image_url,
|
|
"unit_size": product.unit_size,
|
|
"unit_of_measure": product.unit_of_measure,
|
|
"min_order_quantity": product.min_order_quantity,
|
|
"stock_status": product.stock_status,
|
|
"active": product.active,
|
|
"requires_quote": product.requires_quote,
|
|
"base_price": product.base_price,
|
|
"created_at": product.created_at,
|
|
}
|
|
if db is not None and client_account_id is not None:
|
|
resolution = resolve_price(
|
|
db, client_account_id=client_account_id, product=product, quantity=quantity
|
|
)
|
|
data["price"] = {
|
|
"unit_price": resolution.unit_price,
|
|
"price_source": resolution.price_source,
|
|
"price_rule_id": resolution.price_rule_id,
|
|
"discount_percent": resolution.discount_percent,
|
|
"requires_quote": resolution.requires_quote,
|
|
"label": resolution.label,
|
|
}
|
|
return data
|
|
|
|
|
|
def serialize_order_line(line: OrderLine, *, for_admin: bool) -> dict:
|
|
data = {
|
|
"id": line.id,
|
|
"product_id": line.product_id,
|
|
"product_name": line.product_name,
|
|
"product_sku": line.product_sku,
|
|
"quantity": line.quantity,
|
|
"unit_price": effective_unit_price(line),
|
|
"line_total": line.line_total,
|
|
"requires_quote": line.requires_quote,
|
|
"price_source": line.price_source,
|
|
"discount_percent": line.discount_percent,
|
|
"notes": line.notes,
|
|
}
|
|
if for_admin:
|
|
data.update(
|
|
{
|
|
"resolved_unit_price": line.unit_price,
|
|
"admin_override_price": line.admin_override_price,
|
|
"admin_override_reason": line.admin_override_reason,
|
|
"price_rule_id": line.price_rule_id,
|
|
}
|
|
)
|
|
return data
|
|
|
|
|
|
def serialize_order(order: Order, *, for_admin: bool) -> dict:
|
|
status = order.status if for_admin else CUSTOMER_VISIBLE_STATUS.get(order.status, order.status)
|
|
data = {
|
|
"id": order.id,
|
|
"order_number": order.order_number,
|
|
"status": status,
|
|
"client_account_id": order.client_account_id,
|
|
"created_by_name": order.created_by_name,
|
|
"purchase_order_number": order.purchase_order_number,
|
|
"delivery_notes": order.delivery_notes,
|
|
"requested_delivery_date": order.requested_delivery_date,
|
|
"fulfilment_method": order.fulfilment_method,
|
|
"subtotal_ex_gst": order.subtotal_ex_gst,
|
|
"requires_quote": order.requires_quote,
|
|
"submitted_at": order.submitted_at,
|
|
"created_at": order.created_at,
|
|
"updated_at": order.updated_at,
|
|
"editable": order.status in CUSTOMER_EDITABLE_STATUSES,
|
|
"lines": [serialize_order_line(line, for_admin=for_admin) for line in order.lines],
|
|
}
|
|
if for_admin:
|
|
data.update(
|
|
{
|
|
"raw_status": order.status,
|
|
"admin_notes": order.admin_notes,
|
|
"reopened": order.reopened,
|
|
"xero_status": order.xero_status,
|
|
"xero_invoice_id": order.xero_invoice_id,
|
|
"status_history": [
|
|
{
|
|
"id": h.id,
|
|
"from_status": h.from_status,
|
|
"to_status": h.to_status,
|
|
"actor_type": h.actor_type,
|
|
"actor_name": h.actor_name,
|
|
"note": h.note,
|
|
"created_at": h.created_at,
|
|
}
|
|
for h in order.status_history
|
|
],
|
|
}
|
|
)
|
|
return data
|
|
|
|
|
|
# --- Audit -------------------------------------------------------------------
|
|
|
|
|
|
def audit_order_event(
|
|
db: Session,
|
|
*,
|
|
session: AuthSession,
|
|
order: Order,
|
|
action: str,
|
|
summary: str,
|
|
target_id: int | None = None,
|
|
) -> None:
|
|
"""Record an ordering audit event onto the shared audit trail."""
|
|
actor_type = "lean_admin" if session.role in {"admin", "internal"} else "customer"
|
|
record_audit_event(
|
|
db,
|
|
tenant_id=order.tenant_id,
|
|
client_account_id=order.client_account_id,
|
|
actor_type=actor_type,
|
|
actor_name=session.name or "",
|
|
actor_email=session.email or "",
|
|
actor_role=session.client_role or session.role,
|
|
action=action,
|
|
target_type="order",
|
|
target_id=target_id if target_id is not None else order.id,
|
|
module_key="ordering",
|
|
summary=summary,
|
|
)
|
|
|
|
|
|
def get_customer_account(db: Session, *, client_account_id: int) -> ClientAccount | None:
|
|
# client_account_id is a global primary key; a customer's own tenant_id may
|
|
# differ from ORDERING_TENANT, so we look up by id alone.
|
|
return db.scalar(select(ClientAccount).where(ClientAccount.id == client_account_id))
|
|
|
|
|
|
def ensure_customer_active(account: ClientAccount | None) -> None:
|
|
from fastapi import HTTPException, status
|
|
|
|
if account is None:
|
|
raise HTTPException(status_code=404, detail="Customer account not found")
|
|
if account.status != "active":
|
|
raise HTTPException(status_code=403, detail="This customer account is disabled")
|