Files
2026-04-20 15:23:18 +12:00

2252 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
analysis_engine.py — Core analytical layer for the SHEQ reporting tool.
Accepts normalised DataFrames from data_loader and produces a structured
results dict consumed by report_builder. All analysis is performed here;
report_builder only formats and writes.
Public API
----------
run_full_analysis(events, safety_energy, llc, start_date, split_date,
pd1_name, pd2_name, output_dir) -> AnalysisResults
"""
from __future__ import annotations
import logging
import os
import re
import warnings
from collections import Counter
from dataclasses import dataclass, field
from typing import Any, Optional
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import numpy as np
import pandas as pd
from config import (
CHART_PALETTE,
DEEP_BLUE, SKY_BLUE, DARK_GREEN, MID_GREEN, LIGHT_GREEN,
PURPLE, AMBER, RED, MUTED,
CONSEQUENCE_ORDER, CONSEQUENCE_SERIOUS,
LEADING_ACTIVITY_TYPES, ACTIVITY_COLOURS,
AT_RISK_KEYWORDS,
CORR_MIN_MONTHS, LEADER_MIN_ACTIVITIES,
TWO_YEAR_WINDOW_MONTHS, QUALITY_SCORE_BANDS,
)
log = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=UserWarning)
# ─────────────────────────────────────────────────────────────────────────────
# Result container
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class AnalysisResults:
"""All analysis outputs passed to report_builder."""
params: dict[str, Any] = field(default_factory=dict)
data_quality: dict[str, Any] = field(default_factory=dict)
events_summary: dict[str, Any] = field(default_factory=dict)
leading_summary: dict[str, Any] = field(default_factory=dict)
trends: dict[str, Any] = field(default_factory=dict)
effectiveness: dict[str, Any] = field(default_factory=dict)
at_risk: dict[str, Any] = field(default_factory=dict)
se_events_rel: dict[str, Any] = field(default_factory=dict)
focus_areas: dict[str, Any] = field(default_factory=dict)
charts: dict[str, str] = field(default_factory=dict) # name → file path
recommendations: list[str] = field(default_factory=list)
caveats: list[str] = field(default_factory=list)
# ─────────────────────────────────────────────────────────────────────────────
# Chart helpers
# ─────────────────────────────────────────────────────────────────────────────
def _setup_style() -> None:
"""Apply brand-aligned matplotlib defaults."""
import matplotlib.font_manager as fm
available = {f.name for f in fm.fontManager.ttflist}
if "Source Sans Pro" in available:
plt.rcParams["font.family"] = "Source Sans Pro"
elif "Source Sans 3" in available:
plt.rcParams["font.family"] = "Source Sans 3"
else:
plt.rcParams["font.family"] = "sans-serif"
plt.rcParams.update({
"axes.spines.top": False,
"axes.spines.right": False,
"axes.grid": True,
"grid.alpha": 0.3,
"grid.linestyle": "--",
})
def _save(fig: plt.Figure, path: str) -> None:
fig.tight_layout()
fig.savefig(path, dpi=180, bbox_inches="tight", facecolor="white")
plt.close(fig)
def _month_labels(periods: pd.PeriodIndex) -> list[str]:
return [p.strftime("%b %y") for p in periods]
QUALITY_TEXT_COLUMNS = [
"module_name",
"llc_topic",
"at_risk_obs",
"positive_obs",
"at_risk_crp",
"Immediate Actions Taken / Comments",
"Instruction",
"Top practices",
"Top improvement opportunities",
"Review & Action",
"Best practices shared with site leaders",
"Activity/Task",
"Custom",
]
INPUT_DEPTH_BASE_FIELDS = [
"module_name",
"module_prefix",
"leader",
"business_unit",
"project",
"location",
"shift",
]
INPUT_DEPTH_OPTIONAL_FIELDS = [
"participants",
"time_spent",
"at_risk_crp",
"llc_topic",
"at_risk_obs",
"positive_obs",
"find_fix",
"Immediate Actions Taken / Comments",
"Instruction",
"Top practices",
"Top improvement opportunities",
"Review & Action",
"Best practices shared with site leaders",
"Activity/Task",
"Custom",
]
INPUT_DEPTH_NUMERIC_FIELDS = [
"at_risk_aspects",
"total_questions",
"actions",
"atl_actions",
]
SEMANTIC_EMPTY_STRINGS = {
"", "n/a", "na", "nan", "nil", "none", "null", "unknown", "not applicable",
"no", "no risk identified", "no at risk identified", "no at risk situations identified",
}
ACTION_WORDS = {
"action", "address", "brief", "coach", "control", "correct", "escalate",
"fix", "follow", "improve", "implement", "isolate", "monitor", "plan",
"rectify", "reinforce", "repair", "replace", "review", "stop", "train",
"update", "verify",
}
LEARNING_WORDS = {
"awareness", "coach", "coaching", "discuss", "discussed", "education",
"explained", "feedback", "learn", "learning", "lesson", "mentor",
"reinforce", "reinforced", "reminded", "shared", "understand",
}
REACTIVE_WORDS = {
"breach", "defect", "failure", "incident", "issue", "non-compliance",
"not in place", "overdue", "unsafe", "failed",
}
PREVENTIVE_WORDS = {
"before", "brief", "coaching", "planned", "pre-start", "prepare",
"proactive", "reinforce", "review", "verify",
}
GENERIC_PATTERNS = [
"all good",
"n/a",
"na",
"nil",
"none",
"no issues",
"no at risk situations identified",
"no at risk identified",
"nothing noted",
"routine check",
]
def _safe_pct(numerator: float, denominator: float) -> float:
if denominator in (0, 0.0) or pd.isna(denominator):
return 0.0
return float(numerator) / float(denominator) * 100.0
def _normalise_text(text: Any) -> str:
if pd.isna(text):
return ""
return re.sub(r"\s+", " ", str(text)).strip()
def _is_meaningful_text(value: Any) -> bool:
text = _normalise_text(value).lower()
if not text:
return False
if text in SEMANTIC_EMPTY_STRINGS:
return False
return True
def _tokenise(text: str) -> list[str]:
return re.findall(r"[a-zA-Z][a-zA-Z0-9/&'-]+", text.lower())
def _theme_matches(text: str) -> set[str]:
tl = text.lower()
return {
theme for theme, keywords in AT_RISK_KEYWORDS.items()
if any(kw in tl for kw in keywords)
}
def _top_dict(series: pd.Series, limit: int = 10) -> dict[str, int]:
if series.empty:
return {}
cleaned = series.dropna().astype(str).str.strip()
cleaned = cleaned[cleaned.ne("") & cleaned.ne("nan")]
return cleaned.value_counts().head(limit).to_dict()
def _pick_window_start(max_date: pd.Timestamp) -> pd.Timestamp:
return (max_date.to_period("M") - (TWO_YEAR_WINDOW_MONTHS - 1)).to_timestamp()
def _build_quality_frame(se_window: pd.DataFrame) -> pd.DataFrame:
"""Create practical quality proxies for each Safety Energy record."""
if se_window.empty:
return se_window.copy()
df = se_window.copy()
text_cols = [c for c in QUALITY_TEXT_COLUMNS if c in df.columns]
if text_cols:
df["_text_blob"] = (
df[text_cols]
.fillna("")
.astype(str)
.agg(" ".join, axis=1)
.map(_normalise_text)
)
else:
df["_text_blob"] = ""
df["_tokens"] = df["_text_blob"].map(_tokenise)
df["_word_count"] = df["_tokens"].map(len)
df["_unique_words"] = df["_tokens"].map(lambda toks: len(set(toks)))
df["_contains_number"] = df["_text_blob"].str.contains(r"\d", regex=True, na=False)
df["_theme_count"] = df["_text_blob"].map(lambda t: len(_theme_matches(t)))
df["_action_words"] = df["_tokens"].map(lambda toks: len(set(toks) & ACTION_WORDS))
df["_learning_words"] = df["_tokens"].map(lambda toks: len(set(toks) & LEARNING_WORDS))
df["_reactive_words"] = df["_text_blob"].str.lower().map(
lambda txt: sum(1 for word in REACTIVE_WORDS if word in txt)
)
df["_preventive_words"] = df["_text_blob"].str.lower().map(
lambda txt: sum(1 for word in PREVENTIVE_WORDS if word in txt)
)
df["_generic_flag"] = (
(df["_word_count"] <= 4)
| df["_text_blob"].str.lower().map(lambda txt: any(p in txt for p in GENERIC_PATTERNS))
)
total_q = pd.to_numeric(df.get("total_questions"), errors="coerce").fillna(0)
at_risk = pd.to_numeric(df.get("at_risk_aspects"), errors="coerce").fillna(0)
actions = pd.to_numeric(df.get("actions"), errors="coerce").fillna(0)
atl_actions = pd.to_numeric(df.get("atl_actions"), errors="coerce").fillna(0)
base_fields = [c for c in INPUT_DEPTH_BASE_FIELDS if c in df.columns]
optional_fields = [c for c in INPUT_DEPTH_OPTIONAL_FIELDS if c in df.columns]
numeric_fields = [c for c in INPUT_DEPTH_NUMERIC_FIELDS if c in df.columns]
if base_fields:
df["_base_input_count"] = sum(df[col].map(_is_meaningful_text).astype(int) for col in base_fields)
else:
df["_base_input_count"] = 0
if optional_fields:
df["_optional_input_count"] = sum(df[col].map(_is_meaningful_text).astype(int) for col in optional_fields)
else:
df["_optional_input_count"] = 0
numeric_presence = []
for col in numeric_fields:
vals = pd.to_numeric(df[col], errors="coerce").fillna(0)
if col == "total_questions":
numeric_presence.append(vals.gt(0).astype(int))
else:
numeric_presence.append(vals.gt(0).astype(int))
df["_numeric_input_count"] = sum(numeric_presence) if numeric_presence else 0
max_points = max(1, len(base_fields) + len(optional_fields) + len(numeric_fields))
weighted_points = (
df["_base_input_count"] * 1.0
+ df["_optional_input_count"] * 1.2
+ df["_numeric_input_count"] * 1.0
)
weighted_max = max(
1.0,
len(base_fields) * 1.0 + len(optional_fields) * 1.2 + len(numeric_fields) * 1.0,
)
df["input_depth_score"] = np.clip((weighted_points / weighted_max) * 100.0, 0, 100).round(1)
df["input_depth_band"] = np.select(
[
df["input_depth_score"] >= 60,
df["input_depth_score"] >= 40,
df["input_depth_score"] >= 20,
],
["Rich", "Balanced", "Light"],
default="Sparse",
)
df["_follow_up_signal"] = (
(actions + atl_actions > 0)
| df["_text_blob"].str.lower().str.contains("follow up|review|action|close out|escalat", regex=True, na=False)
)
df["_risk_signal"] = (
(at_risk > 0)
| df["_theme_count"].gt(0)
| df["_text_blob"].str.lower().str.contains("critical risk|hazard|unsafe|control", regex=True, na=False)
)
df["_critical_control_signal"] = df.get(
"Was a critical risk identified and controls verified as effective and in place?",
pd.Series(index=df.index, dtype="object"),
).astype(str).str.lower().str.contains("yes|effective|verified", regex=True, na=False)
norm_text = (
df["_text_blob"].str.lower()
.str.replace(r"[^a-z0-9 ]", " ", regex=True)
.str.replace(r"\s+", " ", regex=True)
.str.strip()
)
freq = norm_text[norm_text.ne("")].value_counts()
df["_duplicate_flag"] = norm_text.map(freq).fillna(0).ge(3) & df["_word_count"].ge(5)
richness = (
np.where(df["_word_count"] >= 35, 22,
np.where(df["_word_count"] >= 20, 18,
np.where(df["_word_count"] >= 10, 12,
np.where(df["_word_count"] >= 5, 6, 0))))
)
specificity = (
np.where(df["_unique_words"] >= 18, 10, np.where(df["_unique_words"] >= 10, 6, 2))
+ np.where(df["_contains_number"], 4, 0)
+ np.where(df["_theme_count"] >= 2, 4, np.where(df["_theme_count"] == 1, 2, 0))
)
action_score = (
np.where(actions + atl_actions >= 2, 12, np.where(actions + atl_actions == 1, 8, 0))
+ np.where(df["_action_words"] >= 2, 6, np.where(df["_action_words"] == 1, 3, 0))
)
learning_score = (
np.where(df["_learning_words"] >= 2, 10, np.where(df["_learning_words"] == 1, 6, 0))
+ np.where(df["_text_blob"].str.lower().str.contains("best practice|lesson|learning|feedback", regex=True, na=False), 4, 0)
)
risk_score = (
np.where(df["_risk_signal"], 8, 0)
+ np.where(at_risk >= 2, 6, np.where(at_risk == 1, 3, 0))
+ np.where(df["_critical_control_signal"], 4, 0)
+ np.where((total_q > 0) & ((at_risk / total_q.replace(0, np.nan)).fillna(0) >= 0.1), 2, 0)
)
follow_up_score = (
np.where(df["_follow_up_signal"], 8, 0)
+ np.where(df["_text_blob"].str.lower().str.contains("close out|owner|due|monitor", regex=True, na=False), 4, 0)
)
penalty = (
np.where(df["_generic_flag"], 10, 0)
+ np.where(df["_duplicate_flag"], 8, 0)
+ np.where((df["_word_count"] <= 6) & (~df["_follow_up_signal"]) & (~df["_risk_signal"]), 8, 0)
)
df["quality_score"] = np.clip(
richness + specificity + action_score + learning_score + risk_score + follow_up_score - penalty,
0, 100,
).astype(int)
df["meaningful_flag"] = df["quality_score"] >= QUALITY_SCORE_BANDS["meaningful"]
df["high_value_flag"] = df["quality_score"] >= QUALITY_SCORE_BANDS["high_value"]
df["shallow_flag"] = df["quality_score"] <= QUALITY_SCORE_BANDS["shallow"]
df["reactive_flag"] = (
(df["_reactive_words"] > df["_preventive_words"])
| ((actions + atl_actions > 0) & at_risk.gt(0))
)
df["preventive_flag"] = (
(df["_preventive_words"] >= df["_reactive_words"])
& df["_risk_signal"]
& ~df["shallow_flag"]
)
df["repetitive_flag"] = df["_duplicate_flag"]
def _band(score: int) -> str:
if score >= QUALITY_SCORE_BANDS["high_value"]:
return "High value"
if score >= QUALITY_SCORE_BANDS["meaningful"]:
return "Meaningful"
if score <= QUALITY_SCORE_BANDS["shallow"]:
return "Shallow"
return "Mixed"
df["quality_band"] = df["quality_score"].map(_band)
return df
def _summarise_quality_slice(df: pd.DataFrame) -> dict[str, Any]:
if df.empty:
return {
"count": 0,
"avg_quality": 0.0,
"avg_input_depth": 0.0,
"meaningful_pct": 0.0,
"high_value_pct": 0.0,
"shallow_pct": 0.0,
"reactive_pct": 0.0,
"preventive_pct": 0.0,
"repetitive_pct": 0.0,
"follow_up_pct": 0.0,
"risk_signal_pct": 0.0,
"rich_input_pct": 0.0,
}
return {
"count": int(len(df)),
"avg_quality": round(float(df["quality_score"].mean()), 1),
"avg_input_depth": round(float(df["input_depth_score"].mean()), 1),
"meaningful_pct": round(_safe_pct(df["meaningful_flag"].sum(), len(df)), 1),
"high_value_pct": round(_safe_pct(df["high_value_flag"].sum(), len(df)), 1),
"shallow_pct": round(_safe_pct(df["shallow_flag"].sum(), len(df)), 1),
"reactive_pct": round(_safe_pct(df["reactive_flag"].sum(), len(df)), 1),
"preventive_pct": round(_safe_pct(df["preventive_flag"].sum(), len(df)), 1),
"repetitive_pct": round(_safe_pct(df["repetitive_flag"].sum(), len(df)), 1),
"follow_up_pct": round(_safe_pct(df["_follow_up_signal"].sum(), len(df)), 1),
"risk_signal_pct": round(_safe_pct(df["_risk_signal"].sum(), len(df)), 1),
"rich_input_pct": round(_safe_pct((df["input_depth_band"] == "Rich").sum(), len(df)), 1),
}
def _summarise_theme_trend(df: pd.DataFrame, recent_months: int = 6) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
if df.empty:
return [], []
theme_rows: list[dict[str, Any]] = []
for _, row in df.iterrows():
blob = row.get("_text_blob", "")
matches = _theme_matches(blob)
if not matches:
module_name = _normalise_text(row.get("module_name", ""))
if module_name:
matches = {module_name}
for theme in matches:
theme_rows.append({"year_month": row["year_month"], "theme": theme})
if not theme_rows:
return [], []
theme_df = pd.DataFrame(theme_rows)
monthly = (
theme_df.groupby(["theme", "year_month"]).size()
.unstack(fill_value=0)
.sort_index(axis=1)
)
recent_cols = monthly.columns[-recent_months:]
prior_cols = monthly.columns[-(recent_months * 2):-recent_months]
if len(recent_cols) == 0:
return [], []
rows: list[dict[str, Any]] = []
for theme, vals in monthly.iterrows():
recent_avg = float(vals[recent_cols].mean())
prior_avg = float(vals[prior_cols].mean()) if len(prior_cols) else 0.0
delta = recent_avg - prior_avg
if delta == 0:
continue
rows.append({
"theme": str(theme),
"recent_avg": round(recent_avg, 2),
"prior_avg": round(prior_avg, 2),
"delta": round(delta, 2),
})
rising = sorted([r for r in rows if r["delta"] > 0], key=lambda r: (-r["delta"], -r["recent_avg"]))[:6]
declining = sorted([r for r in rows if r["delta"] < 0], key=lambda r: (r["delta"], -r["recent_avg"]))[:6]
return rising, declining
def _input_depth_insights(df: pd.DataFrame) -> dict[str, Any]:
if df.empty or "input_depth_score" not in df.columns:
return {
"correlation": None,
"by_band": [],
"note": "No input-depth insight available.",
}
corr = None
if df["input_depth_score"].nunique() > 1 and df["quality_score"].nunique() > 1:
corr = round(float(df["input_depth_score"].corr(df["quality_score"])), 2)
band_order = ["Sparse", "Light", "Balanced", "Rich"]
grouped = (
df.groupby("input_depth_band")
.agg(
count=("quality_score", "size"),
avg_quality=("quality_score", "mean"),
meaningful_pct=("meaningful_flag", lambda s: _safe_pct(s.sum(), len(s))),
high_value_pct=("high_value_flag", lambda s: _safe_pct(s.sum(), len(s))),
shallow_pct=("shallow_flag", lambda s: _safe_pct(s.sum(), len(s))),
avg_input_depth=("input_depth_score", "mean"),
)
.reset_index()
)
grouped["band_order"] = grouped["input_depth_band"].map({b: i for i, b in enumerate(band_order)})
grouped = grouped.sort_values("band_order")
by_band = [
{
"band": r["input_depth_band"],
"count": int(r["count"]),
"avg_input_depth": round(float(r["avg_input_depth"]), 1),
"avg_quality": round(float(r["avg_quality"]), 1),
"meaningful_pct": round(float(r["meaningful_pct"]), 1),
"high_value_pct": round(float(r["high_value_pct"]), 1),
"shallow_pct": round(float(r["shallow_pct"]), 1),
}
for _, r in grouped.iterrows()
]
note = "Input depth appears usable as a supporting quality metric."
if corr is None:
note = "Input depth could not be tested reliably against quality because there was not enough variation."
elif corr < 0.25:
note = "Input depth is only weakly aligned with overall quality, so it should remain a secondary metric."
elif corr < 0.5:
note = "Input depth is moderately aligned with overall quality and is useful as a supporting metric."
return {
"correlation": corr,
"by_band": by_band,
"note": note,
}
def _analyse_two_year_trends(
se: pd.DataFrame,
llc: pd.DataFrame,
events: pd.DataFrame,
start_date: str,
) -> dict[str, Any]:
"""
Build a rolling two-year Safety Energy trend and quality view.
Counts come from Safety Energy. Quality is inferred through practical
proxies such as text richness, specificity, action/follow-up signals,
hazard recognition, and repeated generic entries.
"""
if se.empty:
return {"note": "No Safety Energy data available for two-year trend analysis."}
max_date = se["date"].max()
window_start = _pick_window_start(max_date)
requested_start = pd.Timestamp(start_date)
se_window = se[(se["date"] >= window_start) & (se["date"] <= max_date)].copy()
llc_window = llc[(llc["date"] >= window_start) & (llc["date"] <= max_date)].copy()
events_window = events[(events["date"] >= window_start) & (events["date"] <= max_date)].copy()
if se_window.empty:
return {"note": "No Safety Energy records fall within the rolling two-year window."}
quality_df = _build_quality_frame(se_window)
all_months = pd.period_range(
quality_df["date"].min().to_period("M"),
quality_df["date"].max().to_period("M"),
freq="M",
)
all_quarters = pd.period_range(
quality_df["date"].min().to_period("Q"),
quality_df["date"].max().to_period("Q"),
freq="Q",
)
monthly_counts = (
quality_df.groupby(["year_month", "activity_type"]).size()
.unstack(fill_value=0)
.reindex(all_months, fill_value=0)
)
quarterly_counts = (
quality_df.assign(year_quarter=quality_df["date"].dt.to_period("Q"))
.groupby(["year_quarter", "activity_type"]).size()
.unstack(fill_value=0)
.reindex(all_quarters, fill_value=0)
)
monthly_quality = (
quality_df.groupby(["year_month", "activity_type"])["quality_score"].mean()
.unstack(fill_value=np.nan)
.reindex(all_months)
)
activity_insights: dict[str, Any] = {}
quality_rows: list[dict[str, Any]] = []
bu_snapshots: dict[str, list[dict[str, Any]]] = {}
low_value_units: list[dict[str, Any]] = []
depth_insights_by_type: dict[str, Any] = {}
for atype in LEADING_ACTIVITY_TYPES:
sub = quality_df[quality_df["activity_type"] == atype].copy()
if sub.empty:
continue
module_topics = _top_dict(sub.get("module_name", pd.Series(dtype="object")), 8)
text_topics = _top_dict(sub.get("llc_topic", pd.Series(dtype="object")), 8)
sub_events = (
events_window[events_window["business_unit"].isin(sub["business_unit"].dropna().unique())]
if "business_unit" in events_window.columns else pd.DataFrame()
)
summary = _summarise_quality_slice(sub)
depth_insights = _input_depth_insights(sub)
summary.update({
"top_modules": module_topics,
"top_topics": text_topics,
"avg_at_risk": round(float(pd.to_numeric(sub.get("at_risk_aspects"), errors="coerce").fillna(0).mean()), 2),
"avg_actions": round(float(pd.to_numeric(sub.get("actions"), errors="coerce").fillna(0).mean()), 2),
})
depth_insights_by_type[atype] = depth_insights
bu_rows: list[dict[str, Any]] = []
if "business_unit" in sub.columns:
grouped = (
sub.groupby("business_unit")
.agg(
count=("quality_score", "size"),
avg_quality=("quality_score", "mean"),
shallow_pct=("shallow_flag", lambda s: _safe_pct(s.sum(), len(s))),
high_value_pct=("high_value_flag", lambda s: _safe_pct(s.sum(), len(s))),
repetitive_pct=("repetitive_flag", lambda s: _safe_pct(s.sum(), len(s))),
)
.reset_index()
)
grouped = grouped[grouped["count"] >= 20].sort_values(["avg_quality", "count"], ascending=[False, False])
bu_rows = [
{
"business_unit": r["business_unit"],
"count": int(r["count"]),
"avg_quality": round(float(r["avg_quality"]), 1),
"shallow_pct": round(float(r["shallow_pct"]), 1),
"high_value_pct": round(float(r["high_value_pct"]), 1),
"repetitive_pct": round(float(r["repetitive_pct"]), 1),
}
for _, r in grouped.iterrows()
]
bu_snapshots[atype] = bu_rows[:8]
for row in bu_rows:
if row["count"] >= 30 and row["shallow_pct"] >= 45:
low_value_units.append({
"activity_type": atype,
"business_unit": row["business_unit"],
"count": row["count"],
"avg_quality": row["avg_quality"],
"shallow_pct": row["shallow_pct"],
})
quality_rows.append({
"activity_type": atype,
"count": summary["count"],
"avg_quality": summary["avg_quality"],
"avg_input_depth": summary["avg_input_depth"],
"meaningful_pct": summary["meaningful_pct"],
"high_value_pct": summary["high_value_pct"],
"shallow_pct": summary["shallow_pct"],
"preventive_pct": summary["preventive_pct"],
"reactive_pct": summary["reactive_pct"],
"repetitive_pct": summary["repetitive_pct"],
"follow_up_pct": summary["follow_up_pct"],
"rich_input_pct": summary["rich_input_pct"],
})
yoy = {}
if len(sub["year"].dropna().unique()) >= 2:
yearly = sub.groupby("year").agg(
count=("quality_score", "size"),
quality=("quality_score", "mean"),
meaningful=("meaningful_flag", "mean"),
).sort_index()
if len(yearly) >= 2:
prev = yearly.iloc[-2]
curr = yearly.iloc[-1]
yoy = {
"count_change_pct": round(_safe_pct(curr["count"] - prev["count"], prev["count"]), 1),
"quality_change": round(float(curr["quality"] - prev["quality"]), 1),
"meaningful_change_pct": round((float(curr["meaningful"]) - float(prev["meaningful"])) * 100, 1),
}
activity_insights[atype] = {
**summary,
"top_modules": module_topics,
"top_topics": text_topics,
"business_units": bu_rows,
"input_depth": depth_insights,
"yoy": yoy,
}
overall_themes = Counter()
for blob in quality_df["_text_blob"]:
matches = _theme_matches(blob)
for theme in matches:
overall_themes[theme] += 1
ccc_df = quality_df[quality_df["activity_type"] == "CCC"].copy()
ccc_rising, ccc_declining = _summarise_theme_trend(ccc_df)
overall_rising, overall_declining = _summarise_theme_trend(quality_df)
high_volume_low_value = sorted(
low_value_units,
key=lambda r: (-r["count"], -r["shallow_pct"], r["avg_quality"]),
)[:8]
recurring_modules: list[dict[str, Any]] = []
if "module_name" in ccc_df.columns:
module_summary = (
ccc_df.groupby("module_name")
.agg(
count=("quality_score", "size"),
avg_quality=("quality_score", "mean"),
repetitive_pct=("repetitive_flag", lambda s: _safe_pct(s.sum(), len(s))),
shallow_pct=("shallow_flag", lambda s: _safe_pct(s.sum(), len(s))),
)
.reset_index()
.sort_values("count", ascending=False)
)
recurring_modules = [
{
"module_name": r["module_name"],
"count": int(r["count"]),
"avg_quality": round(float(r["avg_quality"]), 1),
"repetitive_pct": round(float(r["repetitive_pct"]), 1),
"shallow_pct": round(float(r["shallow_pct"]), 1),
}
for _, r in module_summary.head(10).iterrows()
]
monthly_mix = [
{
"period": str(period),
**{atype: int(monthly_counts.loc[period, atype]) if atype in monthly_counts.columns else 0 for atype in LEADING_ACTIVITY_TYPES},
}
for period in all_months
]
quality_monthly_rows = [
{
"period": str(period),
**{
atype: round(float(monthly_quality.loc[period, atype]), 1)
if atype in monthly_quality.columns and pd.notna(monthly_quality.loc[period, atype]) else None
for atype in LEADING_ACTIVITY_TYPES
},
}
for period in all_months
]
quarter_rows = [
{
"period": str(period),
**{atype: int(quarterly_counts.loc[period, atype]) if atype in quarterly_counts.columns else 0 for atype in LEADING_ACTIVITY_TYPES},
}
for period in all_quarters
]
seasonality = (
quality_df.assign(month_name=quality_df["date"].dt.month_name())
.groupby("month_name").size().sort_values(ascending=False)
)
overall_input_depth = _input_depth_insights(quality_df)
executive_summary: list[str] = []
ccc_summary = activity_insights.get("CCC", {})
occ_summary = activity_insights.get("OCC", {})
llc_summary = activity_insights.get("LLC", {})
if ccc_summary:
executive_summary.append(
f"CCCs averaged a quality score of {ccc_summary.get('avg_quality', 0):.1f}/100 over the last "
f"{len(all_months)} months, with {ccc_summary.get('shallow_pct', 0):.1f}% assessed as shallow "
f"and {ccc_summary.get('high_value_pct', 0):.1f}% assessed as high value."
)
if high_volume_low_value:
hv = high_volume_low_value[0]
executive_summary.append(
f"{hv['business_unit']} shows a high-volume / low-value pattern in {hv['activity_type']} activity: "
f"{hv['count']} records with average quality {hv['avg_quality']:.1f} and {hv['shallow_pct']:.1f}% shallow entries."
)
if overall_rising:
executive_summary.append(
f"Emerging Safety Energy themes in the recent six months include "
f"{', '.join(r['theme'] for r in overall_rising[:3])}."
)
if llc_summary and occ_summary:
stronger = "LLC" if llc_summary.get("avg_quality", 0) >= occ_summary.get("avg_quality", 0) else "OCC"
executive_summary.append(
f"{stronger} records currently show the strongest overall documentation quality among the three leading activity types."
)
if overall_input_depth.get("correlation") is not None:
executive_summary.append(
f"Input depth and quality are correlated at r = {overall_input_depth['correlation']:.2f}, indicating that fuller records are a useful supporting signal for activity quality."
)
leadership_focus: list[str] = []
if ccc_summary.get("repetitive_pct", 0) >= 20:
leadership_focus.append(
"CCC records show a material level of repeated or duplicated wording, suggesting some checks may be drifting toward compliance-only completion."
)
if ccc_declining:
leadership_focus.append(
f"CCC focus on {', '.join(item['theme'] for item in ccc_declining[:3])} has reduced in the recent six months; confirm this is intentional rather than a blind spot."
)
if overall_declining:
leadership_focus.append(
f"Previously visible themes such as {', '.join(item['theme'] for item in overall_declining[:3])} are appearing less often in recorded activity narratives."
)
if ccc_summary.get("follow_up_pct", 0) < 35:
leadership_focus.append(
"A relatively low share of CCCs contain clear follow-up or close-out signals, which weakens the evidence that issues identified in checks are being converted into learning and action."
)
if overall_input_depth.get("correlation") is not None and overall_input_depth["correlation"] >= 0.4:
leadership_focus.append(
"Rows with richer input depth are materially more likely to read as meaningful records, so populated-field depth can be used as a practical early warning metric for declining quality."
)
if not leadership_focus:
leadership_focus.append(
"No dominant low-value pattern was detected across the full two-year window, but monthly quality should still be monitored for slippage."
)
recommendations = [
"Use CCC quality, not just CCC count, as a leadership KPI. Track shallow-entry rate, follow-up rate, and repeated wording monthly.",
"Review the highest-volume low-value Business Units with their leaders and sample the underlying records to confirm whether quality concerns are real or data-entry related.",
"Push recurring CCC/OCC themes that show little improvement into board-level focus areas where repeated exposure is visible but learning evidence is weak.",
]
return {
"window_start": window_start.strftime("%Y-%m-%d"),
"window_end": max_date.strftime("%Y-%m-%d"),
"window_months": int(len(all_months)),
"requested_start_date": requested_start.strftime("%Y-%m-%d"),
"monthly_mix": monthly_mix,
"quarterly_mix": quarter_rows,
"monthly_quality": quality_monthly_rows,
"quality_by_type": quality_rows,
"activity_insights": activity_insights,
"input_depth": overall_input_depth,
"input_depth_by_type": depth_insights_by_type,
"top_themes": dict(overall_themes.most_common(10)),
"rising_themes": overall_rising,
"declining_themes": overall_declining,
"ccc_rising_themes": ccc_rising,
"ccc_declining_themes": ccc_declining,
"ccc_recurring_modules": recurring_modules,
"high_volume_low_value": high_volume_low_value,
"bu_quality_snapshots": bu_snapshots,
"seasonality": {k: int(v) for k, v in seasonality.head(6).items()},
"executive_summary": executive_summary,
"leadership_focus": leadership_focus,
"recommendations": recommendations,
"proxy_note": (
"Quality is inferred using practical proxies: richness and specificity of text, risk recognition, "
"action/follow-up language, evidence of learning, input depth across useful fields, and penalties for generic or repeated wording. "
"These scores indicate likely value, not definitive assurance."
),
"note": (
"The deeper Safety Energy analysis uses a rolling two-year window ending on the latest Safety Energy record. "
"Counts come from Safety_Energy.xlsx; LLC_Data is used separately elsewhere for richer LLC theme detail."
),
"_quality_df": quality_df,
"_llc_window": llc_window,
}
# ─────────────────────────────────────────────────────────────────────────────
# Data quality profiling
# ─────────────────────────────────────────────────────────────────────────────
def _profile_data_quality(
events: pd.DataFrame,
se: pd.DataFrame,
llc: pd.DataFrame,
) -> dict[str, Any]:
"""
Summarise row counts, date coverage, and null rates for key fields.
"""
def _date_range(df: pd.DataFrame) -> tuple[str, str]:
mn = df["date"].min()
mx = df["date"].max()
return (
mn.strftime("%d %b %Y") if pd.notna(mn) else "N/A",
mx.strftime("%d %b %Y") if pd.notna(mx) else "N/A",
)
def _null_pct(df: pd.DataFrame, col: str) -> str:
if col not in df.columns:
return "N/A"
return f"{df[col].isna().mean() * 100:.1f}%"
ev_range = _date_range(events)
se_range = _date_range(se)
llc_range = _date_range(llc)
return {
"events": {
"rows": len(events),
"date_from": ev_range[0],
"date_to": ev_range[1],
"null_event_type": _null_pct(events, "event_type"),
"null_consequence": _null_pct(events, "consequence"),
"null_business_unit": _null_pct(events, "business_unit"),
"null_root_cause": _null_pct(events, "root_cause_cat"),
"duplicate_ids": int(events["EventID"].duplicated().sum()) if "EventID" in events.columns else "N/A",
},
"safety_energy": {
"rows": len(se),
"date_from": se_range[0],
"date_to": se_range[1],
"type_breakdown": se["activity_type"].value_counts().to_dict() if "activity_type" in se else {},
"null_leader": _null_pct(se, "leader"),
"null_bu": _null_pct(se, "business_unit"),
},
"llc": {
"rows": len(llc),
"date_from": llc_range[0],
"date_to": llc_range[1],
"null_topic": _null_pct(llc, "topic"),
"null_leader": _null_pct(llc, "leader"),
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Events analysis
# ─────────────────────────────────────────────────────────────────────────────
def _parse_time_to_hour(value: Any) -> Optional[int]:
if pd.isna(value):
return None
text = str(value).strip()
if not text or text.lower() == "nan":
return None
parsed = pd.to_datetime(text, errors="coerce")
if pd.notna(parsed):
return int(parsed.hour)
match = re.search(r"(\d{1,2}):(\d{2})", text)
if match:
return int(match.group(1))
return None
def _time_bucket(hour: Optional[int]) -> str:
if hour is None:
return "Unknown"
if 0 <= hour < 6:
return "Night (00:00-05:59)"
if 6 <= hour < 12:
return "Morning (06:00-11:59)"
if 12 <= hour < 18:
return "Afternoon (12:00-17:59)"
return "Evening (18:00-23:59)"
def _analyse_events(
events: pd.DataFrame,
start_date: str,
split_date: str,
pd1_name: str,
pd2_name: str,
) -> dict[str, Any]:
"""Whole-of-period events view with serious-event, timing, and MVE insight."""
df = events[events["date"] >= pd.Timestamp(start_date)].copy()
if df.empty:
return {"total": 0, "_df": df}
def _pct(n: int, total: int) -> str:
return f"{n/max(total,1)*100:.1f}%"
def _cons_counts(sub: pd.DataFrame) -> dict[str, int]:
if "consequence" not in sub:
return {}
return {c: int((sub["consequence"] == c).sum()) for c in CONSEQUENCE_ORDER}
def _crp_counts(sub: pd.DataFrame) -> dict[str, int]:
if "crp" not in sub:
return {}
vc = sub["crp"].dropna().astype(str).str.strip().value_counts()
vc = vc[~vc.index.isin(["None Identified", "Under Investigation", "nan", ""])]
return vc.head(10).to_dict()
monthly_all = (
df.groupby("year_month").size()
.reindex(
pd.period_range(df["date"].min().to_period("M"), df["date"].max().to_period("M"), freq="M"),
fill_value=0,
)
)
months = max(1, len(monthly_all))
injury_class = df["injury_class"].value_counts().to_dict() if "injury_class" in df.columns else {}
serious = df[df["consequence"].isin(CONSEQUENCE_SERIOUS)].copy() if "consequence" in df.columns else df.iloc[0:0]
if "Time of Event" in serious.columns:
serious["_event_hour"] = serious["Time of Event"].map(_parse_time_to_hour)
serious["_time_bucket"] = serious["_event_hour"].map(_time_bucket)
time_bucket_counts = serious["_time_bucket"].value_counts().to_dict()
time_hour_counts = serious["_event_hour"].dropna().astype(int).value_counts().sort_index().to_dict()
timed_serious = int(serious["_event_hour"].notna().sum())
else:
time_bucket_counts = {}
time_hour_counts = {}
timed_serious = 0
motor = (
df[df["event_type"].astype(str).str.contains("motor|vehicle|mva|traffic", case=False, na=False)].copy()
if "event_type" in df.columns else df.iloc[0:0]
)
serious_motor = motor[motor["consequence"].isin(CONSEQUENCE_SERIOUS)].copy() if "consequence" in motor.columns else motor.iloc[0:0]
return {
"total": len(df),
"date_from": df["date"].min().strftime("%d %b %Y"),
"date_to": df["date"].max().strftime("%d %b %Y"),
"months": months,
"events_per_month": round(len(df) / months, 1),
"serious_count": int(len(serious)),
"serious_pct": _pct(len(serious), len(df)),
"lti_count": int(injury_class.get("Lost Time Injury", 0)),
"fai_count": int(injury_class.get("First Aid Treatment", 0)),
"event_type_counts": df["event_type"].value_counts().to_dict() if "event_type" in df.columns else {},
"consequence_counts": _cons_counts(df),
"crp_counts": _crp_counts(df),
"root_cause_counts": df["root_cause_cat"].value_counts().head(10).to_dict() if "root_cause_cat" in df.columns else {},
"serious_projects": serious["project"].value_counts().head(8).to_dict() if "project" in serious.columns else {},
"serious_locations": serious["location"].value_counts().head(8).to_dict() if "location" in serious.columns else {},
"serious_bus": serious["business_unit"].value_counts().head(8).to_dict() if "business_unit" in serious.columns else {},
"serious_time_buckets": time_bucket_counts,
"serious_time_hours": {str(k): int(v) for k, v in time_hour_counts.items()},
"serious_time_coverage_pct": round(_safe_pct(timed_serious, len(serious)), 1) if len(serious) else 0.0,
"motor_vehicle": {
"count": int(len(motor)),
"pct_total": round(_safe_pct(len(motor), len(df)), 1),
"serious_count": int(len(serious_motor)),
"serious_pct_within_mve": round(_safe_pct(len(serious_motor), len(motor)), 1) if len(motor) else 0.0,
"consequence_counts": _cons_counts(motor),
"top_projects": motor["project"].value_counts().head(8).to_dict() if "project" in motor.columns else {},
"top_locations": motor["location"].value_counts().head(8).to_dict() if "location" in motor.columns else {},
"road_types": _top_dict(motor.get("Road Type", pd.Series(dtype="object")), 6),
"conditions": _top_dict(motor.get("Road Conditions", pd.Series(dtype="object")), 6),
"vehicle_types": _top_dict(motor.get("Type of vehicle involved", pd.Series(dtype="object")), 6),
},
"monthly_all": {str(k): int(v) for k, v in monthly_all.items()},
"_df": df,
"_serious": serious,
"_motor": motor,
}
# ─────────────────────────────────────────────────────────────────────────────
# Leading activity analysis
# ─────────────────────────────────────────────────────────────────────────────
def _analyse_leading(
se: pd.DataFrame,
llc: pd.DataFrame,
start_date: str,
) -> dict[str, Any]:
"""
Summarise leading activities from Safety Energy + LLC Data.
LLC_Data is used for its richer free-text (topic, CRP focus, observations).
Safety Energy provides the authoritative counts for all three activity types.
"""
se_f = se[se["date"] >= pd.Timestamp(start_date)].copy()
# Monthly counts by activity type
monthly_by_type: dict[str, dict[str, int]] = {}
all_months = pd.period_range(se_f["date"].min().to_period("M"),
se_f["date"].max().to_period("M"), freq="M")
for atype in LEADING_ACTIVITY_TYPES:
sub = se_f[se_f["activity_type"] == atype]
monthly = sub.groupby("year_month").size().reindex(all_months, fill_value=0)
monthly_by_type[atype] = {str(k): int(v) for k, v in monthly.items()}
# BU breakdown
bu_by_type: dict[str, dict[str, int]] = {}
for atype in LEADING_ACTIVITY_TYPES:
sub = se_f[se_f["activity_type"] == atype]
if "business_unit" in sub:
bu_by_type[atype] = sub["business_unit"].value_counts().to_dict()
# Top leaders (LLC only, from LLC_Data for richer detail)
llc_f = llc[llc["date"] >= pd.Timestamp(start_date)].copy()
top_leaders: dict[str, int] = {}
if "leader" in llc_f:
top_leaders = (
llc_f["leader"].dropna().value_counts()
.head(15).to_dict()
)
# LLC topics
top_topics: dict[str, int] = {}
if "topic" in llc_f:
top_topics = (
llc_f["topic"].dropna()
.str.strip()
.value_counts()
.head(15).to_dict()
)
# CRP focus areas in LLCs
crp_focus: dict[str, int] = {}
if "crp_focus" in llc_f:
crp_focus = (
llc_f["crp_focus"].dropna()
.str.strip()
.value_counts()
.head(10).to_dict()
)
# At-risk flags from LLC_Data
at_risk_total = 0
if "at_risk_flag" in llc_f:
at_risk_total = int(llc_f["at_risk_flag"].sum())
# Overall totals
totals = se_f["activity_type"].value_counts().to_dict()
# Average at-risk aspects per activity
avg_at_risk: dict[str, float] = {}
if "at_risk_aspects" in se_f:
for atype in LEADING_ACTIVITY_TYPES:
sub = se_f[se_f["activity_type"] == atype]
val = sub["at_risk_aspects"].mean()
avg_at_risk[atype] = round(float(val), 2) if pd.notna(val) else 0.0
# Monthly total (all types combined)
monthly_total = (
se_f.groupby("year_month").size()
.reindex(all_months, fill_value=0)
)
# Trend direction (slope of last 6 months vs prior 6 months)
def _trend_dir(series: pd.Series) -> str:
if len(series) < 4:
return "insufficient data"
recent = series.iloc[-min(6, len(series)):]
prior = series.iloc[max(0, -12):-6] if len(series) >= 12 else series.iloc[:max(1, len(series)-6)]
if prior.mean() == 0:
return "no prior baseline"
change = (recent.mean() - prior.mean()) / prior.mean() * 100
if change > 10:
return f"increasing (+{change:.0f}%)"
elif change < -10:
return f"declining ({change:.0f}%)"
return f"stable ({change:+.0f}%)"
activity_trend = _trend_dir(monthly_total)
return {
"totals": totals,
"monthly_by_type": monthly_by_type,
"monthly_total": {str(k): int(v) for k, v in monthly_total.items()},
"bu_by_type": bu_by_type,
"top_leaders": top_leaders,
"top_topics": top_topics,
"crp_focus": crp_focus,
"at_risk_total_llc": at_risk_total,
"avg_at_risk": avg_at_risk,
"activity_trend": activity_trend,
"all_months": [str(m) for m in all_months],
"_se_f": se_f,
"_llc_f": llc_f,
}
# ─────────────────────────────────────────────────────────────────────────────
# Effectiveness analysis
# ─────────────────────────────────────────────────────────────────────────────
def _analyse_effectiveness(
events_result: dict,
leading_result: dict,
) -> dict[str, Any]:
"""
Assess whether leading activities appear associated with event outcomes.
Uses Business Unit as the common grouping dimension.
All language is deliberately cautious (associated with, may indicate).
"""
ev_df = events_result.get("_df")
se_f = leading_result.get("_se_f")
if ev_df is None or se_f is None or "business_unit" not in ev_df.columns:
return {"note": "Insufficient data for effectiveness analysis."}
# BU-level: total leading activities vs total events
bu_activities = se_f.groupby("business_unit").size().rename("activities")
bu_events = ev_df.groupby("business_unit").size().rename("events")
bu_table = pd.concat([bu_activities, bu_events], axis=1).fillna(0)
bu_table.columns = ["activities", "events"]
bu_table = bu_table[bu_table["activities"] > 0].sort_values("activities", ascending=False)
# Monthly correlation: do more activities in month M associate with fewer events?
# Use a 1-month lag (activities in M, events in M+1)
monthly_acts = se_f.groupby("year_month").size()
monthly_events = ev_df.groupby("year_month").size()
common_months = monthly_acts.index.intersection(monthly_events.index)
corr_value: Optional[float] = None
corr_note = "Insufficient overlapping months for correlation analysis."
if len(common_months) >= CORR_MIN_MONTHS:
a_vals = monthly_acts.reindex(common_months, fill_value=0).values
e_vals = monthly_events.reindex(common_months, fill_value=0).values
if np.std(a_vals) > 0 and np.std(e_vals) > 0:
corr_value = float(np.corrcoef(a_vals, e_vals)[0, 1])
direction = "positive" if corr_value > 0 else "negative"
strength = "weak" if abs(corr_value) < 0.3 else ("moderate" if abs(corr_value) < 0.6 else "strong")
corr_note = (
f"A {strength} {direction} association (r = {corr_value:.2f}) was observed "
f"between monthly leading-activity counts and monthly event counts across "
f"{len(common_months)} overlapping months. "
+ ("This may warrant further review — high activity volumes and high event rates "
"in the same periods could reflect reactive activity rather than prevention."
if corr_value > 0.3 else
"A negative association is consistent with leading activities having a "
"preventive effect, though causation cannot be assumed from this data alone."
if corr_value < -0.3 else
"No strong directional association was identified.")
)
# BUs with high activity AND high events (possible reactive pattern)
high_both: list[str] = []
high_acts_low_events: list[str] = []
if len(bu_table) >= 2:
act_median = bu_table["activities"].median()
evt_median = bu_table["events"].median()
for bu, row in bu_table.iterrows():
if row["activities"] >= act_median and row["events"] >= evt_median:
high_both.append(str(bu))
elif row["activities"] >= act_median and row["events"] < evt_median:
high_acts_low_events.append(str(bu))
return {
"bu_table": bu_table.reset_index().to_dict("records"),
"corr_value": corr_value,
"corr_note": corr_note,
"high_activity_high_events": high_both,
"high_activity_low_events": high_acts_low_events,
"note": (
"Effectiveness analysis uses business unit-level and monthly aggregates. "
"All associations are indicative only — correlation does not imply causation."
),
}
# ─────────────────────────────────────────────────────────────────────────────
# At-risk behaviour analysis
# ─────────────────────────────────────────────────────────────────────────────
def _extract_at_risk_themes(
events: pd.DataFrame,
se: pd.DataFrame,
llc: pd.DataFrame,
start_date: str,
) -> dict[str, Any]:
"""
Extract at-risk behaviour themes using keyword matching against free-text
fields in LLC_Data, Safety Energy, and Events.
No cloud APIs; all processing is local.
"""
ev_f = events[events["date"] >= pd.Timestamp(start_date)]
llc_f = llc[llc["date"] >= pd.Timestamp(start_date)]
se_f = se[se["date"] >= pd.Timestamp(start_date)]
# Collect text blobs from each source
ev_texts = _collect_text(ev_f, ["brief_desc", "event_desc", "root_cause_cat"])
llc_texts = _collect_text(llc_f, ["topic", "at_risk_obs", "crp_focus"])
se_texts = _collect_text(se_f, ["llc_topic", "at_risk_obs"])
def _score(texts: list[str]) -> dict[str, int]:
counts: Counter = Counter()
for text in texts:
tl = text.lower()
for theme, keywords in AT_RISK_KEYWORDS.items():
if any(kw in tl for kw in keywords):
counts[theme] += 1
return dict(counts.most_common())
ev_themes = _score(ev_texts)
llc_themes = _score(llc_texts)
se_themes = _score(se_texts)
# Combine: weight events × 2 (lagging = higher severity signal)
combined: Counter = Counter()
for theme, cnt in ev_themes.items():
combined[theme] += cnt * 2
for theme, cnt in llc_themes.items():
combined[theme] += cnt
for theme, cnt in se_themes.items():
combined[theme] += cnt
# Alignment gap: themes prominent in events but absent in LLC discussions
llc_top = set(list(llc_themes.keys())[:5])
events_top = set(list(ev_themes.keys())[:5])
gap_themes = events_top - llc_top
# Top LLC topics (free text)
top_llc_topics: dict[str, int] = {}
if "topic" in llc_f.columns:
top_llc_topics = llc_f["topic"].dropna().value_counts().head(10).to_dict()
# CRP focus in LLCs
top_crp_focus: dict[str, int] = {}
if "crp_focus" in llc_f.columns:
top_crp_focus = llc_f["crp_focus"].dropna().value_counts().head(8).to_dict()
return {
"event_themes": ev_themes,
"llc_themes": llc_themes,
"combined_themes": dict(combined.most_common(10)),
"gap_themes": list(gap_themes),
"top_llc_topics": top_llc_topics,
"top_crp_focus": top_crp_focus,
"note": (
"Theme extraction uses keyword matching against free-text fields. "
"Results are indicative; manual review of underlying records is recommended "
"before drawing firm conclusions."
),
}
def _collect_text(df: pd.DataFrame, cols: list[str]) -> list[str]:
"""Collect non-null text entries from named columns."""
texts = []
for col in cols:
if col in df.columns:
texts.extend(df[col].dropna().astype(str).str.strip().tolist())
return texts
def _compare_dimension(
events_df: pd.DataFrame,
se_df: pd.DataFrame,
dimension: str,
min_activities: int = 10,
) -> dict[str, Any]:
if dimension not in events_df.columns or dimension not in se_df.columns:
return {"table": [], "best": [], "watch": []}
serious = events_df[events_df["consequence"].isin(CONSEQUENCE_SERIOUS)].copy() if "consequence" in events_df.columns else events_df.iloc[0:0]
activities = se_df.groupby(dimension).size().rename("activities")
events = events_df.groupby(dimension).size().rename("events")
serious_events = serious.groupby(dimension).size().rename("serious_events")
comp = pd.concat([activities, events, serious_events], axis=1).fillna(0)
if comp.empty:
return {"table": [], "best": [], "watch": []}
comp = comp.astype(int)
comp["activity_event_ratio"] = comp.apply(
lambda r: round(r["activities"] / r["events"], 1) if r["events"] > 0 else None,
axis=1,
)
def _rows(df: pd.DataFrame, label: str) -> list[dict[str, Any]]:
rows = []
for _, row in df.reset_index().iterrows():
ratio = row["activity_event_ratio"]
rows.append({
label: row[label],
"activities": int(row["activities"]),
"events": int(row["events"]),
"serious_events": int(row["serious_events"]),
"activity_event_ratio": None if pd.isna(ratio) else ratio,
})
return rows
best = (
comp[comp["activities"] >= min_activities]
.sort_values(["serious_events", "events", "activities"], ascending=[True, True, False])
.head(8)
)
watch = comp.sort_values(["serious_events", "events", "activities"], ascending=[False, False, False]).head(8)
return {
"table": _rows(comp.sort_values(["activities", "events"], ascending=[False, False]).head(25), dimension),
"best": _rows(best, dimension),
"watch": _rows(watch, dimension),
}
# ─────────────────────────────────────────────────────────────────────────────
# Safety Energy ↔ Events relationship analysis
# ─────────────────────────────────────────────────────────────────────────────
def _analyse_se_events_relationship(
events: pd.DataFrame,
se: pd.DataFrame,
start_date: str,
) -> dict[str, Any]:
"""
Compare monthly Safety Energy activity levels against Events, overall and
by Business Unit. Uses cautious associative language throughout.
"""
ev_f = events[events["date"] >= pd.Timestamp(start_date)].copy()
se_f = se[se["date"] >= pd.Timestamp(start_date)].copy()
# Build common month range
all_dates = pd.concat([ev_f["date"], se_f["date"]])
if all_dates.empty:
return {"note": "No overlapping data to compare."}
start = all_dates.min().to_period("M")
end = all_dates.max().to_period("M")
all_months = pd.period_range(start, end, freq="M")
monthly_acts = se_f.groupby("year_month").size().reindex(all_months, fill_value=0)
monthly_events = ev_f.groupby("year_month").size().reindex(all_months, fill_value=0)
# Trend divergence: periods where events spike but activities don't
spike_months: list[str] = []
if len(monthly_events) >= 3:
ev_mean = monthly_events.mean()
ev_std = monthly_events.std()
for period, ev_count in monthly_events.items():
if ev_count > ev_mean + ev_std:
act_count = monthly_acts.get(period, 0)
if act_count < monthly_acts.mean():
spike_months.append(str(period))
# BU comparison table
bu_comp: list[dict] = []
if "business_unit" in ev_f and "business_unit" in se_f:
bu_acts = se_f.groupby("business_unit").size().rename("activities")
bu_events = ev_f.groupby("business_unit").size().rename("events")
merged = pd.concat([bu_acts, bu_events], axis=1).fillna(0).astype(int)
# Compute activity-to-event ratio where events > 0
merged["ratio"] = merged.apply(
lambda r: round(r["activities"] / r["events"], 1) if r["events"] > 0 else None,
axis=1,
)
bu_comp = merged.reset_index().rename(columns={"index": "business_unit"}).to_dict("records")
# LLC topic alignment vs event root causes
llc_top_topics: list[str] = []
ev_top_rc: list[str] = []
if "topic" in se_f.columns:
llc_sub = se_f[se_f["activity_type"] == "LLC"]
llc_top_topics = llc_sub["llc_topic"].dropna().value_counts().head(5).index.tolist() if "llc_topic" in llc_sub else []
if "root_cause_cat" in ev_f:
ev_top_rc = ev_f["root_cause_cat"].dropna().value_counts().head(5).index.tolist()
return {
"monthly_acts": {str(k): int(v) for k, v in monthly_acts.items()},
"monthly_events": {str(k): int(v) for k, v in monthly_events.items()},
"spike_months": spike_months,
"bu_comparison": bu_comp,
"project_comparison": _compare_dimension(ev_f, se_f, "project", min_activities=12),
"location_comparison": _compare_dimension(ev_f, se_f, "location", min_activities=10),
"llc_top_topics": llc_top_topics,
"ev_top_rc": ev_top_rc,
"alignment_note": (
"LLC topic focus and event root causes are compared to identify alignment gaps. "
"Where event root causes diverge from LLC discussion topics, this may indicate "
"that leading activity conversations are not yet targeting the highest-risk themes."
),
"note": (
"Monthly comparison covers periods where both datasets have data. "
"Short overlapping periods reduce the reliability of any trend observations. "
"This analysis is associative only."
),
}
# ─────────────────────────────────────────────────────────────────────────────
# Leader and BU focus areas
# ─────────────────────────────────────────────────────────────────────────────
def _analyse_focus_areas(
events: pd.DataFrame,
se: pd.DataFrame,
llc: pd.DataFrame,
start_date: str,
) -> dict[str, Any]:
"""
Identify Business Units and leaders warranting leadership attention,
based on activity volumes, event rates, and declining trends.
"""
ev_f = events[events["date"] >= pd.Timestamp(start_date)]
se_f = se[se["date"] >= pd.Timestamp(start_date)]
llc_f = llc[llc["date"] >= pd.Timestamp(start_date)]
# BU-level activity counts and event counts
bu_acts: dict[str, int] = {}
bu_evts: dict[str, int] = {}
if "business_unit" in se_f:
bu_acts = se_f.groupby("business_unit").size().to_dict()
if "business_unit" in ev_f:
bu_evts = ev_f.groupby("business_unit").size().to_dict()
all_bus = sorted(set(list(bu_acts.keys()) + list(bu_evts.keys())))
bu_summary = [
{
"business_unit": bu,
"activities": bu_acts.get(bu, 0),
"events": bu_evts.get(bu, 0),
}
for bu in all_bus
]
# Most active leaders
leader_counts: dict[str, int] = {}
if "leader" in se_f:
leader_counts = (
se_f["leader"].dropna().value_counts()
.head(20).to_dict()
)
# Leaders with < LEADER_MIN_ACTIVITIES (gap indicator)
low_activity_leaders: list[str] = [
l for l, c in leader_counts.items() if c < LEADER_MIN_ACTIVITIES
]
# Declining BUs: compare first half vs second half of date range
declining_bus: list[str] = []
if "business_unit" in se_f and len(se_f) > 0:
mid = se_f["date"].min() + (se_f["date"].max() - se_f["date"].min()) / 2
for bu in all_bus:
sub = se_f[se_f["business_unit"] == bu]
if len(sub) < 4:
continue
early = len(sub[sub["date"] <= mid])
late = len(sub[sub["date"] > mid])
if late < early * 0.7:
declining_bus.append(bu)
return {
"bu_summary": bu_summary,
"leader_counts": leader_counts,
"low_activity_leaders": low_activity_leaders,
"declining_bus": declining_bus,
}
# ─────────────────────────────────────────────────────────────────────────────
# Chart generation
# ─────────────────────────────────────────────────────────────────────────────
def _generate_charts(
events_res: dict,
leading_res: dict,
se_ev_res: dict,
at_risk_res: dict,
trends_res: dict,
output_dir: str,
pd1_name: str,
pd2_name: str,
split_date: str,
) -> dict[str, str]:
"""Generate all charts and return a dict of name → file path."""
_setup_style()
charts: dict[str, str] = {}
os.makedirs(output_dir, exist_ok=True)
# ── 1. Events monthly trend ──────────────────────────────────────────────
try:
ev_df = events_res.get("_df")
if ev_df is not None and len(ev_df) > 0:
all_months = pd.period_range(
ev_df["date"].min().to_period("M"),
ev_df["date"].max().to_period("M"), freq="M",
)
monthly = ev_df.groupby("year_month").size().reindex(all_months, fill_value=0)
x = range(len(all_months))
labels = _month_labels(all_months)
fig, ax = plt.subplots(figsize=(11, 4))
vals = monthly.values
ax.bar(x, vals, color=DEEP_BLUE, width=0.72, alpha=0.9)
rolling = monthly.rolling(3, min_periods=1).mean().values
ax.plot(x, rolling, color=SKY_BLUE, linewidth=2.2, marker="o", markersize=3, label="3-month average")
ax.set_xticks(x)
ax.set_xticklabels(labels, rotation=45, ha="right", fontsize=8)
ax.set_title("Monthly Events", fontsize=14,
fontweight="bold", color=DEEP_BLUE)
ax.set_ylabel("Events")
ax.legend(loc="upper right", fontsize=9)
p = os.path.join(output_dir, "ch_events_monthly.png")
_save(fig, p)
charts["events_monthly"] = p
except Exception as e:
log.warning("Chart events_monthly failed: %s", e)
# ── 2. Leading activities monthly trend (stacked area) ───────────────────
try:
monthly_by_type = leading_res.get("monthly_by_type", {})
all_months_str = leading_res.get("all_months", [])
if all_months_str and any(monthly_by_type.values()):
months_idx = [pd.Period(m) for m in all_months_str]
x = range(len(months_idx))
labels = _month_labels(pd.PeriodIndex(months_idx))
fig, ax = plt.subplots(figsize=(11, 4))
bottom = np.zeros(len(months_idx))
for atype in LEADING_ACTIVITY_TYPES:
vals = np.array([monthly_by_type.get(atype, {}).get(m, 0) for m in all_months_str])
ax.bar(x, vals, bottom=bottom, color=ACTIVITY_COLOURS[atype],
label=atype, width=0.8, alpha=0.9)
bottom += vals
ax.set_xticks(x)
ax.set_xticklabels(labels, rotation=45, ha="right", fontsize=8)
ax.set_title("Monthly Leading Activities (LLC / CCC / OCC)",
fontsize=14, fontweight="bold", color=DEEP_BLUE)
ax.set_ylabel("Count")
ax.legend(loc="upper right", fontsize=9)
p = os.path.join(output_dir, "ch_leading_monthly.png")
_save(fig, p)
charts["leading_monthly"] = p
except Exception as e:
log.warning("Chart leading_monthly failed: %s", e)
# ── 3. Activity type mix (donut) ─────────────────────────────────────────
try:
totals = leading_res.get("totals", {})
if totals:
labels_d = list(totals.keys())
vals_d = list(totals.values())
colours = [ACTIVITY_COLOURS.get(l, MUTED) for l in labels_d]
fig, ax = plt.subplots(figsize=(5, 4))
wedges, _, autotexts = ax.pie(
vals_d, labels=labels_d, autopct="%1.0f%%",
colors=colours, startangle=140,
wedgeprops={"linewidth": 1, "edgecolor": "white"},
)
for at in autotexts:
at.set_fontsize(9)
ax.set_title("Activity Type Mix", fontsize=13, fontweight="bold", color=DEEP_BLUE)
p = os.path.join(output_dir, "ch_activity_mix.png")
_save(fig, p)
charts["activity_mix"] = p
except Exception as e:
log.warning("Chart activity_mix failed: %s", e)
# ── 4. BU comparison: activities vs events ────────────────────────────────
try:
bu_comp = se_ev_res.get("bu_comparison", [])
if bu_comp:
df_bu = pd.DataFrame(bu_comp).set_index("business_unit")
df_bu = df_bu[["activities", "events"]].sort_values("activities", ascending=True)
y = range(len(df_bu))
fig, ax = plt.subplots(figsize=(9, max(3, len(df_bu) * 0.6)))
ax.barh([i - 0.2 for i in y], df_bu["activities"].values,
height=0.35, color=DEEP_BLUE, label="Activities")
ax.barh([i + 0.2 for i in y], df_bu["events"].values,
height=0.35, color=RED, label="Events")
ax.set_yticks(list(y))
ax.set_yticklabels(df_bu.index.tolist(), fontsize=9)
ax.set_title("Activities vs Events by Business Unit",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.legend(fontsize=9)
p = os.path.join(output_dir, "ch_bu_comparison.png")
_save(fig, p)
charts["bu_comparison"] = p
except Exception as e:
log.warning("Chart bu_comparison failed: %s", e)
# ── 5. Dual-axis: monthly activities and events overlay ───────────────────
try:
m_acts = se_ev_res.get("monthly_acts", {})
m_events = se_ev_res.get("monthly_events", {})
if m_acts and m_events:
all_keys = sorted(set(m_acts) | set(m_events))
all_p = pd.PeriodIndex([pd.Period(k) for k in all_keys])
x = range(len(all_p))
acts_vals = [m_acts.get(k, 0) for k in all_keys]
event_vals = [m_events.get(k, 0) for k in all_keys]
labels_m = _month_labels(all_p)
fig, ax1 = plt.subplots(figsize=(11, 4))
ax2 = ax1.twinx()
ax1.bar(x, acts_vals, color=DEEP_BLUE, alpha=0.6, label="Leading Activities", width=0.6)
ax2.plot(x, event_vals, color=RED, linewidth=2, marker="o", markersize=4, label="Events")
ax1.set_xticks(x)
ax1.set_xticklabels(labels_m, rotation=45, ha="right", fontsize=8)
ax1.set_ylabel("Leading Activities", color=DEEP_BLUE)
ax2.set_ylabel("Events", color=RED)
ax1.set_title("Leading Activities vs Events — Monthly Overlay",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
lines1, labs1 = ax1.get_legend_handles_labels()
lines2, labs2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labs1 + labs2, loc="upper left", fontsize=9)
ax1.spines["top"].set_visible(False)
ax2.spines["top"].set_visible(False)
p = os.path.join(output_dir, "ch_overlay.png")
_save(fig, p)
charts["overlay"] = p
except Exception as e:
log.warning("Chart overlay failed: %s", e)
# ── 6. Top LLC topics ─────────────────────────────────────────────────────
try:
top_topics = at_risk_res.get("top_llc_topics", {})
if top_topics:
items = sorted(top_topics.items(), key=lambda x: x[1])[-12:]
labels_t = [i[0] for i in items]
vals_t = [i[1] for i in items]
fig, ax = plt.subplots(figsize=(8, max(3, len(items) * 0.4)))
bars = ax.barh(labels_t, vals_t, color=DEEP_BLUE, alpha=0.85)
for bar, val in zip(bars, vals_t):
ax.text(val + 0.2, bar.get_y() + bar.get_height() / 2,
str(val), va="center", fontsize=9)
ax.set_title("Top LLC Conversation Topics", fontsize=13,
fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Count")
p = os.path.join(output_dir, "ch_llc_topics.png")
_save(fig, p)
charts["llc_topics"] = p
except Exception as e:
log.warning("Chart llc_topics failed: %s", e)
# ── 7. At-risk theme heatmap (horizontal bar) ─────────────────────────────
try:
combined = at_risk_res.get("combined_themes", {})
if combined:
items = sorted(combined.items(), key=lambda x: x[1])
labels_r = [i[0] for i in items]
vals_r = [i[1] for i in items]
max_v = max(vals_r) if vals_r else 1
colours_r = [
RED if v >= max_v * 0.7
else AMBER if v >= max_v * 0.4
else DARK_GREEN
for v in vals_r
]
fig, ax = plt.subplots(figsize=(8, max(3, len(items) * 0.4)))
ax.barh(labels_r, vals_r, color=colours_r, alpha=0.9)
ax.set_title("At-Risk Behaviour Themes (Combined Sources)",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Theme frequency (weighted)")
p = os.path.join(output_dir, "ch_at_risk_themes.png")
_save(fig, p)
charts["at_risk_themes"] = p
except Exception as e:
log.warning("Chart at_risk_themes failed: %s", e)
# ── 8. Events by consequence ──────────────────────────────────────────────
try:
ev_df = events_res.get("_df")
if ev_df is not None and "consequence" in ev_df:
cons_counts = ev_df["consequence"].value_counts().reindex(
CONSEQUENCE_ORDER, fill_value=0
)
cons_colors = [DARK_GREEN, AMBER, RED, PURPLE, PURPLE]
fig, ax = plt.subplots(figsize=(7, 3.5))
bars = ax.bar(cons_counts.index, cons_counts.values,
color=cons_colors[:len(cons_counts)], alpha=0.9)
for bar, val in zip(bars, cons_counts.values):
if val > 0:
ax.text(bar.get_x() + bar.get_width() / 2, val + 0.3,
str(val), ha="center", fontsize=10, fontweight="bold")
ax.set_title("Events by Actual Consequence", fontsize=13,
fontweight="bold", color=DEEP_BLUE)
ax.set_ylabel("Count")
p = os.path.join(output_dir, "ch_consequence.png")
_save(fig, p)
charts["consequence"] = p
except Exception as e:
log.warning("Chart consequence failed: %s", e)
# ── 9. Top leaders (activities) ───────────────────────────────────────────
try:
top_leaders = leading_res.get("top_leaders", {})
if top_leaders:
items = sorted(top_leaders.items(), key=lambda x: x[1])[-15:]
fig, ax = plt.subplots(figsize=(8, max(4, len(items) * 0.4)))
ax.barh([i[0] for i in items], [i[1] for i in items],
color=SKY_BLUE, alpha=0.9)
ax.set_title("Top Leaders by LLC Activity Count",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("LLC Count")
p = os.path.join(output_dir, "ch_top_leaders.png")
_save(fig, p)
charts["top_leaders"] = p
except Exception as e:
log.warning("Chart top_leaders failed: %s", e)
# ── 10. CRP focus areas ───────────────────────────────────────────────────
try:
crp_focus = leading_res.get("crp_focus", {})
if crp_focus:
items = sorted(crp_focus.items(), key=lambda x: x[1])
fig, ax = plt.subplots(figsize=(8, max(3, len(items) * 0.4)))
ax.barh([i[0] for i in items], [i[1] for i in items],
color=MID_GREEN, alpha=0.9)
ax.set_title("CRP Focus Areas in Leader Learning Conversations",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Count")
p = os.path.join(output_dir, "ch_crp_focus.png")
_save(fig, p)
charts["crp_focus"] = p
except Exception as e:
log.warning("Chart crp_focus failed: %s", e)
# ── 11. Two-year quality trend by activity type ──────────────────────────
try:
monthly_quality_rows = trends_res.get("monthly_quality", [])
if monthly_quality_rows:
qdf = pd.DataFrame(monthly_quality_rows)
if not qdf.empty:
periods = pd.PeriodIndex([pd.Period(p, freq="M") for p in qdf["period"]])
x = range(len(periods))
fig, ax = plt.subplots(figsize=(11, 4))
for atype in LEADING_ACTIVITY_TYPES:
if atype in qdf.columns and qdf[atype].notna().any():
ax.plot(
x,
qdf[atype],
marker="o",
linewidth=2,
markersize=3.5,
label=atype,
color=ACTIVITY_COLOURS.get(atype, DEEP_BLUE),
)
ax.set_xticks(x)
ax.set_xticklabels(_month_labels(periods), rotation=45, ha="right", fontsize=8)
ax.set_ylim(0, 100)
ax.yaxis.set_major_locator(mticker.MultipleLocator(10))
ax.set_ylabel("Average quality score")
ax.set_title("Two-Year Quality Trend by Activity Type",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.legend(fontsize=9, loc="upper left")
p = os.path.join(output_dir, "ch_quality_trend.png")
_save(fig, p)
charts["quality_trend"] = p
except Exception as e:
log.warning("Chart quality_trend failed: %s", e)
# ── 12. High-volume / low-value units ────────────────────────────────────
try:
hvlv = trends_res.get("high_volume_low_value", [])
if hvlv:
df_hv = pd.DataFrame(hvlv[:8]).sort_values("count", ascending=True)
labels = [f"{r['business_unit']} ({r['activity_type']})" for _, r in df_hv.iterrows()]
fig, ax = plt.subplots(figsize=(9, max(3.5, len(df_hv) * 0.5)))
ax.barh(labels, df_hv["count"], color=AMBER, alpha=0.85)
for idx, (_, row) in enumerate(df_hv.iterrows()):
ax.text(row["count"] + 1, idx, f"{row['shallow_pct']:.0f}% shallow", va="center", fontsize=9)
ax.set_title("High-Volume / Low-Value Activity Hotspots",
fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Activity count in two-year window")
p = os.path.join(output_dir, "ch_low_value_units.png")
_save(fig, p)
charts["low_value_units"] = p
except Exception as e:
log.warning("Chart low_value_units failed: %s", e)
# ── 13. Serious hotspot ranking ──────────────────────────────────────────
try:
serious_projects = events_res.get("serious_projects", {})
serious_locations = events_res.get("serious_locations", {})
rows = []
for label, values in [("Project", serious_projects), ("Location", serious_locations)]:
for name, count in list(values.items())[:5]:
rows.append((f"{name} ({label})", int(count)))
if rows:
rows = sorted(rows, key=lambda x: x[1])[-10:]
labels_h = [r[0] for r in rows]
vals_h = [r[1] for r in rows]
fig, ax = plt.subplots(figsize=(9, max(3.5, len(rows) * 0.45)))
bars = ax.barh(labels_h, vals_h, color=RED, alpha=0.9)
for bar, val in zip(bars, vals_h):
ax.text(val + 0.1, bar.get_y() + bar.get_height() / 2, str(val), va="center", fontsize=9)
ax.set_title("Serious Event Hotspots", fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Moderate / Major / Substantial events")
p = os.path.join(output_dir, "ch_serious_hotspots.png")
_save(fig, p)
charts["serious_hotspots"] = p
except Exception as e:
log.warning("Chart serious_hotspots failed: %s", e)
# ── 14. Project performance quadrant ─────────────────────────────────────
try:
proj_best = se_ev_res.get("project_comparison", {}).get("best", [])
proj_watch = se_ev_res.get("project_comparison", {}).get("watch", [])
project_rows = {}
for row in proj_best + proj_watch:
name = row.get("project")
if name:
project_rows[name] = row
if project_rows:
pdf = pd.DataFrame(project_rows.values()).head(12)
fig, ax = plt.subplots(figsize=(8.5, 6))
x = pdf["activities"].astype(float)
y = pdf["events"].astype(float)
sizes = 80 + pdf["serious_events"].astype(float) * 28
colors = [
DARK_GREEN if (row["events"] <= y.median() and row["serious_events"] <= pdf["serious_events"].median()) else AMBER
if row["serious_events"] <= pdf["serious_events"].median() else RED
for _, row in pdf.iterrows()
]
ax.scatter(x, y, s=sizes, c=colors, alpha=0.75, edgecolors="white", linewidths=1.2)
ax.axvline(x.median(), color=MUTED, linestyle="--", linewidth=1)
ax.axhline(y.median(), color=MUTED, linestyle="--", linewidth=1)
for _, row in pdf.iterrows():
ax.text(row["activities"] + 2, row["events"] + 0.2, str(row["project"])[:28], fontsize=8, color=DEEP_BLUE)
ax.set_title("Project Performance Quadrant", fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.set_xlabel("Leading activities")
ax.set_ylabel("Events")
p = os.path.join(output_dir, "ch_project_quadrant.png")
_save(fig, p)
charts["project_quadrant"] = p
except Exception as e:
log.warning("Chart project_quadrant failed: %s", e)
# ── 15. Quality composition by activity type ─────────────────────────────
try:
qrows = trends_res.get("quality_by_type", [])
if qrows:
qdf = pd.DataFrame(qrows)
if not qdf.empty:
mixed = 100 - qdf["high_value_pct"] - qdf["meaningful_pct"] - qdf["shallow_pct"]
fig, ax = plt.subplots(figsize=(8.5, 3.8))
left = np.zeros(len(qdf))
segments = [
("Shallow", qdf["shallow_pct"].values, RED),
("Mixed", mixed.values, AMBER),
("Meaningful", qdf["meaningful_pct"].values, SKY_BLUE),
("High value", qdf["high_value_pct"].values, DARK_GREEN),
]
for label, vals, color in segments:
ax.barh(qdf["activity_type"], vals, left=left, color=color, label=label, alpha=0.9)
left += vals
ax.set_xlim(0, 100)
ax.xaxis.set_major_locator(mticker.MultipleLocator(20))
ax.set_xlabel("Share of records")
ax.set_title("Leading Activity Quality Mix", fontsize=13, fontweight="bold", color=DEEP_BLUE)
ax.legend(loc="lower right", ncol=2, fontsize=8)
p = os.path.join(output_dir, "ch_quality_mix.png")
_save(fig, p)
charts["quality_mix"] = p
except Exception as e:
log.warning("Chart quality_mix failed: %s", e)
log.info("Generated %d charts in %s", len(charts), output_dir)
return charts
# ─────────────────────────────────────────────────────────────────────────────
# Recommendation generation
# ─────────────────────────────────────────────────────────────────────────────
def _generate_recommendations(
events_res: dict,
leading_res: dict,
effectiveness: dict,
at_risk_res: dict,
focus_areas: dict,
trends_res: dict,
se_events_rel: dict,
) -> list[str]:
"""Derive actionable recommendations from analysis results."""
recs: list[str] = []
# Serious events
serious_pct = float(str(events_res.get("serious_pct", "0")).replace("%", "") or 0)
if serious_pct >= 3:
recs.append(
f"Moderate-or-above consequence events make up {serious_pct:.1f}% of recorded events. "
"Review whether controls around the highest-consequence scenarios are being verified often enough in field activity."
)
# LTI
if events_res.get("lti_count", 0) > 0:
recs.append(
f"{events_res.get('lti_count', 0)} Lost Time Injuries were recorded in the analysis window. "
"Review the underlying work types, contributing factors, and recovery actions for common patterns."
)
# Serious-event timing
time_buckets = events_res.get("serious_time_buckets", {})
if time_buckets:
top_bucket = max(time_buckets, key=time_buckets.get)
recs.append(
f"Serious events are most frequently recorded in {top_bucket}. Use this to target pre-start, supervision, and fatigue controls at the riskiest parts of the day."
)
# Motor vehicle
motor = events_res.get("motor_vehicle", {})
if motor.get("count", 0) > 0:
recs.append(
f"Motor vehicle events account for {motor.get('pct_total', 0):.1f}% of all events. "
"Review journey management, road conditions, and vehicle type patterns in the MVE section."
)
top_mv_project = next(iter(motor.get("top_projects", {}).items()), None)
top_mv_road = next(iter(motor.get("road_types", {}).items()), None)
if top_mv_project and top_mv_road:
recs.append(
f"Prioritise a motor vehicle risk review for {top_mv_project[0]} where MV exposure is most visible, "
f"with particular attention to {top_mv_road[0]} driving conditions."
)
# Activity trend
trend = leading_res.get("activity_trend", "")
if "declining" in trend:
recs.append(
f"Leading activity volumes show a declining trend ({trend}). "
"Leaders should re-engage with LLC, CCC, and OCC completion targets."
)
# Declining BUs
for bu in focus_areas.get("declining_bus", []):
recs.append(
f"Business Unit '{bu}' shows declining leading-activity volume in the recent period. "
"Targeted engagement from the sector SHEQ team is recommended."
)
# High activity but high events
for bu in effectiveness.get("high_activity_high_events", []):
recs.append(
f"'{bu}' has both high leading-activity and high event volumes. "
"This may indicate reactive activity patterns — review whether conversations "
"are targeting root causes rather than responding after the fact."
)
project_watch = se_events_rel.get("project_comparison", {}).get("watch", [])
if project_watch:
top = project_watch[0]
recs.append(
f"Focus the next leadership review on project '{top.get('project')}', which recorded "
f"{top.get('events', 0)} events and {top.get('serious_events', 0)} serious events against "
f"{top.get('activities', 0)} leading activities."
)
location_watch = se_events_rel.get("location_comparison", {}).get("watch", [])
if location_watch:
top = location_watch[0]
recs.append(
f"Target field verification and local coaching at location '{top.get('location')}', where "
f"{top.get('serious_events', 0)} serious events have been recorded and the activity-to-event ratio is "
f"{top.get('activity_event_ratio', 'low')}."
)
project_best = se_events_rel.get("project_comparison", {}).get("best", [])
if project_best:
best = project_best[0]
recs.append(
f"Review what is working in project '{best.get('project')}', which shows comparatively strong leading-activity coverage "
f"with {best.get('activities', 0)} activities and {best.get('events', 0)} events, and replicate the practice in weaker areas."
)
# Gap themes
gap = at_risk_res.get("gap_themes", [])
if gap:
recs.append(
f"The following risk themes appear frequently in events but are under-represented "
f"in LLC conversations: {', '.join(gap)}. "
"Consider incorporating these topics into forthcoming LLC schedules."
)
for item in trends_res.get("high_volume_low_value", [])[:3]:
recs.append(
f"{item['business_unit']} has a high-volume / low-value {item['activity_type']} pattern "
f"({item['count']} records, {item['shallow_pct']:.1f}% shallow). Sample the underlying entries "
"with local leaders and tighten expectations for narrative quality and follow-up."
)
for item in trends_res.get("recommendations", [])[:2]:
recs.append(item)
input_depth = trends_res.get("input_depth", {})
if input_depth.get("correlation") is not None and input_depth["correlation"] >= 0.4:
recs.append(
f"Input depth is moderately aligned with record quality (r = {input_depth['correlation']:.2f}). Track populated-field depth as a simple supporting KPI alongside the richer quality score."
)
ccc = trends_res.get("activity_insights", {}).get("CCC", {})
if ccc:
recs.append(
f"Lift CCC quality expectations in priority areas: current CCC quality averages {ccc.get('avg_quality', 0):.1f}/100 with "
f"{ccc.get('shallow_pct', 0):.1f}% shallow records and only {ccc.get('follow_up_pct', 0):.1f}% showing follow-up signals."
)
serious_projects = events_res.get("serious_projects", {})
if serious_projects:
top_project, top_count = next(iter(serious_projects.items()))
recs.append(
f"Escalate a focused action plan for project '{top_project}', which currently has the highest serious-event burden ({top_count} serious events)."
)
# Default if nothing triggered
if not recs:
recs.append(
"No significant adverse trends identified in the current period. "
"Continue current leading activity cadence and monitor monthly."
)
deduped: list[str] = []
seen: set[str] = set()
for rec in recs:
key = rec.strip()
if key and key not in seen:
seen.add(key)
deduped.append(key)
return deduped
# ─────────────────────────────────────────────────────────────────────────────
# Main orchestration
# ─────────────────────────────────────────────────────────────────────────────
def run_full_analysis(
events: pd.DataFrame,
safety_energy: pd.DataFrame,
llc: pd.DataFrame,
start_date: str,
split_date: str,
pd1_name: str,
pd2_name: str,
output_dir: str,
) -> AnalysisResults:
"""
Run the complete SHEQ analysis pipeline across all three data sources.
Parameters
----------
events : normalised Events DataFrame (from data_loader)
safety_energy : normalised Safety Energy DataFrame
llc : normalised LLC Data DataFrame
start_date : ISO date string — analysis window start
split_date : retained for backwards compatibility; ignored by the sector-wide report
pd1_name : retained for backwards compatibility; ignored by the sector-wide report
pd2_name : retained for backwards compatibility; ignored by the sector-wide report
output_dir : directory for chart images and output files
Returns
-------
AnalysisResults dataclass
"""
os.makedirs(output_dir, exist_ok=True)
log.info("=== SHEQ Full Analysis ===")
log.info(" start=%s output_dir=%s", start_date, output_dir)
results = AnalysisResults()
results.params = {
"start_date": start_date,
"output_dir": output_dir,
}
# 1. Data quality
log.info("[1/7] Data quality profiling...")
results.data_quality = _profile_data_quality(events, safety_energy, llc)
# 2. Events analysis
log.info("[2/7] Events analysis...")
results.events_summary = _analyse_events(
events, start_date, split_date, pd1_name, pd2_name
)
# 3. Leading activity analysis
log.info("[3/7] Leading activity analysis...")
results.leading_summary = _analyse_leading(safety_energy, llc, start_date)
# 4. Effectiveness analysis
log.info("[4/7] Effectiveness analysis...")
results.effectiveness = _analyse_effectiveness(
results.events_summary, results.leading_summary
)
# 5. At-risk behaviour analysis
log.info("[5/7] At-risk behaviour extraction...")
results.at_risk = _extract_at_risk_themes(events, safety_energy, llc, start_date)
# 6. SE ↔ Events relationship
log.info("[6/7] Safety Energy ↔ Events relationship...")
results.se_events_rel = _analyse_se_events_relationship(
events, safety_energy, start_date
)
# Rolling two-year Safety Energy trends and quality
results.trends = _analyse_two_year_trends(
safety_energy, llc, events, start_date
)
# 7. Focus areas
results.focus_areas = _analyse_focus_areas(events, safety_energy, llc, start_date)
# Charts
log.info("[7/7] Generating charts...")
results.charts = _generate_charts(
results.events_summary,
results.leading_summary,
results.se_events_rel,
results.at_risk,
results.trends,
output_dir,
pd1_name,
pd2_name,
split_date,
)
# Recommendations and caveats
results.recommendations = _generate_recommendations(
results.events_summary,
results.leading_summary,
results.effectiveness,
results.at_risk,
results.focus_areas,
results.trends,
results.se_events_rel,
)
results.caveats = [
"All analysis in this report is based on data exported from Ventia's safety management "
"system. Data quality depends on the completeness and accuracy of field entries.",
"Correlation and association findings do not imply causation. They are presented to "
"guide further investigation, not to draw definitive conclusions.",
"Activity counts reflect recorded activities only. Under-reporting in any area will "
"affect the reliability of leading-indicator analysis.",
"Theme extraction from free-text fields uses keyword matching and may miss nuance "
"or misclassify entries. Manual review of flagged themes is recommended.",
"Business unit comparisons may be affected by differences in headcount, contract scope, "
"and operational complexity between units.",
]
log.info("Analysis complete. %d charts, %d recommendations.",
len(results.charts), len(results.recommendations))
# Remove private DataFrames before returning (not needed by report_builder)
for key in ("_df", "_serious", "_motor"):
results.events_summary.pop(key, None)
for key in ("_se_f", "_llc_f"):
results.leading_summary.pop(key, None)
for key in ("_quality_df", "_llc_window"):
results.trends.pop(key, None)
return results