import asyncio import io import os import hashlib import base64 import time import urllib.parse from datetime import datetime, timedelta, timezone from dataclasses import dataclass from pathlib import Path import httpx from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse, Response, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageColor, ImageOps, UnidentifiedImageError app = FastAPI(title="Emby Thumbnail Generator") app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096") EMBY_API_KEY = os.environ.get("EMBY_API_KEY", "b9af54b630f6448289ab96422add567a") CACHE_DIR = Path("cache") CACHE_DIR.mkdir(exist_ok=True) IMPORT_CACHE_DIR = CACHE_DIR / "imports" IMPORT_CACHE_DIR.mkdir(exist_ok=True) RENDER_VERSION = "series-banner-v10" THUMB_WIDTH = 800 THUMB_HEIGHT = 450 PRIMARY_WIDTH = 1000 PRIMARY_HEIGHT = 1500 PRIMARY_MIN_ZOOM = 1.0 PRIMARY_MAX_ZOOM = 2.6 REMOTE_IMAGE_MAX_BYTES = 18 * 1024 * 1024 EDITOR_TARGETS = { "poster": {"width": 1000, "height": 1500, "emby_type": "Primary", "label": "Poster"}, "thumb": {"width": 1920, "height": 1080, "emby_type": "Thumb", "label": "Thumb"}, "backdrop": {"width": 1920, "height": 1080, "emby_type": "Backdrop", "label": "Backdrop"}, } HTTP_TIMEOUT = 30.0 http_client: httpx.AsyncClient | None = None AIRING_LOOKUP_CACHE_TTL = 900 NEW_SEASON_MIN_AGE_DAYS = 7 NEW_SEASON_MAX_AGE_DAYS = 21 SEASON_INFERENCE_LOOKBACK_DAYS = 180 airing_lookup_cache: dict[int, dict] = {} airing_lookup_lock = asyncio.Lock() # ── Studio logo file map ───────────────────────────────────────────────────── STUDIOS_DIR = Path("static/studios") # Maps studio key → filename (relative to STUDIOS_DIR) STUDIO_FILES: dict[str, str] = { "hulu": "hulu.png", "hbo": "hbo.png", "disney": "disney.png", } def make_studio_logo(studio: str, max_height: int = 52) -> Image.Image | None: """Load a studio logo from disk and scale it to max_height, preserving aspect ratio.""" filename = STUDIO_FILES.get(studio) if not filename: return None path = STUDIOS_DIR / filename if not path.exists(): return None img = Image.open(path).convert("RGBA") if img.height > max_height: scale = max_height / img.height img = img.resize((int(img.width * scale), max_height), Image.LANCZOS) return img def load_image_from_bytes(image_bytes: bytes, mode: str = "RGB") -> Image.Image: """Fully decode image bytes into an in-memory PIL image. Some Emby-delivered assets appear to trigger blocky/partial artifacts when we process the lazy decoder output directly. Forcing a full load and copy gives us a stable image object before resize/filter operations. """ with Image.open(io.BytesIO(image_bytes)) as img: img = ImageOps.exif_transpose(img) img.load() return img.convert(mode).copy() def encode_jpeg_bytes(img: Image.Image, *, quality: int = 95) -> bytes: buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=quality, optimize=True, progressive=True) return buf.getvalue() def validate_image_bytes(image_bytes: bytes, *, max_pixels: int = 24_000_000) -> tuple[int, int, str]: try: with Image.open(io.BytesIO(image_bytes)) as img: img.verify() width, height = img.size fmt = img.format or "image" except (UnidentifiedImageError, OSError, ValueError) as exc: raise HTTPException(status_code=400, detail="The downloaded file is not a valid image.") from exc if width * height > max_pixels: raise HTTPException(status_code=400, detail="Image is too large to process safely.") return width, height, fmt def get_editor_target(target_mode: str) -> dict: target = EDITOR_TARGETS.get(target_mode) if target is None: raise HTTPException(status_code=400, detail="Invalid artwork target mode.") return target def get_import_cache_path(import_id: str) -> Path: if not import_id or any(ch not in "0123456789abcdef" for ch in import_id.lower()): raise HTTPException(status_code=400, detail="Invalid imported image id.") path = IMPORT_CACHE_DIR / f"{import_id}.img" if not path.exists(): raise HTTPException(status_code=404, detail="Imported image was not found in cache.") return path async def resolve_editor_source_bytes(body: dict) -> bytes: source_kind = body.get("source_kind", "emby") item_id = body.get("item_id") if source_kind == "import": return get_import_cache_path(body.get("import_id", "")).read_bytes() if source_kind != "emby": raise HTTPException(status_code=400, detail="Invalid source image kind.") if not item_id: raise HTTPException(status_code=400, detail="Missing item id.") image_type = body.get("source_type", "Primary") if image_type not in {"Primary", "Thumb", "Backdrop"}: raise HTTPException(status_code=400, detail="Invalid Emby source image type.") index_raw = body.get("source_index", None) index = int(index_raw) if index_raw is not None else None return await emby_get_image(item_id, image_type, index=index) def image_to_data_url(image_bytes: bytes, media_type: str = "image/jpeg") -> str: return f"data:{media_type};base64,{base64.b64encode(image_bytes).decode('ascii')}" def fit_image_positioned( img: Image.Image, width: int, height: int, *, fit_mode: str = "cover", zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = 0.0, ) -> tuple[Image.Image, tuple[int, int]]: fit_mode = fit_mode if fit_mode in {"cover", "contain"} else "cover" pan_x = max(-1.0, min(1.0, float(pan_x))) pan_y = max(-1.0, min(1.0, float(pan_y))) zoom = max(0.25, min(5.0, float(zoom))) scale_fn = max if fit_mode == "cover" else min scale = scale_fn(width / max(1, img.width), height / max(1, img.height)) * zoom fitted_w = max(1, int(round(img.width * scale))) fitted_h = max(1, int(round(img.height * scale))) fitted = img.resize((fitted_w, fitted_h), Image.LANCZOS) if fitted_w >= width: x = -int(round((fitted_w - width) * ((pan_x + 1.0) / 2.0))) else: x = int(round((width - fitted_w) * ((pan_x + 1.0) / 2.0))) if fitted_h >= height: y = -int(round((fitted_h - height) * ((pan_y + 1.0) / 2.0))) else: y = int(round((height - fitted_h) * ((pan_y + 1.0) / 2.0))) return fitted, (x, y) def build_editor_fill( img: Image.Image, width: int, height: int, *, fill_mode: str, fill_color: str | None, ) -> Image.Image: if fill_mode == "solid": try: color = ImageColor.getrgb(fill_color or "#111318") except ValueError: color = (17, 19, 24) return Image.new("RGB", (width, height), color) base = cover_crop(img, width, height) if fill_mode == "mirror": base = ImageOps.mirror(base) overlay = Image.new("RGBA", (width, height), (0, 0, 0, 54)) return Image.alpha_composite(base.convert("RGBA"), overlay).convert("RGB") base = base.filter(ImageFilter.GaussianBlur(radius=max(18, int(min(width, height) * 0.035)))) overlay = Image.new("RGBA", (width, height), (0, 0, 0, 70)) return Image.alpha_composite(base.convert("RGBA"), overlay).convert("RGB") def render_artwork_editor_image( source_bytes: bytes, *, target_mode: str = "poster", fit_mode: str = "cover", fill_mode: str = "blur", fill_color: str | None = "#101318", zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = 0.0, quality: int = 95, ) -> bytes: target = get_editor_target(target_mode) width = target["width"] height = target["height"] source = load_image_from_bytes(source_bytes, mode="RGB") canvas = build_editor_fill(source, width, height, fill_mode=fill_mode, fill_color=fill_color) fitted, (x, y) = fit_image_positioned( source, width, height, fit_mode=fit_mode, zoom=zoom, pan_x=pan_x, pan_y=pan_y, ) canvas.paste(fitted, (x, y)) return encode_jpeg_bytes(canvas, quality=max(80, min(98, int(quality)))) def build_editor_cache_key(body: dict) -> str: payload = { "version": "artwork-editor-v1", "item_id": body.get("item_id"), "source_kind": body.get("source_kind", "emby"), "source_type": body.get("source_type", "Primary"), "source_index": body.get("source_index"), "import_id": body.get("import_id"), "target_mode": body.get("target_mode", "poster"), "fit_mode": body.get("fit_mode", "cover"), "fill_mode": body.get("fill_mode", "blur"), "fill_color": body.get("fill_color", "#101318"), "zoom": round(float(body.get("zoom", 1.0)), 4), "pan_x": round(float(body.get("pan_x", 0.0)), 4), "pan_y": round(float(body.get("pan_y", 0.0)), 4), "quality": int(body.get("quality", 95)), } return hashlib.md5(repr(payload).encode("utf-8")).hexdigest() # --- Emby API helpers --- def get_http_client() -> httpx.AsyncClient: global http_client if http_client is None: http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT) return http_client @app.on_event("startup") async def startup_event(): get_http_client() @app.on_event("shutdown") async def shutdown_event(): global http_client if http_client is not None: await http_client.aclose() http_client = None async def emby_get(path: str, params: dict = None) -> dict: client = get_http_client() url = f"{EMBY_URL}{path}" p = params or {} p["api_key"] = EMBY_API_KEY r = await client.get(url, params=p) r.raise_for_status() return r.json() async def emby_get_image( item_id: str, image_type: str = "Primary", index: int | None = None, *, max_width: int | None = None, max_height: int | None = None, quality: int | None = None, ) -> bytes: client = get_http_client() url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}" params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index if max_width is not None: params["maxWidth"] = max_width if max_height is not None: params["maxHeight"] = max_height if quality is not None: params["quality"] = quality r = await client.get(url, params=params) r.raise_for_status() return r.content async def emby_get_image_with_type( item_id: str, image_type: str = "Primary", index: int | None = None, *, max_width: int | None = None, max_height: int | None = None, quality: int | None = None, ) -> tuple[bytes, str]: client = get_http_client() url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}" params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index if max_width is not None: params["maxWidth"] = max_width if max_height is not None: params["maxHeight"] = max_height if quality is not None: params["quality"] = quality r = await client.get(url, params=params) r.raise_for_status() content_type = (r.headers.get("content-type", "image/jpeg") or "image/jpeg").split(";")[0] return r.content, content_type async def emby_get_image_optional(item_id: str, image_type: str, index: int | None = None) -> bytes | None: """Returns image bytes or None if the image type doesn't exist for this item.""" client = get_http_client() url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}" params: dict[str, str | int] = {"api_key": EMBY_API_KEY} if index is not None: params["Index"] = index r = await client.get(url, params=params) if r.status_code in (404, 400): return None r.raise_for_status() content_type = r.headers.get("content-type", "").lower() if content_type and not content_type.startswith("image/"): return None return r.content async def emby_upload_image(item_id: str, image_bytes: bytes, image_type: str = "Thumb"): upload_buf = io.BytesIO() upload_image = load_image_from_bytes(image_bytes, mode="RGB") upload_image.save(upload_buf, format="JPEG", quality=92, optimize=True) upload_b64 = base64.b64encode(upload_buf.getvalue()) client = get_http_client() url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}" r = await client.post( url, params={"api_key": EMBY_API_KEY}, content=upload_b64, headers={ "Content-Type": "image/jpeg", "X-Emby-Token": EMBY_API_KEY, }, ) try: r.raise_for_status() except httpx.HTTPStatusError as exc: detail = r.text.strip() or str(exc) raise HTTPException( status_code=502, detail=f"Emby upload failed ({r.status_code}): {detail}", ) from exc return r.status_code @dataclass class ArtworkSearchResult: id: str title: str thumbnail_url: str image_url: str source_url: str provider: str width: int | None = None height: int | None = None class ImageSearchProvider: key = "base" label = "Base" async def search(self, query: str, limit: int = 12) -> list[ArtworkSearchResult]: raise NotImplementedError class WikimediaImageSearchProvider(ImageSearchProvider): key = "wikimedia" label = "Wikimedia Commons" async def search(self, query: str, limit: int = 12) -> list[ArtworkSearchResult]: client = get_http_client() params = { "action": "query", "generator": "search", "gsrnamespace": "6", "gsrsearch": query, "gsrlimit": str(max(1, min(24, limit))), "prop": "imageinfo", "iiprop": "url|size|mime", "iiurlwidth": "420", "format": "json", "origin": "*", } r = await client.get("https://commons.wikimedia.org/w/api.php", params=params) r.raise_for_status() pages = (r.json().get("query") or {}).get("pages") or {} results: list[ArtworkSearchResult] = [] for page in pages.values(): info = (page.get("imageinfo") or [{}])[0] mime = info.get("mime", "") if not mime.startswith("image/"): continue image_url = info.get("url") if not image_url: continue title = (page.get("title") or "Artwork").replace("File:", "", 1) results.append(ArtworkSearchResult( id=hashlib.md5(image_url.encode("utf-8")).hexdigest(), title=title, thumbnail_url=info.get("thumburl") or image_url, image_url=image_url, source_url=info.get("descriptionurl") or image_url, provider=self.key, width=info.get("width"), height=info.get("height"), )) return results IMAGE_SEARCH_PROVIDERS: dict[str, ImageSearchProvider] = { WikimediaImageSearchProvider.key: WikimediaImageSearchProvider(), } def parse_emby_datetime(value: str | None) -> datetime | None: if not value: return None try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def to_emby_iso(value: datetime) -> str: return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") async def emby_get_all(path: str, params: dict | None = None, *, page_size: int = 200) -> list[dict]: items: list[dict] = [] start_index = 0 base_params = dict(params or {}) while True: page = await emby_get(path, { **base_params, "StartIndex": str(start_index), "Limit": str(page_size), }) batch = page.get("Items", []) items.extend(batch) total = int(page.get("TotalRecordCount", start_index + len(batch))) if not batch or start_index + len(batch) >= total: break start_index += len(batch) return items def get_week_bounds(now: datetime, week_offset: int) -> tuple[datetime, datetime]: week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0) week_start = week_start + timedelta(weeks=week_offset) return week_start, week_start + timedelta(days=7) async def build_airing_snapshot(week_offset: int = 0) -> dict: now = datetime.now(timezone.utc) week_start, week_end = get_week_bounds(now, week_offset) week_shift_days = max(0, -week_offset) * 7 eligible_min_days = NEW_SEASON_MIN_AGE_DAYS + week_shift_days eligible_max_days = NEW_SEASON_MAX_AGE_DAYS + week_shift_days recent_window_start = now - timedelta(days=eligible_max_days) recent_window_end = now - timedelta(days=eligible_min_days) inference_window_start = now - timedelta(days=max(SEASON_INFERENCE_LOOKBACK_DAYS, eligible_max_days + 35)) series_task = emby_get_all("/Items", { "IncludeItemTypes": "Series", "Recursive": "true", "SortBy": "SortName", "SortOrder": "Ascending", "ImageTypeLimit": "1", "EnableImageTypes": "Primary", }) recent_episodes_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Descending", "MinPremiereDate": to_emby_iso(recent_window_start), "MaxPremiereDate": to_emby_iso(recent_window_end), }) recent_season_activity_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(inference_window_start), "MaxPremiereDate": to_emby_iso(now + timedelta(days=1)), }) selected_week_episodes_task = emby_get_all("/Items", { "IncludeItemTypes": "Episode", "Recursive": "true", "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(week_start), "MaxPremiereDate": to_emby_iso(week_end), }) upcoming_episodes_task = emby_get_all("/Shows/Upcoming", { "SortBy": "PremiereDate", "SortOrder": "Ascending", "MinPremiereDate": to_emby_iso(now - timedelta(hours=12)), }) series_items, recent_episodes, recent_season_activity, selected_week_episodes, upcoming_episodes = await asyncio.gather( series_task, recent_episodes_task, recent_season_activity_task, selected_week_episodes_task, upcoming_episodes_task, return_exceptions=True, ) if isinstance(series_items, Exception): raise series_items if isinstance(recent_episodes, Exception): recent_episodes = [] if isinstance(recent_season_activity, Exception): recent_season_activity = [] if isinstance(selected_week_episodes, Exception): selected_week_episodes = [] if isinstance(upcoming_episodes, Exception): upcoming_episodes = [] recent_premieres_by_series: dict[str, dict] = {} for episode in recent_episodes: series_id = episode.get("SeriesId") season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or season_number < 2 or episode_number != 1 or premiere_at is None: continue existing = recent_premieres_by_series.get(series_id) if existing is None or premiere_at > existing["premiere_at"]: recent_premieres_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_id": episode.get("Id"), "episode_name": episode.get("Name") or "", } earliest_seen_by_series_season: dict[tuple[str, int], dict] = {} for episode in recent_season_activity: series_id = episode.get("SeriesId") season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or season_number < 2 or premiere_at is None: continue key = (series_id, season_number) existing = earliest_seen_by_series_season.get(key) if existing is None or premiere_at < existing["premiere_at"]: earliest_seen_by_series_season[key] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } selected_week_by_series: dict[str, dict] = {} for episode in selected_week_episodes: series_id = episode.get("SeriesId") premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or premiere_at is None: continue existing = selected_week_by_series.get(series_id) if existing is None or premiere_at < existing["premiere_at"]: season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) selected_week_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } upcoming_by_series: dict[str, dict] = {} for episode in upcoming_episodes: series_id = episode.get("SeriesId") premiere_at = parse_emby_datetime(episode.get("PremiereDate")) if not series_id or premiere_at is None: continue existing = upcoming_by_series.get(series_id) if existing is None or premiere_at < existing["premiere_at"]: season_number = int(episode.get("ParentIndexNumber") or 0) episode_number = int(episode.get("IndexNumber") or 0) upcoming_by_series[series_id] = { "premiere_at": premiere_at, "season_number": season_number, "episode_number": episode_number, "episode_name": episode.get("Name") or "", } results: list[dict] = [] for series in series_items: series_id = series.get("Id") if not series_id: continue recent_premiere = recent_premieres_by_series.get(series_id) selected_week_episode = selected_week_by_series.get(series_id) upcoming = upcoming_by_series.get(series_id) status = (series.get("Status") or "").strip() is_current_airing = ( status.lower() == "continuing" or selected_week_episode is not None or upcoming is not None or recent_premiere is not None ) if not is_current_airing: continue air_days = series.get("AirDays") or [] if isinstance(air_days, str): air_days = [part.strip() for part in air_days.split(",") if part.strip()] active_season_number = max( recent_premiere["season_number"] if recent_premiere else 0, selected_week_episode["season_number"] if selected_week_episode else 0, upcoming["season_number"] if upcoming else 0, ) inferred_recent_premiere = None if recent_premiere is None and active_season_number >= 2 and (selected_week_episode is not None or upcoming is not None): inferred_recent_premiere = earliest_seen_by_series_season.get((series_id, active_season_number)) season_start = recent_premiere or inferred_recent_premiere days_since_premiere = None if season_start is not None: days_since_premiere = max(0, int((now - season_start["premiere_at"]).total_seconds() // 86400)) has_current_airing_signal = status.lower() == "continuing" and ( selected_week_episode is not None or upcoming is not None ) has_recent_season_start_signal = ( status.lower() == "continuing" and (selected_week_episode is not None or upcoming is not None) and days_since_premiere is not None and eligible_min_days <= days_since_premiere <= eligible_max_days ) eligible_new_season = has_current_airing_signal or has_recent_season_start_signal season_premiere_inferred = recent_premiere is None and inferred_recent_premiere is not None eligibility_reason = ( "current_airing" if has_current_airing_signal else ("recent_season_start" if has_recent_season_start_signal else None) ) next_air_at = upcoming["premiere_at"] if upcoming else None results.append({ "id": series_id, "name": series.get("Name", ""), "year": series.get("ProductionYear"), "status": status or ("Continuing" if upcoming else "Unknown"), "air_days": air_days, "poster_url": f"/api/poster/{series_id}?w=180&h=270&q=84", "selected_week_air_at": to_emby_iso(selected_week_episode["premiere_at"]) if selected_week_episode else None, "selected_week_episode_label": ( f"S{selected_week_episode['season_number']}E{selected_week_episode['episode_number']} · {selected_week_episode['episode_name']}".strip(" ·") if selected_week_episode and selected_week_episode["season_number"] and selected_week_episode["episode_number"] else (selected_week_episode["episode_name"] if selected_week_episode else None) ), "next_air_at": to_emby_iso(next_air_at) if next_air_at else None, "next_episode_label": ( f"S{upcoming['season_number']}E{upcoming['episode_number']} · {upcoming['episode_name']}".strip(" ·") if upcoming and upcoming["season_number"] and upcoming["episode_number"] else (upcoming["episode_name"] if upcoming else None) ), "season_number": season_start["season_number"] if season_start else None, "season_premiere_at": to_emby_iso(season_start["premiere_at"]) if season_start else None, "season_premiere_episode": season_start["episode_name"] if season_start else None, "days_since_season_premiere": days_since_premiere, "season_premiere_inferred": season_premiere_inferred, "eligibility_reason": eligibility_reason, "eligible_new_season": eligible_new_season, }) results.sort(key=lambda item: ( 0 if item["eligible_new_season"] else 1, item["selected_week_air_at"] is None and item["next_air_at"] is None, item["selected_week_air_at"] or item["next_air_at"] or "", item["name"].lower(), )) return { "items": results, "generated_at": to_emby_iso(now), "week_start": to_emby_iso(week_start), "week_end": to_emby_iso(week_end), "week_offset": week_offset, "new_season_min_days": eligible_min_days, "new_season_max_days": eligible_max_days, } async def get_airing_snapshot(force_refresh: bool = False, week_offset: int = 0) -> dict: cached = airing_lookup_cache.get(week_offset) if not force_refresh and cached and cached["expires_at"] > time.time(): return cached["data"] async with airing_lookup_lock: cached = airing_lookup_cache.get(week_offset) if not force_refresh and cached and cached["expires_at"] > time.time(): return cached["data"] data = await build_airing_snapshot(week_offset=week_offset) airing_lookup_cache[week_offset] = { "expires_at": time.time() + AIRING_LOOKUP_CACHE_TTL, "data": data, } return data # --- Image helpers --- def cover_crop(img: Image.Image, width: int, height: int) -> Image.Image: """Scale and center-crop image to exactly width x height.""" return ImageOps.fit( img, (width, height), method=Image.LANCZOS, centering=(0.5, 0.5), ) def cover_crop_positioned( img: Image.Image, width: int, height: int, *, zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = 0.0, ) -> Image.Image: """Cover-crop with user-controlled zoom and pan. `pan_x` / `pan_y` are normalized in [-1, 1], where -1 means left/top and +1 means right/bottom. """ pan_x = max(-1.0, min(1.0, float(pan_x))) pan_y = max(-1.0, min(1.0, float(pan_y))) zoom = max(PRIMARY_MIN_ZOOM, min(PRIMARY_MAX_ZOOM, float(zoom))) target_ratio = width / max(1, height) source_ratio = img.width / max(1, img.height) if source_ratio > target_ratio: crop_h = img.height crop_w = int(round(crop_h * target_ratio)) else: crop_w = img.width crop_h = int(round(crop_w / target_ratio)) crop_w = max(1, min(img.width, int(round(crop_w / zoom)))) crop_h = max(1, min(img.height, int(round(crop_h / zoom)))) max_left = max(0, img.width - crop_w) max_top = max(0, img.height - crop_h) left = int(round((max_left / 2) * (pan_x + 1.0))) top = int(round((max_top / 2) * (pan_y + 1.0))) cropped = img.crop((left, top, left + crop_w, top + crop_h)) return cropped.resize((width, height), Image.LANCZOS) def build_tall_backdrop_background( img: Image.Image, width: int, height: int, *, dim_factor: float, zoom: float = 1.0, pan_x: float = 0.0, pan_y: float = -0.16, ) -> Image.Image: base = cover_crop_positioned(img, width, height, zoom=zoom, pan_x=pan_x, pan_y=pan_y) return base.point(lambda p: int(p * dim_factor)) def get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont: font_paths = [ # Windows "C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf", "C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf", "C:/Windows/Fonts/verdanab.ttf" if bold else "C:/Windows/Fonts/verdana.ttf", # Linux "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return ImageFont.load_default(size) def get_arial_bold_font(size: int) -> ImageFont.FreeTypeFont: font_paths = [ "C:/Windows/Fonts/arialbd.ttf", "/usr/share/fonts/truetype/msttcorefonts/Arial_Bold.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return get_font(size, bold=True) def get_badge_font(size: int) -> ImageFont.FreeTypeFont: font_paths = [ "C:/Windows/Fonts/ariblk.ttf", "C:/Windows/Fonts/arialbd.ttf", "C:/Windows/Fonts/seguisb.ttf", "/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ] for fp in font_paths: if os.path.exists(fp): return ImageFont.truetype(fp, size) return get_arial_bold_font(size) def measure_tracked_text(text: str, font: ImageFont.FreeTypeFont, tracking: int) -> tuple[int, int]: widths = [] top = 0 bottom = 0 for ch in text: bbox = font.getbbox(ch) widths.append(bbox[2] - bbox[0]) top = min(top, bbox[1]) bottom = max(bottom, bbox[3]) if not widths: return 0, 0 width = sum(widths) + tracking * max(0, len(widths) - 1) height = bottom - top return width, height def draw_tracked_text( draw: ImageDraw.ImageDraw, position: tuple[int, int], text: str, font: ImageFont.FreeTypeFont, fill: tuple[int, int, int], tracking: int, ) -> None: x, y = position for ch in text: bbox = font.getbbox(ch) draw.text((x - bbox[0], y - bbox[1]), ch, font=font, fill=fill) x += (bbox[2] - bbox[0]) + tracking def wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list: words = text.split() lines, current = [], "" for word in words: test = f"{current} {word}".strip() if font.getbbox(test)[2] <= max_width: current = test else: if current: lines.append(current) current = word if current: lines.append(current) return lines def build_cache_key( item_id: str, bg_mode: str = "backdrop", backdrop_index: int = 0, custom_bg: str | None = None, text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, ) -> str: payload = { "render_version": RENDER_VERSION, "item_id": item_id, "bg_mode": bg_mode, "backdrop_index": int(backdrop_index), "custom_bg": custom_bg, "text_color": text_color, "logo_align": logo_align, "logo_scale": round(float(logo_scale), 4), "darkness": round(float(darkness), 4), "studio": studio, "studio_position": studio_position, "new_episodes_tag": bool(new_episodes_tag), "season_finale_tag": bool(season_finale_tag), "logo_index": logo_index, "primary_zoom": round(float(primary_zoom), 4), "primary_pan_x": round(float(primary_pan_x), 4), "primary_pan_y": round(float(primary_pan_y), 4), } return hashlib.md5(repr(payload).encode("utf-8")).hexdigest() def get_thumb_cache_path(cache_key: str) -> Path: return CACHE_DIR / f"{cache_key}.png" def get_primary_cache_path(cache_key: str) -> Path: return CACHE_DIR / f"{cache_key}.primary.png" def generate_thumbnail( poster_bytes: bytes, title: str, bg_mode: str = "backdrop", custom_bg: str = None, text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, width: int = THUMB_WIDTH, height: int = THUMB_HEIGHT, logo_bytes: bytes = None, backdrop_bytes: bytes = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, ) -> bytes: # darkness is 0.0 (no dimming) → 1.0 (very dark); scale bg and vignette together d = max(0.0, min(1.0, darkness)) bg_dim = 1.0 - (d * 0.6) # backdrop brightness: 1.0 → 0.4 vig_max = int(d * 220) # vignette peak alpha: 0 → 220 is_tall_layout = height >= int(width * 1.3) # --- Build background --- if backdrop_bytes and bg_mode != "solid": backdrop = load_image_from_bytes(backdrop_bytes, mode="RGB") if is_tall_layout: bg = build_tall_backdrop_background( backdrop, width, height, dim_factor=bg_dim, zoom=primary_zoom, pan_x=primary_pan_x, pan_y=primary_pan_y, ) else: bg = cover_crop(backdrop, width, height) bg = bg.point(lambda p: int(p * bg_dim)) elif bg_mode == "solid" and custom_bg: try: bg_color = ImageColor.getrgb(custom_bg) except ValueError: bg_color = (15, 15, 25) bg = Image.new("RGB", (width, height), bg_color) else: poster = load_image_from_bytes(poster_bytes, mode="RGB") if is_tall_layout: bg = build_tall_backdrop_background( poster, width, height, dim_factor=bg_dim, zoom=primary_zoom, pan_x=primary_pan_x, pan_y=primary_pan_y, ) else: # Blurred poster fallback for landscape thumbs. bg = cover_crop(poster, width, height) bg = bg.filter(ImageFilter.GaussianBlur(radius=20)) bg = bg.point(lambda p: int(p * bg_dim * 0.85)) # --- Vignette overlay tuned per logo position --- overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0)) ov = ImageDraw.Draw(overlay) def h_gradient(from_left: bool, strength: int = 200): s = int(strength * d) for x in range(width): t = 1.0 - (x / max(1, width - 1)) a = int(s * (t ** 0.55)) px = x if from_left else width - 1 - x ov.line([(px, 0), (px, height)], fill=(0, 0, 0, a)) def v_gradient_bottom(strength: int = 180): s = int(strength * d) for y in range(height): t = 1.0 - (y / max(1, height - 1)) a = int(s * (t ** 0.55)) ov.line([(0, height - 1 - y), (width, height - 1 - y)], fill=(0, 0, 0, a)) # Base layer: uniform darkening across the whole frame on every layout. # This ensures no naturally bright backdrop area is left fully exposed. base_alpha = int(vig_max * 0.45) ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, base_alpha)) # Directional layer on top for contrast where the logo sits. if logo_align == "center": ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, int(vig_max * 0.25))) elif logo_align == "left": h_gradient(from_left=True, strength=160) elif logo_align == "bottom-center": v_gradient_bottom(strength=150) elif logo_align == "bottom-left": h_gradient(from_left=True, strength=140) v_gradient_bottom(strength=140) elif logo_align == "bottom-right": h_gradient(from_left=False, strength=140) v_gradient_bottom(strength=140) bg = bg.convert("RGBA") bg = Image.alpha_composite(bg, overlay) bg = bg.convert("RGB") txt_color = ImageColor.getrgb(text_color) pad = max(52, int(height * (0.11 if is_tall_layout else 0.08))) scale = max(0.4, min(2.5, logo_scale)) logo_max_w = int(width * 0.46 * scale) logo_max_h = int(height * (0.18 if is_tall_layout else 0.40) * scale) logo_box = None studio_box = None def get_series_banner() -> dict | None: if season_finale_tag: banner_text = "Season Finale" banner_fill = "#111111" elif new_episodes_tag: banner_text = "New Season" banner_fill = "#E50914" else: return None banner_font_size = max(72 if is_tall_layout else 58, int(height * (0.048 if is_tall_layout else 0.115))) banner_font = get_badge_font(banner_font_size) text_bbox = banner_font.getbbox(banner_text) text_w = text_bbox[2] - text_bbox[0] text_h = text_bbox[3] - text_bbox[1] banner_x = 0 banner_y = 0 banner_w = width vertical_pad = max(12 if is_tall_layout else 12, int(banner_font_size * (0.28 if is_tall_layout else 0.30))) banner_h = max(text_h + vertical_pad * 2, int(height * (0.105 if is_tall_layout else 0.17))) text_x = (banner_w - text_w) // 2 - text_bbox[0] text_y = (banner_h - text_h) // 2 - text_bbox[1] return { "text": banner_text, "fill": ImageColor.getrgb(banner_fill), "font": banner_font, "x": banner_x, "y": banner_y, "w": banner_w, "h": banner_h, "text_x": text_x, "text_y": text_y, } banner = get_series_banner() def logo_position(lw: int, lh: int): if logo_align == "center": return (width - lw) // 2, (height - lh) // 2 elif logo_align == "left": return pad, (height - lh) // 2 elif logo_align == "bottom-center": return (width - lw) // 2, height - lh - pad elif logo_align == "bottom-left": return pad, height - lh - pad elif logo_align == "bottom-right": return width - lw - pad, height - lh - pad return (width - lw) // 2, (height - lh) // 2 # --- Logo image (preferred) --- if logo_bytes: try: logo = load_image_from_bytes(logo_bytes, mode="RGBA") except (UnidentifiedImageError, OSError, ValueError): logo = None if logo is None: logo_bytes = None if logo_bytes: ratio = min(logo_max_w / logo.width, logo_max_h / logo.height) lw = int(logo.width * ratio) lh = int(logo.height * ratio) logo = logo.resize((lw, lh), Image.LANCZOS) logo_x, logo_y = logo_position(lw, lh) # Drop shadow — blur only a tight crop around the logo, not the whole frame, # to avoid a visible dark haze spreading across the backdrop. blur_r = 5 pad = blur_r * 3 _, _, _, a = logo.split() black = Image.new("RGB", (lw, lh), (0, 0, 0)) shadow_logo = Image.merge("RGBA", (*black.split()[:3], a)) shadow_crop = Image.new("RGBA", (lw + pad * 2, lh + pad * 2), (0, 0, 0, 0)) shadow_crop.paste(shadow_logo, (pad + 4, pad + 4), shadow_logo) shadow_crop = shadow_crop.filter(ImageFilter.GaussianBlur(radius=blur_r)) bg = bg.convert("RGBA") bg.alpha_composite(shadow_crop, (logo_x - pad, logo_y - pad)) bg = bg.convert("RGB") bg.paste(logo, (logo_x, logo_y), logo) logo_box = (logo_x, logo_y, lw, lh) else: # --- Text fallback --- draw = ImageDraw.Draw(bg) title_font = get_font(56, bold=True) line_h = 66 lines = wrap_text(title.upper(), title_font, logo_max_w) total_h = len(lines) * line_h tx, ty = logo_position(logo_max_w, total_h) for line in lines: draw.text((tx + 3, ty + 3), line, font=title_font, fill=(0, 0, 0)) draw.text((tx, ty), line, font=title_font, fill=txt_color) ty += line_h logo_box = (tx, logo_position(logo_max_w, total_h)[1], logo_max_w, total_h) # --- Studio logo overlay --- if studio and studio != "none": slogo = make_studio_logo(studio, max_height=46) if slogo: s_pad = 22 sw, sh = slogo.size top_y = s_pad if not banner else banner["y"] + banner["h"] + 10 positions = { "top-left": (s_pad, top_y), "top-right": (width - sw - s_pad, top_y), "bottom-left": (s_pad, height - sh - s_pad), "bottom-right": (width - sw - s_pad, height - sh - s_pad), } sx, sy = positions.get(studio_position, positions["bottom-right"]) bg.paste(slogo, (sx, sy), slogo) studio_box = (sx, sy, sw, sh) if banner: draw = ImageDraw.Draw(bg) draw.rectangle( [(banner["x"], banner["y"]), (banner["x"] + banner["w"], banner["y"] + banner["h"])], fill=banner["fill"], ) draw.text((banner["text_x"], banner["text_y"]), banner["text"], font=banner["font"], fill=(255, 255, 255)) buf = io.BytesIO() bg.save(buf, format="PNG") buf.seek(0) return buf.getvalue() def generate_primary_cover( poster_bytes: bytes, title: str, bg_mode: str = "backdrop", custom_bg: str | None = None, text_color: str = "#FFFFFF", logo_align: str = "bottom-center", logo_scale: float = 1.3, darkness: float = 0.0, studio: str = "none", studio_position: str = "bottom-right", new_episodes_tag: bool = False, season_finale_tag: bool = False, logo_index: int | None = None, logo_bytes: bytes | None = None, backdrop_bytes: bytes | None = None, primary_zoom: float = 1.0, primary_pan_x: float = 0.0, primary_pan_y: float = -0.16, ) -> bytes: return generate_thumbnail( poster_bytes, title, bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, width=PRIMARY_WIDTH, height=PRIMARY_HEIGHT, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) # --- API Routes --- @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse(request, "index.html") @app.get("/airing", response_class=HTMLResponse) async def airing_page(request: Request): return templates.TemplateResponse(request, "airing.html") @app.get("/api/airing") async def get_airing_titles( page: int = Query(1, ge=1), limit: int = Query(12, ge=1, le=48), eligible_only: bool = False, refresh: bool = False, week_offset: int = Query(0, ge=-8, le=4), ): snapshot = await get_airing_snapshot(force_refresh=refresh, week_offset=week_offset) items = snapshot["items"] filtered = [item for item in items if item["eligible_new_season"]] if eligible_only else items start = (page - 1) * limit page_items = filtered[start:start + limit] total = len(filtered) return { "items": page_items, "page": page, "limit": limit, "total": total, "has_more": start + limit < total, "generated_at": snapshot["generated_at"], "week_start": snapshot["week_start"], "week_end": snapshot["week_end"], "week_offset": snapshot["week_offset"], "new_season_min_days": snapshot["new_season_min_days"], "new_season_max_days": snapshot["new_season_max_days"], } @app.post("/api/airing/apply-new-season") async def apply_new_season_banner(request: Request): body = await request.json() item_id = body["item_id"] generate_primary = bool(body.get("generate_primary", True)) week_offset = int(body.get("week_offset", 0)) snapshot = await get_airing_snapshot(force_refresh=bool(body.get("refresh", False)), week_offset=week_offset) item = next((entry for entry in snapshot["items"] if entry["id"] == item_id), None) if item is None: raise HTTPException(status_code=404, detail="Series was not found in the current airing snapshot.") can_apply_new_season = bool(item["eligible_new_season"]) or ( (item.get("status") or "").strip().lower() == "continuing" and (item.get("selected_week_air_at") or item.get("next_air_at")) ) if not can_apply_new_season: raise HTTPException( status_code=400, detail=( f"Series is outside the New Season window. " f"It must be between {snapshot['new_season_min_days']} and {snapshot['new_season_max_days']} days " f"from the season premiere." ), ) title = body.get("title") or item["name"] cache_key = build_cache_key( item_id=item_id, bg_mode="backdrop", backdrop_index=0, custom_bg=None, text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, ) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) if not thumb_cache_path.exists() or (generate_primary and not primary_cache_path.exists()): poster_bytes = await emby_get_image(item_id, "Primary") logo_bytes = await emby_get_image_optional(item_id, "Logo", 0) backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", 0) thumb_bytes = generate_thumbnail( poster_bytes, title, bg_mode="backdrop", text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, ) thumb_cache_path.write_bytes(thumb_bytes) if generate_primary: primary_bytes = generate_primary_cover( poster_bytes, title, bg_mode="backdrop", text_color="#FFFFFF", logo_align="bottom-center", logo_scale=1.3, darkness=0.0, studio="none", studio_position="bottom-right", new_episodes_tag=True, season_finale_tag=False, logo_index=0, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, ) primary_cache_path.write_bytes(primary_bytes) thumb_status = await emby_upload_image(item_id, thumb_cache_path.read_bytes(), "Thumb") primary_status = None primary_error = None if generate_primary: try: primary_status = await emby_upload_image(item_id, primary_cache_path.read_bytes(), "Primary") except Exception as exc: primary_error = str(exc) return { "status": "applied", "thumb_code": thumb_status, "primary_code": primary_status, "primary_attempted": generate_primary, "primary_error": primary_error, } @app.get("/api/search") async def search_items( q: str = Query(..., min_length=1), type: str = "Movie,Series", start: int = Query(0, ge=0), limit: int = Query(12, ge=1, le=24), ): data = await emby_get("/Items", { "SearchTerm": q, "IncludeItemTypes": type, "Recursive": "true", "Fields": "ProductionYear", "StartIndex": str(start), "Limit": str(limit), "ImageTypeLimit": "1", "EnableImageTypes": "Primary,Logo,Backdrop", }) items = [] for item in data.get("Items", []): image_tags = item.get("ImageTags", {}) backdrop_count = item.get("BackdropImageTags", []) items.append({ "id": item["Id"], "name": item.get("Name", ""), "year": item.get("ProductionYear", ""), "type": item.get("Type", ""), "has_logo": "Logo" in image_tags, "backdrop_count": len(backdrop_count), "poster_url": f"/api/poster/{item['Id']}?w=72&h=108&q=72", }) total = int(data.get("TotalRecordCount", start + len(items))) return { "items": items, "start": start, "limit": limit, "total": total, "has_more": start + len(items) < total, } @app.get("/api/images/{item_id}") async def get_image_info(item_id: str): """Returns accurate logo and backdrop counts by querying the Images endpoint directly.""" images = await emby_get(f"/Items/{item_id}/Images") logos = [] backdrops = [] primaries = [] for img in images: image_type = img.get("ImageType") entry = { "type": image_type, "index": int(img.get("ImageIndex", 0) or 0), "width": img.get("Width"), "height": img.get("Height"), } if image_type == "Logo": entry["url"] = f"/api/item-image/{item_id}?type=Logo&index={entry['index']}&w=220&h=96&q=90" logos.append(entry) elif image_type == "Backdrop": backdrops.append(entry) elif image_type == "Primary": entry["url"] = f"/api/item-image/{item_id}?type=Primary&index={entry['index']}&w=120&h=180&q=80" primaries.append(entry) logos.sort(key=lambda img: img["index"]) backdrops.sort(key=lambda img: img["index"]) primaries.sort(key=lambda img: img["index"]) return { "backdrop_count": len(backdrops), "has_logo": bool(logos), "logo_count": len(logos), "primary_count": len(primaries), "logos": logos, "backdrops": backdrops, "primaries": primaries, } @app.get("/api/item-image/{item_id}") async def get_item_image( item_id: str, type: str = Query("Primary"), index: int | None = Query(None, ge=0), w: int = Query(120, ge=24, le=1200), h: int = Query(180, ge=24, le=1200), q: int = Query(80, ge=30, le=100), ): img_bytes, content_type = await emby_get_image_with_type( item_id, type, index=index, max_width=w, max_height=h, quality=q, ) return Response( content=img_bytes, media_type=content_type, headers={"Cache-Control": "public, max-age=86400"}, ) @app.get("/api/poster/{item_id}") async def get_poster( item_id: str, w: int = Query(72, ge=24, le=400), h: int = Query(108, ge=24, le=600), q: int = Query(72, ge=30, le=100), ): img_bytes = await emby_get_image( item_id, "Primary", max_width=w, max_height=h, quality=q, ) return Response( content=img_bytes, media_type="image/jpeg", headers={"Cache-Control": "public, max-age=86400"}, ) @app.get("/api/cache/{cache_key}/primary") async def get_cached_primary_preview(cache_key: str): if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()): raise HTTPException(status_code=400, detail="Invalid cache key.") cache_path = get_primary_cache_path(cache_key) if not cache_path.exists(): raise HTTPException(status_code=404, detail="Primary preview not found.") return Response( content=cache_path.read_bytes(), media_type="image/png", headers={"Cache-Control": "no-store"}, ) @app.get("/api/artwork/providers") async def list_artwork_search_providers(): return { "providers": [ {"key": provider.key, "label": provider.label} for provider in IMAGE_SEARCH_PROVIDERS.values() ], "targets": [ { "key": key, "label": value["label"], "width": value["width"], "height": value["height"], "emby_type": value["emby_type"], } for key, value in EDITOR_TARGETS.items() ], } @app.get("/api/artwork/search") async def search_external_artwork( q: str = Query(..., min_length=2), provider: str = Query("wikimedia"), limit: int = Query(12, ge=1, le=24), ): search_provider = IMAGE_SEARCH_PROVIDERS.get(provider) if search_provider is None: raise HTTPException(status_code=400, detail="Unknown image search provider.") try: results = await search_provider.search(q, limit=limit) except httpx.HTTPError as exc: raise HTTPException(status_code=502, detail=f"Image search failed: {exc}") from exc return { "provider": provider, "items": [result.__dict__ for result in results], } @app.post("/api/artwork/import") async def import_remote_artwork(request: Request): body = await request.json() image_url = body.get("image_url", "") if not image_url: raise HTTPException(status_code=400, detail="Missing image URL.") parsed = urllib.parse.urlparse(image_url) if parsed.scheme not in {"http", "https"}: raise HTTPException(status_code=400, detail="Only http and https image URLs are supported.") client = get_http_client() try: r = await client.get(image_url, follow_redirects=True) r.raise_for_status() except httpx.HTTPError as exc: raise HTTPException(status_code=502, detail=f"Could not import remote image: {exc}") from exc content_type = r.headers.get("content-type", "").split(";")[0].lower() if content_type and not content_type.startswith("image/"): raise HTTPException(status_code=400, detail="Remote URL did not return an image.") image_bytes = r.content if len(image_bytes) > REMOTE_IMAGE_MAX_BYTES: raise HTTPException(status_code=400, detail="Remote image is too large.") width, height, fmt = validate_image_bytes(image_bytes) import_id = hashlib.sha256(image_bytes).hexdigest() import_path = IMPORT_CACHE_DIR / f"{import_id}.img" if not import_path.exists(): import_path.write_bytes(image_bytes) return { "id": import_id, "width": width, "height": height, "format": fmt, "preview_url": f"/api/artwork/import/{import_id}", } @app.get("/api/artwork/import/{import_id}") async def get_imported_artwork(import_id: str): image_bytes = get_import_cache_path(import_id).read_bytes() return Response( content=image_bytes, media_type="image/jpeg", headers={"Cache-Control": "public, max-age=86400"}, ) @app.post("/api/artwork/export") async def export_artwork(request: Request): body = await request.json() source_bytes = await resolve_editor_source_bytes(body) cache_key = build_editor_cache_key(body) cache_path = CACHE_DIR / f"{cache_key}.editor.jpg" if not cache_path.exists(): cache_path.write_bytes(render_artwork_editor_image( source_bytes, target_mode=body.get("target_mode", "poster"), fit_mode=body.get("fit_mode", "cover"), fill_mode=body.get("fill_mode", "blur"), fill_color=body.get("fill_color", "#101318"), zoom=float(body.get("zoom", 1.0)), pan_x=float(body.get("pan_x", 0.0)), pan_y=float(body.get("pan_y", 0.0)), quality=int(body.get("quality", 95)), )) target = get_editor_target(body.get("target_mode", "poster")) return Response( content=cache_path.read_bytes(), media_type="image/jpeg", headers={ "Cache-Control": "no-store", "X-Editor-Cache-Key": cache_key, "X-Editor-Width": str(target["width"]), "X-Editor-Height": str(target["height"]), "X-Editor-Emby-Type": target["emby_type"], }, ) @app.get("/api/artwork/export/{cache_key}") async def get_exported_artwork(cache_key: str): if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()): raise HTTPException(status_code=400, detail="Invalid export cache key.") cache_path = CACHE_DIR / f"{cache_key}.editor.jpg" if not cache_path.exists(): raise HTTPException(status_code=404, detail="Export was not found in cache.") return Response( content=cache_path.read_bytes(), media_type="image/jpeg", headers={"Cache-Control": "no-store"}, ) @app.post("/api/artwork/apply") async def apply_artwork_export(request: Request): body = await request.json() item_id = body.get("item_id") if not item_id: raise HTTPException(status_code=400, detail="Missing item id.") target = get_editor_target(body.get("target_mode", "poster")) cache_key = body.get("cache_key") or build_editor_cache_key(body) cache_path = CACHE_DIR / f"{cache_key}.editor.jpg" if cache_path.exists(): image_bytes = cache_path.read_bytes() else: source_bytes = await resolve_editor_source_bytes(body) image_bytes = render_artwork_editor_image( source_bytes, target_mode=body.get("target_mode", "poster"), fit_mode=body.get("fit_mode", "cover"), fill_mode=body.get("fill_mode", "blur"), fill_color=body.get("fill_color", "#101318"), zoom=float(body.get("zoom", 1.0)), pan_x=float(body.get("pan_x", 0.0)), pan_y=float(body.get("pan_y", 0.0)), quality=int(body.get("quality", 95)), ) cache_path.write_bytes(image_bytes) status = await emby_upload_image(item_id, image_bytes, target["emby_type"]) return { "status": "applied", "emby_type": target["emby_type"], "code": status, "cache_key": cache_key, } @app.post("/api/generate") async def generate(request: Request): body = await request.json() item_id = body["item_id"] title = body["title"] bg_mode = body.get("bg_mode", "backdrop") custom_bg = body.get("custom_bg", None) text_color = body.get("text_color", "#FFFFFF") logo_align = body.get("logo_align", "bottom-center") logo_scale = float(body.get("logo_scale", 1.3)) darkness = float(body.get("darkness", 0.0)) studio = body.get("studio", "none") studio_position = body.get("studio_position", "bottom-right") new_episodes_tag = bool(body.get("new_episodes_tag", False)) season_finale_tag = bool(body.get("season_finale_tag", False)) generate_primary = bool(body.get("generate_primary", False)) logo_index_raw = body.get("logo_index", None) logo_index = int(logo_index_raw) if logo_index_raw is not None else None backdrop_index = int(body.get("backdrop_index", 0)) primary_zoom = float(body.get("primary_zoom", 1.0)) primary_pan_x = float(body.get("primary_pan_x", 0.0)) primary_pan_y = float(body.get("primary_pan_y", -0.16)) poster_bytes = await emby_get_image(item_id, "Primary") logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index) backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index) thumb_bytes = generate_thumbnail( poster_bytes, title, bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) cache_key = build_cache_key( item_id=item_id, bg_mode=bg_mode, backdrop_index=backdrop_index, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) thumb_cache_path = get_thumb_cache_path(cache_key) thumb_cache_path.write_bytes(thumb_bytes) primary_generated = False if generate_primary: try: primary_bytes = generate_primary_cover( poster_bytes, title, bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) get_primary_cache_path(cache_key).write_bytes(primary_bytes) primary_generated = True except Exception: primary_generated = False return StreamingResponse( io.BytesIO(thumb_bytes), media_type="image/png", headers={ "X-Primary-Generated": "1" if primary_generated else "0", "X-Cache-Key": cache_key, }, ) @app.post("/api/apply") async def apply_to_emby(request: Request): body = await request.json() item_id = body["item_id"] title = body.get("title", "") bg_mode = body.get("bg_mode", "backdrop") backdrop_index = int(body.get("backdrop_index", 0)) custom_bg = body.get("custom_bg", None) text_color = body.get("text_color", "#FFFFFF") logo_align = body.get("logo_align", "bottom-center") logo_scale = float(body.get("logo_scale", 1.3)) darkness = float(body.get("darkness", 0.0)) studio = body.get("studio", "none") studio_position = body.get("studio_position", "bottom-right") new_episodes_tag = bool(body.get("new_episodes_tag", False)) season_finale_tag = bool(body.get("season_finale_tag", False)) generate_primary = bool(body.get("generate_primary", False)) logo_index_raw = body.get("logo_index", None) logo_index = int(logo_index_raw) if logo_index_raw is not None else None primary_zoom = float(body.get("primary_zoom", 1.0)) primary_pan_x = float(body.get("primary_pan_x", 0.0)) primary_pan_y = float(body.get("primary_pan_y", -0.16)) cache_key = build_cache_key( item_id=item_id, bg_mode=bg_mode, backdrop_index=backdrop_index, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) thumb_cache_path = get_thumb_cache_path(cache_key) primary_cache_path = get_primary_cache_path(cache_key) if not thumb_cache_path.exists(): poster_bytes = await emby_get_image(item_id, "Primary") logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index) backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index) thumb_bytes = generate_thumbnail( poster_bytes, title or "", bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) thumb_cache_path.write_bytes(thumb_bytes) if generate_primary and not primary_cache_path.exists(): try: primary_cache_path.write_bytes( generate_primary_cover( poster_bytes, title or "", bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) ) except Exception: pass thumb_bytes = thumb_cache_path.read_bytes() thumb_status = await emby_upload_image(item_id, thumb_bytes, "Thumb") primary_status = None primary_error = None if generate_primary: try: if primary_cache_path.exists(): primary_bytes = primary_cache_path.read_bytes() else: poster_bytes = await emby_get_image(item_id, "Primary") logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index) backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index) primary_bytes = generate_primary_cover( poster_bytes, title or "", bg_mode=bg_mode, custom_bg=custom_bg, text_color=text_color, logo_align=logo_align, logo_scale=logo_scale, darkness=darkness, studio=studio, studio_position=studio_position, new_episodes_tag=new_episodes_tag, season_finale_tag=season_finale_tag, logo_index=logo_index, logo_bytes=logo_bytes, backdrop_bytes=backdrop_bytes, primary_zoom=primary_zoom, primary_pan_x=primary_pan_x, primary_pan_y=primary_pan_y, ) primary_cache_path.write_bytes(primary_bytes) primary_status = await emby_upload_image(item_id, primary_bytes, "Primary") except Exception as exc: primary_error = str(exc) return { "status": "applied", "thumb_code": thumb_status, "primary_code": primary_status, "primary_attempted": generate_primary, "primary_error": primary_error, } @app.get("/api/config") async def get_config(): return { "emby_url": EMBY_URL, "connected": bool(EMBY_API_KEY), } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8500)