1582 lines
58 KiB
Python
1582 lines
58 KiB
Python
import asyncio
|
|
import io
|
|
import os
|
|
import hashlib
|
|
import base64
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from fastapi import FastAPI, HTTPException, Query, Request
|
|
from fastapi.responses import HTMLResponse, Response, StreamingResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageColor, ImageOps, UnidentifiedImageError
|
|
|
|
app = FastAPI(title="Emby Thumbnail Generator")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
|
EMBY_URL = os.environ.get("EMBY_URL", "http://10.0.0.2:8096")
|
|
EMBY_API_KEY = os.environ.get("EMBY_API_KEY", "b9af54b630f6448289ab96422add567a")
|
|
CACHE_DIR = Path("cache")
|
|
CACHE_DIR.mkdir(exist_ok=True)
|
|
RENDER_VERSION = "series-banner-v10"
|
|
|
|
THUMB_WIDTH = 800
|
|
THUMB_HEIGHT = 450
|
|
PRIMARY_WIDTH = 1000
|
|
PRIMARY_HEIGHT = 1500
|
|
PRIMARY_MIN_ZOOM = 0.6
|
|
PRIMARY_MAX_ZOOM = 2.6
|
|
HTTP_TIMEOUT = 30.0
|
|
http_client: httpx.AsyncClient | None = None
|
|
AIRING_LOOKUP_CACHE_TTL = 900
|
|
NEW_SEASON_MIN_AGE_DAYS = 7
|
|
NEW_SEASON_MAX_AGE_DAYS = 21
|
|
SEASON_INFERENCE_LOOKBACK_DAYS = 180
|
|
airing_lookup_cache: dict[int, dict] = {}
|
|
airing_lookup_lock = asyncio.Lock()
|
|
|
|
# ── Studio logo file map ─────────────────────────────────────────────────────
|
|
STUDIOS_DIR = Path("static/studios")
|
|
# Maps studio key → filename (relative to STUDIOS_DIR)
|
|
STUDIO_FILES: dict[str, str] = {
|
|
"hulu": "hulu.png",
|
|
"hbo": "hbo.png",
|
|
"disney": "disney.png",
|
|
}
|
|
|
|
|
|
def make_studio_logo(studio: str, max_height: int = 52) -> Image.Image | None:
|
|
"""Load a studio logo from disk and scale it to max_height, preserving aspect ratio."""
|
|
filename = STUDIO_FILES.get(studio)
|
|
if not filename:
|
|
return None
|
|
path = STUDIOS_DIR / filename
|
|
if not path.exists():
|
|
return None
|
|
img = Image.open(path).convert("RGBA")
|
|
if img.height > max_height:
|
|
scale = max_height / img.height
|
|
img = img.resize((int(img.width * scale), max_height), Image.LANCZOS)
|
|
return img
|
|
|
|
|
|
def load_image_from_bytes(image_bytes: bytes, mode: str = "RGB") -> Image.Image:
|
|
"""Fully decode image bytes into an in-memory PIL image.
|
|
|
|
Some Emby-delivered assets appear to trigger blocky/partial artifacts when
|
|
we process the lazy decoder output directly. Forcing a full load and copy
|
|
gives us a stable image object before resize/filter operations.
|
|
"""
|
|
with Image.open(io.BytesIO(image_bytes)) as img:
|
|
img = ImageOps.exif_transpose(img)
|
|
img.load()
|
|
return img.convert(mode).copy()
|
|
|
|
|
|
# --- Emby API helpers ---
|
|
|
|
def get_http_client() -> httpx.AsyncClient:
|
|
global http_client
|
|
if http_client is None:
|
|
http_client = httpx.AsyncClient(timeout=HTTP_TIMEOUT)
|
|
return http_client
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
get_http_client()
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
global http_client
|
|
if http_client is not None:
|
|
await http_client.aclose()
|
|
http_client = None
|
|
|
|
async def emby_get(path: str, params: dict = None) -> dict:
|
|
client = get_http_client()
|
|
url = f"{EMBY_URL}{path}"
|
|
p = params or {}
|
|
p["api_key"] = EMBY_API_KEY
|
|
r = await client.get(url, params=p)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
async def emby_get_image(
|
|
item_id: str,
|
|
image_type: str = "Primary",
|
|
index: int | None = None,
|
|
*,
|
|
max_width: int | None = None,
|
|
max_height: int | None = None,
|
|
quality: int | None = None,
|
|
) -> bytes:
|
|
client = get_http_client()
|
|
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
|
|
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
|
|
if index is not None:
|
|
params["Index"] = index
|
|
if max_width is not None:
|
|
params["maxWidth"] = max_width
|
|
if max_height is not None:
|
|
params["maxHeight"] = max_height
|
|
if quality is not None:
|
|
params["quality"] = quality
|
|
r = await client.get(url, params=params)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
|
|
async def emby_get_image_with_type(
|
|
item_id: str,
|
|
image_type: str = "Primary",
|
|
index: int | None = None,
|
|
*,
|
|
max_width: int | None = None,
|
|
max_height: int | None = None,
|
|
quality: int | None = None,
|
|
) -> tuple[bytes, str]:
|
|
client = get_http_client()
|
|
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
|
|
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
|
|
if index is not None:
|
|
params["Index"] = index
|
|
if max_width is not None:
|
|
params["maxWidth"] = max_width
|
|
if max_height is not None:
|
|
params["maxHeight"] = max_height
|
|
if quality is not None:
|
|
params["quality"] = quality
|
|
r = await client.get(url, params=params)
|
|
r.raise_for_status()
|
|
content_type = (r.headers.get("content-type", "image/jpeg") or "image/jpeg").split(";")[0]
|
|
return r.content, content_type
|
|
|
|
|
|
async def emby_get_image_optional(item_id: str, image_type: str, index: int | None = None) -> bytes | None:
|
|
"""Returns image bytes or None if the image type doesn't exist for this item."""
|
|
client = get_http_client()
|
|
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
|
|
params: dict[str, str | int] = {"api_key": EMBY_API_KEY}
|
|
if index is not None:
|
|
params["Index"] = index
|
|
r = await client.get(url, params=params)
|
|
if r.status_code in (404, 400):
|
|
return None
|
|
r.raise_for_status()
|
|
content_type = r.headers.get("content-type", "").lower()
|
|
if content_type and not content_type.startswith("image/"):
|
|
return None
|
|
return r.content
|
|
|
|
|
|
async def emby_upload_image(item_id: str, image_bytes: bytes, image_type: str = "Thumb"):
|
|
upload_buf = io.BytesIO()
|
|
upload_image = load_image_from_bytes(image_bytes, mode="RGB")
|
|
upload_image.save(upload_buf, format="JPEG", quality=92, optimize=True)
|
|
upload_b64 = base64.b64encode(upload_buf.getvalue())
|
|
|
|
client = get_http_client()
|
|
url = f"{EMBY_URL}/Items/{item_id}/Images/{image_type}"
|
|
r = await client.post(
|
|
url,
|
|
params={"api_key": EMBY_API_KEY},
|
|
content=upload_b64,
|
|
headers={
|
|
"Content-Type": "image/jpeg",
|
|
"X-Emby-Token": EMBY_API_KEY,
|
|
},
|
|
)
|
|
try:
|
|
r.raise_for_status()
|
|
except httpx.HTTPStatusError as exc:
|
|
detail = r.text.strip() or str(exc)
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail=f"Emby upload failed ({r.status_code}): {detail}",
|
|
) from exc
|
|
return r.status_code
|
|
|
|
|
|
def parse_emby_datetime(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
try:
|
|
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
parsed = parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|
|
|
|
|
|
def to_emby_iso(value: datetime) -> str:
|
|
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
async def emby_get_all(path: str, params: dict | None = None, *, page_size: int = 200) -> list[dict]:
|
|
items: list[dict] = []
|
|
start_index = 0
|
|
base_params = dict(params or {})
|
|
while True:
|
|
page = await emby_get(path, {
|
|
**base_params,
|
|
"StartIndex": str(start_index),
|
|
"Limit": str(page_size),
|
|
})
|
|
batch = page.get("Items", [])
|
|
items.extend(batch)
|
|
total = int(page.get("TotalRecordCount", start_index + len(batch)))
|
|
if not batch or start_index + len(batch) >= total:
|
|
break
|
|
start_index += len(batch)
|
|
return items
|
|
|
|
|
|
def get_week_bounds(now: datetime, week_offset: int) -> tuple[datetime, datetime]:
|
|
week_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
week_start = week_start + timedelta(weeks=week_offset)
|
|
return week_start, week_start + timedelta(days=7)
|
|
|
|
|
|
async def build_airing_snapshot(week_offset: int = 0) -> dict:
|
|
now = datetime.now(timezone.utc)
|
|
week_start, week_end = get_week_bounds(now, week_offset)
|
|
week_shift_days = max(0, -week_offset) * 7
|
|
eligible_min_days = NEW_SEASON_MIN_AGE_DAYS + week_shift_days
|
|
eligible_max_days = NEW_SEASON_MAX_AGE_DAYS + week_shift_days
|
|
recent_window_start = now - timedelta(days=eligible_max_days)
|
|
recent_window_end = now - timedelta(days=eligible_min_days)
|
|
inference_window_start = now - timedelta(days=max(SEASON_INFERENCE_LOOKBACK_DAYS, eligible_max_days + 35))
|
|
|
|
series_task = emby_get_all("/Items", {
|
|
"IncludeItemTypes": "Series",
|
|
"Recursive": "true",
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
"ImageTypeLimit": "1",
|
|
"EnableImageTypes": "Primary",
|
|
})
|
|
recent_episodes_task = emby_get_all("/Items", {
|
|
"IncludeItemTypes": "Episode",
|
|
"Recursive": "true",
|
|
"SortBy": "PremiereDate",
|
|
"SortOrder": "Descending",
|
|
"MinPremiereDate": to_emby_iso(recent_window_start),
|
|
"MaxPremiereDate": to_emby_iso(recent_window_end),
|
|
})
|
|
recent_season_activity_task = emby_get_all("/Items", {
|
|
"IncludeItemTypes": "Episode",
|
|
"Recursive": "true",
|
|
"SortBy": "PremiereDate",
|
|
"SortOrder": "Ascending",
|
|
"MinPremiereDate": to_emby_iso(inference_window_start),
|
|
"MaxPremiereDate": to_emby_iso(now + timedelta(days=1)),
|
|
})
|
|
selected_week_episodes_task = emby_get_all("/Items", {
|
|
"IncludeItemTypes": "Episode",
|
|
"Recursive": "true",
|
|
"SortBy": "PremiereDate",
|
|
"SortOrder": "Ascending",
|
|
"MinPremiereDate": to_emby_iso(week_start),
|
|
"MaxPremiereDate": to_emby_iso(week_end),
|
|
})
|
|
upcoming_episodes_task = emby_get_all("/Shows/Upcoming", {
|
|
"SortBy": "PremiereDate",
|
|
"SortOrder": "Ascending",
|
|
"MinPremiereDate": to_emby_iso(now - timedelta(hours=12)),
|
|
})
|
|
|
|
series_items, recent_episodes, recent_season_activity, selected_week_episodes, upcoming_episodes = await asyncio.gather(
|
|
series_task,
|
|
recent_episodes_task,
|
|
recent_season_activity_task,
|
|
selected_week_episodes_task,
|
|
upcoming_episodes_task,
|
|
return_exceptions=True,
|
|
)
|
|
|
|
if isinstance(series_items, Exception):
|
|
raise series_items
|
|
if isinstance(recent_episodes, Exception):
|
|
recent_episodes = []
|
|
if isinstance(recent_season_activity, Exception):
|
|
recent_season_activity = []
|
|
if isinstance(selected_week_episodes, Exception):
|
|
selected_week_episodes = []
|
|
if isinstance(upcoming_episodes, Exception):
|
|
upcoming_episodes = []
|
|
|
|
recent_premieres_by_series: dict[str, dict] = {}
|
|
for episode in recent_episodes:
|
|
series_id = episode.get("SeriesId")
|
|
season_number = int(episode.get("ParentIndexNumber") or 0)
|
|
episode_number = int(episode.get("IndexNumber") or 0)
|
|
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
|
|
if not series_id or season_number < 2 or episode_number != 1 or premiere_at is None:
|
|
continue
|
|
existing = recent_premieres_by_series.get(series_id)
|
|
if existing is None or premiere_at > existing["premiere_at"]:
|
|
recent_premieres_by_series[series_id] = {
|
|
"premiere_at": premiere_at,
|
|
"season_number": season_number,
|
|
"episode_id": episode.get("Id"),
|
|
"episode_name": episode.get("Name") or "",
|
|
}
|
|
|
|
earliest_seen_by_series_season: dict[tuple[str, int], dict] = {}
|
|
for episode in recent_season_activity:
|
|
series_id = episode.get("SeriesId")
|
|
season_number = int(episode.get("ParentIndexNumber") or 0)
|
|
episode_number = int(episode.get("IndexNumber") or 0)
|
|
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
|
|
if not series_id or season_number < 2 or premiere_at is None:
|
|
continue
|
|
key = (series_id, season_number)
|
|
existing = earliest_seen_by_series_season.get(key)
|
|
if existing is None or premiere_at < existing["premiere_at"]:
|
|
earliest_seen_by_series_season[key] = {
|
|
"premiere_at": premiere_at,
|
|
"season_number": season_number,
|
|
"episode_number": episode_number,
|
|
"episode_name": episode.get("Name") or "",
|
|
}
|
|
|
|
selected_week_by_series: dict[str, dict] = {}
|
|
for episode in selected_week_episodes:
|
|
series_id = episode.get("SeriesId")
|
|
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
|
|
if not series_id or premiere_at is None:
|
|
continue
|
|
existing = selected_week_by_series.get(series_id)
|
|
if existing is None or premiere_at < existing["premiere_at"]:
|
|
season_number = int(episode.get("ParentIndexNumber") or 0)
|
|
episode_number = int(episode.get("IndexNumber") or 0)
|
|
selected_week_by_series[series_id] = {
|
|
"premiere_at": premiere_at,
|
|
"season_number": season_number,
|
|
"episode_number": episode_number,
|
|
"episode_name": episode.get("Name") or "",
|
|
}
|
|
|
|
upcoming_by_series: dict[str, dict] = {}
|
|
for episode in upcoming_episodes:
|
|
series_id = episode.get("SeriesId")
|
|
premiere_at = parse_emby_datetime(episode.get("PremiereDate"))
|
|
if not series_id or premiere_at is None:
|
|
continue
|
|
existing = upcoming_by_series.get(series_id)
|
|
if existing is None or premiere_at < existing["premiere_at"]:
|
|
season_number = int(episode.get("ParentIndexNumber") or 0)
|
|
episode_number = int(episode.get("IndexNumber") or 0)
|
|
upcoming_by_series[series_id] = {
|
|
"premiere_at": premiere_at,
|
|
"season_number": season_number,
|
|
"episode_number": episode_number,
|
|
"episode_name": episode.get("Name") or "",
|
|
}
|
|
|
|
results: list[dict] = []
|
|
for series in series_items:
|
|
series_id = series.get("Id")
|
|
if not series_id:
|
|
continue
|
|
recent_premiere = recent_premieres_by_series.get(series_id)
|
|
selected_week_episode = selected_week_by_series.get(series_id)
|
|
upcoming = upcoming_by_series.get(series_id)
|
|
status = (series.get("Status") or "").strip()
|
|
is_current_airing = (
|
|
status.lower() == "continuing"
|
|
or selected_week_episode is not None
|
|
or upcoming is not None
|
|
or recent_premiere is not None
|
|
)
|
|
if not is_current_airing:
|
|
continue
|
|
|
|
air_days = series.get("AirDays") or []
|
|
if isinstance(air_days, str):
|
|
air_days = [part.strip() for part in air_days.split(",") if part.strip()]
|
|
|
|
active_season_number = max(
|
|
recent_premiere["season_number"] if recent_premiere else 0,
|
|
selected_week_episode["season_number"] if selected_week_episode else 0,
|
|
upcoming["season_number"] if upcoming else 0,
|
|
)
|
|
inferred_recent_premiere = None
|
|
if recent_premiere is None and active_season_number >= 2 and (selected_week_episode is not None or upcoming is not None):
|
|
inferred_recent_premiere = earliest_seen_by_series_season.get((series_id, active_season_number))
|
|
|
|
season_start = recent_premiere or inferred_recent_premiere
|
|
days_since_premiere = None
|
|
if season_start is not None:
|
|
days_since_premiere = max(0, int((now - season_start["premiere_at"]).total_seconds() // 86400))
|
|
has_current_airing_signal = status.lower() == "continuing" and (
|
|
selected_week_episode is not None or upcoming is not None
|
|
)
|
|
has_recent_season_start_signal = (
|
|
status.lower() == "continuing"
|
|
and (selected_week_episode is not None or upcoming is not None)
|
|
and days_since_premiere is not None
|
|
and eligible_min_days <= days_since_premiere <= eligible_max_days
|
|
)
|
|
eligible_new_season = has_current_airing_signal or has_recent_season_start_signal
|
|
season_premiere_inferred = recent_premiere is None and inferred_recent_premiere is not None
|
|
eligibility_reason = (
|
|
"current_airing"
|
|
if has_current_airing_signal
|
|
else ("recent_season_start" if has_recent_season_start_signal else None)
|
|
)
|
|
|
|
next_air_at = upcoming["premiere_at"] if upcoming else None
|
|
results.append({
|
|
"id": series_id,
|
|
"name": series.get("Name", ""),
|
|
"year": series.get("ProductionYear"),
|
|
"status": status or ("Continuing" if upcoming else "Unknown"),
|
|
"air_days": air_days,
|
|
"poster_url": f"/api/poster/{series_id}?w=180&h=270&q=84",
|
|
"selected_week_air_at": to_emby_iso(selected_week_episode["premiere_at"]) if selected_week_episode else None,
|
|
"selected_week_episode_label": (
|
|
f"S{selected_week_episode['season_number']}E{selected_week_episode['episode_number']} · {selected_week_episode['episode_name']}".strip(" ·")
|
|
if selected_week_episode and selected_week_episode["season_number"] and selected_week_episode["episode_number"]
|
|
else (selected_week_episode["episode_name"] if selected_week_episode else None)
|
|
),
|
|
"next_air_at": to_emby_iso(next_air_at) if next_air_at else None,
|
|
"next_episode_label": (
|
|
f"S{upcoming['season_number']}E{upcoming['episode_number']} · {upcoming['episode_name']}".strip(" ·")
|
|
if upcoming and upcoming["season_number"] and upcoming["episode_number"]
|
|
else (upcoming["episode_name"] if upcoming else None)
|
|
),
|
|
"season_number": season_start["season_number"] if season_start else None,
|
|
"season_premiere_at": to_emby_iso(season_start["premiere_at"]) if season_start else None,
|
|
"season_premiere_episode": season_start["episode_name"] if season_start else None,
|
|
"days_since_season_premiere": days_since_premiere,
|
|
"season_premiere_inferred": season_premiere_inferred,
|
|
"eligibility_reason": eligibility_reason,
|
|
"eligible_new_season": eligible_new_season,
|
|
})
|
|
|
|
results.sort(key=lambda item: (
|
|
0 if item["eligible_new_season"] else 1,
|
|
item["selected_week_air_at"] is None and item["next_air_at"] is None,
|
|
item["selected_week_air_at"] or item["next_air_at"] or "",
|
|
item["name"].lower(),
|
|
))
|
|
return {
|
|
"items": results,
|
|
"generated_at": to_emby_iso(now),
|
|
"week_start": to_emby_iso(week_start),
|
|
"week_end": to_emby_iso(week_end),
|
|
"week_offset": week_offset,
|
|
"new_season_min_days": eligible_min_days,
|
|
"new_season_max_days": eligible_max_days,
|
|
}
|
|
|
|
|
|
async def get_airing_snapshot(force_refresh: bool = False, week_offset: int = 0) -> dict:
|
|
cached = airing_lookup_cache.get(week_offset)
|
|
if not force_refresh and cached and cached["expires_at"] > time.time():
|
|
return cached["data"]
|
|
|
|
async with airing_lookup_lock:
|
|
cached = airing_lookup_cache.get(week_offset)
|
|
if not force_refresh and cached and cached["expires_at"] > time.time():
|
|
return cached["data"]
|
|
data = await build_airing_snapshot(week_offset=week_offset)
|
|
airing_lookup_cache[week_offset] = {
|
|
"expires_at": time.time() + AIRING_LOOKUP_CACHE_TTL,
|
|
"data": data,
|
|
}
|
|
return data
|
|
|
|
|
|
# --- Image helpers ---
|
|
|
|
def cover_crop(img: Image.Image, width: int, height: int) -> Image.Image:
|
|
"""Scale and center-crop image to exactly width x height."""
|
|
return ImageOps.fit(
|
|
img,
|
|
(width, height),
|
|
method=Image.LANCZOS,
|
|
centering=(0.5, 0.5),
|
|
)
|
|
|
|
|
|
def cover_crop_positioned(
|
|
img: Image.Image,
|
|
width: int,
|
|
height: int,
|
|
*,
|
|
zoom: float = 1.0,
|
|
pan_x: float = 0.0,
|
|
pan_y: float = 0.0,
|
|
) -> Image.Image:
|
|
"""Cover-crop with user-controlled zoom and pan.
|
|
|
|
`pan_x` / `pan_y` are normalized in [-1, 1], where -1 means left/top and
|
|
+1 means right/bottom.
|
|
"""
|
|
pan_x = max(-1.0, min(1.0, float(pan_x)))
|
|
pan_y = max(-1.0, min(1.0, float(pan_y)))
|
|
zoom = max(PRIMARY_MIN_ZOOM, min(PRIMARY_MAX_ZOOM, float(zoom)))
|
|
|
|
if zoom < 1.0:
|
|
contain_scale = min(width / max(1, img.width), height / max(1, img.height))
|
|
cover_scale = max(width / max(1, img.width), height / max(1, img.height))
|
|
zoom_t = (zoom - PRIMARY_MIN_ZOOM) / max(0.0001, 1.0 - PRIMARY_MIN_ZOOM)
|
|
zoom_t = max(0.0, min(1.0, zoom_t))
|
|
scale = contain_scale + (cover_scale - contain_scale) * zoom_t
|
|
|
|
fitted_w = max(1, int(round(img.width * scale)))
|
|
fitted_h = max(1, int(round(img.height * scale)))
|
|
fitted = img.resize((fitted_w, fitted_h), Image.LANCZOS)
|
|
if fitted_w >= width:
|
|
paste_x = -int(round((fitted_w - width) * ((pan_x + 1.0) / 2.0)))
|
|
else:
|
|
paste_x = int(round((width - fitted_w) * ((pan_x + 1.0) / 2.0)))
|
|
if fitted_h >= height:
|
|
paste_y = -int(round((fitted_h - height) * ((pan_y + 1.0) / 2.0)))
|
|
else:
|
|
paste_y = int(round((height - fitted_h) * ((pan_y + 1.0) / 2.0)))
|
|
|
|
canvas = Image.new("RGB", (width, height))
|
|
left_gap = max(0, paste_x)
|
|
top_gap = max(0, paste_y)
|
|
right_gap = max(0, width - (paste_x + fitted_w))
|
|
bottom_gap = max(0, height - (paste_y + fitted_h))
|
|
strip = max(1, min(12, fitted_w, fitted_h))
|
|
|
|
if top_gap:
|
|
top_strip = fitted.crop((0, 0, fitted_w, strip)).resize((fitted_w, top_gap), Image.LANCZOS)
|
|
canvas.paste(top_strip, (paste_x, 0))
|
|
if bottom_gap:
|
|
bottom_strip = fitted.crop((0, fitted_h - strip, fitted_w, fitted_h)).resize((fitted_w, bottom_gap), Image.LANCZOS)
|
|
canvas.paste(bottom_strip, (paste_x, paste_y + fitted_h))
|
|
if left_gap:
|
|
left_strip = fitted.crop((0, 0, strip, fitted_h)).resize((left_gap, fitted_h), Image.LANCZOS)
|
|
canvas.paste(left_strip, (0, paste_y))
|
|
if right_gap:
|
|
right_strip = fitted.crop((fitted_w - strip, 0, fitted_w, fitted_h)).resize((right_gap, fitted_h), Image.LANCZOS)
|
|
canvas.paste(right_strip, (paste_x + fitted_w, paste_y))
|
|
if top_gap and left_gap:
|
|
canvas.paste(fitted.crop((0, 0, strip, strip)).resize((left_gap, top_gap), Image.LANCZOS), (0, 0))
|
|
if top_gap and right_gap:
|
|
canvas.paste(
|
|
fitted.crop((fitted_w - strip, 0, fitted_w, strip)).resize((right_gap, top_gap), Image.LANCZOS),
|
|
(paste_x + fitted_w, 0),
|
|
)
|
|
if bottom_gap and left_gap:
|
|
canvas.paste(
|
|
fitted.crop((0, fitted_h - strip, strip, fitted_h)).resize((left_gap, bottom_gap), Image.LANCZOS),
|
|
(0, paste_y + fitted_h),
|
|
)
|
|
if bottom_gap and right_gap:
|
|
canvas.paste(
|
|
fitted.crop((fitted_w - strip, fitted_h - strip, fitted_w, fitted_h)).resize((right_gap, bottom_gap), Image.LANCZOS),
|
|
(paste_x + fitted_w, paste_y + fitted_h),
|
|
)
|
|
|
|
canvas.paste(fitted, (paste_x, paste_y))
|
|
return canvas
|
|
|
|
target_ratio = width / max(1, height)
|
|
source_ratio = img.width / max(1, img.height)
|
|
|
|
if source_ratio > target_ratio:
|
|
crop_h = img.height
|
|
crop_w = int(round(crop_h * target_ratio))
|
|
else:
|
|
crop_w = img.width
|
|
crop_h = int(round(crop_w / target_ratio))
|
|
|
|
crop_w = max(1, min(img.width, int(round(crop_w / zoom))))
|
|
crop_h = max(1, min(img.height, int(round(crop_h / zoom))))
|
|
|
|
max_left = max(0, img.width - crop_w)
|
|
max_top = max(0, img.height - crop_h)
|
|
|
|
left = int(round((max_left / 2) * (pan_x + 1.0)))
|
|
top = int(round((max_top / 2) * (pan_y + 1.0)))
|
|
|
|
cropped = img.crop((left, top, left + crop_w, top + crop_h))
|
|
return cropped.resize((width, height), Image.LANCZOS)
|
|
|
|
|
|
def build_tall_backdrop_background(
|
|
img: Image.Image,
|
|
width: int,
|
|
height: int,
|
|
*,
|
|
dim_factor: float,
|
|
zoom: float = 1.0,
|
|
pan_x: float = 0.0,
|
|
pan_y: float = -0.16,
|
|
) -> Image.Image:
|
|
base = cover_crop_positioned(img, width, height, zoom=zoom, pan_x=pan_x, pan_y=pan_y)
|
|
return base.point(lambda p: int(p * dim_factor))
|
|
|
|
|
|
def get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont:
|
|
font_paths = [
|
|
# Windows
|
|
"C:/Windows/Fonts/arialbd.ttf" if bold else "C:/Windows/Fonts/arial.ttf",
|
|
"C:/Windows/Fonts/calibrib.ttf" if bold else "C:/Windows/Fonts/calibri.ttf",
|
|
"C:/Windows/Fonts/verdanab.ttf" if bold else "C:/Windows/Fonts/verdana.ttf",
|
|
# Linux
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
|
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
|
|
]
|
|
for fp in font_paths:
|
|
if os.path.exists(fp):
|
|
return ImageFont.truetype(fp, size)
|
|
return ImageFont.load_default(size)
|
|
|
|
|
|
def get_arial_bold_font(size: int) -> ImageFont.FreeTypeFont:
|
|
font_paths = [
|
|
"C:/Windows/Fonts/arialbd.ttf",
|
|
"/usr/share/fonts/truetype/msttcorefonts/Arial_Bold.ttf",
|
|
"/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf",
|
|
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
|
]
|
|
for fp in font_paths:
|
|
if os.path.exists(fp):
|
|
return ImageFont.truetype(fp, size)
|
|
return get_font(size, bold=True)
|
|
|
|
|
|
def get_badge_font(size: int) -> ImageFont.FreeTypeFont:
|
|
font_paths = [
|
|
"C:/Windows/Fonts/ariblk.ttf",
|
|
"C:/Windows/Fonts/arialbd.ttf",
|
|
"C:/Windows/Fonts/seguisb.ttf",
|
|
"/usr/share/fonts/truetype/liberation2/LiberationSans-Bold.ttf",
|
|
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
|
]
|
|
for fp in font_paths:
|
|
if os.path.exists(fp):
|
|
return ImageFont.truetype(fp, size)
|
|
return get_arial_bold_font(size)
|
|
|
|
|
|
def measure_tracked_text(text: str, font: ImageFont.FreeTypeFont, tracking: int) -> tuple[int, int]:
|
|
widths = []
|
|
top = 0
|
|
bottom = 0
|
|
for ch in text:
|
|
bbox = font.getbbox(ch)
|
|
widths.append(bbox[2] - bbox[0])
|
|
top = min(top, bbox[1])
|
|
bottom = max(bottom, bbox[3])
|
|
if not widths:
|
|
return 0, 0
|
|
width = sum(widths) + tracking * max(0, len(widths) - 1)
|
|
height = bottom - top
|
|
return width, height
|
|
|
|
|
|
def draw_tracked_text(
|
|
draw: ImageDraw.ImageDraw,
|
|
position: tuple[int, int],
|
|
text: str,
|
|
font: ImageFont.FreeTypeFont,
|
|
fill: tuple[int, int, int],
|
|
tracking: int,
|
|
) -> None:
|
|
x, y = position
|
|
for ch in text:
|
|
bbox = font.getbbox(ch)
|
|
draw.text((x - bbox[0], y - bbox[1]), ch, font=font, fill=fill)
|
|
x += (bbox[2] - bbox[0]) + tracking
|
|
|
|
|
|
def wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> list:
|
|
words = text.split()
|
|
lines, current = [], ""
|
|
for word in words:
|
|
test = f"{current} {word}".strip()
|
|
if font.getbbox(test)[2] <= max_width:
|
|
current = test
|
|
else:
|
|
if current:
|
|
lines.append(current)
|
|
current = word
|
|
if current:
|
|
lines.append(current)
|
|
return lines
|
|
|
|
|
|
def build_cache_key(
|
|
item_id: str,
|
|
bg_mode: str = "backdrop",
|
|
backdrop_index: int = 0,
|
|
custom_bg: str | None = None,
|
|
text_color: str = "#FFFFFF",
|
|
logo_align: str = "bottom-center",
|
|
logo_scale: float = 1.3,
|
|
darkness: float = 0.0,
|
|
studio: str = "none",
|
|
studio_position: str = "bottom-right",
|
|
new_episodes_tag: bool = False,
|
|
season_finale_tag: bool = False,
|
|
logo_index: int | None = None,
|
|
primary_zoom: float = 1.0,
|
|
primary_pan_x: float = 0.0,
|
|
primary_pan_y: float = -0.16,
|
|
) -> str:
|
|
payload = {
|
|
"render_version": RENDER_VERSION,
|
|
"item_id": item_id,
|
|
"bg_mode": bg_mode,
|
|
"backdrop_index": int(backdrop_index),
|
|
"custom_bg": custom_bg,
|
|
"text_color": text_color,
|
|
"logo_align": logo_align,
|
|
"logo_scale": round(float(logo_scale), 4),
|
|
"darkness": round(float(darkness), 4),
|
|
"studio": studio,
|
|
"studio_position": studio_position,
|
|
"new_episodes_tag": bool(new_episodes_tag),
|
|
"season_finale_tag": bool(season_finale_tag),
|
|
"logo_index": logo_index,
|
|
"primary_zoom": round(float(primary_zoom), 4),
|
|
"primary_pan_x": round(float(primary_pan_x), 4),
|
|
"primary_pan_y": round(float(primary_pan_y), 4),
|
|
}
|
|
return hashlib.md5(repr(payload).encode("utf-8")).hexdigest()
|
|
|
|
|
|
def get_thumb_cache_path(cache_key: str) -> Path:
|
|
return CACHE_DIR / f"{cache_key}.png"
|
|
|
|
|
|
def get_primary_cache_path(cache_key: str) -> Path:
|
|
return CACHE_DIR / f"{cache_key}.primary.png"
|
|
|
|
|
|
def generate_thumbnail(
|
|
poster_bytes: bytes,
|
|
title: str,
|
|
bg_mode: str = "backdrop",
|
|
custom_bg: str = None,
|
|
text_color: str = "#FFFFFF",
|
|
logo_align: str = "bottom-center",
|
|
logo_scale: float = 1.3,
|
|
darkness: float = 0.0,
|
|
studio: str = "none",
|
|
studio_position: str = "bottom-right",
|
|
new_episodes_tag: bool = False,
|
|
season_finale_tag: bool = False,
|
|
logo_index: int | None = None,
|
|
width: int = THUMB_WIDTH,
|
|
height: int = THUMB_HEIGHT,
|
|
logo_bytes: bytes = None,
|
|
backdrop_bytes: bytes = None,
|
|
primary_zoom: float = 1.0,
|
|
primary_pan_x: float = 0.0,
|
|
primary_pan_y: float = -0.16,
|
|
) -> bytes:
|
|
|
|
# darkness is 0.0 (no dimming) → 1.0 (very dark); scale bg and vignette together
|
|
d = max(0.0, min(1.0, darkness))
|
|
bg_dim = 1.0 - (d * 0.6) # backdrop brightness: 1.0 → 0.4
|
|
vig_max = int(d * 220) # vignette peak alpha: 0 → 220
|
|
is_tall_layout = height >= int(width * 1.3)
|
|
|
|
# --- Build background ---
|
|
if backdrop_bytes and bg_mode != "solid":
|
|
backdrop = load_image_from_bytes(backdrop_bytes, mode="RGB")
|
|
if is_tall_layout:
|
|
bg = build_tall_backdrop_background(
|
|
backdrop,
|
|
width,
|
|
height,
|
|
dim_factor=bg_dim,
|
|
zoom=primary_zoom,
|
|
pan_x=primary_pan_x,
|
|
pan_y=primary_pan_y,
|
|
)
|
|
else:
|
|
bg = cover_crop(backdrop, width, height)
|
|
bg = bg.point(lambda p: int(p * bg_dim))
|
|
elif bg_mode == "solid" and custom_bg:
|
|
try:
|
|
bg_color = ImageColor.getrgb(custom_bg)
|
|
except ValueError:
|
|
bg_color = (15, 15, 25)
|
|
bg = Image.new("RGB", (width, height), bg_color)
|
|
else:
|
|
# Blurred poster fallback
|
|
bg = load_image_from_bytes(poster_bytes, mode="RGB")
|
|
bg = cover_crop(bg, width, height)
|
|
bg = bg.filter(ImageFilter.GaussianBlur(radius=20))
|
|
bg = bg.point(lambda p: int(p * bg_dim * 0.85))
|
|
|
|
# --- Vignette overlay tuned per logo position ---
|
|
overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0))
|
|
ov = ImageDraw.Draw(overlay)
|
|
|
|
def h_gradient(from_left: bool, strength: int = 200):
|
|
s = int(strength * d)
|
|
for x in range(width):
|
|
t = 1.0 - (x / max(1, width - 1))
|
|
a = int(s * (t ** 0.55))
|
|
px = x if from_left else width - 1 - x
|
|
ov.line([(px, 0), (px, height)], fill=(0, 0, 0, a))
|
|
|
|
def v_gradient_bottom(strength: int = 180):
|
|
s = int(strength * d)
|
|
for y in range(height):
|
|
t = 1.0 - (y / max(1, height - 1))
|
|
a = int(s * (t ** 0.55))
|
|
ov.line([(0, height - 1 - y), (width, height - 1 - y)], fill=(0, 0, 0, a))
|
|
|
|
# Base layer: uniform darkening across the whole frame on every layout.
|
|
# This ensures no naturally bright backdrop area is left fully exposed.
|
|
base_alpha = int(vig_max * 0.45)
|
|
ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, base_alpha))
|
|
|
|
# Directional layer on top for contrast where the logo sits.
|
|
if logo_align == "center":
|
|
ov.rectangle([(0, 0), (width, height)], fill=(0, 0, 0, int(vig_max * 0.25)))
|
|
elif logo_align == "left":
|
|
h_gradient(from_left=True, strength=160)
|
|
elif logo_align == "bottom-center":
|
|
v_gradient_bottom(strength=150)
|
|
elif logo_align == "bottom-left":
|
|
h_gradient(from_left=True, strength=140)
|
|
v_gradient_bottom(strength=140)
|
|
elif logo_align == "bottom-right":
|
|
h_gradient(from_left=False, strength=140)
|
|
v_gradient_bottom(strength=140)
|
|
|
|
bg = bg.convert("RGBA")
|
|
bg = Image.alpha_composite(bg, overlay)
|
|
bg = bg.convert("RGB")
|
|
|
|
txt_color = ImageColor.getrgb(text_color)
|
|
pad = max(52, int(height * (0.11 if is_tall_layout else 0.08)))
|
|
scale = max(0.4, min(2.5, logo_scale))
|
|
logo_max_w = int(width * 0.46 * scale)
|
|
logo_max_h = int(height * (0.18 if is_tall_layout else 0.40) * scale)
|
|
logo_box = None
|
|
studio_box = None
|
|
|
|
def get_series_banner() -> dict | None:
|
|
if season_finale_tag:
|
|
banner_text = "Season Finale"
|
|
banner_fill = "#111111"
|
|
elif new_episodes_tag:
|
|
banner_text = "New Season"
|
|
banner_fill = "#E50914"
|
|
else:
|
|
return None
|
|
banner_font_size = max(72 if is_tall_layout else 58, int(height * (0.048 if is_tall_layout else 0.115)))
|
|
banner_font = get_badge_font(banner_font_size)
|
|
text_bbox = banner_font.getbbox(banner_text)
|
|
text_w = text_bbox[2] - text_bbox[0]
|
|
text_h = text_bbox[3] - text_bbox[1]
|
|
banner_x = 0
|
|
banner_y = 0
|
|
banner_w = width
|
|
vertical_pad = max(12 if is_tall_layout else 12, int(banner_font_size * (0.28 if is_tall_layout else 0.30)))
|
|
banner_h = max(text_h + vertical_pad * 2, int(height * (0.105 if is_tall_layout else 0.17)))
|
|
text_x = (banner_w - text_w) // 2 - text_bbox[0]
|
|
text_y = (banner_h - text_h) // 2 - text_bbox[1]
|
|
return {
|
|
"text": banner_text,
|
|
"fill": ImageColor.getrgb(banner_fill),
|
|
"font": banner_font,
|
|
"x": banner_x,
|
|
"y": banner_y,
|
|
"w": banner_w,
|
|
"h": banner_h,
|
|
"text_x": text_x,
|
|
"text_y": text_y,
|
|
}
|
|
|
|
banner = get_series_banner()
|
|
|
|
def logo_position(lw: int, lh: int):
|
|
if logo_align == "center":
|
|
return (width - lw) // 2, (height - lh) // 2
|
|
elif logo_align == "left":
|
|
return pad, (height - lh) // 2
|
|
elif logo_align == "bottom-center":
|
|
return (width - lw) // 2, height - lh - pad
|
|
elif logo_align == "bottom-left":
|
|
return pad, height - lh - pad
|
|
elif logo_align == "bottom-right":
|
|
return width - lw - pad, height - lh - pad
|
|
return (width - lw) // 2, (height - lh) // 2
|
|
|
|
# --- Logo image (preferred) ---
|
|
if logo_bytes:
|
|
try:
|
|
logo = load_image_from_bytes(logo_bytes, mode="RGBA")
|
|
except (UnidentifiedImageError, OSError, ValueError):
|
|
logo = None
|
|
if logo is None:
|
|
logo_bytes = None
|
|
if logo_bytes:
|
|
ratio = min(logo_max_w / logo.width, logo_max_h / logo.height)
|
|
lw = int(logo.width * ratio)
|
|
lh = int(logo.height * ratio)
|
|
logo = logo.resize((lw, lh), Image.LANCZOS)
|
|
logo_x, logo_y = logo_position(lw, lh)
|
|
|
|
# Drop shadow — blur only a tight crop around the logo, not the whole frame,
|
|
# to avoid a visible dark haze spreading across the backdrop.
|
|
blur_r = 5
|
|
pad = blur_r * 3
|
|
_, _, _, a = logo.split()
|
|
black = Image.new("RGB", (lw, lh), (0, 0, 0))
|
|
shadow_logo = Image.merge("RGBA", (*black.split()[:3], a))
|
|
shadow_crop = Image.new("RGBA", (lw + pad * 2, lh + pad * 2), (0, 0, 0, 0))
|
|
shadow_crop.paste(shadow_logo, (pad + 4, pad + 4), shadow_logo)
|
|
shadow_crop = shadow_crop.filter(ImageFilter.GaussianBlur(radius=blur_r))
|
|
bg = bg.convert("RGBA")
|
|
bg.alpha_composite(shadow_crop, (logo_x - pad, logo_y - pad))
|
|
bg = bg.convert("RGB")
|
|
|
|
bg.paste(logo, (logo_x, logo_y), logo)
|
|
logo_box = (logo_x, logo_y, lw, lh)
|
|
|
|
else:
|
|
# --- Text fallback ---
|
|
draw = ImageDraw.Draw(bg)
|
|
title_font = get_font(56, bold=True)
|
|
line_h = 66
|
|
lines = wrap_text(title.upper(), title_font, logo_max_w)
|
|
total_h = len(lines) * line_h
|
|
tx, ty = logo_position(logo_max_w, total_h)
|
|
for line in lines:
|
|
draw.text((tx + 3, ty + 3), line, font=title_font, fill=(0, 0, 0))
|
|
draw.text((tx, ty), line, font=title_font, fill=txt_color)
|
|
ty += line_h
|
|
logo_box = (tx, logo_position(logo_max_w, total_h)[1], logo_max_w, total_h)
|
|
|
|
# --- Studio logo overlay ---
|
|
if studio and studio != "none":
|
|
slogo = make_studio_logo(studio, max_height=46)
|
|
if slogo:
|
|
s_pad = 22
|
|
sw, sh = slogo.size
|
|
top_y = s_pad if not banner else banner["y"] + banner["h"] + 10
|
|
positions = {
|
|
"top-left": (s_pad, top_y),
|
|
"top-right": (width - sw - s_pad, top_y),
|
|
"bottom-left": (s_pad, height - sh - s_pad),
|
|
"bottom-right": (width - sw - s_pad, height - sh - s_pad),
|
|
}
|
|
sx, sy = positions.get(studio_position, positions["bottom-right"])
|
|
bg.paste(slogo, (sx, sy), slogo)
|
|
studio_box = (sx, sy, sw, sh)
|
|
|
|
if banner:
|
|
draw = ImageDraw.Draw(bg)
|
|
draw.rectangle(
|
|
[(banner["x"], banner["y"]), (banner["x"] + banner["w"], banner["y"] + banner["h"])],
|
|
fill=banner["fill"],
|
|
)
|
|
draw.text((banner["text_x"], banner["text_y"]), banner["text"], font=banner["font"], fill=(255, 255, 255))
|
|
|
|
buf = io.BytesIO()
|
|
bg.save(buf, format="PNG")
|
|
buf.seek(0)
|
|
return buf.getvalue()
|
|
|
|
|
|
def generate_primary_cover(
|
|
poster_bytes: bytes,
|
|
title: str,
|
|
bg_mode: str = "backdrop",
|
|
custom_bg: str | None = None,
|
|
text_color: str = "#FFFFFF",
|
|
logo_align: str = "bottom-center",
|
|
logo_scale: float = 1.3,
|
|
darkness: float = 0.0,
|
|
studio: str = "none",
|
|
studio_position: str = "bottom-right",
|
|
new_episodes_tag: bool = False,
|
|
season_finale_tag: bool = False,
|
|
logo_index: int | None = None,
|
|
logo_bytes: bytes | None = None,
|
|
backdrop_bytes: bytes | None = None,
|
|
primary_zoom: float = 1.0,
|
|
primary_pan_x: float = 0.0,
|
|
primary_pan_y: float = -0.16,
|
|
) -> bytes:
|
|
return generate_thumbnail(
|
|
poster_bytes,
|
|
title,
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
width=PRIMARY_WIDTH,
|
|
height=PRIMARY_HEIGHT,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
|
|
|
|
# --- API Routes ---
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request):
|
|
return templates.TemplateResponse(request, "index.html")
|
|
|
|
|
|
@app.get("/airing", response_class=HTMLResponse)
|
|
async def airing_page(request: Request):
|
|
return templates.TemplateResponse(request, "airing.html")
|
|
|
|
|
|
@app.get("/api/airing")
|
|
async def get_airing_titles(
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(12, ge=1, le=48),
|
|
eligible_only: bool = False,
|
|
refresh: bool = False,
|
|
week_offset: int = Query(0, ge=-8, le=4),
|
|
):
|
|
snapshot = await get_airing_snapshot(force_refresh=refresh, week_offset=week_offset)
|
|
items = snapshot["items"]
|
|
filtered = [item for item in items if item["eligible_new_season"]] if eligible_only else items
|
|
start = (page - 1) * limit
|
|
page_items = filtered[start:start + limit]
|
|
total = len(filtered)
|
|
return {
|
|
"items": page_items,
|
|
"page": page,
|
|
"limit": limit,
|
|
"total": total,
|
|
"has_more": start + limit < total,
|
|
"generated_at": snapshot["generated_at"],
|
|
"week_start": snapshot["week_start"],
|
|
"week_end": snapshot["week_end"],
|
|
"week_offset": snapshot["week_offset"],
|
|
"new_season_min_days": snapshot["new_season_min_days"],
|
|
"new_season_max_days": snapshot["new_season_max_days"],
|
|
}
|
|
|
|
|
|
@app.post("/api/airing/apply-new-season")
|
|
async def apply_new_season_banner(request: Request):
|
|
body = await request.json()
|
|
item_id = body["item_id"]
|
|
generate_primary = bool(body.get("generate_primary", True))
|
|
week_offset = int(body.get("week_offset", 0))
|
|
|
|
snapshot = await get_airing_snapshot(force_refresh=bool(body.get("refresh", False)), week_offset=week_offset)
|
|
item = next((entry for entry in snapshot["items"] if entry["id"] == item_id), None)
|
|
if item is None:
|
|
raise HTTPException(status_code=404, detail="Series was not found in the current airing snapshot.")
|
|
can_apply_new_season = bool(item["eligible_new_season"]) or (
|
|
(item.get("status") or "").strip().lower() == "continuing"
|
|
and (item.get("selected_week_air_at") or item.get("next_air_at"))
|
|
)
|
|
if not can_apply_new_season:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"Series is outside the New Season window. "
|
|
f"It must be between {snapshot['new_season_min_days']} and {snapshot['new_season_max_days']} days "
|
|
f"from the season premiere."
|
|
),
|
|
)
|
|
|
|
title = body.get("title") or item["name"]
|
|
cache_key = build_cache_key(
|
|
item_id=item_id,
|
|
bg_mode="backdrop",
|
|
backdrop_index=0,
|
|
custom_bg=None,
|
|
text_color="#FFFFFF",
|
|
logo_align="bottom-center",
|
|
logo_scale=1.3,
|
|
darkness=0.0,
|
|
studio="none",
|
|
studio_position="bottom-right",
|
|
new_episodes_tag=True,
|
|
season_finale_tag=False,
|
|
logo_index=0,
|
|
)
|
|
thumb_cache_path = get_thumb_cache_path(cache_key)
|
|
primary_cache_path = get_primary_cache_path(cache_key)
|
|
|
|
if not thumb_cache_path.exists() or (generate_primary and not primary_cache_path.exists()):
|
|
poster_bytes = await emby_get_image(item_id, "Primary")
|
|
logo_bytes = await emby_get_image_optional(item_id, "Logo", 0)
|
|
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", 0)
|
|
thumb_bytes = generate_thumbnail(
|
|
poster_bytes,
|
|
title,
|
|
bg_mode="backdrop",
|
|
text_color="#FFFFFF",
|
|
logo_align="bottom-center",
|
|
logo_scale=1.3,
|
|
darkness=0.0,
|
|
studio="none",
|
|
studio_position="bottom-right",
|
|
new_episodes_tag=True,
|
|
season_finale_tag=False,
|
|
logo_index=0,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
)
|
|
thumb_cache_path.write_bytes(thumb_bytes)
|
|
if generate_primary:
|
|
primary_bytes = generate_primary_cover(
|
|
poster_bytes,
|
|
title,
|
|
bg_mode="backdrop",
|
|
text_color="#FFFFFF",
|
|
logo_align="bottom-center",
|
|
logo_scale=1.3,
|
|
darkness=0.0,
|
|
studio="none",
|
|
studio_position="bottom-right",
|
|
new_episodes_tag=True,
|
|
season_finale_tag=False,
|
|
logo_index=0,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
)
|
|
primary_cache_path.write_bytes(primary_bytes)
|
|
|
|
thumb_status = await emby_upload_image(item_id, thumb_cache_path.read_bytes(), "Thumb")
|
|
|
|
primary_status = None
|
|
primary_error = None
|
|
if generate_primary:
|
|
try:
|
|
primary_status = await emby_upload_image(item_id, primary_cache_path.read_bytes(), "Primary")
|
|
except Exception as exc:
|
|
primary_error = str(exc)
|
|
|
|
return {
|
|
"status": "applied",
|
|
"thumb_code": thumb_status,
|
|
"primary_code": primary_status,
|
|
"primary_attempted": generate_primary,
|
|
"primary_error": primary_error,
|
|
}
|
|
|
|
|
|
@app.get("/api/search")
|
|
async def search_items(
|
|
q: str = Query(..., min_length=1),
|
|
type: str = "Movie,Series",
|
|
start: int = Query(0, ge=0),
|
|
limit: int = Query(12, ge=1, le=24),
|
|
):
|
|
data = await emby_get("/Items", {
|
|
"SearchTerm": q,
|
|
"IncludeItemTypes": type,
|
|
"Recursive": "true",
|
|
"Fields": "ProductionYear",
|
|
"StartIndex": str(start),
|
|
"Limit": str(limit),
|
|
"ImageTypeLimit": "1",
|
|
"EnableImageTypes": "Primary,Logo,Backdrop",
|
|
})
|
|
items = []
|
|
for item in data.get("Items", []):
|
|
image_tags = item.get("ImageTags", {})
|
|
backdrop_count = item.get("BackdropImageTags", [])
|
|
items.append({
|
|
"id": item["Id"],
|
|
"name": item.get("Name", ""),
|
|
"year": item.get("ProductionYear", ""),
|
|
"type": item.get("Type", ""),
|
|
"has_logo": "Logo" in image_tags,
|
|
"backdrop_count": len(backdrop_count),
|
|
"poster_url": f"/api/poster/{item['Id']}?w=72&h=108&q=72",
|
|
})
|
|
total = int(data.get("TotalRecordCount", start + len(items)))
|
|
return {
|
|
"items": items,
|
|
"start": start,
|
|
"limit": limit,
|
|
"total": total,
|
|
"has_more": start + len(items) < total,
|
|
}
|
|
|
|
|
|
@app.get("/api/images/{item_id}")
|
|
async def get_image_info(item_id: str):
|
|
"""Returns accurate logo and backdrop counts by querying the Images endpoint directly."""
|
|
images = await emby_get(f"/Items/{item_id}/Images")
|
|
logos = []
|
|
backdrops = []
|
|
primaries = []
|
|
for img in images:
|
|
image_type = img.get("ImageType")
|
|
entry = {
|
|
"type": image_type,
|
|
"index": int(img.get("ImageIndex", 0) or 0),
|
|
"width": img.get("Width"),
|
|
"height": img.get("Height"),
|
|
}
|
|
if image_type == "Logo":
|
|
entry["url"] = f"/api/item-image/{item_id}?type=Logo&index={entry['index']}&w=220&h=96&q=90"
|
|
logos.append(entry)
|
|
elif image_type == "Backdrop":
|
|
backdrops.append(entry)
|
|
elif image_type == "Primary":
|
|
entry["url"] = f"/api/item-image/{item_id}?type=Primary&index={entry['index']}&w=120&h=180&q=80"
|
|
primaries.append(entry)
|
|
logos.sort(key=lambda img: img["index"])
|
|
backdrops.sort(key=lambda img: img["index"])
|
|
primaries.sort(key=lambda img: img["index"])
|
|
return {
|
|
"backdrop_count": len(backdrops),
|
|
"has_logo": bool(logos),
|
|
"logo_count": len(logos),
|
|
"primary_count": len(primaries),
|
|
"logos": logos,
|
|
"backdrops": backdrops,
|
|
"primaries": primaries,
|
|
}
|
|
|
|
|
|
@app.get("/api/item-image/{item_id}")
|
|
async def get_item_image(
|
|
item_id: str,
|
|
type: str = Query("Primary"),
|
|
index: int | None = Query(None, ge=0),
|
|
w: int = Query(120, ge=24, le=1200),
|
|
h: int = Query(180, ge=24, le=1200),
|
|
q: int = Query(80, ge=30, le=100),
|
|
):
|
|
img_bytes, content_type = await emby_get_image_with_type(
|
|
item_id,
|
|
type,
|
|
index=index,
|
|
max_width=w,
|
|
max_height=h,
|
|
quality=q,
|
|
)
|
|
return Response(
|
|
content=img_bytes,
|
|
media_type=content_type,
|
|
headers={"Cache-Control": "public, max-age=86400"},
|
|
)
|
|
|
|
|
|
@app.get("/api/poster/{item_id}")
|
|
async def get_poster(
|
|
item_id: str,
|
|
w: int = Query(72, ge=24, le=400),
|
|
h: int = Query(108, ge=24, le=600),
|
|
q: int = Query(72, ge=30, le=100),
|
|
):
|
|
img_bytes = await emby_get_image(
|
|
item_id,
|
|
"Primary",
|
|
max_width=w,
|
|
max_height=h,
|
|
quality=q,
|
|
)
|
|
return Response(
|
|
content=img_bytes,
|
|
media_type="image/jpeg",
|
|
headers={"Cache-Control": "public, max-age=86400"},
|
|
)
|
|
|
|
|
|
@app.get("/api/cache/{cache_key}/primary")
|
|
async def get_cached_primary_preview(cache_key: str):
|
|
if not cache_key or any(ch not in "0123456789abcdef" for ch in cache_key.lower()):
|
|
raise HTTPException(status_code=400, detail="Invalid cache key.")
|
|
cache_path = get_primary_cache_path(cache_key)
|
|
if not cache_path.exists():
|
|
raise HTTPException(status_code=404, detail="Primary preview not found.")
|
|
return Response(
|
|
content=cache_path.read_bytes(),
|
|
media_type="image/png",
|
|
headers={"Cache-Control": "no-store"},
|
|
)
|
|
|
|
|
|
@app.post("/api/generate")
|
|
async def generate(request: Request):
|
|
body = await request.json()
|
|
item_id = body["item_id"]
|
|
title = body["title"]
|
|
bg_mode = body.get("bg_mode", "backdrop")
|
|
custom_bg = body.get("custom_bg", None)
|
|
text_color = body.get("text_color", "#FFFFFF")
|
|
logo_align = body.get("logo_align", "bottom-center")
|
|
logo_scale = float(body.get("logo_scale", 1.3))
|
|
darkness = float(body.get("darkness", 0.0))
|
|
studio = body.get("studio", "none")
|
|
studio_position = body.get("studio_position", "bottom-right")
|
|
new_episodes_tag = bool(body.get("new_episodes_tag", False))
|
|
season_finale_tag = bool(body.get("season_finale_tag", False))
|
|
generate_primary = bool(body.get("generate_primary", False))
|
|
logo_index_raw = body.get("logo_index", None)
|
|
logo_index = int(logo_index_raw) if logo_index_raw is not None else None
|
|
backdrop_index = int(body.get("backdrop_index", 0))
|
|
primary_zoom = float(body.get("primary_zoom", 1.0))
|
|
primary_pan_x = float(body.get("primary_pan_x", 0.0))
|
|
primary_pan_y = float(body.get("primary_pan_y", -0.16))
|
|
|
|
poster_bytes = await emby_get_image(item_id, "Primary")
|
|
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
|
|
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
|
|
|
|
thumb_bytes = generate_thumbnail(
|
|
poster_bytes, title,
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
|
|
cache_key = build_cache_key(
|
|
item_id=item_id,
|
|
bg_mode=bg_mode,
|
|
backdrop_index=backdrop_index,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
thumb_cache_path = get_thumb_cache_path(cache_key)
|
|
thumb_cache_path.write_bytes(thumb_bytes)
|
|
|
|
primary_generated = False
|
|
if generate_primary:
|
|
try:
|
|
primary_bytes = generate_primary_cover(
|
|
poster_bytes,
|
|
title,
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
get_primary_cache_path(cache_key).write_bytes(primary_bytes)
|
|
primary_generated = True
|
|
except Exception:
|
|
primary_generated = False
|
|
|
|
return StreamingResponse(
|
|
io.BytesIO(thumb_bytes),
|
|
media_type="image/png",
|
|
headers={
|
|
"X-Primary-Generated": "1" if primary_generated else "0",
|
|
"X-Cache-Key": cache_key,
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/api/apply")
|
|
async def apply_to_emby(request: Request):
|
|
body = await request.json()
|
|
item_id = body["item_id"]
|
|
title = body.get("title", "")
|
|
bg_mode = body.get("bg_mode", "backdrop")
|
|
backdrop_index = int(body.get("backdrop_index", 0))
|
|
custom_bg = body.get("custom_bg", None)
|
|
text_color = body.get("text_color", "#FFFFFF")
|
|
logo_align = body.get("logo_align", "bottom-center")
|
|
logo_scale = float(body.get("logo_scale", 1.3))
|
|
darkness = float(body.get("darkness", 0.0))
|
|
studio = body.get("studio", "none")
|
|
studio_position = body.get("studio_position", "bottom-right")
|
|
new_episodes_tag = bool(body.get("new_episodes_tag", False))
|
|
season_finale_tag = bool(body.get("season_finale_tag", False))
|
|
generate_primary = bool(body.get("generate_primary", False))
|
|
logo_index_raw = body.get("logo_index", None)
|
|
logo_index = int(logo_index_raw) if logo_index_raw is not None else None
|
|
primary_zoom = float(body.get("primary_zoom", 1.0))
|
|
primary_pan_x = float(body.get("primary_pan_x", 0.0))
|
|
primary_pan_y = float(body.get("primary_pan_y", -0.16))
|
|
|
|
cache_key = build_cache_key(
|
|
item_id=item_id,
|
|
bg_mode=bg_mode,
|
|
backdrop_index=backdrop_index,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
thumb_cache_path = get_thumb_cache_path(cache_key)
|
|
primary_cache_path = get_primary_cache_path(cache_key)
|
|
|
|
if not thumb_cache_path.exists():
|
|
poster_bytes = await emby_get_image(item_id, "Primary")
|
|
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
|
|
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
|
|
thumb_bytes = generate_thumbnail(
|
|
poster_bytes,
|
|
title or "",
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
thumb_cache_path.write_bytes(thumb_bytes)
|
|
if generate_primary and not primary_cache_path.exists():
|
|
try:
|
|
primary_cache_path.write_bytes(
|
|
generate_primary_cover(
|
|
poster_bytes,
|
|
title or "",
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
thumb_bytes = thumb_cache_path.read_bytes()
|
|
thumb_status = await emby_upload_image(item_id, thumb_bytes, "Thumb")
|
|
|
|
primary_status = None
|
|
primary_error = None
|
|
if generate_primary:
|
|
try:
|
|
if primary_cache_path.exists():
|
|
primary_bytes = primary_cache_path.read_bytes()
|
|
else:
|
|
poster_bytes = await emby_get_image(item_id, "Primary")
|
|
logo_bytes = await emby_get_image_optional(item_id, "Logo", logo_index)
|
|
backdrop_bytes = await emby_get_image_optional(item_id, "Backdrop", backdrop_index)
|
|
primary_bytes = generate_primary_cover(
|
|
poster_bytes,
|
|
title or "",
|
|
bg_mode=bg_mode,
|
|
custom_bg=custom_bg,
|
|
text_color=text_color,
|
|
logo_align=logo_align,
|
|
logo_scale=logo_scale,
|
|
darkness=darkness,
|
|
studio=studio,
|
|
studio_position=studio_position,
|
|
new_episodes_tag=new_episodes_tag,
|
|
season_finale_tag=season_finale_tag,
|
|
logo_index=logo_index,
|
|
logo_bytes=logo_bytes,
|
|
backdrop_bytes=backdrop_bytes,
|
|
primary_zoom=primary_zoom,
|
|
primary_pan_x=primary_pan_x,
|
|
primary_pan_y=primary_pan_y,
|
|
)
|
|
primary_cache_path.write_bytes(primary_bytes)
|
|
primary_status = await emby_upload_image(item_id, primary_bytes, "Primary")
|
|
except Exception as exc:
|
|
primary_error = str(exc)
|
|
|
|
return {
|
|
"status": "applied",
|
|
"thumb_code": thumb_status,
|
|
"primary_code": primary_status,
|
|
"primary_attempted": generate_primary,
|
|
"primary_error": primary_error,
|
|
}
|
|
|
|
|
|
@app.get("/api/config")
|
|
async def get_config():
|
|
return {
|
|
"emby_url": EMBY_URL,
|
|
"connected": bool(EMBY_API_KEY),
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8500)
|