from __future__ import annotations import logging import os from datetime import date, datetime from pathlib import Path from typing import Iterable from openpyxl import load_workbook from sqlalchemy import select from sqlalchemy.orm import Session from app.models.throughput import ProductionThroughput, ThroughputProduct logger = logging.getLogger("data_entry_app.throughput") PRODUCTION_SHEET = "Production" NAMES_SHEET = "Names" # The historical throughput export. Bundled into the image under input_data/ so # the seed can import it on a fresh deployment (e.g. a new Postgres volume). WORKBOOK_FILENAME = "Operations Throughput.xlsx" # Anything at or above this kg/bag is treated as a bulka batch, not a per-bag count. _BULKA_BAG_SIZE_THRESHOLD = 100.0 def normalise_staff_name(value: object) -> str | None: if value is None: return None text = str(value).strip() if not text: return None # Collapse internal whitespace, title-case for consistency. cleaned = " ".join(text.split()) return cleaned def calculate_kg(quantity: float | None, quantity_type: str, bag_size: float | None) -> float: if quantity is None: return 0.0 if quantity_type == "kg": return float(quantity) if bag_size is None: return 0.0 return float(quantity) * float(bag_size) def qa_passed(entry: ProductionThroughput) -> bool: return bool(entry.scales_checked and entry.label_correct and entry.bag_sealed and entry.pallet_good_condition) def serialize_entry(entry: ProductionThroughput) -> dict: return { "id": entry.id, "tenant_id": entry.tenant_id, "production_date": entry.production_date, "product_id": entry.product_id, "product_name_snapshot": entry.product_name_snapshot, "bag_size": entry.bag_size, "scales_checked": entry.scales_checked, "label_correct": entry.label_correct, "bag_sealed": entry.bag_sealed, "pallet_good_condition": entry.pallet_good_condition, "for_order": entry.for_order, "for_stock": entry.for_stock, "job_number": entry.job_number, "stock_quantity": entry.stock_quantity, "sample_box_no": entry.sample_box_no, "test_weight_1": entry.test_weight_1, "test_weight_2": entry.test_weight_2, "test_weight_3": entry.test_weight_3, "test_weight_4": entry.test_weight_4, "test_weight_5": entry.test_weight_5, "quantity": entry.quantity, "quantity_type": entry.quantity_type, "calculated_kg": entry.calculated_kg, "staff_name": entry.staff_name, "notes": entry.notes, "qa_passed": qa_passed(entry), "created_by": entry.created_by, "created_at": entry.created_at, "updated_at": entry.updated_at, } def _coerce_bool(value: object) -> bool: if isinstance(value, bool): return value if value is None: return True if isinstance(value, (int, float)): return bool(value) text = str(value).strip().lower() if text in {"yes", "y", "true", "1", "pass", "ok", "x", "checked"}: return True if text in {"no", "n", "false", "0", "fail"}: return False return True def _coerce_float(value: object) -> float | None: if value is None or value == "": return None if isinstance(value, bool): return float(value) if isinstance(value, (int, float)): return float(value) text = str(value).strip().replace(",", "") if not text: return None try: return float(text) except ValueError: return None def _coerce_text(value: object) -> str | None: if value is None: return None text = str(value).strip() if not text or text.lower() in {"#value!", "#n/a", "n/a"}: return None return text def _coerce_date(value: object) -> date | None: if value is None: return None if isinstance(value, datetime): return value.date() if isinstance(value, date): return value text = str(value).strip() if not text: return None for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"): try: return datetime.strptime(text, fmt).date() except ValueError: continue return None def _infer_bulka_default(name: str, bag_size: float | None) -> bool: lowered = name.lower() if "bulka" in lowered: return True if bag_size is None: return False return bag_size >= _BULKA_BAG_SIZE_THRESHOLD def import_names_sheet(db: Session, workbook, tenant_id: str) -> tuple[int, int]: """Upsert product master from the Names sheet. Returns (created, updated).""" if NAMES_SHEET not in workbook.sheetnames: return (0, 0) ws = workbook[NAMES_SHEET] existing: dict[tuple[str, str | None], ThroughputProduct] = {} by_item: dict[str, ThroughputProduct] = {} by_name: dict[str, ThroughputProduct] = {} for product in db.scalars( select(ThroughputProduct).where(ThroughputProduct.tenant_id == tenant_id) ).all(): if product.item_id: by_item[str(product.item_id)] = product by_name[product.name.lower()] = product created = 0 updated = 0 for row in ws.iter_rows(min_row=2, values_only=True): if not row: continue name = _coerce_text(row[0] if len(row) > 0 else None) if not name: continue item_id_raw = row[1] if len(row) > 1 else None item_id = None if item_id_raw is not None: if isinstance(item_id_raw, float) and item_id_raw.is_integer(): item_id = str(int(item_id_raw)) else: item_id = _coerce_text(item_id_raw) product = (by_item.get(item_id) if item_id else None) or by_name.get(name.lower()) if product is None: product = ThroughputProduct( tenant_id=tenant_id, item_id=item_id, name=name, default_bag_size=None, is_bulka_default="bulka" in name.lower(), active=True, notes="Imported from Operations Throughput.xlsx", ) db.add(product) created += 1 if item_id: by_item[item_id] = product by_name[name.lower()] = product else: if item_id and not product.item_id: product.item_id = item_id if name and product.name != name: product.name = name updated += 1 db.flush() return (created, updated) def import_production_sheet(db: Session, workbook, tenant_id: str) -> tuple[int, int]: """Import the Production sheet. Returns (imported, skipped).""" if PRODUCTION_SHEET not in workbook.sheetnames: return (0, 0) ws = workbook[PRODUCTION_SHEET] # Header row is row 3 in the sheet (rows 1 and 2 are display banners). products_by_name: dict[str, ThroughputProduct] = { product.name.lower(): product for product in db.scalars( select(ThroughputProduct).where(ThroughputProduct.tenant_id == tenant_id) ).all() } bag_size_seen: dict[int, list[float]] = {} imported = 0 skipped = 0 for row in ws.iter_rows(min_row=4, values_only=True): if not row or len(row) < 15: skipped += 1 continue production_date = _coerce_date(row[0]) product_name = _coerce_text(row[1]) if production_date is None or not product_name: skipped += 1 continue bag_size = _coerce_float(row[2]) scales = _coerce_bool(row[3]) label = _coerce_bool(row[4]) sealed = _coerce_bool(row[5]) pallet = _coerce_bool(row[6]) sample_box = _coerce_text(row[7]) tw1 = _coerce_float(row[8]) tw2 = _coerce_float(row[9]) tw3 = _coerce_float(row[10]) tw4 = _coerce_float(row[11]) tw5 = _coerce_float(row[12]) quantity = _coerce_float(row[13]) or 0.0 staff = normalise_staff_name(row[14]) notes = _coerce_text(row[15]) if len(row) > 15 else None # Infer quantity_type: bulka-style rows have a blank or very large bag size. if bag_size is None or bag_size >= _BULKA_BAG_SIZE_THRESHOLD or "bulka" in product_name.lower(): quantity_type = "kg" else: quantity_type = "bags" product = products_by_name.get(product_name.lower()) if product is None: product = ThroughputProduct( tenant_id=tenant_id, item_id=None, name=product_name, default_bag_size=bag_size, is_bulka_default=_infer_bulka_default(product_name, bag_size), active=True, notes="Auto-created during Operations Throughput import", ) db.add(product) db.flush() products_by_name[product_name.lower()] = product if product.id is not None and bag_size is not None and bag_size > 0: bag_size_seen.setdefault(product.id, []).append(bag_size) calculated = calculate_kg(quantity, quantity_type, bag_size) entry = ProductionThroughput( tenant_id=tenant_id, production_date=production_date, product_id=product.id, product_name_snapshot=product_name, bag_size=bag_size, scales_checked=scales, label_correct=label, bag_sealed=sealed, pallet_good_condition=pallet, sample_box_no=sample_box, test_weight_1=tw1, test_weight_2=tw2, test_weight_3=tw3, test_weight_4=tw4, test_weight_5=tw5, quantity=quantity, quantity_type=quantity_type, calculated_kg=calculated, staff_name=staff, notes=notes, created_by="workbook-import", ) db.add(entry) imported += 1 # Backfill default_bag_size on products that don't have one but appear in entries. for product_id, sizes in bag_size_seen.items(): product = db.get(ThroughputProduct, product_id) if product and product.default_bag_size is None: # Use the most common bag size seen. common = max(set(sizes), key=sizes.count) product.default_bag_size = common if not product.is_bulka_default: product.is_bulka_default = _infer_bulka_default(product.name, common) db.flush() return (imported, skipped) def import_workbook(db: Session, workbook_path: Path, tenant_id: str) -> dict: workbook = load_workbook(workbook_path, data_only=True) products_created, products_updated = import_names_sheet(db, workbook, tenant_id) entries_imported, entries_skipped = import_production_sheet(db, workbook, tenant_id) return { "products_created": products_created, "products_updated": products_updated, "entries_imported": entries_imported, "entries_skipped": entries_skipped, } def workbook_candidates() -> Iterable[Path]: repo_root = Path(__file__).resolve().parents[3] cwd = Path.cwd() env_value = os.getenv("THROUGHPUT_WORKBOOK_PATH") env_path = Path(env_value.strip()) if isinstance(env_value, str) and env_value.strip() else None # input_data/ is where the workbook is bundled in the image; in the # container the working directory is /app, so cwd/input_data resolves it. candidates = [ env_path, repo_root / "input_data" / WORKBOOK_FILENAME, cwd / "input_data" / WORKBOOK_FILENAME, Path("/app") / "input_data" / WORKBOOK_FILENAME, Path("/srv/lean101-clients") / "input_data" / WORKBOOK_FILENAME, repo_root / WORKBOOK_FILENAME, repo_root.parent / WORKBOOK_FILENAME, cwd / WORKBOOK_FILENAME, Path("/srv/lean101-clients") / WORKBOOK_FILENAME, Path("/app") / WORKBOOK_FILENAME, ] seen: set[str] = set() ordered: list[Path] = [] for candidate in candidates: if candidate is None: continue key = str(candidate) if key in seen: continue seen.add(key) ordered.append(candidate) return ordered def resolve_workbook_path() -> Path | None: for candidate in workbook_candidates(): if candidate.exists(): return candidate return None