Files

379 lines
15 KiB
Python
Raw Permalink Normal View History

2026-04-20 15:23:18 +12:00
"""
data_loader.py — Load and normalise the three SHEQ data sources.
Each loader returns a pandas DataFrame with normalised column names
(defined in config.py) so that downstream analysis code is insulated
from changes to the source file schema.
Public API
----------
load_events(filepath) -> pd.DataFrame
load_safety_energy(filepath) -> pd.DataFrame
load_llc_data(filepath) -> pd.DataFrame
load_all(events_path, se_path, llc_path) -> dict[str, pd.DataFrame]
"""
from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import Optional
import pandas as pd
from config import (
EVENTS_COL_MAP,
SE_COL_MAP,
LLC_COL_MAP,
MODULE_TYPE_LABELS,
EVENTS_FILE,
SAFETY_ENERGY_FILE,
LLC_FILE,
)
log = logging.getLogger(__name__)
# Suppress openpyxl "no default style" warnings
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _resolve_col(df: pd.DataFrame, candidates: list[str], key: str) -> Optional[str]:
"""Return the first candidate column that exists in df, or None."""
for c in candidates:
if c in df.columns:
return c
log.debug("Column key '%s' not found (tried: %s)", key, candidates)
return None
def _parse_dates(series: pd.Series) -> pd.Series:
"""
Parse a date series that may contain:
- ISO strings "2024-01-15"
- Long-form strings "Monday, 15 January 2024"
- Excel datetime objects
Returns a tz-naive datetime64 series; unparseable values become NaT.
"""
if pd.api.types.is_datetime64_any_dtype(series):
return series.dt.tz_localize(None) if series.dt.tz is not None else series
def _parse_one(val):
if pd.isna(val):
return pd.NaT
s = str(val).strip()
# Strip leading day-of-week "Monday, " prefix from long-form dates
if "," in s and len(s.split(",")[0].split()) == 1:
s = s.split(",", 1)[1].strip()
try:
return pd.to_datetime(s, dayfirst=True)
except Exception:
return pd.NaT
return series.map(_parse_one)
def _remap(df: pd.DataFrame, col_map: dict[str, list[str]]) -> pd.DataFrame:
"""
Build a new DataFrame with normalised column names.
For each key in col_map, find the first matching source column and
rename it. Columns not mentioned in col_map are dropped. The
original source columns are preserved under their original names as
well, allowing callers to access additional fields if needed.
"""
# Keep all original columns; add normalised aliases
result = df.copy()
for norm_name, candidates in col_map.items():
src = _resolve_col(df, candidates, norm_name)
if src is not None and norm_name not in df.columns:
result[norm_name] = df[src]
elif src is not None:
result[norm_name] = df[src]
return result
def _null_rate(series: pd.Series) -> float:
"""Return fraction of null / empty values (01)."""
return series.isna().mean()
def _profile(df: pd.DataFrame, label: str) -> dict:
"""Return a simple quality profile dict for logging."""
return {
"source": label,
"rows": len(df),
"cols": len(df.columns),
"date_nulls": _null_rate(df.get("date", pd.Series(dtype="object"))),
}
# ─────────────────────────────────────────────────────────────────────────────
# Events loader
# ─────────────────────────────────────────────────────────────────────────────
def load_events(filepath: str = EVENTS_FILE) -> pd.DataFrame:
"""
Load Events.xlsx and return a normalised DataFrame.
Normalised columns (see EVENTS_COL_MAP):
date, event_type, consequence, status, business_unit, project,
location, crp, root_cause_cat, root_cause_sub, injury_class,
body_part, brief_desc, event_desc, days_to_enter, event_lag,
report_lag, investigation_done, hipo, critical_event
Also adds:
year, month, year_month (Period[M])
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Events file not found: {filepath}")
log.info("Loading Events from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, EVENTS_COL_MAP)
# Parse dates
df["date"] = _parse_dates(df["date"])
# Drop rows with no date
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
# Derived time fields
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
df["dow"] = df["date"].dt.day_name()
# Normalise text fields
for col in ("event_type", "consequence", "business_unit", "project",
"root_cause_cat", "injury_class"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
profile = _profile(df, "Events")
log.info(" Loaded %d events | BUs: %s",
profile["rows"],
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# Safety Energy loader
# ─────────────────────────────────────────────────────────────────────────────
def load_safety_energy(filepath: str = SAFETY_ENERGY_FILE) -> pd.DataFrame:
"""
Load Safety_Energy.xlsx and return a normalised DataFrame.
Safety Energy is the combined analytical domain covering all leading
activity types: LLC (Leader Learning Conversations), CCC (Critical
Control Checks), and OCC (Operational Control Checks).
Normalised columns (see SE_COL_MAP):
date, module_name, module_prefix, module_type, activity_type
(short label: LLC/CCC/OCC), leader, business_unit, project,
location, at_risk_aspects, total_questions, actions, atl_actions,
at_risk_crp, llc_topic, at_risk_obs, positive_obs, participants
Also adds:
year, month, year_month (Period[M])
activity_type — shortened label from MODULE_TYPE_LABELS
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Safety Energy file not found: {filepath}")
log.info("Loading Safety Energy from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, SE_COL_MAP)
df["date"] = _parse_dates(df["date"])
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
# Shorten module_type to LLC / CCC / OCC label
df["activity_type"] = (
df["module_type"]
.map(MODULE_TYPE_LABELS)
.fillna(df.get("module_type", pd.Series(dtype="str")))
)
# Derived time fields
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
# Normalise text
for col in ("business_unit", "project", "leader", "activity_type"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
# Numeric fields — coerce to numeric safely
for col in ("at_risk_aspects", "total_questions", "actions", "atl_actions"):
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
log.info(" Loaded %d activities | types: %s",
len(df),
df["activity_type"].value_counts().to_dict() if "activity_type" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# LLC Data loader
# ─────────────────────────────────────────────────────────────────────────────
def load_llc_data(filepath: str = LLC_FILE) -> pd.DataFrame:
"""
Load LLC_Data.xlsx and return a normalised DataFrame.
LLC_Data is a supplementary export of Leader Learning Conversations,
often containing richer free-text fields (topic, at-risk observations,
review & action notes) than the Safety_Energy export.
Normalised columns (see LLC_COL_MAP):
date, topic, leader, business_unit, project, location,
crp_focus, at_risk_obs, positive_obs, at_risk_flag, participants
Also adds:
year, month, year_month (Period[M])
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"LLC Data file not found: {filepath}")
log.info("Loading LLC Data from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, LLC_COL_MAP)
df["date"] = _parse_dates(df["date"])
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
for col in ("business_unit", "project", "leader", "topic", "crp_focus"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
# at_risk_flag is a count field in this export
if "at_risk_flag" in df.columns:
df["at_risk_flag"] = pd.to_numeric(df["at_risk_flag"], errors="coerce")
log.info(" Loaded %d LLC records | BUs: %s",
len(df),
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# Combined loader
# ─────────────────────────────────────────────────────────────────────────────
def load_all(
events_path: str = EVENTS_FILE,
se_path: str = SAFETY_ENERGY_FILE,
llc_path: str = LLC_FILE,
) -> dict[str, pd.DataFrame]:
"""
Load all three data sources and return a dict with keys:
'events' -> normalised Events DataFrame
'safety_energy' -> normalised Safety Energy DataFrame
'llc' -> normalised LLC Data DataFrame
Raises FileNotFoundError with a descriptive message if any file
is missing.
"""
return {
"events": load_events(events_path),
"safety_energy": load_safety_energy(se_path),
"llc": load_llc_data(llc_path),
}
# ─────────────────────────────────────────────────────────────────────────────
# Backwards-compatibility shim for old analysis.py
# ─────────────────────────────────────────────────────────────────────────────
def load_and_prepare(filepath: str, start_date: str, split_date: str) -> pd.DataFrame:
"""
Backwards-compatible wrapper used by the old analysis.py module.
Returns Events data filtered to start_date onwards, with a 'PD'
column (pd1 / pd2) based on split_date.
"""
df = load_events(filepath)
# Rename normalised columns back to legacy names for old analysis.py
rename_map = {
"date": "Event Date",
"event_type": "Event Type",
"consequence": "Actual Consequence",
"crp": "CRPInvolved",
"root_cause_cat":"Root Cause Category",
"injury_class": "Ventia Injury Classification",
"body_part": "Bodily Location",
}
df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
# Handle missing columns that old code expects
if "Days to Investigate" not in df.columns:
df["Days to Investigate"] = df.get("event_lag", pd.Series(dtype="float64"))
if "Days to Close" not in df.columns:
df["Days to Close"] = pd.to_numeric(
pd.to_datetime(df.get("ClosedAtDate"), errors="coerce")
.sub(df["Event Date"])
.dt.days,
errors="coerce",
)
if "CRPInvolved" not in df.columns:
df["CRPInvolved"] = df.get("CRP Involved", pd.NA)
df = df[df["Event Date"] >= pd.Timestamp(start_date)].copy()
df["Year"] = df["Event Date"].dt.year
df["Month"] = df["Event Date"].dt.month
df["MonthName"] = df["Event Date"].dt.strftime("%b")
df["DOW"] = df["Event Date"].dt.day_name()
df["YearMonth"] = df["Event Date"].dt.to_period("M")
df["PD"] = df["Event Date"].apply(
lambda x: "pd1" if x < pd.Timestamp(split_date) else "pd2"
)
return df
def get_body_parts(series: pd.Series) -> pd.Series:
"""Split multi-value body part entries and normalise (legacy helper)."""
parts = []
for val in series.dropna():
for part in str(val).split(","):
part = part.strip()
if part and "unspecified" not in part.lower():
parts.append(part)
return pd.Series(parts)