556 lines
19 KiB
Python
556 lines
19 KiB
Python
"""
|
|
CaddyBuddy — Caddy Log Dashboard
|
|
Reads a Caddy JSON access log and surfaces useful insights.
|
|
|
|
Run: python app.py
|
|
Open: http://127.0.0.1:5000
|
|
|
|
Optional Emby API integration (resolves device IDs to usernames):
|
|
EMBY_URL=http://localhost:8096 EMBY_KEY=<admin_api_key> python app.py
|
|
"""
|
|
|
|
import gzip
|
|
import glob
|
|
import io
|
|
import json
|
|
import re
|
|
import os
|
|
import tarfile
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import unquote_plus, parse_qs, urlparse
|
|
from flask import Flask, render_template, jsonify
|
|
|
|
# Point CADDY_LOG at a single file OR a directory.
|
|
# When it's a directory, CaddyBuddy discovers every .json / .log / .gz / .tar.gz
|
|
# inside it automatically.
|
|
LOG_PATH = os.environ.get("CADDY_LOG", r"C:\Caddy")
|
|
EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096")
|
|
EMBY_KEY = os.environ.get("EMBY_KEY", "b9af54b630f6448289ab96422add567a")
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Header / URI parsing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _first(headers, key):
|
|
"""Caddy stores header values as lists; return the first non-empty value."""
|
|
if not headers:
|
|
return None
|
|
vals = headers.get(key)
|
|
if isinstance(vals, list):
|
|
return next((v for v in vals if v), None)
|
|
return vals or None
|
|
|
|
|
|
# Emby auth-header regexes
|
|
# Header format: MediaBrowser Client="x", Device="y", DeviceId="z", Version="v", Token="t"
|
|
_RE = re.IGNORECASE
|
|
EMBY_CLIENT_RE = re.compile(r'Client="?([^,"\n]+)"?', _RE)
|
|
EMBY_VERSION_RE = re.compile(r'Version="?([0-9][^,"\s]*)"?', _RE)
|
|
EMBY_DEVICE_RE = re.compile(r'Device(?!Id)"?[=\s]"?([^,"\n]+)"?', _RE)
|
|
EMBY_DEVICE_ID_RE = re.compile(r'DeviceId"?[=\s]"?([^,"\s&]+)"?', _RE)
|
|
EMBY_TOKEN_RE = re.compile(r'Token"?[=\s]"?([a-fA-F0-9]{24,})"?', _RE)
|
|
|
|
|
|
def _parse_auth_header(val):
|
|
"""
|
|
Parse a MediaBrowser/Emby authorization header value.
|
|
Returns (client, version, device, device_id, token).
|
|
"""
|
|
if not val:
|
|
return None, None, None, None, None
|
|
|
|
def _g(m):
|
|
return m.group(1).strip() if m else None
|
|
|
|
client = _g(EMBY_CLIENT_RE.search(val))
|
|
version = _g(EMBY_VERSION_RE.search(val))
|
|
device = _g(EMBY_DEVICE_RE.search(val))
|
|
device_id = _g(EMBY_DEVICE_ID_RE.search(val))
|
|
token = _g(EMBY_TOKEN_RE.search(val))
|
|
|
|
# Device names are sometimes URL-encoded in headers
|
|
if device:
|
|
device = unquote_plus(device)
|
|
|
|
return client, version, device, device_id, token
|
|
|
|
|
|
def parse_query_string(uri):
|
|
"""Return a flat {key: first_value} dict from a URI's query string."""
|
|
if not uri or "?" not in uri:
|
|
return {}
|
|
try:
|
|
qs = parse_qs(urlparse(uri).query, keep_blank_values=False)
|
|
return {k: v[0] for k, v in qs.items() if v}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def classify_emby(entry):
|
|
"""
|
|
Best-effort identification of an Emby client from a log entry.
|
|
Returns (client, version, device, device_id, token).
|
|
All fields may be None if this is not an Emby request.
|
|
"""
|
|
req = entry.get("request", {}) or {}
|
|
headers = req.get("headers", {}) or {}
|
|
uri = req.get("uri", "")
|
|
|
|
# 1. Try X-Emby-Authorization header (most complete source)
|
|
auth_val = _first(headers, "X-Emby-Authorization")
|
|
client, version, device, device_id, token = _parse_auth_header(auth_val)
|
|
|
|
# 2. Try individual X-Emby-* headers (some clients send these instead)
|
|
if not client:
|
|
client = _first(headers, "X-Emby-Client")
|
|
if not version:
|
|
version = _first(headers, "X-Emby-Client-Version")
|
|
if not device:
|
|
raw = _first(headers, "X-Emby-Device-Name")
|
|
device = unquote_plus(raw) if raw else None
|
|
if not device_id:
|
|
device_id = _first(headers, "X-Emby-Device-Id")
|
|
if not token:
|
|
token = _first(headers, "X-Emby-Token") or _first(headers, "X-MediaBrowser-Token")
|
|
|
|
# 3. Fall back to query-string params (streaming URLs embed them)
|
|
if not any([client, version, device, device_id]):
|
|
qs = parse_query_string(uri)
|
|
if not client and "X-Emby-Client" in qs:
|
|
client = unquote_plus(qs["X-Emby-Client"])
|
|
if not version:
|
|
version = qs.get("X-Emby-Client-Version")
|
|
if not device and "X-Emby-Device-Name" in qs:
|
|
device = unquote_plus(qs["X-Emby-Device-Name"])
|
|
if not device_id:
|
|
device_id = qs.get("X-Emby-Device-Id")
|
|
if not token:
|
|
token = qs.get("X-Emby-Token")
|
|
|
|
if not any([client, version, device, device_id]):
|
|
return None, None, None, None, None
|
|
|
|
return client, version, device, device_id, token
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Emby API integration (optional)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_emby_device_users():
|
|
"""
|
|
Call the Emby /Devices endpoint and return {device_id: last_user_name}.
|
|
Returns an empty dict if the API is not configured or unreachable.
|
|
"""
|
|
if not EMBY_URL or not EMBY_KEY:
|
|
return {}
|
|
try:
|
|
import urllib.request
|
|
url = f"{EMBY_URL.rstrip('/')}/emby/Devices?api_key={EMBY_KEY}"
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=4) as resp:
|
|
data = json.loads(resp.read())
|
|
result = {}
|
|
for item in data.get("Items", []):
|
|
did = item.get("Id") or item.get("DeviceId")
|
|
user = item.get("LastUserName") or item.get("UserName")
|
|
if did and user:
|
|
result[did] = user
|
|
return result
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Log ingestion — file discovery + multi-format streaming
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Filename suffixes we recognise, in priority order (most-specific first so
|
|
# .tar.gz isn't accidentally matched by the plain .gz rule).
|
|
_LOG_GLOBS = ["*.tar.gz", "*.tgz", "*.json.gz", "*.log.gz", "*.gz", "*.json", "*.log"]
|
|
|
|
|
|
def find_log_files(path):
|
|
"""
|
|
Given a file path or a directory, return a sorted list of log file paths.
|
|
Files are sorted oldest-first by modification time so merged data is in
|
|
chronological order.
|
|
"""
|
|
if os.path.isfile(path):
|
|
return [path]
|
|
if not os.path.isdir(path):
|
|
return []
|
|
|
|
found = set()
|
|
for pattern in _LOG_GLOBS:
|
|
found.update(glob.glob(os.path.join(path, pattern)))
|
|
# one level of sub-directories (e.g. logs/2024/)
|
|
found.update(glob.glob(os.path.join(path, "*", pattern)))
|
|
|
|
return sorted(found, key=os.path.getmtime)
|
|
|
|
|
|
def _iter_lines(filepath):
|
|
"""
|
|
Yield raw text lines from a log file regardless of compression format.
|
|
Handles: plain text, .gz, .tar.gz / .tgz (any members inside the archive).
|
|
"""
|
|
name = os.path.basename(filepath).lower()
|
|
|
|
if name.endswith(".tar.gz") or name.endswith(".tgz"):
|
|
with tarfile.open(filepath, "r:gz") as tar:
|
|
for member in tar.getmembers():
|
|
if not member.isfile():
|
|
continue
|
|
fobj = tar.extractfile(member)
|
|
if fobj is None:
|
|
continue
|
|
# Member itself might be gzip-compressed
|
|
raw = fobj.read()
|
|
if raw[:2] == b"\x1f\x8b":
|
|
raw = gzip.decompress(raw)
|
|
yield from io.TextIOWrapper(io.BytesIO(raw), encoding="utf-8", errors="replace")
|
|
elif name.endswith(".gz"):
|
|
with gzip.open(filepath, "rt", encoding="utf-8", errors="replace") as f:
|
|
yield from f
|
|
else:
|
|
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
|
|
yield from f
|
|
|
|
|
|
def _parse_line(raw):
|
|
"""Parse one JSON log line into an entry dict, or return None."""
|
|
raw = raw.strip()
|
|
if not raw or raw[0] != "{":
|
|
return None
|
|
try:
|
|
rec = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
req = rec.get("request", {}) or {}
|
|
host = req.get("host", "")
|
|
h = req.get("headers", {}) or {}
|
|
ua = _first(h, "User-Agent") or ""
|
|
|
|
client, version, device, device_id, token = classify_emby(rec)
|
|
|
|
return {
|
|
"ts": rec.get("ts"),
|
|
"host": host,
|
|
"method": req.get("method", ""),
|
|
"uri": req.get("uri", ""),
|
|
"status": rec.get("status", 0),
|
|
"size": rec.get("size", 0),
|
|
"duration": rec.get("duration", 0),
|
|
"remote_ip": req.get("remote_ip", ""),
|
|
"user_agent": ua,
|
|
"emby_client": client,
|
|
"emby_version": version,
|
|
"emby_device": device,
|
|
"emby_device_id": device_id,
|
|
"emby_token": token,
|
|
"referer": _first(h, "Referer"),
|
|
}
|
|
|
|
|
|
def parse_log(path):
|
|
"""
|
|
Parse all log files found at *path* (file or directory).
|
|
Returns (entries, log_stats) where log_stats is a dict with:
|
|
files_found, files_ok, files_error, total_bytes, formats
|
|
"""
|
|
files = find_log_files(path)
|
|
entries = []
|
|
stats = {"files_found": len(files), "files_ok": 0, "files_error": 0,
|
|
"total_bytes": 0, "formats": Counter()}
|
|
|
|
for filepath in files:
|
|
try:
|
|
stats["total_bytes"] += os.path.getsize(filepath)
|
|
name = os.path.basename(filepath).lower()
|
|
if name.endswith(".tar.gz") or name.endswith(".tgz"): fmt = "tar.gz"
|
|
elif name.endswith(".gz"): fmt = "gz"
|
|
else: fmt = "plain"
|
|
stats["formats"][fmt] += 1
|
|
|
|
count_before = len(entries)
|
|
for raw in _iter_lines(filepath):
|
|
entry = _parse_line(raw)
|
|
if entry:
|
|
entries.append(entry)
|
|
|
|
if len(entries) > count_before:
|
|
stats["files_ok"] += 1
|
|
except Exception:
|
|
stats["files_error"] += 1
|
|
|
|
stats["formats"] = dict(stats["formats"])
|
|
return entries, stats
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Aggregations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def summarize(entries, device_users=None):
|
|
"""Aggregate log entries into a summary dict for the template."""
|
|
if device_users is None:
|
|
device_users = {}
|
|
|
|
total = len(entries)
|
|
by_host = Counter()
|
|
by_status_class = Counter()
|
|
by_ip = Counter()
|
|
bytes_by_host = defaultdict(int)
|
|
status_by_host = defaultdict(Counter)
|
|
|
|
blocked_403 = []
|
|
errors_5xx = []
|
|
slow_requests = []
|
|
|
|
emby_versions = Counter()
|
|
emby_clients = Counter()
|
|
emby_devices = Counter()
|
|
emby_version_by_client = defaultdict(Counter)
|
|
device_registry = {}
|
|
|
|
auth_events = []
|
|
first_ts = last_ts = None
|
|
SLOW_THRESHOLD = 2.0
|
|
|
|
for e in entries:
|
|
ts = e["ts"]
|
|
if ts:
|
|
if first_ts is None or ts < first_ts: first_ts = ts
|
|
if last_ts is None or ts > last_ts: last_ts = ts
|
|
|
|
host = e["host"] or "(none)"
|
|
by_host[host] += 1
|
|
bytes_by_host[host] += e["size"] or 0
|
|
|
|
status = e["status"] or 0
|
|
klass = f"{status // 100}xx" if status else "0xx"
|
|
by_status_class[klass] += 1
|
|
status_by_host[host][klass] += 1
|
|
|
|
ip = e["remote_ip"] or "(none)"
|
|
by_ip[ip] += 1
|
|
|
|
if status == 403: blocked_403.append(e)
|
|
if 500 <= status < 600: errors_5xx.append(e)
|
|
if (e["duration"] or 0) >= SLOW_THRESHOLD: slow_requests.append(e)
|
|
|
|
if e["emby_version"]: emby_versions[e["emby_version"]] += 1
|
|
if e["emby_client"]: emby_clients[e["emby_client"]] += 1
|
|
if e["emby_device"]: emby_devices[e["emby_device"]] += 1
|
|
if e["emby_client"] and e["emby_version"]:
|
|
emby_version_by_client[e["emby_client"]][e["emby_version"]] += 1
|
|
|
|
# Device registry keyed by DeviceId
|
|
did = e.get("emby_device_id")
|
|
if did:
|
|
if did not in device_registry:
|
|
device_registry[did] = {
|
|
"device_id": did,
|
|
"device": e["emby_device"] or did[:12],
|
|
"client": e["emby_client"],
|
|
"version": e["emby_version"],
|
|
"token": e.get("emby_token"),
|
|
"ips": set(),
|
|
"hits": 0,
|
|
"last_ts": None,
|
|
}
|
|
rec = device_registry[did]
|
|
rec["hits"] += 1
|
|
if e["remote_ip"]:
|
|
rec["ips"].add(e["remote_ip"])
|
|
if ts and (rec["last_ts"] is None or ts > rec["last_ts"]):
|
|
rec["last_ts"] = ts
|
|
if e["emby_version"]: rec["version"] = e["emby_version"]
|
|
if e["emby_client"]: rec["client"] = e["emby_client"]
|
|
if e["emby_device"]: rec["device"] = e["emby_device"]
|
|
if e.get("emby_token"): rec["token"] = e["emby_token"]
|
|
|
|
if host == "auth.mattcohen.net":
|
|
auth_events.append(e)
|
|
|
|
slow_requests.sort(key=lambda x: x["duration"] or 0, reverse=True)
|
|
blocked_403.sort(key=lambda x: x["ts"] or 0, reverse=True)
|
|
|
|
max_hits = max(by_host.values(), default=1)
|
|
host_summary = [
|
|
{
|
|
"host": host,
|
|
"hits": count,
|
|
"bytes_human": human_bytes(bytes_by_host[host]),
|
|
"status_mix": dict(status_by_host[host]),
|
|
"pct": round(count / max_hits * 100),
|
|
}
|
|
for host, count in by_host.most_common()
|
|
]
|
|
|
|
emby_breakdown = sorted(
|
|
[
|
|
{"client": c, "total": sum(vs.values()), "versions": dict(vs.most_common())}
|
|
for c, vs in emby_version_by_client.items()
|
|
],
|
|
key=lambda x: x["total"], reverse=True,
|
|
)
|
|
|
|
device_list = []
|
|
for rec in device_registry.values():
|
|
did = rec["device_id"]
|
|
user = device_users.get(did)
|
|
device_list.append({
|
|
**rec,
|
|
"ips": sorted(rec["ips"]),
|
|
"last_seen": _fmt_ts(rec["last_ts"]),
|
|
"username": user,
|
|
})
|
|
device_list.sort(key=lambda x: x["hits"], reverse=True)
|
|
|
|
blocked_ip_counter = Counter(e["remote_ip"] for e in blocked_403 if e["remote_ip"])
|
|
|
|
return {
|
|
"total": total,
|
|
"first_ts": _fmt_ts(first_ts),
|
|
"last_ts": _fmt_ts(last_ts),
|
|
"span_hours": round((last_ts - first_ts) / 3600, 2) if first_ts and last_ts else 0,
|
|
"by_host": host_summary,
|
|
"by_status_class": dict(by_status_class.most_common()),
|
|
"by_ip": by_ip.most_common(25),
|
|
"blocked_403": [_entry_view(x) for x in blocked_403[:100]],
|
|
"blocked_403_total": len(blocked_403),
|
|
"blocked_403_by_trigger": classify_403_triggers(blocked_403),
|
|
"blocked_ips_top": blocked_ip_counter.most_common(10),
|
|
"errors_5xx": [_entry_view(x) for x in errors_5xx[:50]],
|
|
"errors_5xx_total": len(errors_5xx),
|
|
"slow_requests": [_entry_view(x) for x in slow_requests[:50]],
|
|
"slow_total": len(slow_requests),
|
|
"emby_versions": emby_versions.most_common(),
|
|
"emby_clients": emby_clients.most_common(),
|
|
"emby_devices": emby_devices.most_common(),
|
|
"emby_breakdown": emby_breakdown,
|
|
"device_registry": device_list,
|
|
"emby_api_enabled": bool(EMBY_URL and EMBY_KEY),
|
|
"auth_summary": summarize_auth(auth_events),
|
|
}
|
|
|
|
|
|
def classify_403_triggers(blocked):
|
|
"""Heuristically guess which Caddy matcher fired on each 403."""
|
|
triggers = Counter()
|
|
for e in blocked:
|
|
tags = []
|
|
uri = e.get("uri") or ""
|
|
ua = e.get("user_agent") or ""
|
|
qs = parse_query_string(uri)
|
|
|
|
ver = qs.get("X-Emby-Client-Version") or e.get("emby_version")
|
|
if ver in ("2.2.51", "3.5.52"):
|
|
tags.append(f"emby_version:{ver}")
|
|
if qs.get("X-Emby-Client") in ("Emby for iOS", "Emby+for+iOS"):
|
|
tags.append("emby_client:ios")
|
|
did = qs.get("X-Emby-Device-Id") or e.get("emby_device_id")
|
|
if did == "9F318B1F-6E72-4962-BE37-7F8843EA497A":
|
|
tags.append("emby_device_id:known")
|
|
if re.search(r"Emby/[\d.]+ CFNetwork/.* Darwin/", ua):
|
|
tags.append("ua:emby_ios_native")
|
|
if not tags:
|
|
tags.append("other")
|
|
for t in tags:
|
|
triggers[t] += 1
|
|
return triggers.most_common()
|
|
|
|
|
|
def summarize_auth(events):
|
|
if not events:
|
|
return {"total": 0}
|
|
by_status = Counter(e["status"] for e in events)
|
|
by_ip = Counter(e["remote_ip"] for e in events)
|
|
by_path = Counter((e["uri"] or "").split("?")[0] for e in events)
|
|
recent = sorted(events, key=lambda x: x["ts"] or 0, reverse=True)[:30]
|
|
return {
|
|
"total": len(events),
|
|
"by_status": by_status.most_common(),
|
|
"by_ip": by_ip.most_common(10),
|
|
"by_path": by_path.most_common(15),
|
|
"recent": [_entry_view(x) for x in recent],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Formatting helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def human_bytes(n):
|
|
for unit in ["B", "KB", "MB", "GB", "TB"]:
|
|
if n < 1024:
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
def _fmt_ts(ts):
|
|
if not ts:
|
|
return ""
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def _entry_view(e):
|
|
return {
|
|
"time": _fmt_ts(e["ts"]),
|
|
"host": e["host"],
|
|
"method": e["method"],
|
|
"uri": (e["uri"] or "")[:120],
|
|
"status": e["status"],
|
|
"ip": e["remote_ip"],
|
|
"ua": (e["user_agent"] or "")[:80],
|
|
"duration": round(e["duration"] or 0, 3),
|
|
"size": e["size"],
|
|
"emby_client": e["emby_client"],
|
|
"emby_version": e["emby_version"],
|
|
"emby_device": e["emby_device"],
|
|
"emby_device_id": e.get("emby_device_id"),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/")
|
|
def index():
|
|
device_users = fetch_emby_device_users()
|
|
entries, lstats = parse_log(LOG_PATH)
|
|
summary = summarize(entries, device_users)
|
|
if lstats["files_found"] == 1:
|
|
log_label = os.path.basename(LOG_PATH)
|
|
else:
|
|
log_label = os.path.basename(os.path.abspath(LOG_PATH))
|
|
return render_template(
|
|
"dashboard.html",
|
|
s=summary,
|
|
log_path=LOG_PATH,
|
|
log_label=log_label,
|
|
log_size=human_bytes(lstats["total_bytes"]),
|
|
log_stats=lstats,
|
|
)
|
|
|
|
|
|
@app.route("/api/raw/<int:n>")
|
|
def api_raw(n):
|
|
"""Return the N most-recent parsed entries as JSON."""
|
|
entries, _ = parse_log(LOG_PATH)
|
|
entries.sort(key=lambda x: x["ts"] or 0, reverse=True)
|
|
return jsonify([_entry_view(e) for e in entries[:n]])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="127.0.0.1", port=5000, debug=False)
|