""" CaddyBuddy — Caddy Log Dashboard Reads a Caddy JSON access log and surfaces useful insights. Run: python app.py Open: http://127.0.0.1:5000 Optional Emby API integration (resolves device IDs to usernames): EMBY_URL=http://localhost:8096 EMBY_KEY= python app.py """ import gzip import glob import io import json import re import os import tarfile from collections import Counter, defaultdict from datetime import datetime, timezone from urllib.parse import unquote_plus, parse_qs, urlparse from flask import Flask, render_template, jsonify # Point CADDY_LOG at a single file OR a directory. # When it's a directory, CaddyBuddy discovers every .json / .log / .gz / .tar.gz # inside it automatically. LOG_PATH = os.environ.get("CADDY_LOG", r"C:\Caddy") EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096") EMBY_KEY = os.environ.get("EMBY_KEY", "b9af54b630f6448289ab96422add567a") app = Flask(__name__) # --------------------------------------------------------------------------- # Header / URI parsing helpers # --------------------------------------------------------------------------- def _first(headers, key): """Caddy stores header values as lists; return the first non-empty value.""" if not headers: return None vals = headers.get(key) if isinstance(vals, list): return next((v for v in vals if v), None) return vals or None # Emby auth-header regexes # Header format: MediaBrowser Client="x", Device="y", DeviceId="z", Version="v", Token="t" _RE = re.IGNORECASE EMBY_CLIENT_RE = re.compile(r'Client="?([^,"\n]+)"?', _RE) EMBY_VERSION_RE = re.compile(r'Version="?([0-9][^,"\s]*)"?', _RE) EMBY_DEVICE_RE = re.compile(r'Device(?!Id)"?[=\s]"?([^,"\n]+)"?', _RE) EMBY_DEVICE_ID_RE = re.compile(r'DeviceId"?[=\s]"?([^,"\s&]+)"?', _RE) EMBY_TOKEN_RE = re.compile(r'Token"?[=\s]"?([a-fA-F0-9]{24,})"?', _RE) def _parse_auth_header(val): """ Parse a MediaBrowser/Emby authorization header value. Returns (client, version, device, device_id, token). """ if not val: return None, None, None, None, None def _g(m): return m.group(1).strip() if m else None client = _g(EMBY_CLIENT_RE.search(val)) version = _g(EMBY_VERSION_RE.search(val)) device = _g(EMBY_DEVICE_RE.search(val)) device_id = _g(EMBY_DEVICE_ID_RE.search(val)) token = _g(EMBY_TOKEN_RE.search(val)) # Device names are sometimes URL-encoded in headers if device: device = unquote_plus(device) return client, version, device, device_id, token def parse_query_string(uri): """Return a flat {key: first_value} dict from a URI's query string.""" if not uri or "?" not in uri: return {} try: qs = parse_qs(urlparse(uri).query, keep_blank_values=False) return {k: v[0] for k, v in qs.items() if v} except Exception: return {} def classify_emby(entry): """ Best-effort identification of an Emby client from a log entry. Returns (client, version, device, device_id, token). All fields may be None if this is not an Emby request. """ req = entry.get("request", {}) or {} headers = req.get("headers", {}) or {} uri = req.get("uri", "") # 1. Try X-Emby-Authorization header (most complete source) auth_val = _first(headers, "X-Emby-Authorization") client, version, device, device_id, token = _parse_auth_header(auth_val) # 2. Try individual X-Emby-* headers (some clients send these instead) if not client: client = _first(headers, "X-Emby-Client") if not version: version = _first(headers, "X-Emby-Client-Version") if not device: raw = _first(headers, "X-Emby-Device-Name") device = unquote_plus(raw) if raw else None if not device_id: device_id = _first(headers, "X-Emby-Device-Id") if not token: token = _first(headers, "X-Emby-Token") or _first(headers, "X-MediaBrowser-Token") # 3. Fall back to query-string params (streaming URLs embed them) if not any([client, version, device, device_id]): qs = parse_query_string(uri) if not client and "X-Emby-Client" in qs: client = unquote_plus(qs["X-Emby-Client"]) if not version: version = qs.get("X-Emby-Client-Version") if not device and "X-Emby-Device-Name" in qs: device = unquote_plus(qs["X-Emby-Device-Name"]) if not device_id: device_id = qs.get("X-Emby-Device-Id") if not token: token = qs.get("X-Emby-Token") if not any([client, version, device, device_id]): return None, None, None, None, None return client, version, device, device_id, token # --------------------------------------------------------------------------- # Emby API integration (optional) # --------------------------------------------------------------------------- def fetch_emby_device_users(): """ Call the Emby /Devices endpoint and return {device_id: last_user_name}. Returns an empty dict if the API is not configured or unreachable. """ if not EMBY_URL or not EMBY_KEY: return {} try: import urllib.request url = f"{EMBY_URL.rstrip('/')}/emby/Devices?api_key={EMBY_KEY}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=4) as resp: data = json.loads(resp.read()) result = {} for item in data.get("Items", []): did = item.get("Id") or item.get("DeviceId") user = item.get("LastUserName") or item.get("UserName") if did and user: result[did] = user return result except Exception: return {} # --------------------------------------------------------------------------- # Log ingestion — file discovery + multi-format streaming # --------------------------------------------------------------------------- # Filename suffixes we recognise, in priority order (most-specific first so # .tar.gz isn't accidentally matched by the plain .gz rule). _LOG_GLOBS = ["*.tar.gz", "*.tgz", "*.json.gz", "*.log.gz", "*.gz", "*.json", "*.log"] def find_log_files(path): """ Given a file path or a directory, return a sorted list of log file paths. Files are sorted oldest-first by modification time so merged data is in chronological order. """ if os.path.isfile(path): return [path] if not os.path.isdir(path): return [] found = set() for pattern in _LOG_GLOBS: found.update(glob.glob(os.path.join(path, pattern))) # one level of sub-directories (e.g. logs/2024/) found.update(glob.glob(os.path.join(path, "*", pattern))) return sorted(found, key=os.path.getmtime) def _iter_lines(filepath): """ Yield raw text lines from a log file regardless of compression format. Handles: plain text, .gz, .tar.gz / .tgz (any members inside the archive). """ name = os.path.basename(filepath).lower() if name.endswith(".tar.gz") or name.endswith(".tgz"): with tarfile.open(filepath, "r:gz") as tar: for member in tar.getmembers(): if not member.isfile(): continue fobj = tar.extractfile(member) if fobj is None: continue # Member itself might be gzip-compressed raw = fobj.read() if raw[:2] == b"\x1f\x8b": raw = gzip.decompress(raw) yield from io.TextIOWrapper(io.BytesIO(raw), encoding="utf-8", errors="replace") elif name.endswith(".gz"): with gzip.open(filepath, "rt", encoding="utf-8", errors="replace") as f: yield from f else: with open(filepath, "r", encoding="utf-8", errors="replace") as f: yield from f def _parse_line(raw): """Parse one JSON log line into an entry dict, or return None.""" raw = raw.strip() if not raw or raw[0] != "{": return None try: rec = json.loads(raw) except json.JSONDecodeError: return None req = rec.get("request", {}) or {} host = req.get("host", "") h = req.get("headers", {}) or {} ua = _first(h, "User-Agent") or "" client, version, device, device_id, token = classify_emby(rec) return { "ts": rec.get("ts"), "host": host, "method": req.get("method", ""), "uri": req.get("uri", ""), "status": rec.get("status", 0), "size": rec.get("size", 0), "duration": rec.get("duration", 0), "remote_ip": req.get("remote_ip", ""), "user_agent": ua, "emby_client": client, "emby_version": version, "emby_device": device, "emby_device_id": device_id, "emby_token": token, "referer": _first(h, "Referer"), } def parse_log(path): """ Parse all log files found at *path* (file or directory). Returns (entries, log_stats) where log_stats is a dict with: files_found, files_ok, files_error, total_bytes, formats """ files = find_log_files(path) entries = [] stats = {"files_found": len(files), "files_ok": 0, "files_error": 0, "total_bytes": 0, "formats": Counter()} for filepath in files: try: stats["total_bytes"] += os.path.getsize(filepath) name = os.path.basename(filepath).lower() if name.endswith(".tar.gz") or name.endswith(".tgz"): fmt = "tar.gz" elif name.endswith(".gz"): fmt = "gz" else: fmt = "plain" stats["formats"][fmt] += 1 count_before = len(entries) for raw in _iter_lines(filepath): entry = _parse_line(raw) if entry: entries.append(entry) if len(entries) > count_before: stats["files_ok"] += 1 except Exception: stats["files_error"] += 1 stats["formats"] = dict(stats["formats"]) return entries, stats # --------------------------------------------------------------------------- # Aggregations # --------------------------------------------------------------------------- def summarize(entries, device_users=None): """Aggregate log entries into a summary dict for the template.""" if device_users is None: device_users = {} total = len(entries) by_host = Counter() by_status_class = Counter() by_ip = Counter() bytes_by_host = defaultdict(int) status_by_host = defaultdict(Counter) blocked_403 = [] errors_5xx = [] slow_requests = [] emby_versions = Counter() emby_clients = Counter() emby_devices = Counter() emby_version_by_client = defaultdict(Counter) device_registry = {} auth_events = [] first_ts = last_ts = None SLOW_THRESHOLD = 2.0 for e in entries: ts = e["ts"] if ts: if first_ts is None or ts < first_ts: first_ts = ts if last_ts is None or ts > last_ts: last_ts = ts host = e["host"] or "(none)" by_host[host] += 1 bytes_by_host[host] += e["size"] or 0 status = e["status"] or 0 klass = f"{status // 100}xx" if status else "0xx" by_status_class[klass] += 1 status_by_host[host][klass] += 1 ip = e["remote_ip"] or "(none)" by_ip[ip] += 1 if status == 403: blocked_403.append(e) if 500 <= status < 600: errors_5xx.append(e) if (e["duration"] or 0) >= SLOW_THRESHOLD: slow_requests.append(e) if e["emby_version"]: emby_versions[e["emby_version"]] += 1 if e["emby_client"]: emby_clients[e["emby_client"]] += 1 if e["emby_device"]: emby_devices[e["emby_device"]] += 1 if e["emby_client"] and e["emby_version"]: emby_version_by_client[e["emby_client"]][e["emby_version"]] += 1 # Device registry keyed by DeviceId did = e.get("emby_device_id") if did: if did not in device_registry: device_registry[did] = { "device_id": did, "device": e["emby_device"] or did[:12], "client": e["emby_client"], "version": e["emby_version"], "token": e.get("emby_token"), "ips": set(), "hits": 0, "last_ts": None, } rec = device_registry[did] rec["hits"] += 1 if e["remote_ip"]: rec["ips"].add(e["remote_ip"]) if ts and (rec["last_ts"] is None or ts > rec["last_ts"]): rec["last_ts"] = ts if e["emby_version"]: rec["version"] = e["emby_version"] if e["emby_client"]: rec["client"] = e["emby_client"] if e["emby_device"]: rec["device"] = e["emby_device"] if e.get("emby_token"): rec["token"] = e["emby_token"] if host == "auth.mattcohen.net": auth_events.append(e) slow_requests.sort(key=lambda x: x["duration"] or 0, reverse=True) blocked_403.sort(key=lambda x: x["ts"] or 0, reverse=True) max_hits = max(by_host.values(), default=1) host_summary = [ { "host": host, "hits": count, "bytes_human": human_bytes(bytes_by_host[host]), "status_mix": dict(status_by_host[host]), "pct": round(count / max_hits * 100), } for host, count in by_host.most_common() ] emby_breakdown = sorted( [ {"client": c, "total": sum(vs.values()), "versions": dict(vs.most_common())} for c, vs in emby_version_by_client.items() ], key=lambda x: x["total"], reverse=True, ) device_list = [] for rec in device_registry.values(): did = rec["device_id"] user = device_users.get(did) device_list.append({ **rec, "ips": sorted(rec["ips"]), "last_seen": _fmt_ts(rec["last_ts"]), "username": user, }) device_list.sort(key=lambda x: x["hits"], reverse=True) blocked_ip_counter = Counter(e["remote_ip"] for e in blocked_403 if e["remote_ip"]) return { "total": total, "first_ts": _fmt_ts(first_ts), "last_ts": _fmt_ts(last_ts), "span_hours": round((last_ts - first_ts) / 3600, 2) if first_ts and last_ts else 0, "by_host": host_summary, "by_status_class": dict(by_status_class.most_common()), "by_ip": by_ip.most_common(25), "blocked_403": [_entry_view(x) for x in blocked_403[:100]], "blocked_403_total": len(blocked_403), "blocked_403_by_trigger": classify_403_triggers(blocked_403), "blocked_ips_top": blocked_ip_counter.most_common(10), "errors_5xx": [_entry_view(x) for x in errors_5xx[:50]], "errors_5xx_total": len(errors_5xx), "slow_requests": [_entry_view(x) for x in slow_requests[:50]], "slow_total": len(slow_requests), "emby_versions": emby_versions.most_common(), "emby_clients": emby_clients.most_common(), "emby_devices": emby_devices.most_common(), "emby_breakdown": emby_breakdown, "device_registry": device_list, "emby_api_enabled": bool(EMBY_URL and EMBY_KEY), "auth_summary": summarize_auth(auth_events), } def classify_403_triggers(blocked): """Heuristically guess which Caddy matcher fired on each 403.""" triggers = Counter() for e in blocked: tags = [] uri = e.get("uri") or "" ua = e.get("user_agent") or "" qs = parse_query_string(uri) ver = qs.get("X-Emby-Client-Version") or e.get("emby_version") if ver in ("2.2.51", "3.5.52"): tags.append(f"emby_version:{ver}") if qs.get("X-Emby-Client") in ("Emby for iOS", "Emby+for+iOS"): tags.append("emby_client:ios") did = qs.get("X-Emby-Device-Id") or e.get("emby_device_id") if did == "9F318B1F-6E72-4962-BE37-7F8843EA497A": tags.append("emby_device_id:known") if re.search(r"Emby/[\d.]+ CFNetwork/.* Darwin/", ua): tags.append("ua:emby_ios_native") if not tags: tags.append("other") for t in tags: triggers[t] += 1 return triggers.most_common() def summarize_auth(events): if not events: return {"total": 0} by_status = Counter(e["status"] for e in events) by_ip = Counter(e["remote_ip"] for e in events) by_path = Counter((e["uri"] or "").split("?")[0] for e in events) recent = sorted(events, key=lambda x: x["ts"] or 0, reverse=True)[:30] return { "total": len(events), "by_status": by_status.most_common(), "by_ip": by_ip.most_common(10), "by_path": by_path.most_common(15), "recent": [_entry_view(x) for x in recent], } # --------------------------------------------------------------------------- # Formatting helpers # --------------------------------------------------------------------------- def human_bytes(n): for unit in ["B", "KB", "MB", "GB", "TB"]: if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} PB" def _fmt_ts(ts): if not ts: return "" return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") def _entry_view(e): return { "time": _fmt_ts(e["ts"]), "host": e["host"], "method": e["method"], "uri": (e["uri"] or "")[:120], "status": e["status"], "ip": e["remote_ip"], "ua": (e["user_agent"] or "")[:80], "duration": round(e["duration"] or 0, 3), "size": e["size"], "emby_client": e["emby_client"], "emby_version": e["emby_version"], "emby_device": e["emby_device"], "emby_device_id": e.get("emby_device_id"), } # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.route("/") def index(): device_users = fetch_emby_device_users() entries, lstats = parse_log(LOG_PATH) summary = summarize(entries, device_users) if lstats["files_found"] == 1: log_label = os.path.basename(LOG_PATH) else: log_label = os.path.basename(os.path.abspath(LOG_PATH)) return render_template( "dashboard.html", s=summary, log_path=LOG_PATH, log_label=log_label, log_size=human_bytes(lstats["total_bytes"]), log_stats=lstats, ) @app.route("/api/raw/") def api_raw(n): """Return the N most-recent parsed entries as JSON.""" entries, _ = parse_log(LOG_PATH) entries.sort(key=lambda x: x["ts"] or 0, reverse=True) return jsonify([_entry_view(e) for e in entries[:n]]) if __name__ == "__main__": app.run(host="127.0.0.1", port=5000, debug=False)