Files
caddybuddy/app.py
T

556 lines
19 KiB
Python
Raw Normal View History

2026-04-18 07:15:39 +12:00
"""
CaddyBuddy — Caddy Log Dashboard
Reads a Caddy JSON access log and surfaces useful insights.
Run: python app.py
Open: http://127.0.0.1:5000
Optional Emby API integration (resolves device IDs to usernames):
EMBY_URL=http://localhost:8096 EMBY_KEY=<admin_api_key> python app.py
"""
import gzip
import glob
import io
import json
import re
import os
import tarfile
from collections import Counter, defaultdict
from datetime import datetime, timezone
from urllib.parse import unquote_plus, parse_qs, urlparse
from flask import Flask, render_template, jsonify
# Point CADDY_LOG at a single file OR a directory.
# When it's a directory, CaddyBuddy discovers every .json / .log / .gz / .tar.gz
# inside it automatically.
LOG_PATH = os.environ.get("CADDY_LOG", r"C:\Caddy")
EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096")
EMBY_KEY = os.environ.get("EMBY_KEY", "b9af54b630f6448289ab96422add567a")
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Header / URI parsing helpers
# ---------------------------------------------------------------------------
def _first(headers, key):
"""Caddy stores header values as lists; return the first non-empty value."""
if not headers:
return None
vals = headers.get(key)
if isinstance(vals, list):
return next((v for v in vals if v), None)
return vals or None
# Emby auth-header regexes
# Header format: MediaBrowser Client="x", Device="y", DeviceId="z", Version="v", Token="t"
_RE = re.IGNORECASE
EMBY_CLIENT_RE = re.compile(r'Client="?([^,"\n]+)"?', _RE)
EMBY_VERSION_RE = re.compile(r'Version="?([0-9][^,"\s]*)"?', _RE)
EMBY_DEVICE_RE = re.compile(r'Device(?!Id)"?[=\s]"?([^,"\n]+)"?', _RE)
EMBY_DEVICE_ID_RE = re.compile(r'DeviceId"?[=\s]"?([^,"\s&]+)"?', _RE)
EMBY_TOKEN_RE = re.compile(r'Token"?[=\s]"?([a-fA-F0-9]{24,})"?', _RE)
def _parse_auth_header(val):
"""
Parse a MediaBrowser/Emby authorization header value.
Returns (client, version, device, device_id, token).
"""
if not val:
return None, None, None, None, None
def _g(m):
return m.group(1).strip() if m else None
client = _g(EMBY_CLIENT_RE.search(val))
version = _g(EMBY_VERSION_RE.search(val))
device = _g(EMBY_DEVICE_RE.search(val))
device_id = _g(EMBY_DEVICE_ID_RE.search(val))
token = _g(EMBY_TOKEN_RE.search(val))
# Device names are sometimes URL-encoded in headers
if device:
device = unquote_plus(device)
return client, version, device, device_id, token
def parse_query_string(uri):
"""Return a flat {key: first_value} dict from a URI's query string."""
if not uri or "?" not in uri:
return {}
try:
qs = parse_qs(urlparse(uri).query, keep_blank_values=False)
return {k: v[0] for k, v in qs.items() if v}
except Exception:
return {}
def classify_emby(entry):
"""
Best-effort identification of an Emby client from a log entry.
Returns (client, version, device, device_id, token).
All fields may be None if this is not an Emby request.
"""
req = entry.get("request", {}) or {}
headers = req.get("headers", {}) or {}
uri = req.get("uri", "")
# 1. Try X-Emby-Authorization header (most complete source)
auth_val = _first(headers, "X-Emby-Authorization")
client, version, device, device_id, token = _parse_auth_header(auth_val)
# 2. Try individual X-Emby-* headers (some clients send these instead)
if not client:
client = _first(headers, "X-Emby-Client")
if not version:
version = _first(headers, "X-Emby-Client-Version")
if not device:
raw = _first(headers, "X-Emby-Device-Name")
device = unquote_plus(raw) if raw else None
if not device_id:
device_id = _first(headers, "X-Emby-Device-Id")
if not token:
token = _first(headers, "X-Emby-Token") or _first(headers, "X-MediaBrowser-Token")
# 3. Fall back to query-string params (streaming URLs embed them)
if not any([client, version, device, device_id]):
qs = parse_query_string(uri)
if not client and "X-Emby-Client" in qs:
client = unquote_plus(qs["X-Emby-Client"])
if not version:
version = qs.get("X-Emby-Client-Version")
if not device and "X-Emby-Device-Name" in qs:
device = unquote_plus(qs["X-Emby-Device-Name"])
if not device_id:
device_id = qs.get("X-Emby-Device-Id")
if not token:
token = qs.get("X-Emby-Token")
if not any([client, version, device, device_id]):
return None, None, None, None, None
return client, version, device, device_id, token
# ---------------------------------------------------------------------------
# Emby API integration (optional)
# ---------------------------------------------------------------------------
def fetch_emby_device_users():
"""
Call the Emby /Devices endpoint and return {device_id: last_user_name}.
Returns an empty dict if the API is not configured or unreachable.
"""
if not EMBY_URL or not EMBY_KEY:
return {}
try:
import urllib.request
url = f"{EMBY_URL.rstrip('/')}/emby/Devices?api_key={EMBY_KEY}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=4) as resp:
data = json.loads(resp.read())
result = {}
for item in data.get("Items", []):
did = item.get("Id") or item.get("DeviceId")
user = item.get("LastUserName") or item.get("UserName")
if did and user:
result[did] = user
return result
except Exception:
return {}
# ---------------------------------------------------------------------------
# Log ingestion — file discovery + multi-format streaming
# ---------------------------------------------------------------------------
# Filename suffixes we recognise, in priority order (most-specific first so
# .tar.gz isn't accidentally matched by the plain .gz rule).
_LOG_GLOBS = ["*.tar.gz", "*.tgz", "*.json.gz", "*.log.gz", "*.gz", "*.json", "*.log"]
def find_log_files(path):
"""
Given a file path or a directory, return a sorted list of log file paths.
Files are sorted oldest-first by modification time so merged data is in
chronological order.
"""
if os.path.isfile(path):
return [path]
if not os.path.isdir(path):
return []
found = set()
for pattern in _LOG_GLOBS:
found.update(glob.glob(os.path.join(path, pattern)))
# one level of sub-directories (e.g. logs/2024/)
found.update(glob.glob(os.path.join(path, "*", pattern)))
return sorted(found, key=os.path.getmtime)
def _iter_lines(filepath):
"""
Yield raw text lines from a log file regardless of compression format.
Handles: plain text, .gz, .tar.gz / .tgz (any members inside the archive).
"""
name = os.path.basename(filepath).lower()
if name.endswith(".tar.gz") or name.endswith(".tgz"):
with tarfile.open(filepath, "r:gz") as tar:
for member in tar.getmembers():
if not member.isfile():
continue
fobj = tar.extractfile(member)
if fobj is None:
continue
# Member itself might be gzip-compressed
raw = fobj.read()
if raw[:2] == b"\x1f\x8b":
raw = gzip.decompress(raw)
yield from io.TextIOWrapper(io.BytesIO(raw), encoding="utf-8", errors="replace")
elif name.endswith(".gz"):
with gzip.open(filepath, "rt", encoding="utf-8", errors="replace") as f:
yield from f
else:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
yield from f
def _parse_line(raw):
"""Parse one JSON log line into an entry dict, or return None."""
raw = raw.strip()
if not raw or raw[0] != "{":
return None
try:
rec = json.loads(raw)
except json.JSONDecodeError:
return None
req = rec.get("request", {}) or {}
host = req.get("host", "")
h = req.get("headers", {}) or {}
ua = _first(h, "User-Agent") or ""
client, version, device, device_id, token = classify_emby(rec)
return {
"ts": rec.get("ts"),
"host": host,
"method": req.get("method", ""),
"uri": req.get("uri", ""),
"status": rec.get("status", 0),
"size": rec.get("size", 0),
"duration": rec.get("duration", 0),
"remote_ip": req.get("remote_ip", ""),
"user_agent": ua,
"emby_client": client,
"emby_version": version,
"emby_device": device,
"emby_device_id": device_id,
"emby_token": token,
"referer": _first(h, "Referer"),
}
def parse_log(path):
"""
Parse all log files found at *path* (file or directory).
Returns (entries, log_stats) where log_stats is a dict with:
files_found, files_ok, files_error, total_bytes, formats
"""
files = find_log_files(path)
entries = []
stats = {"files_found": len(files), "files_ok": 0, "files_error": 0,
"total_bytes": 0, "formats": Counter()}
for filepath in files:
try:
stats["total_bytes"] += os.path.getsize(filepath)
name = os.path.basename(filepath).lower()
if name.endswith(".tar.gz") or name.endswith(".tgz"): fmt = "tar.gz"
elif name.endswith(".gz"): fmt = "gz"
else: fmt = "plain"
stats["formats"][fmt] += 1
count_before = len(entries)
for raw in _iter_lines(filepath):
entry = _parse_line(raw)
if entry:
entries.append(entry)
if len(entries) > count_before:
stats["files_ok"] += 1
except Exception:
stats["files_error"] += 1
stats["formats"] = dict(stats["formats"])
return entries, stats
# ---------------------------------------------------------------------------
# Aggregations
# ---------------------------------------------------------------------------
def summarize(entries, device_users=None):
"""Aggregate log entries into a summary dict for the template."""
if device_users is None:
device_users = {}
total = len(entries)
by_host = Counter()
by_status_class = Counter()
by_ip = Counter()
bytes_by_host = defaultdict(int)
status_by_host = defaultdict(Counter)
blocked_403 = []
errors_5xx = []
slow_requests = []
emby_versions = Counter()
emby_clients = Counter()
emby_devices = Counter()
emby_version_by_client = defaultdict(Counter)
device_registry = {}
auth_events = []
first_ts = last_ts = None
SLOW_THRESHOLD = 2.0
for e in entries:
ts = e["ts"]
if ts:
if first_ts is None or ts < first_ts: first_ts = ts
if last_ts is None or ts > last_ts: last_ts = ts
host = e["host"] or "(none)"
by_host[host] += 1
bytes_by_host[host] += e["size"] or 0
status = e["status"] or 0
klass = f"{status // 100}xx" if status else "0xx"
by_status_class[klass] += 1
status_by_host[host][klass] += 1
ip = e["remote_ip"] or "(none)"
by_ip[ip] += 1
if status == 403: blocked_403.append(e)
if 500 <= status < 600: errors_5xx.append(e)
if (e["duration"] or 0) >= SLOW_THRESHOLD: slow_requests.append(e)
if e["emby_version"]: emby_versions[e["emby_version"]] += 1
if e["emby_client"]: emby_clients[e["emby_client"]] += 1
if e["emby_device"]: emby_devices[e["emby_device"]] += 1
if e["emby_client"] and e["emby_version"]:
emby_version_by_client[e["emby_client"]][e["emby_version"]] += 1
# Device registry keyed by DeviceId
did = e.get("emby_device_id")
if did:
if did not in device_registry:
device_registry[did] = {
"device_id": did,
"device": e["emby_device"] or did[:12],
"client": e["emby_client"],
"version": e["emby_version"],
"token": e.get("emby_token"),
"ips": set(),
"hits": 0,
"last_ts": None,
}
rec = device_registry[did]
rec["hits"] += 1
if e["remote_ip"]:
rec["ips"].add(e["remote_ip"])
if ts and (rec["last_ts"] is None or ts > rec["last_ts"]):
rec["last_ts"] = ts
if e["emby_version"]: rec["version"] = e["emby_version"]
if e["emby_client"]: rec["client"] = e["emby_client"]
if e["emby_device"]: rec["device"] = e["emby_device"]
if e.get("emby_token"): rec["token"] = e["emby_token"]
if host == "auth.mattcohen.net":
auth_events.append(e)
slow_requests.sort(key=lambda x: x["duration"] or 0, reverse=True)
blocked_403.sort(key=lambda x: x["ts"] or 0, reverse=True)
max_hits = max(by_host.values(), default=1)
host_summary = [
{
"host": host,
"hits": count,
"bytes_human": human_bytes(bytes_by_host[host]),
"status_mix": dict(status_by_host[host]),
"pct": round(count / max_hits * 100),
}
for host, count in by_host.most_common()
]
emby_breakdown = sorted(
[
{"client": c, "total": sum(vs.values()), "versions": dict(vs.most_common())}
for c, vs in emby_version_by_client.items()
],
key=lambda x: x["total"], reverse=True,
)
device_list = []
for rec in device_registry.values():
did = rec["device_id"]
user = device_users.get(did)
device_list.append({
**rec,
"ips": sorted(rec["ips"]),
"last_seen": _fmt_ts(rec["last_ts"]),
"username": user,
})
device_list.sort(key=lambda x: x["hits"], reverse=True)
blocked_ip_counter = Counter(e["remote_ip"] for e in blocked_403 if e["remote_ip"])
return {
"total": total,
"first_ts": _fmt_ts(first_ts),
"last_ts": _fmt_ts(last_ts),
"span_hours": round((last_ts - first_ts) / 3600, 2) if first_ts and last_ts else 0,
"by_host": host_summary,
"by_status_class": dict(by_status_class.most_common()),
"by_ip": by_ip.most_common(25),
"blocked_403": [_entry_view(x) for x in blocked_403[:100]],
"blocked_403_total": len(blocked_403),
"blocked_403_by_trigger": classify_403_triggers(blocked_403),
"blocked_ips_top": blocked_ip_counter.most_common(10),
"errors_5xx": [_entry_view(x) for x in errors_5xx[:50]],
"errors_5xx_total": len(errors_5xx),
"slow_requests": [_entry_view(x) for x in slow_requests[:50]],
"slow_total": len(slow_requests),
"emby_versions": emby_versions.most_common(),
"emby_clients": emby_clients.most_common(),
"emby_devices": emby_devices.most_common(),
"emby_breakdown": emby_breakdown,
"device_registry": device_list,
"emby_api_enabled": bool(EMBY_URL and EMBY_KEY),
"auth_summary": summarize_auth(auth_events),
}
def classify_403_triggers(blocked):
"""Heuristically guess which Caddy matcher fired on each 403."""
triggers = Counter()
for e in blocked:
tags = []
uri = e.get("uri") or ""
ua = e.get("user_agent") or ""
qs = parse_query_string(uri)
ver = qs.get("X-Emby-Client-Version") or e.get("emby_version")
if ver in ("2.2.51", "3.5.52"):
tags.append(f"emby_version:{ver}")
if qs.get("X-Emby-Client") in ("Emby for iOS", "Emby+for+iOS"):
tags.append("emby_client:ios")
did = qs.get("X-Emby-Device-Id") or e.get("emby_device_id")
if did == "9F318B1F-6E72-4962-BE37-7F8843EA497A":
tags.append("emby_device_id:known")
if re.search(r"Emby/[\d.]+ CFNetwork/.* Darwin/", ua):
tags.append("ua:emby_ios_native")
if not tags:
tags.append("other")
for t in tags:
triggers[t] += 1
return triggers.most_common()
def summarize_auth(events):
if not events:
return {"total": 0}
by_status = Counter(e["status"] for e in events)
by_ip = Counter(e["remote_ip"] for e in events)
by_path = Counter((e["uri"] or "").split("?")[0] for e in events)
recent = sorted(events, key=lambda x: x["ts"] or 0, reverse=True)[:30]
return {
"total": len(events),
"by_status": by_status.most_common(),
"by_ip": by_ip.most_common(10),
"by_path": by_path.most_common(15),
"recent": [_entry_view(x) for x in recent],
}
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def human_bytes(n):
for unit in ["B", "KB", "MB", "GB", "TB"]:
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def _fmt_ts(ts):
if not ts:
return ""
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def _entry_view(e):
return {
"time": _fmt_ts(e["ts"]),
"host": e["host"],
"method": e["method"],
"uri": (e["uri"] or "")[:120],
"status": e["status"],
"ip": e["remote_ip"],
"ua": (e["user_agent"] or "")[:80],
"duration": round(e["duration"] or 0, 3),
"size": e["size"],
"emby_client": e["emby_client"],
"emby_version": e["emby_version"],
"emby_device": e["emby_device"],
"emby_device_id": e.get("emby_device_id"),
}
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/")
def index():
device_users = fetch_emby_device_users()
entries, lstats = parse_log(LOG_PATH)
summary = summarize(entries, device_users)
if lstats["files_found"] == 1:
log_label = os.path.basename(LOG_PATH)
else:
log_label = os.path.basename(os.path.abspath(LOG_PATH))
return render_template(
"dashboard.html",
s=summary,
log_path=LOG_PATH,
log_label=log_label,
log_size=human_bytes(lstats["total_bytes"]),
log_stats=lstats,
)
@app.route("/api/raw/<int:n>")
def api_raw(n):
"""Return the N most-recent parsed entries as JSON."""
entries, _ = parse_log(LOG_PATH)
entries.sort(key=lambda x: x["ts"] or 0, reverse=True)
return jsonify([_entry_view(e) for e in entries[:n]])
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5000, debug=False)