379 lines
15 KiB
Python
379 lines
15 KiB
Python
"""
|
||
data_loader.py — Load and normalise the three SHEQ data sources.
|
||
|
||
Each loader returns a pandas DataFrame with normalised column names
|
||
(defined in config.py) so that downstream analysis code is insulated
|
||
from changes to the source file schema.
|
||
|
||
Public API
|
||
----------
|
||
load_events(filepath) -> pd.DataFrame
|
||
load_safety_energy(filepath) -> pd.DataFrame
|
||
load_llc_data(filepath) -> pd.DataFrame
|
||
load_all(events_path, se_path, llc_path) -> dict[str, pd.DataFrame]
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import warnings
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import pandas as pd
|
||
|
||
from config import (
|
||
EVENTS_COL_MAP,
|
||
SE_COL_MAP,
|
||
LLC_COL_MAP,
|
||
MODULE_TYPE_LABELS,
|
||
EVENTS_FILE,
|
||
SAFETY_ENERGY_FILE,
|
||
LLC_FILE,
|
||
)
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
# Suppress openpyxl "no default style" warnings
|
||
warnings.filterwarnings("ignore", category=UserWarning, module="openpyxl")
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Internal helpers
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def _resolve_col(df: pd.DataFrame, candidates: list[str], key: str) -> Optional[str]:
|
||
"""Return the first candidate column that exists in df, or None."""
|
||
for c in candidates:
|
||
if c in df.columns:
|
||
return c
|
||
log.debug("Column key '%s' not found (tried: %s)", key, candidates)
|
||
return None
|
||
|
||
|
||
def _parse_dates(series: pd.Series) -> pd.Series:
|
||
"""
|
||
Parse a date series that may contain:
|
||
- ISO strings "2024-01-15"
|
||
- Long-form strings "Monday, 15 January 2024"
|
||
- Excel datetime objects
|
||
Returns a tz-naive datetime64 series; unparseable values become NaT.
|
||
"""
|
||
if pd.api.types.is_datetime64_any_dtype(series):
|
||
return series.dt.tz_localize(None) if series.dt.tz is not None else series
|
||
|
||
def _parse_one(val):
|
||
if pd.isna(val):
|
||
return pd.NaT
|
||
s = str(val).strip()
|
||
# Strip leading day-of-week "Monday, " prefix from long-form dates
|
||
if "," in s and len(s.split(",")[0].split()) == 1:
|
||
s = s.split(",", 1)[1].strip()
|
||
try:
|
||
return pd.to_datetime(s, dayfirst=True)
|
||
except Exception:
|
||
return pd.NaT
|
||
|
||
return series.map(_parse_one)
|
||
|
||
|
||
def _remap(df: pd.DataFrame, col_map: dict[str, list[str]]) -> pd.DataFrame:
|
||
"""
|
||
Build a new DataFrame with normalised column names.
|
||
|
||
For each key in col_map, find the first matching source column and
|
||
rename it. Columns not mentioned in col_map are dropped. The
|
||
original source columns are preserved under their original names as
|
||
well, allowing callers to access additional fields if needed.
|
||
"""
|
||
# Keep all original columns; add normalised aliases
|
||
result = df.copy()
|
||
for norm_name, candidates in col_map.items():
|
||
src = _resolve_col(df, candidates, norm_name)
|
||
if src is not None and norm_name not in df.columns:
|
||
result[norm_name] = df[src]
|
||
elif src is not None:
|
||
result[norm_name] = df[src]
|
||
return result
|
||
|
||
|
||
def _null_rate(series: pd.Series) -> float:
|
||
"""Return fraction of null / empty values (0–1)."""
|
||
return series.isna().mean()
|
||
|
||
|
||
def _profile(df: pd.DataFrame, label: str) -> dict:
|
||
"""Return a simple quality profile dict for logging."""
|
||
return {
|
||
"source": label,
|
||
"rows": len(df),
|
||
"cols": len(df.columns),
|
||
"date_nulls": _null_rate(df.get("date", pd.Series(dtype="object"))),
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Events loader
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def load_events(filepath: str = EVENTS_FILE) -> pd.DataFrame:
|
||
"""
|
||
Load Events.xlsx and return a normalised DataFrame.
|
||
|
||
Normalised columns (see EVENTS_COL_MAP):
|
||
date, event_type, consequence, status, business_unit, project,
|
||
location, crp, root_cause_cat, root_cause_sub, injury_class,
|
||
body_part, brief_desc, event_desc, days_to_enter, event_lag,
|
||
report_lag, investigation_done, hipo, critical_event
|
||
|
||
Also adds:
|
||
year, month, year_month (Period[M])
|
||
"""
|
||
path = Path(filepath)
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"Events file not found: {filepath}")
|
||
|
||
log.info("Loading Events from %s", filepath)
|
||
raw = pd.read_excel(filepath)
|
||
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
|
||
|
||
df = _remap(raw, EVENTS_COL_MAP)
|
||
|
||
# Parse dates
|
||
df["date"] = _parse_dates(df["date"])
|
||
|
||
# Drop rows with no date
|
||
n_before = len(df)
|
||
df = df.dropna(subset=["date"]).copy()
|
||
if len(df) < n_before:
|
||
log.warning(" Dropped %d rows with missing date", n_before - len(df))
|
||
|
||
# Derived time fields
|
||
df["year"] = df["date"].dt.year
|
||
df["month"] = df["date"].dt.month
|
||
df["year_month"] = df["date"].dt.to_period("M")
|
||
df["dow"] = df["date"].dt.day_name()
|
||
|
||
# Normalise text fields
|
||
for col in ("event_type", "consequence", "business_unit", "project",
|
||
"root_cause_cat", "injury_class"):
|
||
if col in df.columns:
|
||
df[col] = df[col].astype(str).str.strip()
|
||
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
|
||
|
||
profile = _profile(df, "Events")
|
||
log.info(" Loaded %d events | BUs: %s",
|
||
profile["rows"],
|
||
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
|
||
|
||
return df
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Safety Energy loader
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def load_safety_energy(filepath: str = SAFETY_ENERGY_FILE) -> pd.DataFrame:
|
||
"""
|
||
Load Safety_Energy.xlsx and return a normalised DataFrame.
|
||
|
||
Safety Energy is the combined analytical domain covering all leading
|
||
activity types: LLC (Leader Learning Conversations), CCC (Critical
|
||
Control Checks), and OCC (Operational Control Checks).
|
||
|
||
Normalised columns (see SE_COL_MAP):
|
||
date, module_name, module_prefix, module_type, activity_type
|
||
(short label: LLC/CCC/OCC), leader, business_unit, project,
|
||
location, at_risk_aspects, total_questions, actions, atl_actions,
|
||
at_risk_crp, llc_topic, at_risk_obs, positive_obs, participants
|
||
|
||
Also adds:
|
||
year, month, year_month (Period[M])
|
||
activity_type — shortened label from MODULE_TYPE_LABELS
|
||
"""
|
||
path = Path(filepath)
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"Safety Energy file not found: {filepath}")
|
||
|
||
log.info("Loading Safety Energy from %s", filepath)
|
||
raw = pd.read_excel(filepath)
|
||
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
|
||
|
||
df = _remap(raw, SE_COL_MAP)
|
||
df["date"] = _parse_dates(df["date"])
|
||
|
||
n_before = len(df)
|
||
df = df.dropna(subset=["date"]).copy()
|
||
if len(df) < n_before:
|
||
log.warning(" Dropped %d rows with missing date", n_before - len(df))
|
||
|
||
# Shorten module_type to LLC / CCC / OCC label
|
||
df["activity_type"] = (
|
||
df["module_type"]
|
||
.map(MODULE_TYPE_LABELS)
|
||
.fillna(df.get("module_type", pd.Series(dtype="str")))
|
||
)
|
||
|
||
# Derived time fields
|
||
df["year"] = df["date"].dt.year
|
||
df["month"] = df["date"].dt.month
|
||
df["year_month"] = df["date"].dt.to_period("M")
|
||
|
||
# Normalise text
|
||
for col in ("business_unit", "project", "leader", "activity_type"):
|
||
if col in df.columns:
|
||
df[col] = df[col].astype(str).str.strip()
|
||
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
|
||
|
||
# Numeric fields — coerce to numeric safely
|
||
for col in ("at_risk_aspects", "total_questions", "actions", "atl_actions"):
|
||
if col in df.columns:
|
||
df[col] = pd.to_numeric(df[col], errors="coerce")
|
||
|
||
log.info(" Loaded %d activities | types: %s",
|
||
len(df),
|
||
df["activity_type"].value_counts().to_dict() if "activity_type" in df else "?")
|
||
|
||
return df
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# LLC Data loader
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def load_llc_data(filepath: str = LLC_FILE) -> pd.DataFrame:
|
||
"""
|
||
Load LLC_Data.xlsx and return a normalised DataFrame.
|
||
|
||
LLC_Data is a supplementary export of Leader Learning Conversations,
|
||
often containing richer free-text fields (topic, at-risk observations,
|
||
review & action notes) than the Safety_Energy export.
|
||
|
||
Normalised columns (see LLC_COL_MAP):
|
||
date, topic, leader, business_unit, project, location,
|
||
crp_focus, at_risk_obs, positive_obs, at_risk_flag, participants
|
||
|
||
Also adds:
|
||
year, month, year_month (Period[M])
|
||
"""
|
||
path = Path(filepath)
|
||
if not path.exists():
|
||
raise FileNotFoundError(f"LLC Data file not found: {filepath}")
|
||
|
||
log.info("Loading LLC Data from %s", filepath)
|
||
raw = pd.read_excel(filepath)
|
||
log.info(" Raw shape: %s rows × %s cols", *raw.shape)
|
||
|
||
df = _remap(raw, LLC_COL_MAP)
|
||
df["date"] = _parse_dates(df["date"])
|
||
|
||
n_before = len(df)
|
||
df = df.dropna(subset=["date"]).copy()
|
||
if len(df) < n_before:
|
||
log.warning(" Dropped %d rows with missing date", n_before - len(df))
|
||
|
||
df["year"] = df["date"].dt.year
|
||
df["month"] = df["date"].dt.month
|
||
df["year_month"] = df["date"].dt.to_period("M")
|
||
|
||
for col in ("business_unit", "project", "leader", "topic", "crp_focus"):
|
||
if col in df.columns:
|
||
df[col] = df[col].astype(str).str.strip()
|
||
df[col] = df[col].replace({"nan": pd.NA, "None": pd.NA, "": pd.NA})
|
||
|
||
# at_risk_flag is a count field in this export
|
||
if "at_risk_flag" in df.columns:
|
||
df["at_risk_flag"] = pd.to_numeric(df["at_risk_flag"], errors="coerce")
|
||
|
||
log.info(" Loaded %d LLC records | BUs: %s",
|
||
len(df),
|
||
list(df["business_unit"].dropna().unique()) if "business_unit" in df else "?")
|
||
|
||
return df
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Combined loader
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def load_all(
|
||
events_path: str = EVENTS_FILE,
|
||
se_path: str = SAFETY_ENERGY_FILE,
|
||
llc_path: str = LLC_FILE,
|
||
) -> dict[str, pd.DataFrame]:
|
||
"""
|
||
Load all three data sources and return a dict with keys:
|
||
'events' -> normalised Events DataFrame
|
||
'safety_energy' -> normalised Safety Energy DataFrame
|
||
'llc' -> normalised LLC Data DataFrame
|
||
|
||
Raises FileNotFoundError with a descriptive message if any file
|
||
is missing.
|
||
"""
|
||
return {
|
||
"events": load_events(events_path),
|
||
"safety_energy": load_safety_energy(se_path),
|
||
"llc": load_llc_data(llc_path),
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Backwards-compatibility shim for old analysis.py
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def load_and_prepare(filepath: str, start_date: str, split_date: str) -> pd.DataFrame:
|
||
"""
|
||
Backwards-compatible wrapper used by the old analysis.py module.
|
||
|
||
Returns Events data filtered to start_date onwards, with a 'PD'
|
||
column (pd1 / pd2) based on split_date.
|
||
"""
|
||
df = load_events(filepath)
|
||
|
||
# Rename normalised columns back to legacy names for old analysis.py
|
||
rename_map = {
|
||
"date": "Event Date",
|
||
"event_type": "Event Type",
|
||
"consequence": "Actual Consequence",
|
||
"crp": "CRPInvolved",
|
||
"root_cause_cat":"Root Cause Category",
|
||
"injury_class": "Ventia Injury Classification",
|
||
"body_part": "Bodily Location",
|
||
}
|
||
df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
|
||
|
||
# Handle missing columns that old code expects
|
||
if "Days to Investigate" not in df.columns:
|
||
df["Days to Investigate"] = df.get("event_lag", pd.Series(dtype="float64"))
|
||
if "Days to Close" not in df.columns:
|
||
df["Days to Close"] = pd.to_numeric(
|
||
pd.to_datetime(df.get("ClosedAtDate"), errors="coerce")
|
||
.sub(df["Event Date"])
|
||
.dt.days,
|
||
errors="coerce",
|
||
)
|
||
if "CRPInvolved" not in df.columns:
|
||
df["CRPInvolved"] = df.get("CRP Involved", pd.NA)
|
||
|
||
df = df[df["Event Date"] >= pd.Timestamp(start_date)].copy()
|
||
df["Year"] = df["Event Date"].dt.year
|
||
df["Month"] = df["Event Date"].dt.month
|
||
df["MonthName"] = df["Event Date"].dt.strftime("%b")
|
||
df["DOW"] = df["Event Date"].dt.day_name()
|
||
df["YearMonth"] = df["Event Date"].dt.to_period("M")
|
||
df["PD"] = df["Event Date"].apply(
|
||
lambda x: "pd1" if x < pd.Timestamp(split_date) else "pd2"
|
||
)
|
||
return df
|
||
|
||
|
||
def get_body_parts(series: pd.Series) -> pd.Series:
|
||
"""Split multi-value body part entries and normalise (legacy helper)."""
|
||
parts = []
|
||
for val in series.dropna():
|
||
for part in str(val).split(","):
|
||
part = part.strip()
|
||
if part and "unspecified" not in part.lower():
|
||
parts.append(part)
|
||
return pd.Series(parts)
|