import asyncio import io import os import hashlib import base64 import time from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from pathlib import Path import httpx from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, Response, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageColor, ImageOps, UnidentifiedImageError EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096") EMBY_API_KEY = os.environ.get("EMBY_API_KEY", "b9af54b630f6448289ab96422add567a") CACHE_DIR = Path("cache") CACHE_DIR.mkdir(exist_ok=True) IMPORT_CACHE_DIR = CACHE_DIR / "imports" IMPORT_CACHE_DIR.mkdir(exist_ok=True) EMBY_IMAGE_CACHE_DIR = CACHE_DIR / "emby_images" EMBY_IMAGE_CACHE_DIR.mkdir(exist_ok=True) RENDER_VERSION = "series-banner-v11" COLLECTION_RENDER_VERSION = "collection-cover-v1" THUMB_WIDTH = 800 THUMB_HEIGHT = 450 PRIMARY_WIDTH = 1000 PRIMARY_HEIGHT = 1500 PRIMARY_MIN_ZOOM = 1.0 PRIMARY_MAX_ZOOM = 2.6 UPLOAD_MAX_BYTES = 40 * 1024 * 1024 HTTP_TIMEOUT = 30.0 EMBY_SOURCE_CACHE_TTL = 300 EMBY_SOURCE_MISS_TTL = 90 http_client: httpx.AsyncClient | None = None AIRING_LOOKUP_CACHE_TTL = 900 NEW_SEASON_MIN_AGE_DAYS = 7 NEW_SEASON_MAX_AGE_DAYS = 21 SEASON_INFERENCE_LOOKBACK_DAYS = 180 airing_lookup_cache: dict[int, dict] = {} airing_lookup_lock = asyncio.Lock() # ── Studio logo file map ───────────────────────────────────────────────────── STUDIOS_DIR = Path("static/studios") # Maps studio key → filename (relative to STUDIOS_DIR) STUDIO_FILES: dict[str, str] = { "hulu": "hulu.png", "hbo": "hbo.png", "disney": "disney.png", } def make_studio_logo(studio: str, max_height: int = 52) -> Image.Image | None: """Load a studio logo from disk and scale it to max_height, preserving aspect ratio.""" filename = STUDIO_FILES.get(studio) if not filename: return None path = STUDIOS_DIR / filename if not path.exists(): return None img = Image.open(path).convert("RGBA") if img.height > max_height: scale = max_height / img.height img = img.resize((int(img.width * scale), max_height), Image.LANCZOS) return img def load_image_from_bytes(image_bytes: bytes, mode: str = "RGB") -> Image.Image: """Fully decode image bytes into an in-memory PIL image. Some Emby-delivered assets appear to trigger blocky/partial artifacts when we process the lazy decoder output directly. Forcing a full load and copy gives us a stable image object before resize/filter operations. """ with Image.open(io.BytesIO(image_bytes)) as img: img = ImageOps.exif_transpose(img) img.load() return img.convert(mode).copy() def encode_jpeg_bytes(img: Image.Image, *, quality: int = 95) -> bytes: buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True, progressive=True) return buf.getvalue() def validate_image_bytes(image_bytes: bytes, *, max_pixels: int = 24_000_000) -> tuple[int, int, str]: try: with Image.open(io.BytesIO(image_bytes)) as img: img.verify() width, height = img.size fmt = img.format or "image" except (UnidentifiedImageError, OSError, ValueError) as exc: raise HTTPException(status_code=400, detail="The downloaded file is not a valid image.") from exc if width * height > max_pixels: raise HTTPException(status_code=400, detail="Image is too large to process safely.") return width, height, fmt def get_upload_cache_path(upload_id: str) -> Path: if not upload_id or any(ch not in "0123456789abcdef" for ch in upload_id.lower()): raise HTTPException(status_code=400, detail="Invalid upload id.") path = IMPORT_CACHE_DIR / f"{upload_id}.img" if not path.exists(): raise HTTPException(status_code=404, detail="Uploaded background not found in cache.") return path def get_emby_image_cache_key(item_id: str, image_type: str, index: int | None = None) -> str: payload = f"{item_id}:{image_type}:{index if index is not None else 'default'}" return hashlib.md5(payload.encode("utf-8")).hexdigest() def get_emby_image_cache_path(item_id: str, image_type: str, index: int | None = None) -> Path: return EMBY_IMAGE_CACHE_DIR / f"{get_emby_image_cache_key(item_id, image_type, index)}.img" def get_emby_image_miss_path(item_id: str, image_type: str, index: int | None = None) -> Path: return EMBY_IMAGE_CACHE_DIR / f"{get_emby_image_cache_key(item_id, image_type, index)}.missing" def get_fresh_cached_bytes(path: Path, ttl_seconds: int) -> bytes | None: if not path.exists(): return None age = time.time() - path.stat().st_mtime if age > ttl_seconds: return None return path.read_bytes() async def get_cached_emby_source_image( item_id: str, image_type: str, index: int | None = None, *, optional: bool = False, ) -> bytes | None: cache_path = get_emby_image_cache_path(item_id, image_type, index) cached_bytes = get_fresh_cached_bytes(cache_path, EMBY_SOURCE_CACHE_TTL) if cached_bytes is not None: return cached_bytes miss_path = get_emby_image_miss_path(item_id, image_type, index) if optional and miss_path.exists() and (time.time() - miss_path.stat().st_mtime) <= EMBY_SOURCE_MISS_TTL: return None image_bytes = ( await emby_get_image_optional(item_id, image_type, index=index) if optional else await emby_get_image(item_id, image_type, index=index) ) if image_bytes is None: if optional: miss_path.touch() return None raise HTTPException(status_code=404, detail=f"Emby image {image_type} was not found.") cache_path.write_bytes(image_bytes) if miss_path.exists(): miss_path.unlink(missing_ok=True) return image_bytes # --- Emby API helpers --- @asynccontextmanager async def lifespan(app: FastAPI): get_http_client() try: yield finally: global http_client if http_client is not None: await http_client.aclose() http_client = None app = FastAPI(title="Emby Thumbnail Generator", lifespan=lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") def get_http_client() -> httpx.AsyncClient: global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT) return http_client async def emby_request(method: str, path: str, *, params: dict | None = None, **kwargs) -> httpx.Response: client = get_http_client() url = f"{EMBY_URL}{path}" p = dict(params or {}) p["api_key"] = EMBY_API_KEY try: return await client.request(method, url, params=p, **kwargs) except httpx.ReadTimeout as exc: raise HTTPException( status_code=504, detail=f"Emby timed out while requesting {path}. Try again in a moment.", ) from exc except httpx.ConnectTimeout as exc: raise HTTPException( status_code=504, detail=f"Timed out connecting to Emby at {EMBY_URL}.", ) from exc except httpx.ConnectError as exc: raise HTTPException( status_code=502, detail=f"Could not connect to Emby at {EMBY_URL}.", ) from exc except httpx.RequestError as exc: raise HTTPException( status_code=502, detail=f"Emby request failed while requesting {path}: {exc}", ) from exc def ensure_emby_success(response: httpx.Response, *, context: str) -> httpx.Response: try: response.raise_for_status() except httpx.HTTPStatusError as exc: detail = response.text.strip() or str(exc) raise HTTPException( status_code=502, detail=f"{context} failed ({response.status_code}): {detail}", ) from exc return response async def emby_get(path: str, params: dict = None) -> dict: response = await emby_request("GET", path, params=params) ensure_emby_success(response, context=f"Emby request to {path}") return response.json() async def emby_get_image( item_id: str, image_type: str = "Primary", index: int | None = None, *, max_width: int | None = None, max_height: int | None = None, quality: int | None = None, ) -> bytes: params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index if max_width is not None: params["maxWidth"] = max_width if max_height is not None: params["maxHeight"] = max_height if quality is not None: params["quality"] = quality response = await emby_request("GET", f"/Items/{item_id}/Images/{image_type}", params=params) ensure_emby_success(response, context=f"Emby image request for {image_type}") return response.content async def emby_get_image_with_type( item_id: str, image_type: str = "Primary", index: int | None = None, *, max_width: int | None = None, max_height: int | None = None, quality: int | None = None, ) -> tuple[bytes, str]: params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index if max_width is not None: params["maxWidth"] = max_width if max_height is not None: params["maxHeight"] = max_height if quality is not None: params["quality"] = quality response = await emby_request("GET", f"/Items/{item_id}/Images/{image_type}", params=params) ensure_emby_success(response, context=f"Emby image request for {image_type}") content_type = (response.headers.get("content-type", "image/jpeg") or "image/jpeg").split(";")[0] return response.content, content_type async def emby_get_image_optional(item_id: str, image_type: str, index: int | None = None) -> bytes | None: """Returns image bytes or None if the image type doesn't exist for this item.""" params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index response = await emby_request("GET", f"/Items/{item_id}/Images/{image_type}", params=params) if response.status_code in (404, 400): return None ensure_emby_success(response, context=f"Optional Emby image request for {image_type}") content_type = response.headers.get("content-type", "").lower() if content_type and not content_type.startswith("image/"): return None return response.content async def emby_upload_image(item_id: str, image_bytes: bytes, image_type: str = "Thumb"): upload_buf = io.BytesIO() upload_image = load_image_from_bytes(image_bytes, mode="RGB") upload_image.save(upload_buf, format="JPEG", quality=92, optimize=True) upload_b64 = base64.b64encode(upload_buf.getvalue()) response = await emby_request( "POST", f"/Items/{item_id}/Images/{image_type}", params={"api_key": EMBY_API_KEY}, content=upload_b64, headers={ "Content-Type": "image/jpeg", "X-Emby-Token": EMBY_API_KEY, }, ) ensure_emby_success(response, context=f"Emby upload for {image_type}") return response.status_code def parse_emby_datetime(value: str | None) -> datetime | None: if not value: return None try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def to_emby_iso(value: datetime) -> str: return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") async def emby_get_all(path: str, params: dict | None = None, *, page_size: int = 200) -> list[dict]: items: list[dict] = [] start_index = 0 base_params = dict(params or {}) while True: page = await emby_get(path, { **base_params, "StartIndex": str(start_index), "Limit": str(page_size), }) batch = page.get("Items", []) items.extend(batch) total = int(page.get("TotalRecordCount", start_index + len(batch))) if not batch or start_index + len(batch) >= total: break start_index += len(batch) return items def get_week_bounds(now: datetime, week_offset: int) -> tuple[datetime, datetime]: week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0) week_start = week_start + timedelta(weeks=week_offset) return week_start, week_start + timedelta(days=7) async def build_airing_snapshot(week_offset: int = 0) -> dict: now = datetime.now(timezone.utc) week_start, week_end = get_week_bounds(now, week_offset) week_shift_days = max(0, -week_offset) * 7 eligible_min_days = NEW_SEASON_MIN_AGE_DAYS + week_shift_days eligible_max_days = NEW_SEASON_MAX_AGE_DAYS + week_shift_days recent_window_start = now - timedelta(days=eligible_max_days) recent_window_end = now - timedelta(days=eligible_min_days) inference_window_start = now - timedelta(days=max(SEASON_INFERENCE_LOOKBACK_DAYS, eligible_max_days + 35)) series_task = emby_get_all("/Items", { "IncludeItemTypes": "Series", "Recursive": "true", "SortBy": "SortName", "SortOrder": "Ascending", "ImageTypeLimit": "1", "EnableImageTypes": "Primary", }) recent_episodes_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Descending", "MinPremiereDate": to_emby_iso(recent_window_start), "MaxPremiereDate": to_emby_iso(recent_window_end), }) recent_season_activity_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(inference_window_start), "MaxPremiereDate": to_emby_iso(now + timedelta(days=1)), }) selected_week_episodes_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(week_start), "MaxPremiereDate": to_emby_iso(week_end), }) upcoming_episodes_task = emby_get_all("/Shows/Upcoming", { "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(now - timedelta(hours=12)), }) series_items, recent_episodes, recent_season_activity, selected_week_episodes, upcoming_episodes = await asyncio.gather( series_task, recent_episodes_task, recent_season_activity_task, selected_week_episodes_task, upcoming_episodes_task, return_exceptions=True, ) if isinstance(series_items, Exception): raise series_items if isinstance(recent_episodes, Exception): recent_episodes = [] if isinstance(recent_season_activity, Exception): recent_season_activity = [] if isinstance(selected_week_episodes, Exception): selected_week_episodes = [] if isinstance(upcoming_episodes, Exception): upcoming_episodes = [] recent_premieres_by_series: dict[str, dict] = {} for episode in recent_episodes: series_id = episode.get("SeriesId") season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or season_number < 2 or episode_number != 1 or premiere_at is None: continue existing = recent_premieres_by_series.get(series_id) if existing is None or premiere_at > existing["premiere_at"]: recent_premieres_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_id": episode.get("Id"), "episode_name": episode.get("Name") or "", } earliest_seen_by_series_season: dict[tuple[str, int], dict] = {} for episode in recent_season_activity: series_id = episode.get("SeriesId") season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or season_number < 2 or premiere_at is None: continue key = (series_id, season_number) existing = earliest_seen_by_series_season.get(key) if existing is None or premiere_at < existing["premiere_at"]: earliest_seen_by_series_season[key] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } selected_week_by_series: dict[str, dict] = {} for episode in selected_week_episodes: series_id = episode.get("SeriesId") premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or premiere_at is None: continue existing = selected_week_by_series.get(series_id) if existing is None or premiere_at < existing["premiere_at"]: season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) selected_week_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } upcoming_by_series: dict[str, dict] = {} for episode in upcoming_episodes: series_id = episode.get("SeriesId") premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or premiere_at is None: continue existing = upcoming_by_series.get(series_id) if existing is None or premiere_at < existing["premiere_at"]: season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) upcoming_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } results: list[dict] = [] for series in series_items: series_id = series.get("Id") if not series_id: continue recent_premiere = recent_premieres_by_series.get(series_id) selected_week_episode = selected_week_by_series.get(series_id) upcoming = upcoming_by_series.get(series_id) status = (series.get("Status") or "").strip() is_current_airing = ( status.lower() == "continuing" or selected_week_episode is not None or upcoming is not None or recent_premiere is not None ) if not is_current_airing: continue air_days = series.get("AirDays") or [] if isinstance(air_days, str): air_days = [part.strip() for part in air_days.split(",") if part.strip()] active_season_number = max( recent_premiere["season_number"] if recent_premiere else 0, selected_week_episode["season_number"] if selected_week_episode else 0, upcoming["season_number"] if upcoming else 0, ) inferred_recent_premiere = None if recent_premiere is None and active_season_number >= 2 and (selected_week_episode is not None or upcoming is not None): inferred_recent_premiere = earliest_seen_by_series_season.get((series_id, active_season_number)) season_start = recent_premiere or inferred_recent_premiere days_since_premiere = None if season_start is not None: days_since_premiere = max(0, int((now - season_start["premiere_at"]).total_seconds() // 86400)) has_current_airing_signal = status.lower() == "continuing" and ( selected_week_episode is not None or upcoming is not None ) has_recent_season_start_signal = ( status.lower() == "continuing" and (selected_week_episode is not None or upcoming is not None) and days_since_premiere is not None and eligible_min_days <= days_since_premiere <= eligible_max_days ) eligible_new_season = has_current_airing_signal or has_recent_season_start_signal season_premiere_inferred = recent_premiere is None and inferred_recent_premiere is not None eligibility_reason = ( "current_airing" if has_current_airing_signal else ("recent_season_start" if has_recent_season_start_signal else None) ) next_air_at = upcoming["premiere_at"] if upcoming else None results.append({ "id": series_id, "name": series.get("Name", ""), "year": series.get("ProductionYear"), "status": status or ("Continuing" if upcoming else "Unknown"), "air_days": air_days, "poster_url": f"/api/poster/{series_id}?w=180&h=270&q=84", "selected_week_air_at": to_emby_iso(selected_week_episode["premiere_at"]) if selected_week_episode else None, "selected_week_episode_label": ( f"S{selected_week_episode['season_number']}E{selected_week_episode['episode_number']} · {selected_week_episode['episode_name']}".strip(" ·") if selected_week_episode and selected_week_episode["season_number"] and selected_week_episode["episode_number"] else (selected_week_episode["episode_name"] if selected_week_episode else None) ), "next_air_at": to_emby_iso(next_air_at) if next_air_at else None, "next_episode_label": ( f"S{upcoming['season_number']}E{upcoming['episode_number']} · {upcoming['episode_name']}".strip(" ·") if upcoming and upcoming["season_number"] and upcoming["episode_number"] else (upcoming["episode_name"] if upcoming else None) ), "season_number": season_start["season_number"] if season_start else None, "season_premiere_at": to_emby_iso(season_start["premiere_at"]) if season_start else None, "season_premiere_episode": season_start["episode_name"] if season_start else None, "days_since_season_premiere": days_since_premiere, "season_premiere_inferred": season_premiere_inferred, "eligibility_reason": eligibility_reason, "eligible_new_season": eligible_new_season, }) results.sort(key=lambda item: ( 0 if item["eligible_new_season"] else 1, item["selected_week_air_at"] is None and item["next_air_at"] is None, item["selected_week_air_at"] or item["next_air_at"] or "", item["name"].lower(), )) return { "items": results, "generated_at": to_emby_iso(now), "week_start": to_emby_iso(week_start), "week_end": to_emby_iso(week_end), "week_offset": week_offset, "new_season_min_days": eligible_min_days, "new_season_max_days": eligible_max_days, } async def get_airing_snapshot(force_refresh: bool = False, week_offset: int = 0) -> dict: cached = airing_lookup_cache.get(week_offset) if not force_refresh and cached and cached["expires_at"] > time.time(): return cached["data"] async with airing_lookup_lock: cached = airing_lookup_cache.get(week_offset) if not force_refresh and cached and cached["expires_at"] > time.time(): return cached["data"] data = await build_airing_snapshot(week_offset=week_offset) airing_lookup_cache[week_offset] = { "expires_at": time.time() + AIRING_LOOKUP_CACHE_TTL, "data": data, } return data # --- Image helpers --- def cover_crop(img: Image.Image, width: int, height: int) -> Image.Image: """Scale and center-crop image to exactly width x height.""" return ImageOps.fit( img, (width, height), method=Image.LANCZOS, centering=(0.5, 0.5), ) def cover_crop_positioned( img: Image.Image, width: int, height: int, *, zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = 0.0, ) -> Image.Image: """Cover-crop with user-controlled zoom and pan. `pan_x` / `pan_y` are normalized in [-1, 1], where -1 means left/top and +1 means right/bottom. """ pan_x = max(-1.0, min(1.0, float(pan_x))) pan_y = max(-1.0, min(1.0, float(pan_y))) zoom = max(PRIMARY_MIN_ZOOM, min(PRIMARY_MAX_ZOOM, float(zoom))) target_ratio = width / max(1, height) source_ratio = img.width / max(1, img.height) if source_ratio > target_ratio: crop_h = img.height crop_w = int(round(crop_h * target_ratio)) else: crop_w = img.width crop_h = int(round(crop_w / target_ratio)) crop_w = max(1, min(img.width, int(round(crop_w / zoom)))) crop_h = max(1, min(img.height, int(round(crop_h / zoom)))) max_left = max(0, img.width - crop_w) max_top = max(0, img.height - crop_h) left = int(round((max_left / 2) * (pan_x + 1.0))) top = int(round((max_top / 2) * (pan_y + 1.0))) cropped = img.crop((left, top, left + crop_w, top + crop_h)) return cropped.resize((width, height), Image.LANCZOS) def build_tall_backdrop_background( img: Image.Image, width: int, height: int, *, dim_factor: float, zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = -0.16, ) -> Image.Image: base = cover_crop_positioned(img, width, height, zoom=zoom, pan_x=pan_x, pan_y=pan_y) return base.point(lambda p: int(p * dim_factor)) def get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont: font_paths = [ # Windows "C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf", "C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf", "C:/Windows/Fonts/verdanab.ttf" if bold else "C:/Windows/Fonts/verdana.ttf", # Linux "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return ImageFont.load_default(size) def get_arial_bold_font(size: int) -> ImageFont.FreeTypeFont: font_paths = [ "C:/Windows/Fonts/arialbd.ttf", "/usr/share/fonts/truetype/msttcorefonts/Arial_Bold.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return get_font(size, bold=True) def get_badge_font(size: int) -> ImageFont.FreeTypeFont: font_paths = [ "C:/Windows/Fonts/ariblk.ttf", "C:/Windows/Fonts/arialbd.ttf", "C:/Windows/Fonts/seguisb.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return get_arial_bold_font(size) def measure_tracked_text(text: str, font: ImageFont.FreeTypeFont, tracking: int) -> tuple[int, int]: widths = [] top = 0 bottom = 0 for ch in text: bbox = font.getbbox(ch) widths.append(bbox[2] - bbox[0]) top = min(top, bbox[1]) bottom = max(bottom, bbox[3]) if not widths: return 0, 0 width = sum(widths) + tracking * max(0, len(widths) - 1) height = bottom - top return width, height def draw_tracked_text( draw: ImageDraw.ImageDraw, position: tuple[int, int], text: str, font: ImageFont.FreeTypeFont, fill: tuple[int, int, int], tracking: int, ) -> None: x, y = position for ch in text: bbox = font.getbbox(ch) draw.text((x - bbox[0], y - bbox[1]), ch, font=font, fill=fill) x += (bbox[2] - bbox[0]) + tracking def wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list: words = text.split() lines, current = [], "" for word in words: test = f"{current} {word}".strip() if font.getbbox(test)[2] <= max_width: current = test else: if current: lines.append(current) current = word if current: lines.append(current) return lines def parse_generator_request(body: dict) -> dict: logo_index_raw = body.get("logo_index", None) logo_index = int(logo_index_raw) if logo_index_raw is not None else None return { "item_id": body["item_id"], "title": body.get("title", ""), "bg_mode": body.get("bg_mode", "backdrop"), "backdrop_index": int(body.get("backdrop_index", 0)), "text_color": body.get("text_color", "#FFFFFF"), "logo_align": body.get("logo_align", "bottom-center"), "logo_scale": float(body.get("logo_scale", 1.3)), "darkness": float(body.get("darkness", 0.0)), "studio": body.get("studio", "none"), "studio_position": body.get("studio_position", "bottom-right"), "new_episodes_tag": bool(body.get("new_episodes_tag", False)), "season_finale_tag": bool(body.get("season_finale_tag", False)), "generate_primary": bool(body.get("generate_primary", False)), "logo_index": logo_index, "primary_zoom": float(body.get("primary_zoom", 1.0)), "primary_pan_x": float(body.get("primary_pan_x", 0.0)), "primary_pan_y": float(body.get("primary_pan_y", -0.16)), "thumb_zoom": float(body.get("thumb_zoom", 1.0)), "thumb_pan_x": float(body.get("thumb_pan_x", 0.0)), "thumb_pan_y": float(body.get("thumb_pan_y", 0.0)), "upload_bg_id": body.get("upload_bg_id") or None, } async def get_logo_bytes_for_item( item_id: str, preferred_index: int | None, *, fallback_to_first: bool = False, ) -> bytes | None: if preferred_index is not None and preferred_index < 0: return None if preferred_index is not None: logo_bytes = await get_cached_emby_source_image(item_id, "Logo", preferred_index, optional=True) if logo_bytes or not fallback_to_first or preferred_index == 0: return logo_bytes fallback_index = 0 if fallback_to_first else preferred_index if fallback_index is not None and fallback_index < 0: return None return await get_cached_emby_source_image(item_id, "Logo", fallback_index, optional=True) async def render_item_artwork(options: dict, *, fallback_logo_to_first: bool = False) -> tuple[bytes, bytes | None]: poster_bytes = await get_cached_emby_source_image(options["item_id"], "Primary") logo_bytes = await get_logo_bytes_for_item( options["item_id"], options["logo_index"], fallback_to_first=fallback_logo_to_first, ) if options["bg_mode"] == "upload" and options["upload_bg_id"]: upload_path = get_upload_cache_path(options["upload_bg_id"]) backdrop_bytes = upload_path.read_bytes() else: backdrop_bytes = await get_cached_emby_source_image( options["item_id"], "Backdrop", options["backdrop_index"], optional=True, ) effective_bg_mode = "backdrop" if options["bg_mode"] == "upload" else options["bg_mode"] thumb_bytes = generate_thumbnail( poster_bytes, options["title"], bg_mode=effective_bg_mode, text_color=options["text_color"], logo_align=options["logo_align"], logo_scale=options["logo_scale"], darkness=options["darkness"], studio=options["studio"], studio_position=options["studio_position"], new_episodes_tag=options["new_episodes_tag"], season_finale_tag=options["season_finale_tag"], logo_index=options["logo_index"], logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=options["primary_zoom"], primary_pan_x=options["primary_pan_x"], primary_pan_y=options["primary_pan_y"], thumb_zoom=options["thumb_zoom"], thumb_pan_x=options["thumb_pan_x"], thumb_pan_y=options["thumb_pan_y"], ) primary_bytes = None if options["generate_primary"]: try: primary_bytes = generate_primary_cover( poster_bytes, options["title"], bg_mode=effective_bg_mode, text_color=options["text_color"], logo_align=options["logo_align"], logo_scale=options["logo_scale"], darkness=options["darkness"], studio=options["studio"], studio_position=options["studio_position"], new_episodes_tag=options["new_episodes_tag"], season_finale_tag=options["season_finale_tag"], logo_index=options["logo_index"], logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=options["primary_zoom"], primary_pan_x=options["primary_pan_x"], primary_pan_y=options["primary_pan_y"], ) except Exception: primary_bytes = None return thumb_bytes, primary_bytes def get_generator_cache_key(options: dict) -> str: return build_cache_key( item_id=options["item_id"], bg_mode=options["bg_mode"], backdrop_index=options["backdrop_index"], text_color=options["text_color"], logo_align=options["logo_align"], logo_scale=options["logo_scale"], darkness=options["darkness"], studio=options["studio"], studio_position=options["studio_position"], new_episodes_tag=options["new_episodes_tag"], season_finale_tag=options["season_finale_tag"], logo_index=options["logo_index"], primary_zoom=options["primary_zoom"], primary_pan_x=options["primary_pan_x"], primary_pan_y=options["primary_pan_y"], thumb_zoom=options["thumb_zoom"], thumb_pan_x=options["thumb_pan_x"], thumb_pan_y=options["thumb_pan_y"], upload_bg_id=options["upload_bg_id"], ) def build_cache_key( item_id: str, bg_mode: str = "backdrop", backdrop_index: int = 0, text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, thumb_zoom: float = 1.0, thumb_pan_x: float = 0.0, thumb_pan_y: float = 0.0, upload_bg_id: str | None = None, ) -> str: payload = { "render_version": RENDER_VERSION, "item_id": item_id, "bg_mode": bg_mode, "backdrop_index": int(backdrop_index), "text_color": text_color, "logo_align": logo_align, "logo_scale": round(float(logo_scale), 4), "darkness": round(float(darkness), 4), "studio": studio, "studio_position": studio_position, "new_episodes_tag": bool(new_episodes_tag), "season_finale_tag": bool(season_finale_tag), "logo_index": logo_index, "primary_zoom": round(float(primary_zoom), 4), "primary_pan_x": round(float(primary_pan_x), 4), "primary_pan_y": round(float(primary_pan_y), 4), "thumb_zoom": round(float(thumb_zoom), 4), "thumb_pan_x": round(float(thumb_pan_x), 4), "thumb_pan_y": round(float(thumb_pan_y), 4), "upload_bg_id": upload_bg_id, } return hashlib.md5(repr(payload).encode("utf-8")).hexdigest() def get_thumb_cache_path(cache_key: str) -> Path: return CACHE_DIR / f"{cache_key}.png" def get_primary_cache_path(cache_key: str) -> Path: return CACHE_DIR / f"{cache_key}.primary.png" def generate_thumbnail( poster_bytes: bytes, title: str, bg_mode: str = "backdrop", text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, width: int = THUMB_WIDTH, height: int = THUMB_HEIGHT, logo_bytes: bytes = None, backdrop_bytes: bytes = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, thumb_zoom: float = 1.0, thumb_pan_x: float = 0.0, thumb_pan_y: float = 0.0, ) -> bytes: # darkness is 0.0 (no dimming) → 1.0 (very dark); scale bg and vignette together d = max(0.0, min(1.0, darkness)) bg_dim = 1.0 - (d * 0.6) # backdrop brightness: 1.0 → 0.4 vig_max = int(d * 220) # vignette peak alpha: 0 → 220 is_tall_layout = height >= int(width * 1.3) # --- Build background --- if backdrop_bytes: backdrop = load_image_from_bytes(backdrop_bytes, mode="RGB") if is_tall_layout: bg = build_tall_backdrop_background( backdrop, width, height, dim_factor=bg_dim, zoom=primary_zoom, pan_x=primary_pan_x, pan_y=primary_pan_y, ) else: bg = cover_crop_positioned(backdrop, width, height, zoom=thumb_zoom, pan_x=thumb_pan_x, pan_y=thumb_pan_y) bg = bg.point(lambda p: int(p * bg_dim)) else: poster = load_image_from_bytes(poster_bytes, mode="RGB") if is_tall_layout: bg = build_tall_backdrop_background( poster, width, height, dim_factor=bg_dim, zoom=primary_zoom, pan_x=primary_pan_x, pan_y=primary_pan_y, ) else: # Blurred poster fallback for landscape thumbs. bg = cover_crop_positioned(poster, width, height, zoom=thumb_zoom, pan_x=thumb_pan_x, pan_y=thumb_pan_y) bg = bg.filter(ImageFilter.GaussianBlur(radius=20)) bg = bg.point(lambda p: int(p * bg_dim * 0.85)) # --- Vignette overlay tuned per logo position --- overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0)) ov = ImageDraw.Draw(overlay) def h_gradient(from_left: bool, strength: int = 200): s = int(strength * d) for x in range(width): t = 1.0 - (x / max(1, width - 1)) a = int(s * (t ** 0.55)) px = x if from_left else width - 1 - x ov.line([(px, 0), (px, height)], fill=(0, 0, 0, a)) def v_gradient_bottom(strength: int = 180): s = int(strength * d) for y in range(height): t = 1.0 - (y / max(1, height - 1)) a = int(s * (t ** 0.55)) ov.line([(0, height - 1 - y), (width, height - 1 - y)], fill=(0, 0, 0, a)) # Base layer: uniform darkening across the whole frame on every layout. # This ensures no naturally bright backdrop area is left fully exposed. base_alpha = int(vig_max * 0.45) ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, base_alpha)) # Directional layer on top for contrast where the logo sits. if logo_align == "center": ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, int(vig_max * 0.25))) elif logo_align == "left": h_gradient(from_left=True, strength=160) elif logo_align == "bottom-center": v_gradient_bottom(strength=150) elif logo_align == "bottom-left": h_gradient(from_left=True, strength=140) v_gradient_bottom(strength=140) elif logo_align == "bottom-right": h_gradient(from_left=False, strength=140) v_gradient_bottom(strength=140) bg = bg.convert("RGBA") bg = Image.alpha_composite(bg, overlay) bg = bg.convert("RGB") txt_color = ImageColor.getrgb(text_color) pad = max(52, int(height * (0.11 if is_tall_layout else 0.08))) scale = max(0.4, min(2.5, logo_scale)) logo_max_w = int(width * 0.46 * scale) logo_max_h = int(height * (0.18 if is_tall_layout else 0.40) * scale) logo_box = None studio_box = None def get_series_banner() -> dict | None: if season_finale_tag: banner_text = "Season Finale" banner_fill = "#111111" elif new_episodes_tag: banner_text = "New Season" banner_fill = "#E50914" else: return None banner_font_size = max(72 if is_tall_layout else 58, int(height * (0.048 if is_tall_layout else 0.115))) banner_font = get_badge_font(banner_font_size) text_bbox = banner_font.getbbox(banner_text) text_w = text_bbox[2] - text_bbox[0] text_h = text_bbox[3] - text_bbox[1] banner_x = 0 banner_y = 0 banner_w = width vertical_pad = max(12 if is_tall_layout else 12, int(banner_font_size * (0.28 if is_tall_layout else 0.30))) banner_h = max(text_h + vertical_pad * 2, int(height * (0.105 if is_tall_layout else 0.17))) text_x = (banner_w - text_w) // 2 - text_bbox[0] text_y = (banner_h - text_h) // 2 - text_bbox[1] return { "text": banner_text, "fill": ImageColor.getrgb(banner_fill), "font": banner_font, "x": banner_x, "y": banner_y, "w": banner_w, "h": banner_h, "text_x": text_x, "text_y": text_y, } banner = get_series_banner() def logo_position(lw: int, lh: int): if logo_align == "center": return (width - lw) // 2, (height - lh) // 2 elif logo_align == "left": return pad, (height - lh) // 2 elif logo_align == "bottom-center": return (width - lw) // 2, height - lh - pad elif logo_align == "bottom-left": return pad, height - lh - pad elif logo_align == "bottom-right": return width - lw - pad, height - lh - pad return (width - lw) // 2, (height - lh) // 2 # --- Logo image (preferred) --- if logo_bytes: try: logo = load_image_from_bytes(logo_bytes, mode="RGBA") except (UnidentifiedImageError, OSError, ValueError): logo = None if logo is None: logo_bytes = None if logo_bytes: ratio = min(logo_max_w / logo.width, logo_max_h / logo.height) lw = int(logo.width * ratio) lh = int(logo.height * ratio) logo = logo.resize((lw, lh), Image.LANCZOS) logo_x, logo_y = logo_position(lw, lh) # Drop shadow — diffuse glow with no offset for a soft, professional look. # Blur only a tight crop around the logo to avoid dark haze on the backdrop. blur_r = 11 pad = blur_r * 3 _, _, _, a = logo.split() black = Image.new("RGB", (lw, lh), (0, 0, 0)) shadow_logo = Image.merge("RGBA", (*black.split()[:3], a)) shadow_crop = Image.new("RGBA", (lw + pad * 2, lh + pad * 2), (0, 0, 0, 0)) shadow_crop.paste(shadow_logo, (pad, pad), shadow_logo) shadow_crop = shadow_crop.filter(ImageFilter.GaussianBlur(radius=blur_r)) bg = bg.convert("RGBA") bg.alpha_composite(shadow_crop, (logo_x - pad, logo_y - pad)) bg = bg.convert("RGB") bg.paste(logo, (logo_x, logo_y), logo) logo_box = (logo_x, logo_y, lw, lh) else: # --- Text fallback --- draw = ImageDraw.Draw(bg) title_font = get_font(56, bold=True) line_h = 66 lines = wrap_text(title.upper(), title_font, logo_max_w) total_h = len(lines) * line_h tx, ty = logo_position(logo_max_w, total_h) for line in lines: draw.text((tx + 3, ty + 3), line, font=title_font, fill=(0, 0, 0)) draw.text((tx, ty), line, font=title_font, fill=txt_color) ty += line_h logo_box = (tx, logo_position(logo_max_w, total_h)[1], logo_max_w, total_h) # --- Studio logo overlay --- if studio and studio != "none": slogo = make_studio_logo(studio, max_height=46) if slogo: s_pad = 22 sw, sh = slogo.size top_y = s_pad if not banner else banner["y"] + banner["h"] + 10 positions = { "top-left": (s_pad, top_y), "top-right": (width - sw - s_pad, top_y), "bottom-left": (s_pad, height - sh - s_pad), "bottom-right": (width - sw - s_pad, height - sh - s_pad), } sx, sy = positions.get(studio_position, positions["bottom-right"]) bg.paste(slogo, (sx, sy), slogo) studio_box = (sx, sy, sw, sh) if banner: draw = ImageDraw.Draw(bg) draw.rectangle( [(banner["x"], banner["y"]), (banner["x"] + banner["w"], banner["y"] + banner["h"])], fill=banner["fill"], ) draw.text((banner["text_x"], banner["text_y"]), banner["text"], font=banner["font"], fill=(255, 255, 255)) buf = io.BytesIO() bg.save(buf, format="PNG") buf.seek(0) return buf.getvalue() def generate_primary_cover( poster_bytes: bytes, title: str, bg_mode: str = "backdrop", text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, logo_bytes: bytes | None = None, backdrop_bytes: bytes | None = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, ) -> bytes: # Use the same backdrop_bytes as the thumb (backdrop image or uploaded photo). # When none is provided, falls back to using poster_bytes as background. return generate_thumbnail( poster_bytes, title, bg_mode=bg_mode, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, width=PRIMARY_WIDTH, height=PRIMARY_HEIGHT, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) def normalize_collection_target_type(value: str | None) -> str: normalized = (value or "Thumb").strip().lower() if normalized == "primary": return "Primary" if normalized == "thumb": return "Thumb" raise HTTPException(status_code=400, detail="Invalid target type.") def normalize_collection_choice(value: str | None, allowed: set[str], default: str) -> str: normalized = (value or default).strip().lower() if normalized not in allowed: return default return normalized def normalize_collection_text(value: str | None) -> str: text = " ".join((value or "").split()) if not text: raise HTTPException(status_code=400, detail="Text is required.") return text[:140] def clamp_float(value: float, minimum: float, maximum: float) -> float: return max(minimum, min(maximum, float(value))) def get_collection_canvas_size(target_type: str) -> tuple[int, int]: return (PRIMARY_WIDTH, PRIMARY_HEIGHT) if target_type == "Primary" else (THUMB_WIDTH, THUMB_HEIGHT) def build_collection_cache_key( *, item_id: str, target_type: str, text: str, text_color: str, text_align: str, text_position: str, text_scale: float, darkness: float, zoom: float, pan_x: float, pan_y: float, upload_bg_id: str | None, ) -> str: payload = { "render_version": COLLECTION_RENDER_VERSION, "item_id": item_id, "target_type": target_type, "text": text, "text_color": text_color, "text_align": text_align, "text_position": text_position, "text_scale": round(float(text_scale), 4), "darkness": round(float(darkness), 4), "zoom": round(float(zoom), 4), "pan_x": round(float(pan_x), 4), "pan_y": round(float(pan_y), 4), "upload_bg_id": upload_bg_id, } return hashlib.md5(repr(payload).encode("utf-8")).hexdigest() def get_collection_cache_path(cache_key: str) -> Path: return CACHE_DIR / f"{cache_key}.collection.png" def build_collection_fallback_background(width: int, height: int) -> Image.Image: bg = Image.new("RGB", (width, height), "#121723") draw = ImageDraw.Draw(bg) for y in range(height): t = y / max(1, height - 1) r = int(18 + (28 * t)) g = int(24 + (18 * t)) b = int(35 + (48 * t)) draw.line([(0, y), (width, y)], fill=(r, g, b)) accent_radius = max(width, height) // 2 accent = Image.new("RGBA", (width, height), (0, 0, 0, 0)) accent_draw = ImageDraw.Draw(accent) accent_draw.ellipse( ( width - accent_radius, -accent_radius // 3, width + accent_radius // 2, accent_radius + accent_radius // 6, ), fill=(74, 111, 255, 50), ) accent_draw.ellipse( ( -accent_radius // 2, height - accent_radius, accent_radius, height + accent_radius // 3, ), fill=(52, 211, 153, 34), ) return Image.alpha_composite(bg.convert("RGBA"), accent).convert("RGB") def fit_collection_text_block( text: str, *, width: int, height: int, text_scale: float, ) -> tuple[ImageFont.FreeTypeFont, list[str], list[tuple[int, int, int, int]], int]: is_tall = height > width max_width = width - int(width * 0.12) max_height = int(height * (0.54 if is_tall else 0.42)) max_lines = 5 if is_tall else 3 base_size = int(height * (0.07 if is_tall else 0.12) * text_scale) min_size = 34 if is_tall else 24 size = max(base_size, min_size) while size >= min_size: font = get_arial_bold_font(size) lines = wrap_text(text, font, max_width) boxes = [font.getbbox(line) for line in lines] if lines else [] heights = [(box[3] - box[1]) for box in boxes] line_gap = max(8, int(size * 0.16)) total_height = sum(heights) + line_gap * max(0, len(lines) - 1) if lines and len(lines) <= max_lines and total_height <= max_height: return font, lines, boxes, line_gap size -= 4 font = get_arial_bold_font(min_size) lines = wrap_text(text, font, max_width) boxes = [font.getbbox(line) for line in lines] if lines else [] line_gap = max(8, int(min_size * 0.16)) return font, lines or [text], boxes or [font.getbbox(text)], line_gap def generate_collection_art( *, source_bytes: bytes | None, target_type: str, text: str, text_color: str, text_align: str, text_position: str, text_scale: float, darkness: float, zoom: float, pan_x: float, pan_y: float, ) -> bytes: width, height = get_collection_canvas_size(target_type) darkness = clamp_float(darkness, 0.0, 1.0) zoom = clamp_float(zoom, PRIMARY_MIN_ZOOM, PRIMARY_MAX_ZOOM) pan_x = clamp_float(pan_x, -1.0, 1.0) pan_y = clamp_float(pan_y, -1.0, 1.0) if source_bytes: source = load_image_from_bytes(source_bytes, mode="RGB") bg = cover_crop_positioned(source, width, height, zoom=zoom, pan_x=pan_x, pan_y=pan_y) else: bg = build_collection_fallback_background(width, height) if darkness > 0: bg = bg.point(lambda p: int(p * (1.0 - (darkness * 0.42)))) font, lines, boxes, line_gap = fit_collection_text_block( text, width=width, height=height, text_scale=clamp_float(text_scale, 0.65, 1.8), ) try: text_fill = ImageColor.getrgb(text_color) except ValueError: text_fill = (255, 255, 255) padding_x = max(34, int(width * 0.06)) padding_y = max(28, int(height * 0.05)) line_heights = [(box[3] - box[1]) for box in boxes] line_widths = [(box[2] - box[0]) for box in boxes] text_block_w = max(line_widths) if line_widths else 0 text_block_h = sum(line_heights) + line_gap * max(0, len(lines) - 1) if text_align == "left": text_x = padding_x elif text_align == "right": text_x = width - padding_x - text_block_w else: text_x = (width - text_block_w) // 2 if text_position == "top": text_y = padding_y elif text_position == "center": text_y = max(padding_y, (height - text_block_h) // 2) else: text_y = height - padding_y - text_block_h panel_pad_x = max(22, int(font.size * 0.45)) panel_pad_y = max(18, int(font.size * 0.34)) panel_left = max(14, text_x - panel_pad_x) panel_top = max(14, text_y - panel_pad_y) panel_right = min(width - 14, text_x + text_block_w + panel_pad_x) panel_bottom = min(height - 14, text_y + text_block_h + panel_pad_y) overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0)) overlay_draw = ImageDraw.Draw(overlay) overlay_draw.rounded_rectangle( [(panel_left, panel_top), (panel_right, panel_bottom)], radius=max(18, int(min(width, height) * 0.024)), fill=(0, 0, 0, 92 + int(darkness * 80)), outline=(255, 255, 255, 18), width=1, ) bg = Image.alpha_composite(bg.convert("RGBA"), overlay).convert("RGB") draw = ImageDraw.Draw(bg) stroke_width = max(2, int(font.size * 0.08)) current_y = text_y for line, box, line_height, line_width in zip(lines, boxes, line_heights, line_widths): if text_align == "left": line_x = padding_x - box[0] elif text_align == "right": line_x = width - padding_x - line_width - box[0] else: line_x = ((width - line_width) // 2) - box[0] draw.text( (line_x, current_y - box[1]), line, font=font, fill=text_fill, stroke_width=stroke_width, stroke_fill=(0, 0, 0), ) current_y += line_height + line_gap buf = io.BytesIO() bg.save(buf, format="PNG") buf.seek(0) return buf.getvalue() def parse_collection_render_request(body: dict) -> dict: try: item_id = str(body["item_id"]) except KeyError as exc: raise HTTPException(status_code=400, detail="Missing item id.") from exc return { "item_id": item_id, "target_type": normalize_collection_target_type(body.get("target_type", "Thumb")), "text": normalize_collection_text(body.get("text") or body.get("title")), "text_color": body.get("text_color", "#FFFFFF"), "text_align": normalize_collection_choice(body.get("text_align"), {"left", "center", "right"}, "center"), "text_position": normalize_collection_choice(body.get("text_position"), {"top", "center", "bottom"}, "bottom"), "text_scale": clamp_float(body.get("text_scale", 1.0), 0.65, 1.8), "darkness": clamp_float(body.get("darkness", 0.18), 0.0, 0.85), "zoom": clamp_float(body.get("zoom", 1.0), PRIMARY_MIN_ZOOM, PRIMARY_MAX_ZOOM), "pan_x": clamp_float(body.get("pan_x", 0.0), -1.0, 1.0), "pan_y": clamp_float(body.get("pan_y", 0.0), -1.0, 1.0), "upload_bg_id": body.get("upload_bg_id") or None, } async def get_collection_source_bytes(item_id: str, target_type: str, upload_bg_id: str | None) -> bytes | None: if upload_bg_id: return get_upload_cache_path(upload_bg_id).read_bytes() fallback_types = [target_type] if target_type == "Thumb": fallback_types.append("Primary") else: fallback_types.append("Thumb") seen: set[str] = set() for image_type in fallback_types: if image_type in seen: continue seen.add(image_type) image_bytes = await emby_get_image_optional(item_id, image_type) if image_bytes: return image_bytes return None async def render_collection_art_preview(options: dict) -> tuple[str, bytes]: cache_key = build_collection_cache_key( item_id=options["item_id"], target_type=options["target_type"], text=options["text"], text_color=options["text_color"], text_align=options["text_align"], text_position=options["text_position"], text_scale=options["text_scale"], darkness=options["darkness"], zoom=options["zoom"], pan_x=options["pan_x"], pan_y=options["pan_y"], upload_bg_id=options["upload_bg_id"], ) cache_path = get_collection_cache_path(cache_key) if cache_path.exists(): return cache_key, cache_path.read_bytes() source_bytes = await get_collection_source_bytes( options["item_id"], options["target_type"], options["upload_bg_id"], ) preview_bytes = generate_collection_art( source_bytes=source_bytes, target_type=options["target_type"], text=options["text"], text_color=options["text_color"], text_align=options["text_align"], text_position=options["text_position"], text_scale=options["text_scale"], darkness=options["darkness"], zoom=options["zoom"], pan_x=options["pan_x"], pan_y=options["pan_y"], ) cache_path.write_bytes(preview_bytes) return cache_key, preview_bytes # --- API Routes --- @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse(request, "index.html") @app.get("/collections", response_class=HTMLResponse) async def collections_page(request: Request): return templates.TemplateResponse(request, "collections.html") @app.get("/airing", response_class=HTMLResponse) async def airing_page(request: Request): return templates.TemplateResponse(request, "airing.html") @app.get("/api/airing") async def get_airing_titles( page: int = Query(1, ge=1), limit: int = Query(12, ge=1, le=48), eligible_only: bool = False, refresh: bool = False, week_offset: int = Query(0, ge=-8, le=4), ): snapshot = await get_airing_snapshot(force_refresh=refresh, week_offset=week_offset) items = snapshot["items"] filtered = [item for item in items if item["eligible_new_season"]] if eligible_only else items start = (page - 1) * limit page_items = filtered[start:start + limit] total = len(filtered) return { "items": page_items, "page": page, "limit": limit, "total": total, "has_more": start + limit < total, "generated_at": snapshot["generated_at"], "week_start": snapshot["week_start"], "week_end": snapshot["week_end"], "week_offset": snapshot["week_offset"], "new_season_min_days": snapshot["new_season_min_days"], "new_season_max_days": snapshot["new_season_max_days"], } @app.post("/api/airing/apply-new-season") async def apply_new_season_banner(request: Request): body = await request.json() item_id = body["item_id"] generate_primary = bool(body.get("generate_primary", True)) week_offset = int(body.get("week_offset", 0)) snapshot = await get_airing_snapshot(force_refresh=bool(body.get("refresh", False)), week_offset=week_offset) item = next((entry for entry in snapshot["items"] if entry["id"] == item_id), None) if item is None: raise HTTPException(status_code=404, detail="Series was not found in the current airing snapshot.") can_apply_new_season = bool(item["eligible_new_season"]) or ( (item.get("status") or "").strip().lower() == "continuing" and (item.get("selected_week_air_at") or item.get("next_air_at")) ) if not can_apply_new_season: raise HTTPException( status_code=400, detail=( f"Series is outside the New Season window. " f"It must be between {snapshot['new_season_min_days']} and {snapshot['new_season_max_days']} days " f"from the season premiere." ), ) title = body.get("title") or item["name"] cache_key = build_cache_key( item_id=item_id, bg_mode="backdrop", backdrop_index=0, text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, ) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) if not thumb_cache_path.exists() or (generate_primary and not primary_cache_path.exists()): poster_bytes = await emby_get_image(item_id, "Primary") logo_bytes = await emby_get_image_optional(item_id, "Logo", 0) backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", 0) thumb_bytes = generate_thumbnail( poster_bytes, title, bg_mode="backdrop", text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, ) thumb_cache_path.write_bytes(thumb_bytes) if generate_primary: primary_bytes = generate_primary_cover( poster_bytes, title, bg_mode="backdrop", text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, logo_bytes=logo_bytes, ) primary_cache_path.write_bytes(primary_bytes) thumb_status = await emby_upload_image(item_id, thumb_cache_path.read_bytes(), "Thumb") primary_status = None primary_error = None if generate_primary: try: primary_status = await emby_upload_image(item_id, primary_cache_path.read_bytes(), "Primary") except Exception as exc: primary_error = str(exc) return { "status": "applied", "thumb_code": thumb_status, "primary_code": primary_status, "primary_attempted": options["generate_primary"], "primary_error": primary_error, } @app.get("/api/search") async def search_items( q: str = Query(..., min_length=1), type: str = "Movie,Series", start: int = Query(0, ge=0), limit: int = Query(12, ge=1, le=24), ): data = await emby_get("/Items", { "SearchTerm": q, "IncludeItemTypes": type, "Recursive": "true", "Fields": "ProductionYear", "StartIndex": str(start), "Limit": str(limit), "ImageTypeLimit": "1", "EnableImageTypes": "Primary,Logo,Backdrop", }) items = [] for item in data.get("Items", []): image_tags = item.get("ImageTags", {}) backdrop_count = item.get("BackdropImageTags", []) items.append({ "id": item["Id"], "name": item.get("Name", ""), "year": item.get("ProductionYear", ""), "type": item.get("Type", ""), "has_logo": "Logo" in image_tags, "backdrop_count": len(backdrop_count), "poster_url": f"/api/poster/{item['Id']}?w=72&h=108&q=72", }) total = int(data.get("TotalRecordCount", start + len(items))) return { "items": items, "start": start, "limit": limit, "total": total, "has_more": start + len(items) < total, } @app.get("/api/categories") async def get_categories(): data = await emby_get("/Genres", { "Recursive": "true", "SortBy": "SortName", "SortOrder": "Ascending", }) items = [] for item in data.get("Items", []): items.append({ "id": item.get("Id"), "name": item.get("Name", ""), "item_count": item.get("ItemCount") or item.get("ChildCount") or 0, }) items.sort(key=lambda entry: entry["name"].lower()) return {"items": items} @app.get("/api/collections") async def get_collections( q: str = Query(""), start: int = Query(0, ge=0), limit: int = Query(24, ge=1, le=60), ): params = { "IncludeItemTypes": "BoxSet", "Recursive": "true", "SortBy": "SortName", "SortOrder": "Ascending", "Fields": "ChildCount", "StartIndex": str(start), "Limit": str(limit), "ImageTypeLimit": "1", "EnableImageTypes": "Primary,Thumb", } if q.strip(): params["SearchTerm"] = q.strip() data = await emby_get("/Items", params) items = [] for item in data.get("Items", []): image_tags = item.get("ImageTags") or {} items.append({ "id": item["Id"], "name": item.get("Name", ""), "type": item.get("Type", ""), "child_count": item.get("ChildCount") or 0, "poster_url": f"/api/poster/{item['Id']}?w=72&h=108&q=72" if "Primary" in image_tags else None, }) total = int(data.get("TotalRecordCount", start + len(items))) return { "items": items, "start": start, "limit": limit, "total": total, "has_more": start + len(items) < total, } @app.get("/api/collections/{item_id}/artwork") async def get_collection_artwork(item_id: str): images = await emby_get(f"/Items/{item_id}/Images") artwork = { "Primary": {"exists": False, "url": None}, "Thumb": {"exists": False, "url": None}, } for image in images: image_type = image.get("ImageType") if image_type not in artwork or artwork[image_type]["exists"]: continue artwork[image_type] = { "exists": True, "url": f"/api/item-image/{item_id}?type={image_type}&w={300 if image_type == 'Primary' else 420}&h={450 if image_type == 'Primary' else 236}&q=88", } return { "primary": artwork["Primary"], "thumb": artwork["Thumb"], } @app.get("/api/images/{item_id}") async def get_image_info(item_id: str): """Returns accurate logo and backdrop counts by querying the Images endpoint directly.""" images = await emby_get(f"/Items/{item_id}/Images") logos = [] backdrops = [] primaries = [] for img in images: image_type = img.get("ImageType") entry = { "type": image_type, "index": int(img.get("ImageIndex", 0) or 0), "width": img.get("Width"), "height": img.get("Height"), } if image_type == "Logo": entry["url"] = f"/api/item-image/{item_id}?type=Logo&index={entry['index']}&w=220&h=96&q=90" logos.append(entry) elif image_type == "Backdrop": backdrops.append(entry) elif image_type == "Primary": entry["url"] = f"/api/item-image/{item_id}?type=Primary&index={entry['index']}&w=120&h=180&q=80" primaries.append(entry) logos.sort(key=lambda img: img["index"]) backdrops.sort(key=lambda img: img["index"]) primaries.sort(key=lambda img: img["index"]) return { "backdrop_count": len(backdrops), "has_logo": bool(logos), "logo_count": len(logos), "primary_count": len(primaries), "logos": logos, "backdrops": backdrops, "primaries": primaries, } @app.get("/api/item-image/{item_id}") async def get_item_image( item_id: str, type: str = Query("Primary"), index: int | None = Query(None, ge=0), w: int = Query(120, ge=24, le=1200), h: int = Query(180, ge=24, le=1200), q: int = Query(80, ge=30, le=100), ): img_bytes, content_type = await emby_get_image_with_type( item_id, type, index=index, max_width=w, max_height=h, quality=q, ) return Response( content=img_bytes, media_type=content_type, headers={"Cache-Control": "public, max-age=86400"}, ) @app.get("/api/poster/{item_id}") async def get_poster( item_id: str, w: int = Query(72, ge=24, le=400), h: int = Query(108, ge=24, le=600), q: int = Query(72, ge=30, le=100), ): img_bytes = await emby_get_image( item_id, "Primary", max_width=w, max_height=h, quality=q, ) return Response( content=img_bytes, media_type="image/jpeg", headers={"Cache-Control": "public, max-age=86400"}, ) @app.get("/api/cache/{cache_key}/primary") async def get_cached_primary_preview(cache_key: str): if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()): raise HTTPException(status_code=400, detail="Invalid cache key.") cache_path = get_primary_cache_path(cache_key) if not cache_path.exists(): raise HTTPException(status_code=404, detail="Primary preview not found.") return Response( content=cache_path.read_bytes(), media_type="image/png", headers={"Cache-Control": "no-store"}, ) @app.get("/api/cache/{cache_key}/collection") async def get_cached_collection_preview(cache_key: str): if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()): raise HTTPException(status_code=400, detail="Invalid cache key.") cache_path = get_collection_cache_path(cache_key) if not cache_path.exists(): raise HTTPException(status_code=404, detail="Collection preview not found.") return Response( content=cache_path.read_bytes(), media_type="image/png", headers={"Cache-Control": "no-store"}, ) @app.post("/api/upload-background") async def upload_background(request: Request): form = await request.form() file = form.get("file") if file is None: raise HTTPException(status_code=400, detail="Missing file field.") image_bytes = await file.read() if len(image_bytes) > UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail="Uploaded file is too large (max 40 MB).") width, height, fmt = validate_image_bytes(image_bytes) upload_id = hashlib.sha256(image_bytes).hexdigest() upload_path = IMPORT_CACHE_DIR / f"{upload_id}.img" if not upload_path.exists(): upload_path.write_bytes(image_bytes) return {"upload_id": upload_id, "width": width, "height": height, "format": fmt} @app.post("/api/collections/generate") async def generate_collection_preview(request: Request): body = await request.json() options = parse_collection_render_request(body) cache_key, preview_bytes = await render_collection_art_preview(options) return StreamingResponse( io.BytesIO(preview_bytes), media_type="image/png", headers={"X-Cache-Key": cache_key}, ) @app.post("/api/collections/apply") async def apply_collection_art(request: Request): body = await request.json() options = parse_collection_render_request(body) _, preview_bytes = await render_collection_art_preview(options) status_code = await emby_upload_image(options["item_id"], preview_bytes, options["target_type"]) return { "status": "applied", "target_type": options["target_type"], "code": status_code, } @app.post("/api/generate") async def generate(request: Request): options = parse_generator_request(await request.json()) cache_key = get_generator_cache_key(options) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) thumb_cached = thumb_cache_path.exists() primary_cached = primary_cache_path.exists() if thumb_cached and (not options["generate_primary"] or primary_cached): return StreamingResponse( io.BytesIO(thumb_cache_path.read_bytes()), media_type="image/png", headers={ "X-Primary-Generated": "1" if (options["generate_primary"] and primary_cached) else "0", "X-Cache-Key": cache_key, }, ) thumb_bytes, primary_bytes = await render_item_artwork(options) thumb_cache_path.write_bytes(thumb_bytes) primary_generated = primary_bytes is not None if primary_generated: get_primary_cache_path(cache_key).write_bytes(primary_bytes) return StreamingResponse( io.BytesIO(thumb_bytes), media_type="image/png", headers={ "X-Primary-Generated": "1" if primary_generated else "0", "X-Cache-Key": cache_key, }, ) @app.post("/api/apply") async def apply_to_emby(request: Request): options = parse_generator_request(await request.json()) cache_key = get_generator_cache_key(options) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) if not thumb_cache_path.exists(): thumb_bytes, primary_bytes = await render_item_artwork(options) thumb_cache_path.write_bytes(thumb_bytes) if primary_bytes is not None and not primary_cache_path.exists(): primary_cache_path.write_bytes(primary_bytes) thumb_bytes = thumb_cache_path.read_bytes() thumb_status = await emby_upload_image(options["item_id"], thumb_bytes, "Thumb") primary_status = None primary_error = None if options["generate_primary"]: try: if primary_cache_path.exists(): primary_bytes = primary_cache_path.read_bytes() else: _, primary_bytes = await render_item_artwork(options) if primary_bytes is None: raise HTTPException(status_code=500, detail="Primary artwork could not be generated.") primary_cache_path.write_bytes(primary_bytes) primary_status = await emby_upload_image(options["item_id"], primary_bytes, "Primary") except Exception as exc: primary_error = str(exc) return { "status": "applied", "thumb_code": thumb_status, "primary_code": primary_status, "primary_attempted": options["generate_primary"], "primary_error": primary_error, } @app.post("/api/bulk-apply/category") async def bulk_apply_category(request: Request): body = await request.json() category_id = (body.get("category_id") or "").strip() category_name = (body.get("category_name") or "").strip() if not category_id: raise HTTPException(status_code=400, detail="Category is required.") base_options = parse_generator_request(body) items = await emby_get_all("/Items", { "GenreIds": category_id, "IncludeItemTypes": "Movie,Series", "Recursive": "true", "SortBy": "SortName", "SortOrder": "Ascending", "Fields": "ProductionYear", "ImageTypeLimit": "1", "EnableImageTypes": "Primary,Logo,Backdrop", }) eligible_items: list[dict] = [] skipped_without_logo: list[str] = [] skipped_without_primary: list[str] = [] for item in items: image_tags = item.get("ImageTags") or {} name = item.get("Name") or "Unknown" if "Primary" not in image_tags: skipped_without_primary.append(name) continue if "Logo" not in image_tags: skipped_without_logo.append(name) continue eligible_items.append(item) applied: list[str] = [] failed: list[dict] = [] for item in eligible_items: item_options = dict(base_options) item_options["item_id"] = item["Id"] item_options["title"] = item.get("Name", "") if item.get("Type") != "Series": item_options["new_episodes_tag"] = False item_options["season_finale_tag"] = False cache_key = get_generator_cache_key(item_options) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) try: thumb_bytes, primary_bytes = await render_item_artwork(item_options, fallback_logo_to_first=True) thumb_cache_path.write_bytes(thumb_bytes) await emby_upload_image(item["Id"], thumb_bytes, "Thumb") if item_options["generate_primary"] and primary_bytes is not None: primary_cache_path.write_bytes(primary_bytes) await emby_upload_image(item["Id"], primary_bytes, "Primary") applied.append(item.get("Name", "")) except Exception as exc: failed.append({ "name": item.get("Name", ""), "error": str(exc), }) return { "status": "completed", "category_id": category_id, "category_name": category_name, "matched_count": len(items), "eligible_count": len(eligible_items), "applied_count": len(applied), "skipped_without_logo_count": len(skipped_without_logo), "skipped_without_primary_count": len(skipped_without_primary), "failed_count": len(failed), "failed": failed[:12], "applied": applied[:12], } @app.get("/api/config") async def get_config(): return { "emby_url": EMBY_URL, "connected": bool(EMBY_API_KEY), } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8500)