Files
sheq-analysis-tool/data_loader.py
T
2026-04-20 15:23:18 +12:00

379 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
data_loader.py — Load and normalise the three SHEQ data sources.
Each loader returns a pandas DataFrame with normalised column names
(defined in config.py) so that downstream analysis code is insulated
from changes to the source file schema.
Public API
----------
load_events(filepath) -> pd.DataFrame
load_safety_energy(filepath) -> pd.DataFrame
load_llc_data(filepath) -> pd.DataFrame
load_all(events_path, se_path, llc_path) -> dict[str, pd.DataFrame]
"""
from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import Optional
import pandas as pd
from config import (
EVENTS_COL_MAP,
SE_COL_MAP,
LLC_COL_MAP,
MODULE_TYPE_LABELS,
EVENTS_FILE,
SAFETY_ENERGY_FILE,
LLC_FILE,
)
log = logging.getLogger(__name__)
# Suppress openpyxl "no default style" warnings
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
# ─────────────────────────────────────────────────────────────────────────────
# Internal helpers
# ─────────────────────────────────────────────────────────────────────────────
def _resolve_col(df: pd.DataFrame, candidates: list[str], key: str) -> Optional[str]:
"""Return the first candidate column that exists in df, or None."""
for c in candidates:
if c in df.columns:
return c
log.debug("Column key '%s' not found (tried: %s)", key, candidates)
return None
def _parse_dates(series: pd.Series) -> pd.Series:
"""
Parse a date series that may contain:
- ISO strings "2024-01-15"
- Long-form strings "Monday, 15 January 2024"
- Excel datetime objects
Returns a tz-naive datetime64 series; unparseable values become NaT.
"""
if pd.api.types.is_datetime64_any_dtype(series):
return series.dt.tz_localize(None) if series.dt.tz is not None else series
def _parse_one(val):
if pd.isna(val):
return pd.NaT
s = str(val).strip()
# Strip leading day-of-week "Monday, " prefix from long-form dates
if "," in s and len(s.split(",")[0].split()) == 1:
s = s.split(",", 1)[1].strip()
try:
return pd.to_datetime(s, dayfirst=True)
except Exception:
return pd.NaT
return series.map(_parse_one)
def _remap(df: pd.DataFrame, col_map: dict[str, list[str]]) -> pd.DataFrame:
"""
Build a new DataFrame with normalised column names.
For each key in col_map, find the first matching source column and
rename it. Columns not mentioned in col_map are dropped. The
original source columns are preserved under their original names as
well, allowing callers to access additional fields if needed.
"""
# Keep all original columns; add normalised aliases
result = df.copy()
for norm_name, candidates in col_map.items():
src = _resolve_col(df, candidates, norm_name)
if src is not None and norm_name not in df.columns:
result[norm_name] = df[src]
elif src is not None:
result[norm_name] = df[src]
return result
def _null_rate(series: pd.Series) -> float:
"""Return fraction of null / empty values (01)."""
return series.isna().mean()
def _profile(df: pd.DataFrame, label: str) -> dict:
"""Return a simple quality profile dict for logging."""
return {
"source": label,
"rows": len(df),
"cols": len(df.columns),
"date_nulls": _null_rate(df.get("date", pd.Series(dtype="object"))),
}
# ─────────────────────────────────────────────────────────────────────────────
# Events loader
# ─────────────────────────────────────────────────────────────────────────────
def load_events(filepath: str = EVENTS_FILE) -> pd.DataFrame:
"""
Load Events.xlsx and return a normalised DataFrame.
Normalised columns (see EVENTS_COL_MAP):
date, event_type, consequence, status, business_unit, project,
location, crp, root_cause_cat, root_cause_sub, injury_class,
body_part, brief_desc, event_desc, days_to_enter, event_lag,
report_lag, investigation_done, hipo, critical_event
Also adds:
year, month, year_month (Period[M])
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Events file not found: {filepath}")
log.info("Loading Events from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, EVENTS_COL_MAP)
# Parse dates
df["date"] = _parse_dates(df["date"])
# Drop rows with no date
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
# Derived time fields
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
df["dow"] = df["date"].dt.day_name()
# Normalise text fields
for col in ("event_type", "consequence", "business_unit", "project",
"root_cause_cat", "injury_class"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
profile = _profile(df, "Events")
log.info(" Loaded %d events | BUs: %s",
profile["rows"],
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# Safety Energy loader
# ─────────────────────────────────────────────────────────────────────────────
def load_safety_energy(filepath: str = SAFETY_ENERGY_FILE) -> pd.DataFrame:
"""
Load Safety_Energy.xlsx and return a normalised DataFrame.
Safety Energy is the combined analytical domain covering all leading
activity types: LLC (Leader Learning Conversations), CCC (Critical
Control Checks), and OCC (Operational Control Checks).
Normalised columns (see SE_COL_MAP):
date, module_name, module_prefix, module_type, activity_type
(short label: LLC/CCC/OCC), leader, business_unit, project,
location, at_risk_aspects, total_questions, actions, atl_actions,
at_risk_crp, llc_topic, at_risk_obs, positive_obs, participants
Also adds:
year, month, year_month (Period[M])
activity_type — shortened label from MODULE_TYPE_LABELS
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Safety Energy file not found: {filepath}")
log.info("Loading Safety Energy from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, SE_COL_MAP)
df["date"] = _parse_dates(df["date"])
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
# Shorten module_type to LLC / CCC / OCC label
df["activity_type"] = (
df["module_type"]
.map(MODULE_TYPE_LABELS)
.fillna(df.get("module_type", pd.Series(dtype="str")))
)
# Derived time fields
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
# Normalise text
for col in ("business_unit", "project", "leader", "activity_type"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
# Numeric fields — coerce to numeric safely
for col in ("at_risk_aspects", "total_questions", "actions", "atl_actions"):
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
log.info(" Loaded %d activities | types: %s",
len(df),
df["activity_type"].value_counts().to_dict() if "activity_type" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# LLC Data loader
# ─────────────────────────────────────────────────────────────────────────────
def load_llc_data(filepath: str = LLC_FILE) -> pd.DataFrame:
"""
Load LLC_Data.xlsx and return a normalised DataFrame.
LLC_Data is a supplementary export of Leader Learning Conversations,
often containing richer free-text fields (topic, at-risk observations,
review & action notes) than the Safety_Energy export.
Normalised columns (see LLC_COL_MAP):
date, topic, leader, business_unit, project, location,
crp_focus, at_risk_obs, positive_obs, at_risk_flag, participants
Also adds:
year, month, year_month (Period[M])
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"LLC Data file not found: {filepath}")
log.info("Loading LLC Data from %s", filepath)
raw = pd.read_excel(filepath)
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
df = _remap(raw, LLC_COL_MAP)
df["date"] = _parse_dates(df["date"])
n_before = len(df)
df = df.dropna(subset=["date"]).copy()
if len(df) < n_before:
log.warning(" Dropped %d rows with missing date", n_before - len(df))
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["year_month"] = df["date"].dt.to_period("M")
for col in ("business_unit", "project", "leader", "topic", "crp_focus"):
if col in df.columns:
df[col] = df[col].astype(str).str.strip()
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
# at_risk_flag is a count field in this export
if "at_risk_flag" in df.columns:
df["at_risk_flag"] = pd.to_numeric(df["at_risk_flag"], errors="coerce")
log.info(" Loaded %d LLC records | BUs: %s",
len(df),
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
return df
# ─────────────────────────────────────────────────────────────────────────────
# Combined loader
# ─────────────────────────────────────────────────────────────────────────────
def load_all(
events_path: str = EVENTS_FILE,
se_path: str = SAFETY_ENERGY_FILE,
llc_path: str = LLC_FILE,
) -> dict[str, pd.DataFrame]:
"""
Load all three data sources and return a dict with keys:
'events' -> normalised Events DataFrame
'safety_energy' -> normalised Safety Energy DataFrame
'llc' -> normalised LLC Data DataFrame
Raises FileNotFoundError with a descriptive message if any file
is missing.
"""
return {
"events": load_events(events_path),
"safety_energy": load_safety_energy(se_path),
"llc": load_llc_data(llc_path),
}
# ─────────────────────────────────────────────────────────────────────────────
# Backwards-compatibility shim for old analysis.py
# ─────────────────────────────────────────────────────────────────────────────
def load_and_prepare(filepath: str, start_date: str, split_date: str) -> pd.DataFrame:
"""
Backwards-compatible wrapper used by the old analysis.py module.
Returns Events data filtered to start_date onwards, with a 'PD'
column (pd1 / pd2) based on split_date.
"""
df = load_events(filepath)
# Rename normalised columns back to legacy names for old analysis.py
rename_map = {
"date": "Event Date",
"event_type": "Event Type",
"consequence": "Actual Consequence",
"crp": "CRPInvolved",
"root_cause_cat":"Root Cause Category",
"injury_class": "Ventia Injury Classification",
"body_part": "Bodily Location",
}
df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
# Handle missing columns that old code expects
if "Days to Investigate" not in df.columns:
df["Days to Investigate"] = df.get("event_lag", pd.Series(dtype="float64"))
if "Days to Close" not in df.columns:
df["Days to Close"] = pd.to_numeric(
pd.to_datetime(df.get("ClosedAtDate"), errors="coerce")
.sub(df["Event Date"])
.dt.days,
errors="coerce",
)
if "CRPInvolved" not in df.columns:
df["CRPInvolved"] = df.get("CRP Involved", pd.NA)
df = df[df["Event Date"] >= pd.Timestamp(start_date)].copy()
df["Year"] = df["Event Date"].dt.year
df["Month"] = df["Event Date"].dt.month
df["MonthName"] = df["Event Date"].dt.strftime("%b")
df["DOW"] = df["Event Date"].dt.day_name()
df["YearMonth"] = df["Event Date"].dt.to_period("M")
df["PD"] = df["Event Date"].apply(
lambda x: "pd1" if x < pd.Timestamp(split_date) else "pd2"
)
return df
def get_body_parts(series: pd.Series) -> pd.Series:
"""Split multi-value body part entries and normalise (legacy helper)."""
parts = []
for val in series.dropna():
for part in str(val).split(","):
part = part.strip()
if part and "unspecified" not in part.lower():
parts.append(part)
return pd.Series(parts)