Files
embycovers/app.py
T

1582 lines
58 KiB
Python
Raw Normal View History

2026-04-15 09:27:29 +12:00
import asyncio
import io
import os
import hashlib
import base64
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import httpx
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, Response, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageColor, ImageOps, UnidentifiedImageError
app = FastAPI(title="Emby Thumbnail Generator")
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096")
EMBY_API_KEY = os.environ.get("EMBY_API_KEY", "b9af54b630f6448289ab96422add567a")
CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True)
RENDER_VERSION = "series-banner-v10"
THUMB_WIDTH = 800
THUMB_HEIGHT = 450
PRIMARY_WIDTH = 1000
PRIMARY_HEIGHT = 1500
PRIMARY_MIN_ZOOM = 0.6
PRIMARY_MAX_ZOOM = 2.6
HTTP_TIMEOUT = 30.0
http_client: httpx.AsyncClient | None = None
AIRING_LOOKUP_CACHE_TTL = 900
NEW_SEASON_MIN_AGE_DAYS = 7
NEW_SEASON_MAX_AGE_DAYS = 21
SEASON_INFERENCE_LOOKBACK_DAYS = 180
airing_lookup_cache: dict[int, dict] = {}
airing_lookup_lock = asyncio.Lock()
# ── Studio logo file map ─────────────────────────────────────────────────────
STUDIOS_DIR = Path("static/studios")
# Maps studio key → filename (relative to STUDIOS_DIR)
STUDIO_FILES: dict[str, str] = {
"hulu": "hulu.png",
"hbo": "hbo.png",
"disney": "disney.png",
}
def make_studio_logo(studio: str, max_height: int = 52) -> Image.Image | None:
"""Load a studio logo from disk and scale it to max_height, preserving aspect ratio."""
filename = STUDIO_FILES.get(studio)
if not filename:
return None
path = STUDIOS_DIR / filename
if not path.exists():
return None
img = Image.open(path).convert("RGBA")
if img.height > max_height:
scale = max_height / img.height
img = img.resize((int(img.width * scale), max_height), Image.LANCZOS)
return img
def load_image_from_bytes(image_bytes: bytes, mode: str = "RGB") -> Image.Image:
"""Fully decode image bytes into an in-memory PIL image.
Some Emby-delivered assets appear to trigger blocky/partial artifacts when
we process the lazy decoder output directly. Forcing a full load and copy
gives us a stable image object before resize/filter operations.
"""
with Image.open(io.BytesIO(image_bytes)) as img:
img = ImageOps.exif_transpose(img)
img.load()
return img.convert(mode).copy()
# --- Emby API helpers ---
def get_http_client() -> httpx.AsyncClient:
global http_client
if http_client is None:
http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
return http_client
@app.on_event("startup")
async def startup_event():
get_http_client()
@app.on_event("shutdown")
async def shutdown_event():
global http_client
if http_client is not None:
await http_client.aclose()
http_client = None
async def emby_get(path: str, params: dict = None) -> dict:
client = get_http_client()
url = f"{EMBY_URL}{path}"
p = params or {}
p["api_key"] = EMBY_API_KEY
r = await client.get(url, params=p)
r.raise_for_status()
return r.json()
async def emby_get_image(
item_id: str,
image_type: str = "Primary",
index: int | None = None,
*,
max_width: int | None = None,
max_height: int | None = None,
quality: int | None = None,
) -> bytes:
client = get_http_client()
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
if index is not None:
params["Index"] = index
if max_width is not None:
params["maxWidth"] = max_width
if max_height is not None:
params["maxHeight"] = max_height
if quality is not None:
params["quality"] = quality
r = await client.get(url, params=params)
r.raise_for_status()
return r.content
async def emby_get_image_with_type(
item_id: str,
image_type: str = "Primary",
index: int | None = None,
*,
max_width: int | None = None,
max_height: int | None = None,
quality: int | None = None,
) -> tuple[bytes, str]:
client = get_http_client()
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
if index is not None:
params["Index"] = index
if max_width is not None:
params["maxWidth"] = max_width
if max_height is not None:
params["maxHeight"] = max_height
if quality is not None:
params["quality"] = quality
r = await client.get(url, params=params)
r.raise_for_status()
content_type = (r.headers.get("content-type", "image/jpeg") or "image/jpeg").split(";")[0]
return r.content, content_type
async def emby_get_image_optional(item_id: str, image_type: str, index: int | None = None) -> bytes | None:
"""Returns image bytes or None if the image type doesn't exist for this item."""
client = get_http_client()
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
if index is not None:
params["Index"] = index
r = await client.get(url, params=params)
if r.status_code in (404, 400):
return None
r.raise_for_status()
content_type = r.headers.get("content-type", "").lower()
if content_type and not content_type.startswith("image/"):
return None
return r.content
async def emby_upload_image(item_id: str, image_bytes: bytes, image_type: str = "Thumb"):
upload_buf = io.BytesIO()
upload_image = load_image_from_bytes(image_bytes, mode="RGB")
upload_image.save(upload_buf, format="JPEG", quality=92, optimize=True)
upload_b64 = base64.b64encode(upload_buf.getvalue())
client = get_http_client()
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
r = await client.post(
url,
params={"api_key": EMBY_API_KEY},
content=upload_b64,
headers={
"Content-Type": "image/jpeg",
"X-Emby-Token": EMBY_API_KEY,
},
)
try:
r.raise_for_status()
except httpx.HTTPStatusError as exc:
detail = r.text.strip() or str(exc)
raise HTTPException(
status_code=502,
detail=f"Emby upload failed ({r.status_code}): {detail}",
) from exc
return r.status_code
def parse_emby_datetime(value: str | None) -> datetime | None:
if not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def to_emby_iso(value: datetime) -> str:
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
async def emby_get_all(path: str, params: dict | None = None, *, page_size: int = 200) -> list[dict]:
items: list[dict] = []
start_index = 0
base_params = dict(params or {})
while True:
page = await emby_get(path, {
**base_params,
"StartIndex": str(start_index),
"Limit": str(page_size),
})
batch = page.get("Items", [])
items.extend(batch)
total = int(page.get("TotalRecordCount", start_index + len(batch)))
if not batch or start_index + len(batch) >= total:
break
start_index += len(batch)
return items
def get_week_bounds(now: datetime, week_offset: int) -> tuple[datetime, datetime]:
week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0)
week_start = week_start + timedelta(weeks=week_offset)
return week_start, week_start + timedelta(days=7)
async def build_airing_snapshot(week_offset: int = 0) -> dict:
now = datetime.now(timezone.utc)
week_start, week_end = get_week_bounds(now, week_offset)
week_shift_days = max(0, -week_offset) * 7
eligible_min_days = NEW_SEASON_MIN_AGE_DAYS + week_shift_days
eligible_max_days = NEW_SEASON_MAX_AGE_DAYS + week_shift_days
recent_window_start = now - timedelta(days=eligible_max_days)
recent_window_end = now - timedelta(days=eligible_min_days)
inference_window_start = now - timedelta(days=max(SEASON_INFERENCE_LOOKBACK_DAYS, eligible_max_days + 35))
series_task = emby_get_all("/Items", {
"IncludeItemTypes": "Series",
"Recursive": "true",
"SortBy": "SortName",
"SortOrder": "Ascending",
"ImageTypeLimit": "1",
"EnableImageTypes": "Primary",
})
recent_episodes_task = emby_get_all("/Items", {
"IncludeItemTypes": "Episode",
"Recursive": "true",
"SortBy": "PremiereDate",
"SortOrder": "Descending",
"MinPremiereDate": to_emby_iso(recent_window_start),
"MaxPremiereDate": to_emby_iso(recent_window_end),
})
recent_season_activity_task = emby_get_all("/Items", {
"IncludeItemTypes": "Episode",
"Recursive": "true",
"SortBy": "PremiereDate",
"SortOrder": "Ascending",
"MinPremiereDate": to_emby_iso(inference_window_start),
"MaxPremiereDate": to_emby_iso(now + timedelta(days=1)),
})
selected_week_episodes_task = emby_get_all("/Items", {
"IncludeItemTypes": "Episode",
"Recursive": "true",
"SortBy": "PremiereDate",
"SortOrder": "Ascending",
"MinPremiereDate": to_emby_iso(week_start),
"MaxPremiereDate": to_emby_iso(week_end),
})
upcoming_episodes_task = emby_get_all("/Shows/Upcoming", {
"SortBy": "PremiereDate",
"SortOrder": "Ascending",
"MinPremiereDate": to_emby_iso(now - timedelta(hours=12)),
})
series_items, recent_episodes, recent_season_activity, selected_week_episodes, upcoming_episodes = await asyncio.gather(
series_task,
recent_episodes_task,
recent_season_activity_task,
selected_week_episodes_task,
upcoming_episodes_task,
return_exceptions=True,
)
if isinstance(series_items, Exception):
raise series_items
if isinstance(recent_episodes, Exception):
recent_episodes = []
if isinstance(recent_season_activity, Exception):
recent_season_activity = []
if isinstance(selected_week_episodes, Exception):
selected_week_episodes = []
if isinstance(upcoming_episodes, Exception):
upcoming_episodes = []
recent_premieres_by_series: dict[str, dict] = {}
for episode in recent_episodes:
series_id = episode.get("SeriesId")
season_number = int(episode.get("ParentIndexNumber") or 0)
episode_number = int(episode.get("IndexNumber") or 0)
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
if not series_id or season_number < 2 or episode_number != 1 or premiere_at is None:
continue
existing = recent_premieres_by_series.get(series_id)
if existing is None or premiere_at > existing["premiere_at"]:
recent_premieres_by_series[series_id] = {
"premiere_at": premiere_at,
"season_number": season_number,
"episode_id": episode.get("Id"),
"episode_name": episode.get("Name") or "",
}
earliest_seen_by_series_season: dict[tuple[str, int], dict] = {}
for episode in recent_season_activity:
series_id = episode.get("SeriesId")
season_number = int(episode.get("ParentIndexNumber") or 0)
episode_number = int(episode.get("IndexNumber") or 0)
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
if not series_id or season_number < 2 or premiere_at is None:
continue
key = (series_id, season_number)
existing = earliest_seen_by_series_season.get(key)
if existing is None or premiere_at < existing["premiere_at"]:
earliest_seen_by_series_season[key] = {
"premiere_at": premiere_at,
"season_number": season_number,
"episode_number": episode_number,
"episode_name": episode.get("Name") or "",
}
selected_week_by_series: dict[str, dict] = {}
for episode in selected_week_episodes:
series_id = episode.get("SeriesId")
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
if not series_id or premiere_at is None:
continue
existing = selected_week_by_series.get(series_id)
if existing is None or premiere_at < existing["premiere_at"]:
season_number = int(episode.get("ParentIndexNumber") or 0)
episode_number = int(episode.get("IndexNumber") or 0)
selected_week_by_series[series_id] = {
"premiere_at": premiere_at,
"season_number": season_number,
"episode_number": episode_number,
"episode_name": episode.get("Name") or "",
}
upcoming_by_series: dict[str, dict] = {}
for episode in upcoming_episodes:
series_id = episode.get("SeriesId")
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
if not series_id or premiere_at is None:
continue
existing = upcoming_by_series.get(series_id)
if existing is None or premiere_at < existing["premiere_at"]:
season_number = int(episode.get("ParentIndexNumber") or 0)
episode_number = int(episode.get("IndexNumber") or 0)
upcoming_by_series[series_id] = {
"premiere_at": premiere_at,
"season_number": season_number,
"episode_number": episode_number,
"episode_name": episode.get("Name") or "",
}
results: list[dict] = []
for series in series_items:
series_id = series.get("Id")
if not series_id:
continue
recent_premiere = recent_premieres_by_series.get(series_id)
selected_week_episode = selected_week_by_series.get(series_id)
upcoming = upcoming_by_series.get(series_id)
status = (series.get("Status") or "").strip()
is_current_airing = (
status.lower() == "continuing"
or selected_week_episode is not None
or upcoming is not None
or recent_premiere is not None
)
if not is_current_airing:
continue
air_days = series.get("AirDays") or []
if isinstance(air_days, str):
air_days = [part.strip() for part in air_days.split(",") if part.strip()]
active_season_number = max(
recent_premiere["season_number"] if recent_premiere else 0,
selected_week_episode["season_number"] if selected_week_episode else 0,
upcoming["season_number"] if upcoming else 0,
)
inferred_recent_premiere = None
if recent_premiere is None and active_season_number >= 2 and (selected_week_episode is not None or upcoming is not None):
inferred_recent_premiere = earliest_seen_by_series_season.get((series_id, active_season_number))
season_start = recent_premiere or inferred_recent_premiere
days_since_premiere = None
if season_start is not None:
days_since_premiere = max(0, int((now - season_start["premiere_at"]).total_seconds() // 86400))
has_current_airing_signal = status.lower() == "continuing" and (
selected_week_episode is not None or upcoming is not None
)
has_recent_season_start_signal = (
status.lower() == "continuing"
and (selected_week_episode is not None or upcoming is not None)
and days_since_premiere is not None
and eligible_min_days <= days_since_premiere <= eligible_max_days
)
eligible_new_season = has_current_airing_signal or has_recent_season_start_signal
season_premiere_inferred = recent_premiere is None and inferred_recent_premiere is not None
eligibility_reason = (
"current_airing"
if has_current_airing_signal
else ("recent_season_start" if has_recent_season_start_signal else None)
)
next_air_at = upcoming["premiere_at"] if upcoming else None
results.append({
"id": series_id,
"name": series.get("Name", ""),
"year": series.get("ProductionYear"),
"status": status or ("Continuing" if upcoming else "Unknown"),
"air_days": air_days,
"poster_url": f"/api/poster/{series_id}?w=180&h=270&q=84",
"selected_week_air_at": to_emby_iso(selected_week_episode["premiere_at"]) if selected_week_episode else None,
"selected_week_episode_label": (
f"S{selected_week_episode['season_number']}E{selected_week_episode['episode_number']} · {selected_week_episode['episode_name']}".strip(" ·")
if selected_week_episode and selected_week_episode["season_number"] and selected_week_episode["episode_number"]
else (selected_week_episode["episode_name"] if selected_week_episode else None)
),
"next_air_at": to_emby_iso(next_air_at) if next_air_at else None,
"next_episode_label": (
f"S{upcoming['season_number']}E{upcoming['episode_number']} · {upcoming['episode_name']}".strip(" ·")
if upcoming and upcoming["season_number"] and upcoming["episode_number"]
else (upcoming["episode_name"] if upcoming else None)
),
"season_number": season_start["season_number"] if season_start else None,
"season_premiere_at": to_emby_iso(season_start["premiere_at"]) if season_start else None,
"season_premiere_episode": season_start["episode_name"] if season_start else None,
"days_since_season_premiere": days_since_premiere,
"season_premiere_inferred": season_premiere_inferred,
"eligibility_reason": eligibility_reason,
"eligible_new_season": eligible_new_season,
})
results.sort(key=lambda item: (
0 if item["eligible_new_season"] else 1,
item["selected_week_air_at"] is None and item["next_air_at"] is None,
item["selected_week_air_at"] or item["next_air_at"] or "",
item["name"].lower(),
))
return {
"items": results,
"generated_at": to_emby_iso(now),
"week_start": to_emby_iso(week_start),
"week_end": to_emby_iso(week_end),
"week_offset": week_offset,
"new_season_min_days": eligible_min_days,
"new_season_max_days": eligible_max_days,
}
async def get_airing_snapshot(force_refresh: bool = False, week_offset: int = 0) -> dict:
cached = airing_lookup_cache.get(week_offset)
if not force_refresh and cached and cached["expires_at"] > time.time():
return cached["data"]
async with airing_lookup_lock:
cached = airing_lookup_cache.get(week_offset)
if not force_refresh and cached and cached["expires_at"] > time.time():
return cached["data"]
data = await build_airing_snapshot(week_offset=week_offset)
airing_lookup_cache[week_offset] = {
"expires_at": time.time() + AIRING_LOOKUP_CACHE_TTL,
"data": data,
}
return data
# --- Image helpers ---
def cover_crop(img: Image.Image, width: int, height: int) -> Image.Image:
"""Scale and center-crop image to exactly width x height."""
return ImageOps.fit(
img,
(width, height),
method=Image.LANCZOS,
centering=(0.5, 0.5),
)
def cover_crop_positioned(
img: Image.Image,
width: int,
height: int,
*,
zoom: float = 1.0,
pan_x: float = 0.0,
pan_y: float = 0.0,
) -> Image.Image:
"""Cover-crop with user-controlled zoom and pan.
`pan_x` / `pan_y` are normalized in [-1, 1], where -1 means left/top and
+1 means right/bottom.
"""
pan_x = max(-1.0, min(1.0, float(pan_x)))
pan_y = max(-1.0, min(1.0, float(pan_y)))
zoom = max(PRIMARY_MIN_ZOOM, min(PRIMARY_MAX_ZOOM, float(zoom)))
if zoom < 1.0:
contain_scale = min(width / max(1, img.width), height / max(1, img.height))
cover_scale = max(width / max(1, img.width), height / max(1, img.height))
zoom_t = (zoom - PRIMARY_MIN_ZOOM) / max(0.0001, 1.0 - PRIMARY_MIN_ZOOM)
zoom_t = max(0.0, min(1.0, zoom_t))
scale = contain_scale + (cover_scale - contain_scale) * zoom_t
fitted_w = max(1, int(round(img.width * scale)))
fitted_h = max(1, int(round(img.height * scale)))
fitted = img.resize((fitted_w, fitted_h), Image.LANCZOS)
if fitted_w >= width:
paste_x = -int(round((fitted_w - width) * ((pan_x + 1.0) / 2.0)))
else:
paste_x = int(round((width - fitted_w) * ((pan_x + 1.0) / 2.0)))
if fitted_h >= height:
paste_y = -int(round((fitted_h - height) * ((pan_y + 1.0) / 2.0)))
else:
paste_y = int(round((height - fitted_h) * ((pan_y + 1.0) / 2.0)))
canvas = Image.new("RGB", (width, height))
left_gap = max(0, paste_x)
top_gap = max(0, paste_y)
right_gap = max(0, width - (paste_x + fitted_w))
bottom_gap = max(0, height - (paste_y + fitted_h))
strip = max(1, min(12, fitted_w, fitted_h))
if top_gap:
top_strip = fitted.crop((0, 0, fitted_w, strip)).resize((fitted_w, top_gap), Image.LANCZOS)
canvas.paste(top_strip, (paste_x, 0))
if bottom_gap:
bottom_strip = fitted.crop((0, fitted_h - strip, fitted_w, fitted_h)).resize((fitted_w, bottom_gap), Image.LANCZOS)
canvas.paste(bottom_strip, (paste_x, paste_y + fitted_h))
if left_gap:
left_strip = fitted.crop((0, 0, strip, fitted_h)).resize((left_gap, fitted_h), Image.LANCZOS)
canvas.paste(left_strip, (0, paste_y))
if right_gap:
right_strip = fitted.crop((fitted_w - strip, 0, fitted_w, fitted_h)).resize((right_gap, fitted_h), Image.LANCZOS)
canvas.paste(right_strip, (paste_x + fitted_w, paste_y))
if top_gap and left_gap:
canvas.paste(fitted.crop((0, 0, strip, strip)).resize((left_gap, top_gap), Image.LANCZOS), (0, 0))
if top_gap and right_gap:
canvas.paste(
fitted.crop((fitted_w - strip, 0, fitted_w, strip)).resize((right_gap, top_gap), Image.LANCZOS),
(paste_x + fitted_w, 0),
)
if bottom_gap and left_gap:
canvas.paste(
fitted.crop((0, fitted_h - strip, strip, fitted_h)).resize((left_gap, bottom_gap), Image.LANCZOS),
(0, paste_y + fitted_h),
)
if bottom_gap and right_gap:
canvas.paste(
fitted.crop((fitted_w - strip, fitted_h - strip, fitted_w, fitted_h)).resize((right_gap, bottom_gap), Image.LANCZOS),
(paste_x + fitted_w, paste_y + fitted_h),
)
canvas.paste(fitted, (paste_x, paste_y))
return canvas
target_ratio = width / max(1, height)
source_ratio = img.width / max(1, img.height)
if source_ratio > target_ratio:
crop_h = img.height
crop_w = int(round(crop_h * target_ratio))
else:
crop_w = img.width
crop_h = int(round(crop_w / target_ratio))
crop_w = max(1, min(img.width, int(round(crop_w / zoom))))
crop_h = max(1, min(img.height, int(round(crop_h / zoom))))
max_left = max(0, img.width - crop_w)
max_top = max(0, img.height - crop_h)
left = int(round((max_left / 2) * (pan_x + 1.0)))
top = int(round((max_top / 2) * (pan_y + 1.0)))
cropped = img.crop((left, top, left + crop_w, top + crop_h))
return cropped.resize((width, height), Image.LANCZOS)
def build_tall_backdrop_background(
img: Image.Image,
width: int,
height: int,
*,
dim_factor: float,
zoom: float = 1.0,
pan_x: float = 0.0,
pan_y: float = -0.16,
) -> Image.Image:
base = cover_crop_positioned(img, width, height, zoom=zoom, pan_x=pan_x, pan_y=pan_y)
return base.point(lambda p: int(p * dim_factor))
def get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont:
font_paths = [
# Windows
"C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf",
"C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf",
"C:/Windows/Fonts/verdanab.ttf" if bold else "C:/Windows/Fonts/verdana.ttf",
# Linux
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
]
for fp in font_paths:
if os.path.exists(fp):
return ImageFont.truetype(fp, size)
return ImageFont.load_default(size)
def get_arial_bold_font(size: int) -> ImageFont.FreeTypeFont:
font_paths = [
"C:/Windows/Fonts/arialbd.ttf",
"/usr/share/fonts/truetype/msttcorefonts/Arial_Bold.ttf",
"/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
]
for fp in font_paths:
if os.path.exists(fp):
return ImageFont.truetype(fp, size)
return get_font(size, bold=True)
def get_badge_font(size: int) -> ImageFont.FreeTypeFont:
font_paths = [
"C:/Windows/Fonts/ariblk.ttf",
"C:/Windows/Fonts/arialbd.ttf",
"C:/Windows/Fonts/seguisb.ttf",
"/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
]
for fp in font_paths:
if os.path.exists(fp):
return ImageFont.truetype(fp, size)
return get_arial_bold_font(size)
def measure_tracked_text(text: str, font: ImageFont.FreeTypeFont, tracking: int) -> tuple[int, int]:
widths = []
top = 0
bottom = 0
for ch in text:
bbox = font.getbbox(ch)
widths.append(bbox[2] - bbox[0])
top = min(top, bbox[1])
bottom = max(bottom, bbox[3])
if not widths:
return 0, 0
width = sum(widths) + tracking * max(0, len(widths) - 1)
height = bottom - top
return width, height
def draw_tracked_text(
draw: ImageDraw.ImageDraw,
position: tuple[int, int],
text: str,
font: ImageFont.FreeTypeFont,
fill: tuple[int, int, int],
tracking: int,
) -> None:
x, y = position
for ch in text:
bbox = font.getbbox(ch)
draw.text((x - bbox[0], y - bbox[1]), ch, font=font, fill=fill)
x += (bbox[2] - bbox[0]) + tracking
def wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list:
words = text.split()
lines, current = [], ""
for word in words:
test = f"{current} {word}".strip()
if font.getbbox(test)[2] <= max_width:
current = test
else:
if current:
lines.append(current)
current = word
if current:
lines.append(current)
return lines
def build_cache_key(
item_id: str,
bg_mode: str = "backdrop",
backdrop_index: int = 0,
custom_bg: str | None = None,
text_color: str = "#FFFFFF",
logo_align: str = "bottom-center",
logo_scale: float = 1.3,
darkness: float = 0.0,
studio: str = "none",
studio_position: str = "bottom-right",
new_episodes_tag: bool = False,
season_finale_tag: bool = False,
logo_index: int | None = None,
primary_zoom: float = 1.0,
primary_pan_x: float = 0.0,
primary_pan_y: float = -0.16,
) -> str:
payload = {
"render_version": RENDER_VERSION,
"item_id": item_id,
"bg_mode": bg_mode,
"backdrop_index": int(backdrop_index),
"custom_bg": custom_bg,
"text_color": text_color,
"logo_align": logo_align,
"logo_scale": round(float(logo_scale), 4),
"darkness": round(float(darkness), 4),
"studio": studio,
"studio_position": studio_position,
"new_episodes_tag": bool(new_episodes_tag),
"season_finale_tag": bool(season_finale_tag),
"logo_index": logo_index,
"primary_zoom": round(float(primary_zoom), 4),
"primary_pan_x": round(float(primary_pan_x), 4),
"primary_pan_y": round(float(primary_pan_y), 4),
}
return hashlib.md5(repr(payload).encode("utf-8")).hexdigest()
def get_thumb_cache_path(cache_key: str) -> Path:
return CACHE_DIR / f"{cache_key}.png"
def get_primary_cache_path(cache_key: str) -> Path:
return CACHE_DIR / f"{cache_key}.primary.png"
def generate_thumbnail(
poster_bytes: bytes,
title: str,
bg_mode: str = "backdrop",
custom_bg: str = None,
text_color: str = "#FFFFFF",
logo_align: str = "bottom-center",
logo_scale: float = 1.3,
darkness: float = 0.0,
studio: str = "none",
studio_position: str = "bottom-right",
new_episodes_tag: bool = False,
season_finale_tag: bool = False,
logo_index: int | None = None,
width: int = THUMB_WIDTH,
height: int = THUMB_HEIGHT,
logo_bytes: bytes = None,
backdrop_bytes: bytes = None,
primary_zoom: float = 1.0,
primary_pan_x: float = 0.0,
primary_pan_y: float = -0.16,
) -> bytes:
# darkness is 0.0 (no dimming) → 1.0 (very dark); scale bg and vignette together
d = max(0.0, min(1.0, darkness))
bg_dim = 1.0 - (d * 0.6) # backdrop brightness: 1.0 → 0.4
vig_max = int(d * 220) # vignette peak alpha: 0 → 220
is_tall_layout = height >= int(width * 1.3)
# --- Build background ---
if backdrop_bytes and bg_mode != "solid":
backdrop = load_image_from_bytes(backdrop_bytes, mode="RGB")
if is_tall_layout:
bg = build_tall_backdrop_background(
backdrop,
width,
height,
dim_factor=bg_dim,
zoom=primary_zoom,
pan_x=primary_pan_x,
pan_y=primary_pan_y,
)
else:
bg = cover_crop(backdrop, width, height)
bg = bg.point(lambda p: int(p * bg_dim))
elif bg_mode == "solid" and custom_bg:
try:
bg_color = ImageColor.getrgb(custom_bg)
except ValueError:
bg_color = (15, 15, 25)
bg = Image.new("RGB", (width, height), bg_color)
else:
# Blurred poster fallback
bg = load_image_from_bytes(poster_bytes, mode="RGB")
bg = cover_crop(bg, width, height)
bg = bg.filter(ImageFilter.GaussianBlur(radius=20))
bg = bg.point(lambda p: int(p * bg_dim * 0.85))
# --- Vignette overlay tuned per logo position ---
overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0))
ov = ImageDraw.Draw(overlay)
def h_gradient(from_left: bool, strength: int = 200):
s = int(strength * d)
for x in range(width):
t = 1.0 - (x / max(1, width - 1))
a = int(s * (t ** 0.55))
px = x if from_left else width - 1 - x
ov.line([(px, 0), (px, height)], fill=(0, 0, 0, a))
def v_gradient_bottom(strength: int = 180):
s = int(strength * d)
for y in range(height):
t = 1.0 - (y / max(1, height - 1))
a = int(s * (t ** 0.55))
ov.line([(0, height - 1 - y), (width, height - 1 - y)], fill=(0, 0, 0, a))
# Base layer: uniform darkening across the whole frame on every layout.
# This ensures no naturally bright backdrop area is left fully exposed.
base_alpha = int(vig_max * 0.45)
ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, base_alpha))
# Directional layer on top for contrast where the logo sits.
if logo_align == "center":
ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, int(vig_max * 0.25)))
elif logo_align == "left":
h_gradient(from_left=True, strength=160)
elif logo_align == "bottom-center":
v_gradient_bottom(strength=150)
elif logo_align == "bottom-left":
h_gradient(from_left=True, strength=140)
v_gradient_bottom(strength=140)
elif logo_align == "bottom-right":
h_gradient(from_left=False, strength=140)
v_gradient_bottom(strength=140)
bg = bg.convert("RGBA")
bg = Image.alpha_composite(bg, overlay)
bg = bg.convert("RGB")
txt_color = ImageColor.getrgb(text_color)
pad = max(52, int(height * (0.11 if is_tall_layout else 0.08)))
scale = max(0.4, min(2.5, logo_scale))
logo_max_w = int(width * 0.46 * scale)
logo_max_h = int(height * (0.18 if is_tall_layout else 0.40) * scale)
logo_box = None
studio_box = None
def get_series_banner() -> dict | None:
if season_finale_tag:
banner_text = "Season Finale"
banner_fill = "#111111"
elif new_episodes_tag:
banner_text = "New Season"
banner_fill = "#E50914"
else:
return None
banner_font_size = max(72 if is_tall_layout else 58, int(height * (0.048 if is_tall_layout else 0.115)))
banner_font = get_badge_font(banner_font_size)
text_bbox = banner_font.getbbox(banner_text)
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
banner_x = 0
banner_y = 0
banner_w = width
vertical_pad = max(12 if is_tall_layout else 12, int(banner_font_size * (0.28 if is_tall_layout else 0.30)))
banner_h = max(text_h + vertical_pad * 2, int(height * (0.105 if is_tall_layout else 0.17)))
text_x = (banner_w - text_w) // 2 - text_bbox[0]
text_y = (banner_h - text_h) // 2 - text_bbox[1]
return {
"text": banner_text,
"fill": ImageColor.getrgb(banner_fill),
"font": banner_font,
"x": banner_x,
"y": banner_y,
"w": banner_w,
"h": banner_h,
"text_x": text_x,
"text_y": text_y,
}
banner = get_series_banner()
def logo_position(lw: int, lh: int):
if logo_align == "center":
return (width - lw) // 2, (height - lh) // 2
elif logo_align == "left":
return pad, (height - lh) // 2
elif logo_align == "bottom-center":
return (width - lw) // 2, height - lh - pad
elif logo_align == "bottom-left":
return pad, height - lh - pad
elif logo_align == "bottom-right":
return width - lw - pad, height - lh - pad
return (width - lw) // 2, (height - lh) // 2
# --- Logo image (preferred) ---
if logo_bytes:
try:
logo = load_image_from_bytes(logo_bytes, mode="RGBA")
except (UnidentifiedImageError, OSError, ValueError):
logo = None
if logo is None:
logo_bytes = None
if logo_bytes:
ratio = min(logo_max_w / logo.width, logo_max_h / logo.height)
lw = int(logo.width * ratio)
lh = int(logo.height * ratio)
logo = logo.resize((lw, lh), Image.LANCZOS)
logo_x, logo_y = logo_position(lw, lh)
# Drop shadow — blur only a tight crop around the logo, not the whole frame,
# to avoid a visible dark haze spreading across the backdrop.
blur_r = 5
pad = blur_r * 3
_, _, _, a = logo.split()
black = Image.new("RGB", (lw, lh), (0, 0, 0))
shadow_logo = Image.merge("RGBA", (*black.split()[:3], a))
shadow_crop = Image.new("RGBA", (lw + pad * 2, lh + pad * 2), (0, 0, 0, 0))
shadow_crop.paste(shadow_logo, (pad + 4, pad + 4), shadow_logo)
shadow_crop = shadow_crop.filter(ImageFilter.GaussianBlur(radius=blur_r))
bg = bg.convert("RGBA")
bg.alpha_composite(shadow_crop, (logo_x - pad, logo_y - pad))
bg = bg.convert("RGB")
bg.paste(logo, (logo_x, logo_y), logo)
logo_box = (logo_x, logo_y, lw, lh)
else:
# --- Text fallback ---
draw = ImageDraw.Draw(bg)
title_font = get_font(56, bold=True)
line_h = 66
lines = wrap_text(title.upper(), title_font, logo_max_w)
total_h = len(lines) * line_h
tx, ty = logo_position(logo_max_w, total_h)
for line in lines:
draw.text((tx + 3, ty + 3), line, font=title_font, fill=(0, 0, 0))
draw.text((tx, ty), line, font=title_font, fill=txt_color)
ty += line_h
logo_box = (tx, logo_position(logo_max_w, total_h)[1], logo_max_w, total_h)
# --- Studio logo overlay ---
if studio and studio != "none":
slogo = make_studio_logo(studio, max_height=46)
if slogo:
s_pad = 22
sw, sh = slogo.size
top_y = s_pad if not banner else banner["y"] + banner["h"] + 10
positions = {
"top-left": (s_pad, top_y),
"top-right": (width - sw - s_pad, top_y),
"bottom-left": (s_pad, height - sh - s_pad),
"bottom-right": (width - sw - s_pad, height - sh - s_pad),
}
sx, sy = positions.get(studio_position, positions["bottom-right"])
bg.paste(slogo, (sx, sy), slogo)
studio_box = (sx, sy, sw, sh)
if banner:
draw = ImageDraw.Draw(bg)
draw.rectangle(
[(banner["x"], banner["y"]), (banner["x"] + banner["w"], banner["y"] + banner["h"])],
fill=banner["fill"],
)
draw.text((banner["text_x"], banner["text_y"]), banner["text"], font=banner["font"], fill=(255, 255, 255))
buf = io.BytesIO()
bg.save(buf, format="PNG")
buf.seek(0)
return buf.getvalue()
def generate_primary_cover(
poster_bytes: bytes,
title: str,
bg_mode: str = "backdrop",
custom_bg: str | None = None,
text_color: str = "#FFFFFF",
logo_align: str = "bottom-center",
logo_scale: float = 1.3,
darkness: float = 0.0,
studio: str = "none",
studio_position: str = "bottom-right",
new_episodes_tag: bool = False,
season_finale_tag: bool = False,
logo_index: int | None = None,
logo_bytes: bytes | None = None,
backdrop_bytes: bytes | None = None,
primary_zoom: float = 1.0,
primary_pan_x: float = 0.0,
primary_pan_y: float = -0.16,
) -> bytes:
return generate_thumbnail(
poster_bytes,
title,
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
width=PRIMARY_WIDTH,
height=PRIMARY_HEIGHT,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
# --- API Routes ---
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(request, "index.html")
@app.get("/airing", response_class=HTMLResponse)
async def airing_page(request: Request):
return templates.TemplateResponse(request, "airing.html")
@app.get("/api/airing")
async def get_airing_titles(
page: int = Query(1, ge=1),
limit: int = Query(12, ge=1, le=48),
eligible_only: bool = False,
refresh: bool = False,
week_offset: int = Query(0, ge=-8, le=4),
):
snapshot = await get_airing_snapshot(force_refresh=refresh, week_offset=week_offset)
items = snapshot["items"]
filtered = [item for item in items if item["eligible_new_season"]] if eligible_only else items
start = (page - 1) * limit
page_items = filtered[start:start + limit]
total = len(filtered)
return {
"items": page_items,
"page": page,
"limit": limit,
"total": total,
"has_more": start + limit < total,
"generated_at": snapshot["generated_at"],
"week_start": snapshot["week_start"],
"week_end": snapshot["week_end"],
"week_offset": snapshot["week_offset"],
"new_season_min_days": snapshot["new_season_min_days"],
"new_season_max_days": snapshot["new_season_max_days"],
}
@app.post("/api/airing/apply-new-season")
async def apply_new_season_banner(request: Request):
body = await request.json()
item_id = body["item_id"]
generate_primary = bool(body.get("generate_primary", True))
week_offset = int(body.get("week_offset", 0))
snapshot = await get_airing_snapshot(force_refresh=bool(body.get("refresh", False)), week_offset=week_offset)
item = next((entry for entry in snapshot["items"] if entry["id"] == item_id), None)
if item is None:
raise HTTPException(status_code=404, detail="Series was not found in the current airing snapshot.")
can_apply_new_season = bool(item["eligible_new_season"]) or (
(item.get("status") or "").strip().lower() == "continuing"
and (item.get("selected_week_air_at") or item.get("next_air_at"))
)
if not can_apply_new_season:
raise HTTPException(
status_code=400,
detail=(
f"Series is outside the New Season window. "
f"It must be between {snapshot['new_season_min_days']} and {snapshot['new_season_max_days']} days "
f"from the season premiere."
),
)
title = body.get("title") or item["name"]
cache_key = build_cache_key(
item_id=item_id,
bg_mode="backdrop",
backdrop_index=0,
custom_bg=None,
text_color="#FFFFFF",
logo_align="bottom-center",
logo_scale=1.3,
darkness=0.0,
studio="none",
studio_position="bottom-right",
new_episodes_tag=True,
season_finale_tag=False,
logo_index=0,
)
thumb_cache_path = get_thumb_cache_path(cache_key)
primary_cache_path = get_primary_cache_path(cache_key)
if not thumb_cache_path.exists() or (generate_primary and not primary_cache_path.exists()):
poster_bytes = await emby_get_image(item_id, "Primary")
logo_bytes = await emby_get_image_optional(item_id, "Logo", 0)
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", 0)
thumb_bytes = generate_thumbnail(
poster_bytes,
title,
bg_mode="backdrop",
text_color="#FFFFFF",
logo_align="bottom-center",
logo_scale=1.3,
darkness=0.0,
studio="none",
studio_position="bottom-right",
new_episodes_tag=True,
season_finale_tag=False,
logo_index=0,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
)
thumb_cache_path.write_bytes(thumb_bytes)
if generate_primary:
primary_bytes = generate_primary_cover(
poster_bytes,
title,
bg_mode="backdrop",
text_color="#FFFFFF",
logo_align="bottom-center",
logo_scale=1.3,
darkness=0.0,
studio="none",
studio_position="bottom-right",
new_episodes_tag=True,
season_finale_tag=False,
logo_index=0,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
)
primary_cache_path.write_bytes(primary_bytes)
thumb_status = await emby_upload_image(item_id, thumb_cache_path.read_bytes(), "Thumb")
primary_status = None
primary_error = None
if generate_primary:
try:
primary_status = await emby_upload_image(item_id, primary_cache_path.read_bytes(), "Primary")
except Exception as exc:
primary_error = str(exc)
return {
"status": "applied",
"thumb_code": thumb_status,
"primary_code": primary_status,
"primary_attempted": generate_primary,
"primary_error": primary_error,
}
@app.get("/api/search")
async def search_items(
q: str = Query(..., min_length=1),
type: str = "Movie,Series",
start: int = Query(0, ge=0),
limit: int = Query(12, ge=1, le=24),
):
data = await emby_get("/Items", {
"SearchTerm": q,
"IncludeItemTypes": type,
"Recursive": "true",
"Fields": "ProductionYear",
"StartIndex": str(start),
"Limit": str(limit),
"ImageTypeLimit": "1",
"EnableImageTypes": "Primary,Logo,Backdrop",
})
items = []
for item in data.get("Items", []):
image_tags = item.get("ImageTags", {})
backdrop_count = item.get("BackdropImageTags", [])
items.append({
"id": item["Id"],
"name": item.get("Name", ""),
"year": item.get("ProductionYear", ""),
"type": item.get("Type", ""),
"has_logo": "Logo" in image_tags,
"backdrop_count": len(backdrop_count),
"poster_url": f"/api/poster/{item['Id']}?w=72&h=108&q=72",
})
total = int(data.get("TotalRecordCount", start + len(items)))
return {
"items": items,
"start": start,
"limit": limit,
"total": total,
"has_more": start + len(items) < total,
}
@app.get("/api/images/{item_id}")
async def get_image_info(item_id: str):
"""Returns accurate logo and backdrop counts by querying the Images endpoint directly."""
images = await emby_get(f"/Items/{item_id}/Images")
logos = []
backdrops = []
primaries = []
for img in images:
image_type = img.get("ImageType")
entry = {
"type": image_type,
"index": int(img.get("ImageIndex", 0) or 0),
"width": img.get("Width"),
"height": img.get("Height"),
}
if image_type == "Logo":
entry["url"] = f"/api/item-image/{item_id}?type=Logo&index={entry['index']}&w=220&h=96&q=90"
logos.append(entry)
elif image_type == "Backdrop":
backdrops.append(entry)
elif image_type == "Primary":
entry["url"] = f"/api/item-image/{item_id}?type=Primary&index={entry['index']}&w=120&h=180&q=80"
primaries.append(entry)
logos.sort(key=lambda img: img["index"])
backdrops.sort(key=lambda img: img["index"])
primaries.sort(key=lambda img: img["index"])
return {
"backdrop_count": len(backdrops),
"has_logo": bool(logos),
"logo_count": len(logos),
"primary_count": len(primaries),
"logos": logos,
"backdrops": backdrops,
"primaries": primaries,
}
@app.get("/api/item-image/{item_id}")
async def get_item_image(
item_id: str,
type: str = Query("Primary"),
index: int | None = Query(None, ge=0),
w: int = Query(120, ge=24, le=1200),
h: int = Query(180, ge=24, le=1200),
q: int = Query(80, ge=30, le=100),
):
img_bytes, content_type = await emby_get_image_with_type(
item_id,
type,
index=index,
max_width=w,
max_height=h,
quality=q,
)
return Response(
content=img_bytes,
media_type=content_type,
headers={"Cache-Control": "public, max-age=86400"},
)
@app.get("/api/poster/{item_id}")
async def get_poster(
item_id: str,
w: int = Query(72, ge=24, le=400),
h: int = Query(108, ge=24, le=600),
q: int = Query(72, ge=30, le=100),
):
img_bytes = await emby_get_image(
item_id,
"Primary",
max_width=w,
max_height=h,
quality=q,
)
return Response(
content=img_bytes,
media_type="image/jpeg",
headers={"Cache-Control": "public, max-age=86400"},
)
@app.get("/api/cache/{cache_key}/primary")
async def get_cached_primary_preview(cache_key: str):
if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()):
raise HTTPException(status_code=400, detail="Invalid cache key.")
cache_path = get_primary_cache_path(cache_key)
if not cache_path.exists():
raise HTTPException(status_code=404, detail="Primary preview not found.")
return Response(
content=cache_path.read_bytes(),
media_type="image/png",
headers={"Cache-Control": "no-store"},
)
@app.post("/api/generate")
async def generate(request: Request):
body = await request.json()
item_id = body["item_id"]
title = body["title"]
bg_mode = body.get("bg_mode", "backdrop")
custom_bg = body.get("custom_bg", None)
text_color = body.get("text_color", "#FFFFFF")
logo_align = body.get("logo_align", "bottom-center")
logo_scale = float(body.get("logo_scale", 1.3))
darkness = float(body.get("darkness", 0.0))
studio = body.get("studio", "none")
studio_position = body.get("studio_position", "bottom-right")
new_episodes_tag = bool(body.get("new_episodes_tag", False))
season_finale_tag = bool(body.get("season_finale_tag", False))
generate_primary = bool(body.get("generate_primary", False))
logo_index_raw = body.get("logo_index", None)
logo_index = int(logo_index_raw) if logo_index_raw is not None else None
backdrop_index = int(body.get("backdrop_index", 0))
primary_zoom = float(body.get("primary_zoom", 1.0))
primary_pan_x = float(body.get("primary_pan_x", 0.0))
primary_pan_y = float(body.get("primary_pan_y", -0.16))
poster_bytes = await emby_get_image(item_id, "Primary")
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
thumb_bytes = generate_thumbnail(
poster_bytes, title,
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
cache_key = build_cache_key(
item_id=item_id,
bg_mode=bg_mode,
backdrop_index=backdrop_index,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
thumb_cache_path = get_thumb_cache_path(cache_key)
thumb_cache_path.write_bytes(thumb_bytes)
primary_generated = False
if generate_primary:
try:
primary_bytes = generate_primary_cover(
poster_bytes,
title,
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
get_primary_cache_path(cache_key).write_bytes(primary_bytes)
primary_generated = True
except Exception:
primary_generated = False
return StreamingResponse(
io.BytesIO(thumb_bytes),
media_type="image/png",
headers={
"X-Primary-Generated": "1" if primary_generated else "0",
"X-Cache-Key": cache_key,
},
)
@app.post("/api/apply")
async def apply_to_emby(request: Request):
body = await request.json()
item_id = body["item_id"]
title = body.get("title", "")
bg_mode = body.get("bg_mode", "backdrop")
backdrop_index = int(body.get("backdrop_index", 0))
custom_bg = body.get("custom_bg", None)
text_color = body.get("text_color", "#FFFFFF")
logo_align = body.get("logo_align", "bottom-center")
logo_scale = float(body.get("logo_scale", 1.3))
darkness = float(body.get("darkness", 0.0))
studio = body.get("studio", "none")
studio_position = body.get("studio_position", "bottom-right")
new_episodes_tag = bool(body.get("new_episodes_tag", False))
season_finale_tag = bool(body.get("season_finale_tag", False))
generate_primary = bool(body.get("generate_primary", False))
logo_index_raw = body.get("logo_index", None)
logo_index = int(logo_index_raw) if logo_index_raw is not None else None
primary_zoom = float(body.get("primary_zoom", 1.0))
primary_pan_x = float(body.get("primary_pan_x", 0.0))
primary_pan_y = float(body.get("primary_pan_y", -0.16))
cache_key = build_cache_key(
item_id=item_id,
bg_mode=bg_mode,
backdrop_index=backdrop_index,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
thumb_cache_path = get_thumb_cache_path(cache_key)
primary_cache_path = get_primary_cache_path(cache_key)
if not thumb_cache_path.exists():
poster_bytes = await emby_get_image(item_id, "Primary")
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
thumb_bytes = generate_thumbnail(
poster_bytes,
title or "",
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
thumb_cache_path.write_bytes(thumb_bytes)
if generate_primary and not primary_cache_path.exists():
try:
primary_cache_path.write_bytes(
generate_primary_cover(
poster_bytes,
title or "",
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
)
except Exception:
pass
thumb_bytes = thumb_cache_path.read_bytes()
thumb_status = await emby_upload_image(item_id, thumb_bytes, "Thumb")
primary_status = None
primary_error = None
if generate_primary:
try:
if primary_cache_path.exists():
primary_bytes = primary_cache_path.read_bytes()
else:
poster_bytes = await emby_get_image(item_id, "Primary")
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
primary_bytes = generate_primary_cover(
poster_bytes,
title or "",
bg_mode=bg_mode,
custom_bg=custom_bg,
text_color=text_color,
logo_align=logo_align,
logo_scale=logo_scale,
darkness=darkness,
studio=studio,
studio_position=studio_position,
new_episodes_tag=new_episodes_tag,
season_finale_tag=season_finale_tag,
logo_index=logo_index,
logo_bytes=logo_bytes,
backdrop_bytes=backdrop_bytes,
primary_zoom=primary_zoom,
primary_pan_x=primary_pan_x,
primary_pan_y=primary_pan_y,
)
primary_cache_path.write_bytes(primary_bytes)
primary_status = await emby_upload_image(item_id, primary_bytes, "Primary")
except Exception as exc:
primary_error = str(exc)
return {
"status": "applied",
"thumb_code": thumb_status,
"primary_code": primary_status,
"primary_attempted": generate_primary,
"primary_error": primary_error,
}
@app.get("/api/config")
async def get_config():
return {
"emby_url": EMBY_URL,
"connected": bool(EMBY_API_KEY),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8500)