Files
embyhomescreenedit/test.py
T
2026-04-25 22:57:08 +12:00

1018 lines
28 KiB
Python

import itertools
import sys
import threading
import time
from typing import Dict, List, Optional, Tuple
import requests
# ============================================================
# CONFIG
# ============================================================
RADARR_URL = "http://10.0.0.2:7878"
RADARR_API_KEY = "d393acb157a44dc2b0e2aede96278ad5"
# Use ONE of these only
TMDB_V3_API_KEY = "0dd6070b3573f3f54ba4c5530e36662b"
TMDB_BEARER_TOKEN = ""
# Optional
OMDB_API_KEY = ""
# Radarr settings
QUALITY_PROFILE_NAME = "720p ENG"
ROOT_FOLDER_PATH = r"D:\Movies"
# Safety
DRY_RUN = True
CONFIRM_ADD = False
MAX_TO_ADD = 10
MIN_COMBINED_SCORE = 64
MIN_IMDB_SCORE = 6.3
MIN_SOURCES_REQUIRED = 1
# Classic / comfort tuning
RELEASE_YEAR_FROM = 1980
RELEASE_YEAR_TO = 2012
MIN_TMDB_VOTES = 250
MAX_SIMILAR_PAGES_PER_SEED = 2
MAX_CANDIDATES_TO_SCORE = 250
EXCLUDE_ANIMATION = True
EXCLUDE_TOO_MODERN = True
TMDB_BASE = "https://api.themoviedb.org/3"
# Curated seed list
SEED_MOVIES = [
"The Birdcage",
"When Harry Met Sally",
"You've Got Mail",
"Sleepless in Seattle",
"Notting Hill",
"Moonstruck",
"Father of the Bride",
"My Best Friend's Wedding",
"The First Wives Club",
"Under the Tuscan Sun",
"Julie & Julia",
"Something's Gotta Give",
"Must Love Dogs",
"While You Were Sleeping",
"Practical Magic",
]
session = requests.Session()
session.headers.update({"User-Agent": "radarr-classic-feelgood-script/4.0"})
# ============================================================
# CLI STYLING
# ============================================================
class C:
RESET = "\033[0m"
BOLD = "\033[1m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
GREY = "\033[90m"
def colour(text: str, code: str) -> str:
return f"{code}{text}{C.RESET}"
def info(text: str) -> None:
print(colour(f"[INFO] {text}", C.CYAN))
def ok(text: str) -> None:
print(colour(f"[OK] {text}", C.GREEN))
def warn(text: str) -> None:
print(colour(f"[WARN] {text}", C.YELLOW))
def err(text: str) -> None:
print(colour(f"[ERR] {text}", C.RED))
def section(title: str) -> None:
print()
print(colour("=" * 72, C.BLUE))
print(colour(title, C.BOLD + C.BLUE))
print(colour("=" * 72, C.BLUE))
class Spinner:
def __init__(self, message: str):
self.message = message
self.frames = itertools.cycle(["", "", "", "", "", "", "", "", "", ""])
self.running = False
self.thread: Optional[threading.Thread] = None
def _spin(self):
while self.running:
frame = next(self.frames)
sys.stdout.write(f"\r{colour(frame, C.MAGENTA)} {self.message}")
sys.stdout.flush()
time.sleep(0.09)
sys.stdout.write("\r" + " " * (len(self.message) + 6) + "\r")
sys.stdout.flush()
def __enter__(self):
self.running = True
self.thread = threading.Thread(target=self._spin, daemon=True)
self.thread.start()
return self
def __exit__(self, exc_type, exc, tb):
self.running = False
if self.thread:
self.thread.join()
# ============================================================
# HEURISTICS
# ============================================================
POSITIVE_GENRES = {
"Romance": 18,
"Comedy": 16,
"Drama": 10,
"Music": 4,
}
NEGATIVE_GENRES = {
"Animation": -30,
"Horror": -25,
"Science Fiction": -20,
"Action": -16,
"Thriller": -14,
"Crime": -10,
"War": -18,
"Western": -14,
"Documentary": -18,
"Fantasy": -8,
"Family": -6,
}
POSITIVE_TERMS = [
"wedding", "love", "romance", "relationship", "bride",
"best friend", "friendship", "family", "holiday", "food",
"small town", "bookstore", "restaurant", "writer", "divorce",
"second chance", "starting over", "feel-good", "mother",
"daughter", "sisters", "chosen family", "comedy of manners",
]
NEGATIVE_TERMS = [
"war", "serial killer", "murder spree", "mercenary",
"zombie", "post-apocalyptic", "gang", "assassin", "combat",
"superhero", "multiverse", "alien invasion", "dystopian",
"dragon", "animated adventure",
]
SEED_BONUS_TERMS = [
"ensemble", "relationship", "family", "comedy", "romantic",
"wedding", "friendship", "identity", "midlife", "second chance"
]
# ============================================================
# ERRORS / UTILITIES
# ============================================================
class ApiError(Exception):
pass
def is_placeholder(value: str) -> bool:
bad_bits = [
"PUT_YOUR",
"YOUR_",
"REPLACE_ME",
"CHANGE_ME",
"PASTE_YOUR",
"abc123",
]
upper = (value or "").upper()
return not value or any(bit in upper for bit in bad_bits)
def safe_response_text(response: requests.Response) -> str:
try:
data = response.json()
return str(data)[:700]
except Exception:
text = (response.text or "").strip()
return text[:700] if text else "<empty>"
def fail(message: str) -> None:
raise ApiError(message)
def request_json(
method: str,
url: str,
*,
service_name: str,
headers: Optional[Dict] = None,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
) -> Dict:
try:
response = session.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_data,
timeout=30,
)
except requests.RequestException as e:
fail(f"{service_name} request failed before a response was received.\n{e}")
if not response.ok:
details = safe_response_text(response)
msg = [
f"{service_name} request failed.",
f"HTTP: {response.status_code}",
f"Method: {method}",
f"URL: {response.url}",
f"Response: {details}",
]
if response.status_code == 401:
msg.append("Authentication failed.")
if service_name == "TMDb":
msg.append(
"TMDb expects either:\n"
"- TMDB_BEARER_TOKEN as Authorization: Bearer <token>\n"
"- or TMDB_V3_API_KEY as api_key query parameter"
)
elif service_name == "Radarr":
msg.append("Check RADARR_URL and RADARR_API_KEY.")
elif service_name == "OMDb":
msg.append("Check OMDB_API_KEY.")
if response.status_code == 404:
msg.append("Endpoint not found. Check the URL and API route.")
if response.status_code == 429:
msg.append("Rate limited. Slow the script down.")
if 500 <= response.status_code <= 599:
msg.append("Remote server-side error.")
fail("\n".join(msg))
try:
return response.json()
except ValueError:
fail(
f"{service_name} returned non-JSON content.\n"
f"HTTP: {response.status_code}\n"
f"URL: {response.url}\n"
f"Response: {safe_response_text(response)}"
)
def require_basic_config() -> None:
problems = []
if is_placeholder(RADARR_API_KEY):
problems.append("RADARR_API_KEY is missing or still a placeholder.")
if not TMDB_V3_API_KEY and not TMDB_BEARER_TOKEN:
problems.append("Set either TMDB_V3_API_KEY or TMDB_BEARER_TOKEN.")
if TMDB_V3_API_KEY and TMDB_BEARER_TOKEN:
warn("Both TMDb credentials are present. Bearer token will be preferred.")
if not ROOT_FOLDER_PATH:
problems.append("ROOT_FOLDER_PATH is blank.")
if not QUALITY_PROFILE_NAME.strip():
problems.append("QUALITY_PROFILE_NAME is blank.")
if problems:
fail("Configuration error(s):\n- " + "\n- ".join(problems))
if not OMDB_API_KEY:
warn("OMDb key not set. Script will run in TMDb-only mode.")
# ============================================================
# AUTH HELPERS
# ============================================================
def radarr_headers() -> Dict[str, str]:
return {
"X-Api-Key": RADARR_API_KEY,
"Content-Type": "application/json",
}
def get_tmdb_auth() -> Tuple[Dict[str, str], Dict[str, str]]:
if TMDB_BEARER_TOKEN:
return (
{"Authorization": f"Bearer {TMDB_BEARER_TOKEN}"},
{},
)
if TMDB_V3_API_KEY:
return (
{},
{"api_key": TMDB_V3_API_KEY},
)
fail("TMDb auth missing. Set TMDB_BEARER_TOKEN or TMDB_V3_API_KEY.")
return {}, {}
# ============================================================
# RADARR
# ============================================================
def get_existing_radarr_movies() -> List[Dict]:
url = f"{RADARR_URL}/api/v3/movie"
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
def get_radarr_rootfolders() -> List[Dict]:
url = f"{RADARR_URL}/api/v3/rootfolder"
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
def get_radarr_quality_profiles() -> List[Dict]:
url = f"{RADARR_URL}/api/v3/qualityprofile"
return request_json("GET", url, service_name="Radarr", headers=radarr_headers())
def resolve_quality_profile_id(profile_name: str) -> int:
profiles = get_radarr_quality_profiles()
exact_match = None
partial_matches = []
for profile in profiles:
name = str(profile.get("name", "")).strip()
if name.lower() == profile_name.lower():
exact_match = profile
break
if profile_name.lower() in name.lower():
partial_matches.append(profile)
if exact_match:
return int(exact_match["id"])
if len(partial_matches) == 1:
warn(f"Quality profile exact match not found. Using partial match: {partial_matches[0].get('name')}")
return int(partial_matches[0]["id"])
available = ", ".join(sorted([str(p.get("name")) for p in profiles]))
fail(
f"Could not find Radarr quality profile '{profile_name}'.\n"
f"Available profiles: {available}"
)
return 0
def get_existing_ids(existing_movies: List[Dict]) -> Tuple[set, set]:
tmdb_ids = set()
imdb_ids = set()
for movie in existing_movies:
tmdb_id = movie.get("tmdbId")
imdb_id = movie.get("imdbId")
if tmdb_id:
tmdb_ids.add(int(tmdb_id))
if imdb_id:
imdb_ids.add(str(imdb_id).strip())
return tmdb_ids, imdb_ids
def radarr_lookup_tmdb(tmdb_id: int) -> Dict:
url = f"{RADARR_URL}/api/v3/movie/lookup/tmdb"
return request_json(
"GET",
url,
service_name="Radarr",
headers=radarr_headers(),
params={"tmdbId": tmdb_id},
)
def add_movie_to_radarr(movie_payload: Dict) -> Dict:
url = f"{RADARR_URL}/api/v3/movie"
return request_json(
"POST",
url,
service_name="Radarr",
headers=radarr_headers(),
json_data=movie_payload,
)
# ============================================================
# TMDB
# ============================================================
def tmdb_search_movie(title: str, year: Optional[int] = None) -> List[Dict]:
auth_headers, auth_params = get_tmdb_auth()
url = f"{TMDB_BASE}/search/movie"
params = {
**auth_params,
"query": title,
"language": "en-US",
"include_adult": "false",
"page": 1,
}
if year:
params["year"] = year
data = request_json(
"GET",
url,
service_name="TMDb",
headers=auth_headers,
params=params,
)
return data.get("results", [])
def tmdb_movie_details(tmdb_id: int) -> Dict:
auth_headers, auth_params = get_tmdb_auth()
url = f"{TMDB_BASE}/movie/{tmdb_id}"
params = {
**auth_params,
"language": "en-US",
}
return request_json(
"GET",
url,
service_name="TMDb",
headers=auth_headers,
params=params,
)
def tmdb_similar_movies(tmdb_id: int, page: int = 1) -> List[Dict]:
auth_headers, auth_params = get_tmdb_auth()
url = f"{TMDB_BASE}/movie/{tmdb_id}/similar"
params = {
**auth_params,
"language": "en-US",
"page": page,
}
data = request_json(
"GET",
url,
service_name="TMDb",
headers=auth_headers,
params=params,
)
return data.get("results", [])
def find_seed_movie_ids(seed_titles: List[str]) -> List[Dict]:
seeds: List[Dict] = []
for title in seed_titles:
with Spinner(f"Finding seed: {title}"):
results = tmdb_search_movie(title)
if not results:
warn(f"Seed not found: {title}")
continue
best = results[0]
seeds.append({
"seed_title": title,
"tmdb_id": best["id"],
"matched_title": best.get("title", title),
"release_date": best.get("release_date", ""),
})
ok(f"Seed matched: {title} -> {best.get('title')} ({(best.get('release_date') or '')[:4]})")
time.sleep(0.05)
return seeds
def gather_candidates_from_seeds(seed_matches: List[Dict]) -> Dict[int, Dict]:
pool: Dict[int, Dict] = {}
for seed in seed_matches:
seed_id = seed["tmdb_id"]
seed_title = seed["matched_title"]
for page in range(1, MAX_SIMILAR_PAGES_PER_SEED + 1):
with Spinner(f"Pulling similar titles for {seed_title} (page {page})"):
results = tmdb_similar_movies(seed_id, page=page)
for item in results:
tmdb_id = item.get("id")
if not tmdb_id:
continue
if tmdb_id not in pool:
pool[tmdb_id] = {
"tmdb_id": tmdb_id,
"title": item.get("title"),
"release_date": item.get("release_date", ""),
"seed_hits": 0,
"seed_titles": set(),
"popularity": item.get("popularity", 0),
}
pool[tmdb_id]["seed_hits"] += 1
pool[tmdb_id]["seed_titles"].add(seed_title)
info(f"Collected {len(results)} similar titles from {seed_title} page {page}")
time.sleep(0.05)
return pool
# ============================================================
# OMDB
# ============================================================
def omdb_by_imdb(imdb_id: str) -> Optional[Dict]:
if not imdb_id:
return None
if not OMDB_API_KEY:
return None
url = "https://www.omdbapi.com/"
params = {
"apikey": OMDB_API_KEY,
"i": imdb_id,
"plot": "short",
"r": "json",
}
data = request_json(
"GET",
url,
service_name="OMDb",
params=params,
)
if data.get("Response") == "False":
return None
return data
# ============================================================
# SCORING
# ============================================================
def parse_percent(value: str) -> Optional[float]:
try:
return float(value.replace("%", "").strip())
except Exception:
return None
def parse_imdb_to_100(value: str) -> Optional[float]:
try:
return float(value.strip()) * 10.0
except Exception:
return None
def parse_metacritic(value: str) -> Optional[float]:
try:
raw = value.split("/")[0].strip()
return float(raw)
except Exception:
return None
def extract_omdb_scores(omdb: Optional[Dict]) -> Dict[str, float]:
scores: Dict[str, float] = {}
if not omdb:
return scores
imdb_rating = omdb.get("imdbRating")
if imdb_rating and imdb_rating != "N/A":
parsed = parse_imdb_to_100(imdb_rating)
if parsed is not None:
scores["IMDb"] = parsed
for item in omdb.get("Ratings", []):
source = item.get("Source")
value = item.get("Value", "")
if source == "Rotten Tomatoes":
parsed = parse_percent(value)
if parsed is not None:
scores["RottenTomatoes"] = parsed
elif source == "Metacritic":
parsed = parse_metacritic(value)
if parsed is not None:
scores["Metacritic"] = parsed
return scores
def extract_tmdb_score(tmdb_details: Dict) -> Dict[str, float]:
scores: Dict[str, float] = {}
vote_average = tmdb_details.get("vote_average")
if vote_average is not None:
scores["TMDb"] = float(vote_average) * 10.0
return scores
def weighted_combined_score(all_scores: Dict[str, float]) -> Optional[float]:
if not all_scores:
return None
weights = {
"IMDb": 0.40,
"RottenTomatoes": 0.28,
"Metacritic": 0.20,
"TMDb": 0.12,
}
total = 0.0
total_weight = 0.0
for source, value in all_scores.items():
weight = weights.get(source, 0.0)
if weight > 0:
total += value * weight
total_weight += weight
if total_weight == 0:
return None
return round(total / total_weight, 1)
def genre_bias_score(genre_names: List[str], overview: str) -> int:
score = 0
for genre in genre_names:
score += POSITIVE_GENRES.get(genre, 0)
score += NEGATIVE_GENRES.get(genre, 0)
text = (overview or "").lower()
for term in POSITIVE_TERMS:
if term in text:
score += 2
for term in NEGATIVE_TERMS:
if term in text:
score -= 2
return score
def classic_bonus(release_year: Optional[int], seed_hits: int, overview: str) -> int:
bonus = 0
if release_year is not None:
if 1985 <= release_year <= 2008:
bonus += 10
elif 2009 <= release_year <= 2012:
bonus += 4
elif release_year < 1985:
bonus += 2
else:
bonus -= 8
bonus += min(seed_hits * 5, 20)
text = (overview or "").lower()
for term in SEED_BONUS_TERMS:
if term in text:
bonus += 1
return bonus
def extract_year(date_str: str) -> Optional[int]:
try:
if not date_str or len(date_str) < 4:
return None
return int(date_str[:4])
except Exception:
return None
def should_hard_reject(details: Dict) -> bool:
genre_names = [g["name"] for g in details.get("genres", [])]
year = extract_year(details.get("release_date", ""))
if EXCLUDE_ANIMATION and "Animation" in genre_names:
return True
if EXCLUDE_TOO_MODERN and year is not None and year > RELEASE_YEAR_TO:
return True
if year is not None:
if year < RELEASE_YEAR_FROM or year > RELEASE_YEAR_TO:
return True
if details.get("vote_count", 0) < MIN_TMDB_VOTES:
return True
return False
def should_keep_candidate(
combined: Optional[float],
imdb_score_100: Optional[float],
num_sources: int,
total_style_score: int,
) -> bool:
if combined is None:
return False
if num_sources < MIN_SOURCES_REQUIRED:
return False
if combined < MIN_COMBINED_SCORE:
return False
if imdb_score_100 is not None and imdb_score_100 < (MIN_IMDB_SCORE * 10):
return False
if total_style_score < 16:
return False
return True
# ============================================================
# PAYLOAD BUILDING
# ============================================================
def build_radarr_payload(lookup_obj: Dict, quality_profile_id: int) -> Dict:
payload = dict(lookup_obj)
payload["qualityProfileId"] = quality_profile_id
payload["rootFolderPath"] = ROOT_FOLDER_PATH
payload["monitored"] = True
# User requested NO search after add
payload["searchForMovie"] = False
payload["addOptions"] = {"searchForMovie": False}
payload.setdefault("minimumAvailability", "released")
return payload
# ============================================================
# SAFETY TESTS
# ============================================================
def test_radarr() -> int:
with Spinner("Checking Radarr connection"):
movies = get_existing_radarr_movies()
ok(f"Radarr OK - existing library count: {len(movies)}")
with Spinner("Checking Radarr root folders"):
rootfolders = get_radarr_rootfolders()
root_paths = {rf.get('path') for rf in rootfolders if rf.get('path')}
if ROOT_FOLDER_PATH not in root_paths:
fail(
f"ROOT_FOLDER_PATH '{ROOT_FOLDER_PATH}' does not exist in Radarr.\n"
f"Available root folders: {sorted(root_paths)}"
)
ok(f"Root folder exists: {ROOT_FOLDER_PATH}")
with Spinner(f"Resolving quality profile '{QUALITY_PROFILE_NAME}'"):
quality_profile_id = resolve_quality_profile_id(QUALITY_PROFILE_NAME)
ok(f"Quality profile resolved: {QUALITY_PROFILE_NAME} -> ID {quality_profile_id}")
return quality_profile_id
def test_tmdb() -> None:
auth_mode = "Bearer token" if TMDB_BEARER_TOKEN else "v3 api_key"
with Spinner(f"Testing TMDb using {auth_mode}"):
results = tmdb_search_movie("The Birdcage")
ok(f"TMDb OK - search returned {len(results)} result(s) for test query")
def test_omdb() -> None:
if not OMDB_API_KEY:
warn("OMDb skipped - no API key configured")
return
with Spinner("Testing OMDb"):
sample = omdb_by_imdb("tt0133093")
if not sample:
fail("OMDb test failed. No movie data returned for test lookup.")
ok(f"OMDb OK - test title: {sample.get('Title', '<unknown>')}")
def run_preflight() -> int:
section("PRE-FLIGHT CHECKS")
require_basic_config()
quality_profile_id = test_radarr()
test_tmdb()
test_omdb()
ok("Preflight checks passed")
return quality_profile_id
# ============================================================
# MAIN
# ============================================================
def main() -> None:
quality_profile_id = run_preflight()
section("LIBRARY SCAN")
with Spinner("Reading existing Radarr library"):
existing_movies = get_existing_radarr_movies()
existing_tmdb_ids, existing_imdb_ids = get_existing_ids(existing_movies)
ok(f"Existing movies in Radarr: {len(existing_movies)}")
info(f"DRY_RUN: {DRY_RUN}")
info(f"CONFIRM_ADD: {CONFIRM_ADD}")
info(f"Quality Profile: {QUALITY_PROFILE_NAME}")
info("Search after add: disabled")
section("SEED RESOLUTION")
seed_matches = find_seed_movie_ids(SEED_MOVIES)
if not seed_matches:
fail("No seed movies could be resolved from TMDb.")
section("CANDIDATE GATHERING")
with Spinner("Gathering similar movies from seed titles"):
candidate_pool = gather_candidates_from_seeds(seed_matches)
for tmdb_id in list(candidate_pool.keys()):
if tmdb_id in existing_tmdb_ids:
del candidate_pool[tmdb_id]
ok(f"Candidate pool from seed similarities: {len(candidate_pool)}")
ranked: List[Dict] = []
preliminary = sorted(
candidate_pool.values(),
key=lambda x: (x["seed_hits"], x["popularity"]),
reverse=True
)[:MAX_CANDIDATES_TO_SCORE]
section("SCORING CANDIDATES")
for index, item in enumerate(preliminary, start=1):
tmdb_id = int(item["tmdb_id"])
try:
with Spinner(f"Scoring {item.get('title', 'candidate')} ({index}/{len(preliminary)})"):
details = tmdb_movie_details(tmdb_id)
if should_hard_reject(details):
continue
imdb_id = details.get("imdb_id")
if imdb_id and imdb_id in existing_imdb_ids:
continue
omdb = omdb_by_imdb(imdb_id) if imdb_id else None
omdb_scores = extract_omdb_scores(omdb)
tmdb_scores = extract_tmdb_score(details)
all_scores: Dict[str, float] = {}
all_scores.update(omdb_scores)
all_scores.update(tmdb_scores)
combined = weighted_combined_score(all_scores)
imdb_score_100 = omdb_scores.get("IMDb")
genre_names = [g["name"] for g in details.get("genres", [])]
overview = details.get("overview", "")
bias = genre_bias_score(genre_names, overview)
year = extract_year(details.get("release_date", ""))
style_bonus = classic_bonus(year, item["seed_hits"], overview)
total_style_score = bias + style_bonus
if should_keep_candidate(
combined=combined,
imdb_score_100=imdb_score_100,
num_sources=len(all_scores),
total_style_score=total_style_score,
):
ranked.append({
"tmdb_id": tmdb_id,
"imdb_id": imdb_id,
"title": details.get("title"),
"year": (details.get("release_date") or "????")[:4],
"combined_score": combined,
"genre_bias": bias,
"style_bonus": style_bonus,
"total_style_score": total_style_score,
"seed_hits": item["seed_hits"],
"seed_titles": sorted(list(item["seed_titles"])),
"genres": genre_names,
"scores": all_scores,
"popularity": details.get("popularity", 0),
})
except ApiError as e:
err(f"Skipping TMDb ID {tmdb_id} due to API error:\n{e}\n")
except Exception as e:
err(f"Skipping TMDb ID {tmdb_id} due to unexpected error: {e}\n")
ranked.sort(
key=lambda x: (
x["seed_hits"],
x["total_style_score"],
x["combined_score"],
x["popularity"],
),
reverse=True
)
chosen = ranked[:MAX_TO_ADD]
section("SUGGESTED ADDITIONS")
if not chosen:
warn("No candidates passed the filters.")
return
for i, movie in enumerate(chosen, start=1):
print(
colour(f"{i:>2}. ", C.BOLD + C.GREEN) +
colour(f"{movie['title']} ({movie['year']})", C.BOLD) +
f" | combined={movie['combined_score']}" +
f" | style={movie['total_style_score']}" +
f" | seed_hits={movie['seed_hits']}" +
f" | genres={', '.join(movie['genres'])}" +
f" | similar_to={', '.join(movie['seed_titles'])}"
)
if DRY_RUN or not CONFIRM_ADD:
section("SAFE MODE")
warn("No movies will be added.")
if DRY_RUN:
warn("DRY_RUN is True")
if not CONFIRM_ADD:
warn("CONFIRM_ADD is False")
info("To actually add movies, edit the script and set:")
print(" DRY_RUN = False")
print(" CONFIRM_ADD = True")
return
section("ADDING TO RADARR")
added = 0
for movie in chosen:
try:
with Spinner(f"Adding {movie['title']}"):
lookup = radarr_lookup_tmdb(movie["tmdb_id"])
payload = build_radarr_payload(lookup, quality_profile_id)
result = add_movie_to_radarr(payload)
added += 1
ok(f"Added: {result.get('title')} ({result.get('year')})")
except ApiError as e:
err(f"Failed to add {movie['title']}:\n{e}\n")
except Exception as e:
err(f"Failed to add {movie['title']}: {e}\n")
section("DONE")
ok(f"Added {added} movie(s)")
if __name__ == "__main__":
try:
main()
except ApiError as e:
print()
err("FATAL ERROR")
print(e)
sys.exit(1)
except KeyboardInterrupt:
print()
warn("Stopped by user.")
sys.exit(1)
except Exception as e:
print()
err("UNEXPECTED ERROR")
print(repr(e))
sys.exit(1)