1018 lines
28 KiB
Python
1018 lines
28 KiB
Python
import itertools
|
|
import sys
|
|
import threading
|
|
import time
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
# ============================================================
|
|
# CONFIG
|
|
# ============================================================
|
|
|
|
RADARR_URL = "http://10.0.0.2:7878"
|
|
RADARR_API_KEY = "d393acb157a44dc2b0e2aede96278ad5"
|
|
|
|
# Use ONE of these only
|
|
TMDB_V3_API_KEY = "0dd6070b3573f3f54ba4c5530e36662b"
|
|
TMDB_BEARER_TOKEN = ""
|
|
|
|
# Optional
|
|
OMDB_API_KEY = ""
|
|
|
|
# Radarr settings
|
|
QUALITY_PROFILE_NAME = "720p ENG"
|
|
ROOT_FOLDER_PATH = r"D:\Movies"
|
|
|
|
# Safety
|
|
DRY_RUN = True
|
|
CONFIRM_ADD = False
|
|
|
|
MAX_TO_ADD = 10
|
|
MIN_COMBINED_SCORE = 64
|
|
MIN_IMDB_SCORE = 6.3
|
|
MIN_SOURCES_REQUIRED = 1
|
|
|
|
# Classic / comfort tuning
|
|
RELEASE_YEAR_FROM = 1980
|
|
RELEASE_YEAR_TO = 2012
|
|
MIN_TMDB_VOTES = 250
|
|
MAX_SIMILAR_PAGES_PER_SEED = 2
|
|
MAX_CANDIDATES_TO_SCORE = 250
|
|
|
|
EXCLUDE_ANIMATION = True
|
|
EXCLUDE_TOO_MODERN = True
|
|
|
|
TMDB_BASE = "https://api.themoviedb.org/3"
|
|
|
|
# Curated seed list
|
|
SEED_MOVIES = [
|
|
"The Birdcage",
|
|
"When Harry Met Sally",
|
|
"You've Got Mail",
|
|
"Sleepless in Seattle",
|
|
"Notting Hill",
|
|
"Moonstruck",
|
|
"Father of the Bride",
|
|
"My Best Friend's Wedding",
|
|
"The First Wives Club",
|
|
"Under the Tuscan Sun",
|
|
"Julie & Julia",
|
|
"Something's Gotta Give",
|
|
"Must Love Dogs",
|
|
"While You Were Sleeping",
|
|
"Practical Magic",
|
|
]
|
|
|
|
session = requests.Session()
|
|
session.headers.update({"User-Agent": "radarr-classic-feelgood-script/4.0"})
|
|
|
|
|
|
# ============================================================
|
|
# CLI STYLING
|
|
# ============================================================
|
|
|
|
class C:
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
|
|
RED = "\033[91m"
|
|
GREEN = "\033[92m"
|
|
YELLOW = "\033[93m"
|
|
BLUE = "\033[94m"
|
|
MAGENTA = "\033[95m"
|
|
CYAN = "\033[96m"
|
|
GREY = "\033[90m"
|
|
|
|
|
|
def colour(text: str, code: str) -> str:
|
|
return f"{code}{text}{C.RESET}"
|
|
|
|
|
|
def info(text: str) -> None:
|
|
print(colour(f"[INFO] {text}", C.CYAN))
|
|
|
|
|
|
def ok(text: str) -> None:
|
|
print(colour(f"[OK] {text}", C.GREEN))
|
|
|
|
|
|
def warn(text: str) -> None:
|
|
print(colour(f"[WARN] {text}", C.YELLOW))
|
|
|
|
|
|
def err(text: str) -> None:
|
|
print(colour(f"[ERR] {text}", C.RED))
|
|
|
|
|
|
def section(title: str) -> None:
|
|
print()
|
|
print(colour("=" * 72, C.BLUE))
|
|
print(colour(title, C.BOLD + C.BLUE))
|
|
print(colour("=" * 72, C.BLUE))
|
|
|
|
|
|
class Spinner:
|
|
def __init__(self, message: str):
|
|
self.message = message
|
|
self.frames = itertools.cycle(["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"])
|
|
self.running = False
|
|
self.thread: Optional[threading.Thread] = None
|
|
|
|
def _spin(self):
|
|
while self.running:
|
|
frame = next(self.frames)
|
|
sys.stdout.write(f"\r{colour(frame, C.MAGENTA)} {self.message}")
|
|
sys.stdout.flush()
|
|
time.sleep(0.09)
|
|
sys.stdout.write("\r" + " " * (len(self.message) + 6) + "\r")
|
|
sys.stdout.flush()
|
|
|
|
def __enter__(self):
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._spin, daemon=True)
|
|
self.thread.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join()
|
|
|
|
|
|
# ============================================================
|
|
# HEURISTICS
|
|
# ============================================================
|
|
|
|
POSITIVE_GENRES = {
|
|
"Romance": 18,
|
|
"Comedy": 16,
|
|
"Drama": 10,
|
|
"Music": 4,
|
|
}
|
|
|
|
NEGATIVE_GENRES = {
|
|
"Animation": -30,
|
|
"Horror": -25,
|
|
"Science Fiction": -20,
|
|
"Action": -16,
|
|
"Thriller": -14,
|
|
"Crime": -10,
|
|
"War": -18,
|
|
"Western": -14,
|
|
"Documentary": -18,
|
|
"Fantasy": -8,
|
|
"Family": -6,
|
|
}
|
|
|
|
POSITIVE_TERMS = [
|
|
"wedding", "love", "romance", "relationship", "bride",
|
|
"best friend", "friendship", "family", "holiday", "food",
|
|
"small town", "bookstore", "restaurant", "writer", "divorce",
|
|
"second chance", "starting over", "feel-good", "mother",
|
|
"daughter", "sisters", "chosen family", "comedy of manners",
|
|
]
|
|
|
|
NEGATIVE_TERMS = [
|
|
"war", "serial killer", "murder spree", "mercenary",
|
|
"zombie", "post-apocalyptic", "gang", "assassin", "combat",
|
|
"superhero", "multiverse", "alien invasion", "dystopian",
|
|
"dragon", "animated adventure",
|
|
]
|
|
|
|
SEED_BONUS_TERMS = [
|
|
"ensemble", "relationship", "family", "comedy", "romantic",
|
|
"wedding", "friendship", "identity", "midlife", "second chance"
|
|
]
|
|
|
|
|
|
# ============================================================
|
|
# ERRORS / UTILITIES
|
|
# ============================================================
|
|
|
|
class ApiError(Exception):
|
|
pass
|
|
|
|
|
|
def is_placeholder(value: str) -> bool:
|
|
bad_bits = [
|
|
"PUT_YOUR",
|
|
"YOUR_",
|
|
"REPLACE_ME",
|
|
"CHANGE_ME",
|
|
"PASTE_YOUR",
|
|
"abc123",
|
|
]
|
|
upper = (value or "").upper()
|
|
return not value or any(bit in upper for bit in bad_bits)
|
|
|
|
|
|
def safe_response_text(response: requests.Response) -> str:
|
|
try:
|
|
data = response.json()
|
|
return str(data)[:700]
|
|
except Exception:
|
|
text = (response.text or "").strip()
|
|
return text[:700] if text else "<empty>"
|
|
|
|
|
|
def fail(message: str) -> None:
|
|
raise ApiError(message)
|
|
|
|
|
|
def request_json(
|
|
method: str,
|
|
url: str,
|
|
*,
|
|
service_name: str,
|
|
headers: Optional[Dict] = None,
|
|
params: Optional[Dict] = None,
|
|
json_data: Optional[Dict] = None,
|
|
) -> Dict:
|
|
try:
|
|
response = session.request(
|
|
method=method,
|
|
url=url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json_data,
|
|
timeout=30,
|
|
)
|
|
except requests.RequestException as e:
|
|
fail(f"{service_name} request failed before a response was received.\n{e}")
|
|
|
|
if not response.ok:
|
|
details = safe_response_text(response)
|
|
msg = [
|
|
f"{service_name} request failed.",
|
|
f"HTTP: {response.status_code}",
|
|
f"Method: {method}",
|
|
f"URL: {response.url}",
|
|
f"Response: {details}",
|
|
]
|
|
|
|
if response.status_code == 401:
|
|
msg.append("Authentication failed.")
|
|
if service_name == "TMDb":
|
|
msg.append(
|
|
"TMDb expects either:\n"
|
|
"- TMDB_BEARER_TOKEN as Authorization: Bearer <token>\n"
|
|
"- or TMDB_V3_API_KEY as api_key query parameter"
|
|
)
|
|
elif service_name == "Radarr":
|
|
msg.append("Check RADARR_URL and RADARR_API_KEY.")
|
|
elif service_name == "OMDb":
|
|
msg.append("Check OMDB_API_KEY.")
|
|
|
|
if response.status_code == 404:
|
|
msg.append("Endpoint not found. Check the URL and API route.")
|
|
if response.status_code == 429:
|
|
msg.append("Rate limited. Slow the script down.")
|
|
if 500 <= response.status_code <= 599:
|
|
msg.append("Remote server-side error.")
|
|
|
|
fail("\n".join(msg))
|
|
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
fail(
|
|
f"{service_name} returned non-JSON content.\n"
|
|
f"HTTP: {response.status_code}\n"
|
|
f"URL: {response.url}\n"
|
|
f"Response: {safe_response_text(response)}"
|
|
)
|
|
|
|
|
|
def require_basic_config() -> None:
|
|
problems = []
|
|
|
|
if is_placeholder(RADARR_API_KEY):
|
|
problems.append("RADARR_API_KEY is missing or still a placeholder.")
|
|
|
|
if not TMDB_V3_API_KEY and not TMDB_BEARER_TOKEN:
|
|
problems.append("Set either TMDB_V3_API_KEY or TMDB_BEARER_TOKEN.")
|
|
|
|
if TMDB_V3_API_KEY and TMDB_BEARER_TOKEN:
|
|
warn("Both TMDb credentials are present. Bearer token will be preferred.")
|
|
|
|
if not ROOT_FOLDER_PATH:
|
|
problems.append("ROOT_FOLDER_PATH is blank.")
|
|
|
|
if not QUALITY_PROFILE_NAME.strip():
|
|
problems.append("QUALITY_PROFILE_NAME is blank.")
|
|
|
|
if problems:
|
|
fail("Configuration error(s):\n- " + "\n- ".join(problems))
|
|
|
|
if not OMDB_API_KEY:
|
|
warn("OMDb key not set. Script will run in TMDb-only mode.")
|
|
|
|
|
|
# ============================================================
|
|
# AUTH HELPERS
|
|
# ============================================================
|
|
|
|
def radarr_headers() -> Dict[str, str]:
|
|
return {
|
|
"X-Api-Key": RADARR_API_KEY,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def get_tmdb_auth() -> Tuple[Dict[str, str], Dict[str, str]]:
|
|
if TMDB_BEARER_TOKEN:
|
|
return (
|
|
{"Authorization": f"Bearer {TMDB_BEARER_TOKEN}"},
|
|
{},
|
|
)
|
|
|
|
if TMDB_V3_API_KEY:
|
|
return (
|
|
{},
|
|
{"api_key": TMDB_V3_API_KEY},
|
|
)
|
|
|
|
fail("TMDb auth missing. Set TMDB_BEARER_TOKEN or TMDB_V3_API_KEY.")
|
|
return {}, {}
|
|
|
|
|
|
# ============================================================
|
|
# RADARR
|
|
# ============================================================
|
|
|
|
def get_existing_radarr_movies() -> List[Dict]:
|
|
url = f"{RADARR_URL}/api/v3/movie"
|
|
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
|
|
|
|
|
|
def get_radarr_rootfolders() -> List[Dict]:
|
|
url = f"{RADARR_URL}/api/v3/rootfolder"
|
|
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
|
|
|
|
|
|
def get_radarr_quality_profiles() -> List[Dict]:
|
|
url = f"{RADARR_URL}/api/v3/qualityprofile"
|
|
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
|
|
|
|
|
|
def resolve_quality_profile_id(profile_name: str) -> int:
|
|
profiles = get_radarr_quality_profiles()
|
|
|
|
exact_match = None
|
|
partial_matches = []
|
|
|
|
for profile in profiles:
|
|
name = str(profile.get("name", "")).strip()
|
|
if name.lower() == profile_name.lower():
|
|
exact_match = profile
|
|
break
|
|
if profile_name.lower() in name.lower():
|
|
partial_matches.append(profile)
|
|
|
|
if exact_match:
|
|
return int(exact_match["id"])
|
|
|
|
if len(partial_matches) == 1:
|
|
warn(f"Quality profile exact match not found. Using partial match: {partial_matches[0].get('name')}")
|
|
return int(partial_matches[0]["id"])
|
|
|
|
available = ", ".join(sorted([str(p.get("name")) for p in profiles]))
|
|
fail(
|
|
f"Could not find Radarr quality profile '{profile_name}'.\n"
|
|
f"Available profiles: {available}"
|
|
)
|
|
return 0
|
|
|
|
|
|
def get_existing_ids(existing_movies: List[Dict]) -> Tuple[set, set]:
|
|
tmdb_ids = set()
|
|
imdb_ids = set()
|
|
|
|
for movie in existing_movies:
|
|
tmdb_id = movie.get("tmdbId")
|
|
imdb_id = movie.get("imdbId")
|
|
|
|
if tmdb_id:
|
|
tmdb_ids.add(int(tmdb_id))
|
|
if imdb_id:
|
|
imdb_ids.add(str(imdb_id).strip())
|
|
|
|
return tmdb_ids, imdb_ids
|
|
|
|
|
|
def radarr_lookup_tmdb(tmdb_id: int) -> Dict:
|
|
url = f"{RADARR_URL}/api/v3/movie/lookup/tmdb"
|
|
return request_json(
|
|
"GET",
|
|
url,
|
|
service_name="Radarr",
|
|
headers=radarr_headers(),
|
|
params={"tmdbId": tmdb_id},
|
|
)
|
|
|
|
|
|
def add_movie_to_radarr(movie_payload: Dict) -> Dict:
|
|
url = f"{RADARR_URL}/api/v3/movie"
|
|
return request_json(
|
|
"POST",
|
|
url,
|
|
service_name="Radarr",
|
|
headers=radarr_headers(),
|
|
json_data=movie_payload,
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# TMDB
|
|
# ============================================================
|
|
|
|
def tmdb_search_movie(title: str, year: Optional[int] = None) -> List[Dict]:
|
|
auth_headers, auth_params = get_tmdb_auth()
|
|
url = f"{TMDB_BASE}/search/movie"
|
|
|
|
params = {
|
|
**auth_params,
|
|
"query": title,
|
|
"language": "en-US",
|
|
"include_adult": "false",
|
|
"page": 1,
|
|
}
|
|
if year:
|
|
params["year"] = year
|
|
|
|
data = request_json(
|
|
"GET",
|
|
url,
|
|
service_name="TMDb",
|
|
headers=auth_headers,
|
|
params=params,
|
|
)
|
|
return data.get("results", [])
|
|
|
|
|
|
def tmdb_movie_details(tmdb_id: int) -> Dict:
|
|
auth_headers, auth_params = get_tmdb_auth()
|
|
url = f"{TMDB_BASE}/movie/{tmdb_id}"
|
|
|
|
params = {
|
|
**auth_params,
|
|
"language": "en-US",
|
|
}
|
|
|
|
return request_json(
|
|
"GET",
|
|
url,
|
|
service_name="TMDb",
|
|
headers=auth_headers,
|
|
params=params,
|
|
)
|
|
|
|
|
|
def tmdb_similar_movies(tmdb_id: int, page: int = 1) -> List[Dict]:
|
|
auth_headers, auth_params = get_tmdb_auth()
|
|
url = f"{TMDB_BASE}/movie/{tmdb_id}/similar"
|
|
|
|
params = {
|
|
**auth_params,
|
|
"language": "en-US",
|
|
"page": page,
|
|
}
|
|
|
|
data = request_json(
|
|
"GET",
|
|
url,
|
|
service_name="TMDb",
|
|
headers=auth_headers,
|
|
params=params,
|
|
)
|
|
return data.get("results", [])
|
|
|
|
|
|
def find_seed_movie_ids(seed_titles: List[str]) -> List[Dict]:
|
|
seeds: List[Dict] = []
|
|
|
|
for title in seed_titles:
|
|
with Spinner(f"Finding seed: {title}"):
|
|
results = tmdb_search_movie(title)
|
|
|
|
if not results:
|
|
warn(f"Seed not found: {title}")
|
|
continue
|
|
|
|
best = results[0]
|
|
seeds.append({
|
|
"seed_title": title,
|
|
"tmdb_id": best["id"],
|
|
"matched_title": best.get("title", title),
|
|
"release_date": best.get("release_date", ""),
|
|
})
|
|
ok(f"Seed matched: {title} -> {best.get('title')} ({(best.get('release_date') or '')[:4]})")
|
|
time.sleep(0.05)
|
|
|
|
return seeds
|
|
|
|
|
|
def gather_candidates_from_seeds(seed_matches: List[Dict]) -> Dict[int, Dict]:
|
|
pool: Dict[int, Dict] = {}
|
|
|
|
for seed in seed_matches:
|
|
seed_id = seed["tmdb_id"]
|
|
seed_title = seed["matched_title"]
|
|
|
|
for page in range(1, MAX_SIMILAR_PAGES_PER_SEED + 1):
|
|
with Spinner(f"Pulling similar titles for {seed_title} (page {page})"):
|
|
results = tmdb_similar_movies(seed_id, page=page)
|
|
|
|
for item in results:
|
|
tmdb_id = item.get("id")
|
|
if not tmdb_id:
|
|
continue
|
|
|
|
if tmdb_id not in pool:
|
|
pool[tmdb_id] = {
|
|
"tmdb_id": tmdb_id,
|
|
"title": item.get("title"),
|
|
"release_date": item.get("release_date", ""),
|
|
"seed_hits": 0,
|
|
"seed_titles": set(),
|
|
"popularity": item.get("popularity", 0),
|
|
}
|
|
|
|
pool[tmdb_id]["seed_hits"] += 1
|
|
pool[tmdb_id]["seed_titles"].add(seed_title)
|
|
|
|
info(f"Collected {len(results)} similar titles from {seed_title} page {page}")
|
|
time.sleep(0.05)
|
|
|
|
return pool
|
|
|
|
|
|
# ============================================================
|
|
# OMDB
|
|
# ============================================================
|
|
|
|
def omdb_by_imdb(imdb_id: str) -> Optional[Dict]:
|
|
if not imdb_id:
|
|
return None
|
|
|
|
if not OMDB_API_KEY:
|
|
return None
|
|
|
|
url = "https://www.omdbapi.com/"
|
|
params = {
|
|
"apikey": OMDB_API_KEY,
|
|
"i": imdb_id,
|
|
"plot": "short",
|
|
"r": "json",
|
|
}
|
|
|
|
data = request_json(
|
|
"GET",
|
|
url,
|
|
service_name="OMDb",
|
|
params=params,
|
|
)
|
|
|
|
if data.get("Response") == "False":
|
|
return None
|
|
|
|
return data
|
|
|
|
|
|
# ============================================================
|
|
# SCORING
|
|
# ============================================================
|
|
|
|
def parse_percent(value: str) -> Optional[float]:
|
|
try:
|
|
return float(value.replace("%", "").strip())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_imdb_to_100(value: str) -> Optional[float]:
|
|
try:
|
|
return float(value.strip()) * 10.0
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_metacritic(value: str) -> Optional[float]:
|
|
try:
|
|
raw = value.split("/")[0].strip()
|
|
return float(raw)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def extract_omdb_scores(omdb: Optional[Dict]) -> Dict[str, float]:
|
|
scores: Dict[str, float] = {}
|
|
|
|
if not omdb:
|
|
return scores
|
|
|
|
imdb_rating = omdb.get("imdbRating")
|
|
if imdb_rating and imdb_rating != "N/A":
|
|
parsed = parse_imdb_to_100(imdb_rating)
|
|
if parsed is not None:
|
|
scores["IMDb"] = parsed
|
|
|
|
for item in omdb.get("Ratings", []):
|
|
source = item.get("Source")
|
|
value = item.get("Value", "")
|
|
|
|
if source == "Rotten Tomatoes":
|
|
parsed = parse_percent(value)
|
|
if parsed is not None:
|
|
scores["RottenTomatoes"] = parsed
|
|
elif source == "Metacritic":
|
|
parsed = parse_metacritic(value)
|
|
if parsed is not None:
|
|
scores["Metacritic"] = parsed
|
|
|
|
return scores
|
|
|
|
|
|
def extract_tmdb_score(tmdb_details: Dict) -> Dict[str, float]:
|
|
scores: Dict[str, float] = {}
|
|
vote_average = tmdb_details.get("vote_average")
|
|
if vote_average is not None:
|
|
scores["TMDb"] = float(vote_average) * 10.0
|
|
return scores
|
|
|
|
|
|
def weighted_combined_score(all_scores: Dict[str, float]) -> Optional[float]:
|
|
if not all_scores:
|
|
return None
|
|
|
|
weights = {
|
|
"IMDb": 0.40,
|
|
"RottenTomatoes": 0.28,
|
|
"Metacritic": 0.20,
|
|
"TMDb": 0.12,
|
|
}
|
|
|
|
total = 0.0
|
|
total_weight = 0.0
|
|
|
|
for source, value in all_scores.items():
|
|
weight = weights.get(source, 0.0)
|
|
if weight > 0:
|
|
total += value * weight
|
|
total_weight += weight
|
|
|
|
if total_weight == 0:
|
|
return None
|
|
|
|
return round(total / total_weight, 1)
|
|
|
|
|
|
def genre_bias_score(genre_names: List[str], overview: str) -> int:
|
|
score = 0
|
|
|
|
for genre in genre_names:
|
|
score += POSITIVE_GENRES.get(genre, 0)
|
|
score += NEGATIVE_GENRES.get(genre, 0)
|
|
|
|
text = (overview or "").lower()
|
|
|
|
for term in POSITIVE_TERMS:
|
|
if term in text:
|
|
score += 2
|
|
|
|
for term in NEGATIVE_TERMS:
|
|
if term in text:
|
|
score -= 2
|
|
|
|
return score
|
|
|
|
|
|
def classic_bonus(release_year: Optional[int], seed_hits: int, overview: str) -> int:
|
|
bonus = 0
|
|
|
|
if release_year is not None:
|
|
if 1985 <= release_year <= 2008:
|
|
bonus += 10
|
|
elif 2009 <= release_year <= 2012:
|
|
bonus += 4
|
|
elif release_year < 1985:
|
|
bonus += 2
|
|
else:
|
|
bonus -= 8
|
|
|
|
bonus += min(seed_hits * 5, 20)
|
|
|
|
text = (overview or "").lower()
|
|
for term in SEED_BONUS_TERMS:
|
|
if term in text:
|
|
bonus += 1
|
|
|
|
return bonus
|
|
|
|
|
|
def extract_year(date_str: str) -> Optional[int]:
|
|
try:
|
|
if not date_str or len(date_str) < 4:
|
|
return None
|
|
return int(date_str[:4])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def should_hard_reject(details: Dict) -> bool:
|
|
genre_names = [g["name"] for g in details.get("genres", [])]
|
|
year = extract_year(details.get("release_date", ""))
|
|
|
|
if EXCLUDE_ANIMATION and "Animation" in genre_names:
|
|
return True
|
|
|
|
if EXCLUDE_TOO_MODERN and year is not None and year > RELEASE_YEAR_TO:
|
|
return True
|
|
|
|
if year is not None:
|
|
if year < RELEASE_YEAR_FROM or year > RELEASE_YEAR_TO:
|
|
return True
|
|
|
|
if details.get("vote_count", 0) < MIN_TMDB_VOTES:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def should_keep_candidate(
|
|
combined: Optional[float],
|
|
imdb_score_100: Optional[float],
|
|
num_sources: int,
|
|
total_style_score: int,
|
|
) -> bool:
|
|
if combined is None:
|
|
return False
|
|
if num_sources < MIN_SOURCES_REQUIRED:
|
|
return False
|
|
if combined < MIN_COMBINED_SCORE:
|
|
return False
|
|
if imdb_score_100 is not None and imdb_score_100 < (MIN_IMDB_SCORE * 10):
|
|
return False
|
|
if total_style_score < 16:
|
|
return False
|
|
return True
|
|
|
|
|
|
# ============================================================
|
|
# PAYLOAD BUILDING
|
|
# ============================================================
|
|
|
|
def build_radarr_payload(lookup_obj: Dict, quality_profile_id: int) -> Dict:
|
|
payload = dict(lookup_obj)
|
|
payload["qualityProfileId"] = quality_profile_id
|
|
payload["rootFolderPath"] = ROOT_FOLDER_PATH
|
|
payload["monitored"] = True
|
|
|
|
# User requested NO search after add
|
|
payload["searchForMovie"] = False
|
|
payload["addOptions"] = {"searchForMovie": False}
|
|
|
|
payload.setdefault("minimumAvailability", "released")
|
|
return payload
|
|
|
|
|
|
# ============================================================
|
|
# SAFETY TESTS
|
|
# ============================================================
|
|
|
|
def test_radarr() -> int:
|
|
with Spinner("Checking Radarr connection"):
|
|
movies = get_existing_radarr_movies()
|
|
ok(f"Radarr OK - existing library count: {len(movies)}")
|
|
|
|
with Spinner("Checking Radarr root folders"):
|
|
rootfolders = get_radarr_rootfolders()
|
|
|
|
root_paths = {rf.get('path') for rf in rootfolders if rf.get('path')}
|
|
if ROOT_FOLDER_PATH not in root_paths:
|
|
fail(
|
|
f"ROOT_FOLDER_PATH '{ROOT_FOLDER_PATH}' does not exist in Radarr.\n"
|
|
f"Available root folders: {sorted(root_paths)}"
|
|
)
|
|
ok(f"Root folder exists: {ROOT_FOLDER_PATH}")
|
|
|
|
with Spinner(f"Resolving quality profile '{QUALITY_PROFILE_NAME}'"):
|
|
quality_profile_id = resolve_quality_profile_id(QUALITY_PROFILE_NAME)
|
|
|
|
ok(f"Quality profile resolved: {QUALITY_PROFILE_NAME} -> ID {quality_profile_id}")
|
|
return quality_profile_id
|
|
|
|
|
|
def test_tmdb() -> None:
|
|
auth_mode = "Bearer token" if TMDB_BEARER_TOKEN else "v3 api_key"
|
|
with Spinner(f"Testing TMDb using {auth_mode}"):
|
|
results = tmdb_search_movie("The Birdcage")
|
|
ok(f"TMDb OK - search returned {len(results)} result(s) for test query")
|
|
|
|
|
|
def test_omdb() -> None:
|
|
if not OMDB_API_KEY:
|
|
warn("OMDb skipped - no API key configured")
|
|
return
|
|
|
|
with Spinner("Testing OMDb"):
|
|
sample = omdb_by_imdb("tt0133093")
|
|
|
|
if not sample:
|
|
fail("OMDb test failed. No movie data returned for test lookup.")
|
|
ok(f"OMDb OK - test title: {sample.get('Title', '<unknown>')}")
|
|
|
|
|
|
def run_preflight() -> int:
|
|
section("PRE-FLIGHT CHECKS")
|
|
require_basic_config()
|
|
quality_profile_id = test_radarr()
|
|
test_tmdb()
|
|
test_omdb()
|
|
ok("Preflight checks passed")
|
|
return quality_profile_id
|
|
|
|
|
|
# ============================================================
|
|
# MAIN
|
|
# ============================================================
|
|
|
|
def main() -> None:
|
|
quality_profile_id = run_preflight()
|
|
|
|
section("LIBRARY SCAN")
|
|
with Spinner("Reading existing Radarr library"):
|
|
existing_movies = get_existing_radarr_movies()
|
|
existing_tmdb_ids, existing_imdb_ids = get_existing_ids(existing_movies)
|
|
|
|
ok(f"Existing movies in Radarr: {len(existing_movies)}")
|
|
info(f"DRY_RUN: {DRY_RUN}")
|
|
info(f"CONFIRM_ADD: {CONFIRM_ADD}")
|
|
info(f"Quality Profile: {QUALITY_PROFILE_NAME}")
|
|
info("Search after add: disabled")
|
|
|
|
section("SEED RESOLUTION")
|
|
seed_matches = find_seed_movie_ids(SEED_MOVIES)
|
|
|
|
if not seed_matches:
|
|
fail("No seed movies could be resolved from TMDb.")
|
|
|
|
section("CANDIDATE GATHERING")
|
|
with Spinner("Gathering similar movies from seed titles"):
|
|
candidate_pool = gather_candidates_from_seeds(seed_matches)
|
|
|
|
for tmdb_id in list(candidate_pool.keys()):
|
|
if tmdb_id in existing_tmdb_ids:
|
|
del candidate_pool[tmdb_id]
|
|
|
|
ok(f"Candidate pool from seed similarities: {len(candidate_pool)}")
|
|
|
|
ranked: List[Dict] = []
|
|
|
|
preliminary = sorted(
|
|
candidate_pool.values(),
|
|
key=lambda x: (x["seed_hits"], x["popularity"]),
|
|
reverse=True
|
|
)[:MAX_CANDIDATES_TO_SCORE]
|
|
|
|
section("SCORING CANDIDATES")
|
|
|
|
for index, item in enumerate(preliminary, start=1):
|
|
tmdb_id = int(item["tmdb_id"])
|
|
|
|
try:
|
|
with Spinner(f"Scoring {item.get('title', 'candidate')} ({index}/{len(preliminary)})"):
|
|
details = tmdb_movie_details(tmdb_id)
|
|
|
|
if should_hard_reject(details):
|
|
continue
|
|
|
|
imdb_id = details.get("imdb_id")
|
|
if imdb_id and imdb_id in existing_imdb_ids:
|
|
continue
|
|
|
|
omdb = omdb_by_imdb(imdb_id) if imdb_id else None
|
|
|
|
omdb_scores = extract_omdb_scores(omdb)
|
|
tmdb_scores = extract_tmdb_score(details)
|
|
|
|
all_scores: Dict[str, float] = {}
|
|
all_scores.update(omdb_scores)
|
|
all_scores.update(tmdb_scores)
|
|
|
|
combined = weighted_combined_score(all_scores)
|
|
imdb_score_100 = omdb_scores.get("IMDb")
|
|
genre_names = [g["name"] for g in details.get("genres", [])]
|
|
overview = details.get("overview", "")
|
|
bias = genre_bias_score(genre_names, overview)
|
|
year = extract_year(details.get("release_date", ""))
|
|
style_bonus = classic_bonus(year, item["seed_hits"], overview)
|
|
total_style_score = bias + style_bonus
|
|
|
|
if should_keep_candidate(
|
|
combined=combined,
|
|
imdb_score_100=imdb_score_100,
|
|
num_sources=len(all_scores),
|
|
total_style_score=total_style_score,
|
|
):
|
|
ranked.append({
|
|
"tmdb_id": tmdb_id,
|
|
"imdb_id": imdb_id,
|
|
"title": details.get("title"),
|
|
"year": (details.get("release_date") or "????")[:4],
|
|
"combined_score": combined,
|
|
"genre_bias": bias,
|
|
"style_bonus": style_bonus,
|
|
"total_style_score": total_style_score,
|
|
"seed_hits": item["seed_hits"],
|
|
"seed_titles": sorted(list(item["seed_titles"])),
|
|
"genres": genre_names,
|
|
"scores": all_scores,
|
|
"popularity": details.get("popularity", 0),
|
|
})
|
|
|
|
except ApiError as e:
|
|
err(f"Skipping TMDb ID {tmdb_id} due to API error:\n{e}\n")
|
|
except Exception as e:
|
|
err(f"Skipping TMDb ID {tmdb_id} due to unexpected error: {e}\n")
|
|
|
|
ranked.sort(
|
|
key=lambda x: (
|
|
x["seed_hits"],
|
|
x["total_style_score"],
|
|
x["combined_score"],
|
|
x["popularity"],
|
|
),
|
|
reverse=True
|
|
)
|
|
|
|
chosen = ranked[:MAX_TO_ADD]
|
|
|
|
section("SUGGESTED ADDITIONS")
|
|
|
|
if not chosen:
|
|
warn("No candidates passed the filters.")
|
|
return
|
|
|
|
for i, movie in enumerate(chosen, start=1):
|
|
print(
|
|
colour(f"{i:>2}. ", C.BOLD + C.GREEN) +
|
|
colour(f"{movie['title']} ({movie['year']})", C.BOLD) +
|
|
f" | combined={movie['combined_score']}" +
|
|
f" | style={movie['total_style_score']}" +
|
|
f" | seed_hits={movie['seed_hits']}" +
|
|
f" | genres={', '.join(movie['genres'])}" +
|
|
f" | similar_to={', '.join(movie['seed_titles'])}"
|
|
)
|
|
|
|
if DRY_RUN or not CONFIRM_ADD:
|
|
section("SAFE MODE")
|
|
warn("No movies will be added.")
|
|
if DRY_RUN:
|
|
warn("DRY_RUN is True")
|
|
if not CONFIRM_ADD:
|
|
warn("CONFIRM_ADD is False")
|
|
info("To actually add movies, edit the script and set:")
|
|
print(" DRY_RUN = False")
|
|
print(" CONFIRM_ADD = True")
|
|
return
|
|
|
|
section("ADDING TO RADARR")
|
|
|
|
added = 0
|
|
for movie in chosen:
|
|
try:
|
|
with Spinner(f"Adding {movie['title']}"):
|
|
lookup = radarr_lookup_tmdb(movie["tmdb_id"])
|
|
payload = build_radarr_payload(lookup, quality_profile_id)
|
|
result = add_movie_to_radarr(payload)
|
|
|
|
added += 1
|
|
ok(f"Added: {result.get('title')} ({result.get('year')})")
|
|
except ApiError as e:
|
|
err(f"Failed to add {movie['title']}:\n{e}\n")
|
|
except Exception as e:
|
|
err(f"Failed to add {movie['title']}: {e}\n")
|
|
|
|
section("DONE")
|
|
ok(f"Added {added} movie(s)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except ApiError as e:
|
|
print()
|
|
err("FATAL ERROR")
|
|
print(e)
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
print()
|
|
warn("Stopped by user.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print()
|
|
err("UNEXPECTED ERROR")
|
|
print(repr(e))
|
|
sys.exit(1) |