import itertools import sys import threading import time from typing import Dict, List, Optional, Tuple import requests # ============================================================ # CONFIG # ============================================================ RADARR_URL = "http://10.0.0.2:7878" RADARR_API_KEY = "d393acb157a44dc2b0e2aede96278ad5" # Use ONE of these only TMDB_V3_API_KEY = "0dd6070b3573f3f54ba4c5530e36662b" TMDB_BEARER_TOKEN = "" # Optional OMDB_API_KEY = "" # Radarr settings QUALITY_PROFILE_NAME = "720p ENG" ROOT_FOLDER_PATH = r"D:\Movies" # Safety DRY_RUN = True CONFIRM_ADD = False MAX_TO_ADD = 10 MIN_COMBINED_SCORE = 64 MIN_IMDB_SCORE = 6.3 MIN_SOURCES_REQUIRED = 1 # Classic / comfort tuning RELEASE_YEAR_FROM = 1980 RELEASE_YEAR_TO = 2012 MIN_TMDB_VOTES = 250 MAX_SIMILAR_PAGES_PER_SEED = 2 MAX_CANDIDATES_TO_SCORE = 250 EXCLUDE_ANIMATION = True EXCLUDE_TOO_MODERN = True TMDB_BASE = "https://api.themoviedb.org/3" # Curated seed list SEED_MOVIES = [ "The Birdcage", "When Harry Met Sally", "You've Got Mail", "Sleepless in Seattle", "Notting Hill", "Moonstruck", "Father of the Bride", "My Best Friend's Wedding", "The First Wives Club", "Under the Tuscan Sun", "Julie & Julia", "Something's Gotta Give", "Must Love Dogs", "While You Were Sleeping", "Practical Magic", ] session = requests.Session() session.headers.update({"User-Agent": "radarr-classic-feelgood-script/4.0"}) # ============================================================ # CLI STYLING # ============================================================ class C: RESET = "\033[0m" BOLD = "\033[1m" RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" MAGENTA = "\033[95m" CYAN = "\033[96m" GREY = "\033[90m" def colour(text: str, code: str) -> str: return f"{code}{text}{C.RESET}" def info(text: str) -> None: print(colour(f"[INFO] {text}", C.CYAN)) def ok(text: str) -> None: print(colour(f"[OK] {text}", C.GREEN)) def warn(text: str) -> None: print(colour(f"[WARN] {text}", C.YELLOW)) def err(text: str) -> None: print(colour(f"[ERR] {text}", C.RED)) def section(title: str) -> None: print() print(colour("=" * 72, C.BLUE)) print(colour(title, C.BOLD + C.BLUE)) print(colour("=" * 72, C.BLUE)) class Spinner: def __init__(self, message: str): self.message = message self.frames = itertools.cycle(["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]) self.running = False self.thread: Optional[threading.Thread] = None def _spin(self): while self.running: frame = next(self.frames) sys.stdout.write(f"\r{colour(frame, C.MAGENTA)} {self.message}") sys.stdout.flush() time.sleep(0.09) sys.stdout.write("\r" + " " * (len(self.message) + 6) + "\r") sys.stdout.flush() def __enter__(self): self.running = True self.thread = threading.Thread(target=self._spin, daemon=True) self.thread.start() return self def __exit__(self, exc_type, exc, tb): self.running = False if self.thread: self.thread.join() # ============================================================ # HEURISTICS # ============================================================ POSITIVE_GENRES = { "Romance": 18, "Comedy": 16, "Drama": 10, "Music": 4, } NEGATIVE_GENRES = { "Animation": -30, "Horror": -25, "Science Fiction": -20, "Action": -16, "Thriller": -14, "Crime": -10, "War": -18, "Western": -14, "Documentary": -18, "Fantasy": -8, "Family": -6, } POSITIVE_TERMS = [ "wedding", "love", "romance", "relationship", "bride", "best friend", "friendship", "family", "holiday", "food", "small town", "bookstore", "restaurant", "writer", "divorce", "second chance", "starting over", "feel-good", "mother", "daughter", "sisters", "chosen family", "comedy of manners", ] NEGATIVE_TERMS = [ "war", "serial killer", "murder spree", "mercenary", "zombie", "post-apocalyptic", "gang", "assassin", "combat", "superhero", "multiverse", "alien invasion", "dystopian", "dragon", "animated adventure", ] SEED_BONUS_TERMS = [ "ensemble", "relationship", "family", "comedy", "romantic", "wedding", "friendship", "identity", "midlife", "second chance" ] # ============================================================ # ERRORS / UTILITIES # ============================================================ class ApiError(Exception): pass def is_placeholder(value: str) -> bool: bad_bits = [ "PUT_YOUR", "YOUR_", "REPLACE_ME", "CHANGE_ME", "PASTE_YOUR", "abc123", ] upper = (value or "").upper() return not value or any(bit in upper for bit in bad_bits) def safe_response_text(response: requests.Response) -> str: try: data = response.json() return str(data)[:700] except Exception: text = (response.text or "").strip() return text[:700] if text else "" def fail(message: str) -> None: raise ApiError(message) def request_json( method: str, url: str, *, service_name: str, headers: Optional[Dict] = None, params: Optional[Dict] = None, json_data: Optional[Dict] = None, ) -> Dict: try: response = session.request( method=method, url=url, headers=headers, params=params, json=json_data, timeout=30, ) except requests.RequestException as e: fail(f"{service_name} request failed before a response was received.\n{e}") if not response.ok: details = safe_response_text(response) msg = [ f"{service_name} request failed.", f"HTTP: {response.status_code}", f"Method: {method}", f"URL: {response.url}", f"Response: {details}", ] if response.status_code == 401: msg.append("Authentication failed.") if service_name == "TMDb": msg.append( "TMDb expects either:\n" "- TMDB_BEARER_TOKEN as Authorization: Bearer \n" "- or TMDB_V3_API_KEY as api_key query parameter" ) elif service_name == "Radarr": msg.append("Check RADARR_URL and RADARR_API_KEY.") elif service_name == "OMDb": msg.append("Check OMDB_API_KEY.") if response.status_code == 404: msg.append("Endpoint not found. Check the URL and API route.") if response.status_code == 429: msg.append("Rate limited. Slow the script down.") if 500 <= response.status_code <= 599: msg.append("Remote server-side error.") fail("\n".join(msg)) try: return response.json() except ValueError: fail( f"{service_name} returned non-JSON content.\n" f"HTTP: {response.status_code}\n" f"URL: {response.url}\n" f"Response: {safe_response_text(response)}" ) def require_basic_config() -> None: problems = [] if is_placeholder(RADARR_API_KEY): problems.append("RADARR_API_KEY is missing or still a placeholder.") if not TMDB_V3_API_KEY and not TMDB_BEARER_TOKEN: problems.append("Set either TMDB_V3_API_KEY or TMDB_BEARER_TOKEN.") if TMDB_V3_API_KEY and TMDB_BEARER_TOKEN: warn("Both TMDb credentials are present. Bearer token will be preferred.") if not ROOT_FOLDER_PATH: problems.append("ROOT_FOLDER_PATH is blank.") if not QUALITY_PROFILE_NAME.strip(): problems.append("QUALITY_PROFILE_NAME is blank.") if problems: fail("Configuration error(s):\n- " + "\n- ".join(problems)) if not OMDB_API_KEY: warn("OMDb key not set. Script will run in TMDb-only mode.") # ============================================================ # AUTH HELPERS # ============================================================ def radarr_headers() -> Dict[str, str]: return { "X-Api-Key": RADARR_API_KEY, "Content-Type": "application/json", } def get_tmdb_auth() -> Tuple[Dict[str, str], Dict[str, str]]: if TMDB_BEARER_TOKEN: return ( {"Authorization": f"Bearer {TMDB_BEARER_TOKEN}"}, {}, ) if TMDB_V3_API_KEY: return ( {}, {"api_key": TMDB_V3_API_KEY}, ) fail("TMDb auth missing. Set TMDB_BEARER_TOKEN or TMDB_V3_API_KEY.") return {}, {} # ============================================================ # RADARR # ============================================================ def get_existing_radarr_movies() -> List[Dict]: url = f"{RADARR_URL}/api/v3/movie" return request_json("GET", url, service_name="Radarr", headers=radarr_headers()) def get_radarr_rootfolders() -> List[Dict]: url = f"{RADARR_URL}/api/v3/rootfolder" return request_json("GET", url, service_name="Radarr", headers=radarr_headers()) def get_radarr_quality_profiles() -> List[Dict]: url = f"{RADARR_URL}/api/v3/qualityprofile" return request_json("GET", url, service_name="Radarr", headers=radarr_headers()) def resolve_quality_profile_id(profile_name: str) -> int: profiles = get_radarr_quality_profiles() exact_match = None partial_matches = [] for profile in profiles: name = str(profile.get("name", "")).strip() if name.lower() == profile_name.lower(): exact_match = profile break if profile_name.lower() in name.lower(): partial_matches.append(profile) if exact_match: return int(exact_match["id"]) if len(partial_matches) == 1: warn(f"Quality profile exact match not found. Using partial match: {partial_matches[0].get('name')}") return int(partial_matches[0]["id"]) available = ", ".join(sorted([str(p.get("name")) for p in profiles])) fail( f"Could not find Radarr quality profile '{profile_name}'.\n" f"Available profiles: {available}" ) return 0 def get_existing_ids(existing_movies: List[Dict]) -> Tuple[set, set]: tmdb_ids = set() imdb_ids = set() for movie in existing_movies: tmdb_id = movie.get("tmdbId") imdb_id = movie.get("imdbId") if tmdb_id: tmdb_ids.add(int(tmdb_id)) if imdb_id: imdb_ids.add(str(imdb_id).strip()) return tmdb_ids, imdb_ids def radarr_lookup_tmdb(tmdb_id: int) -> Dict: url = f"{RADARR_URL}/api/v3/movie/lookup/tmdb" return request_json( "GET", url, service_name="Radarr", headers=radarr_headers(), params={"tmdbId": tmdb_id}, ) def add_movie_to_radarr(movie_payload: Dict) -> Dict: url = f"{RADARR_URL}/api/v3/movie" return request_json( "POST", url, service_name="Radarr", headers=radarr_headers(), json_data=movie_payload, ) # ============================================================ # TMDB # ============================================================ def tmdb_search_movie(title: str, year: Optional[int] = None) -> List[Dict]: auth_headers, auth_params = get_tmdb_auth() url = f"{TMDB_BASE}/search/movie" params = { **auth_params, "query": title, "language": "en-US", "include_adult": "false", "page": 1, } if year: params["year"] = year data = request_json( "GET", url, service_name="TMDb", headers=auth_headers, params=params, ) return data.get("results", []) def tmdb_movie_details(tmdb_id: int) -> Dict: auth_headers, auth_params = get_tmdb_auth() url = f"{TMDB_BASE}/movie/{tmdb_id}" params = { **auth_params, "language": "en-US", } return request_json( "GET", url, service_name="TMDb", headers=auth_headers, params=params, ) def tmdb_similar_movies(tmdb_id: int, page: int = 1) -> List[Dict]: auth_headers, auth_params = get_tmdb_auth() url = f"{TMDB_BASE}/movie/{tmdb_id}/similar" params = { **auth_params, "language": "en-US", "page": page, } data = request_json( "GET", url, service_name="TMDb", headers=auth_headers, params=params, ) return data.get("results", []) def find_seed_movie_ids(seed_titles: List[str]) -> List[Dict]: seeds: List[Dict] = [] for title in seed_titles: with Spinner(f"Finding seed: {title}"): results = tmdb_search_movie(title) if not results: warn(f"Seed not found: {title}") continue best = results[0] seeds.append({ "seed_title": title, "tmdb_id": best["id"], "matched_title": best.get("title", title), "release_date": best.get("release_date", ""), }) ok(f"Seed matched: {title} -> {best.get('title')} ({(best.get('release_date') or '')[:4]})") time.sleep(0.05) return seeds def gather_candidates_from_seeds(seed_matches: List[Dict]) -> Dict[int, Dict]: pool: Dict[int, Dict] = {} for seed in seed_matches: seed_id = seed["tmdb_id"] seed_title = seed["matched_title"] for page in range(1, MAX_SIMILAR_PAGES_PER_SEED + 1): with Spinner(f"Pulling similar titles for {seed_title} (page {page})"): results = tmdb_similar_movies(seed_id, page=page) for item in results: tmdb_id = item.get("id") if not tmdb_id: continue if tmdb_id not in pool: pool[tmdb_id] = { "tmdb_id": tmdb_id, "title": item.get("title"), "release_date": item.get("release_date", ""), "seed_hits": 0, "seed_titles": set(), "popularity": item.get("popularity", 0), } pool[tmdb_id]["seed_hits"] += 1 pool[tmdb_id]["seed_titles"].add(seed_title) info(f"Collected {len(results)} similar titles from {seed_title} page {page}") time.sleep(0.05) return pool # ============================================================ # OMDB # ============================================================ def omdb_by_imdb(imdb_id: str) -> Optional[Dict]: if not imdb_id: return None if not OMDB_API_KEY: return None url = "https://www.omdbapi.com/" params = { "apikey": OMDB_API_KEY, "i": imdb_id, "plot": "short", "r": "json", } data = request_json( "GET", url, service_name="OMDb", params=params, ) if data.get("Response") == "False": return None return data # ============================================================ # SCORING # ============================================================ def parse_percent(value: str) -> Optional[float]: try: return float(value.replace("%", "").strip()) except Exception: return None def parse_imdb_to_100(value: str) -> Optional[float]: try: return float(value.strip()) * 10.0 except Exception: return None def parse_metacritic(value: str) -> Optional[float]: try: raw = value.split("/")[0].strip() return float(raw) except Exception: return None def extract_omdb_scores(omdb: Optional[Dict]) -> Dict[str, float]: scores: Dict[str, float] = {} if not omdb: return scores imdb_rating = omdb.get("imdbRating") if imdb_rating and imdb_rating != "N/A": parsed = parse_imdb_to_100(imdb_rating) if parsed is not None: scores["IMDb"] = parsed for item in omdb.get("Ratings", []): source = item.get("Source") value = item.get("Value", "") if source == "Rotten Tomatoes": parsed = parse_percent(value) if parsed is not None: scores["RottenTomatoes"] = parsed elif source == "Metacritic": parsed = parse_metacritic(value) if parsed is not None: scores["Metacritic"] = parsed return scores def extract_tmdb_score(tmdb_details: Dict) -> Dict[str, float]: scores: Dict[str, float] = {} vote_average = tmdb_details.get("vote_average") if vote_average is not None: scores["TMDb"] = float(vote_average) * 10.0 return scores def weighted_combined_score(all_scores: Dict[str, float]) -> Optional[float]: if not all_scores: return None weights = { "IMDb": 0.40, "RottenTomatoes": 0.28, "Metacritic": 0.20, "TMDb": 0.12, } total = 0.0 total_weight = 0.0 for source, value in all_scores.items(): weight = weights.get(source, 0.0) if weight > 0: total += value * weight total_weight += weight if total_weight == 0: return None return round(total / total_weight, 1) def genre_bias_score(genre_names: List[str], overview: str) -> int: score = 0 for genre in genre_names: score += POSITIVE_GENRES.get(genre, 0) score += NEGATIVE_GENRES.get(genre, 0) text = (overview or "").lower() for term in POSITIVE_TERMS: if term in text: score += 2 for term in NEGATIVE_TERMS: if term in text: score -= 2 return score def classic_bonus(release_year: Optional[int], seed_hits: int, overview: str) -> int: bonus = 0 if release_year is not None: if 1985 <= release_year <= 2008: bonus += 10 elif 2009 <= release_year <= 2012: bonus += 4 elif release_year < 1985: bonus += 2 else: bonus -= 8 bonus += min(seed_hits * 5, 20) text = (overview or "").lower() for term in SEED_BONUS_TERMS: if term in text: bonus += 1 return bonus def extract_year(date_str: str) -> Optional[int]: try: if not date_str or len(date_str) < 4: return None return int(date_str[:4]) except Exception: return None def should_hard_reject(details: Dict) -> bool: genre_names = [g["name"] for g in details.get("genres", [])] year = extract_year(details.get("release_date", "")) if EXCLUDE_ANIMATION and "Animation" in genre_names: return True if EXCLUDE_TOO_MODERN and year is not None and year > RELEASE_YEAR_TO: return True if year is not None: if year < RELEASE_YEAR_FROM or year > RELEASE_YEAR_TO: return True if details.get("vote_count", 0) < MIN_TMDB_VOTES: return True return False def should_keep_candidate( combined: Optional[float], imdb_score_100: Optional[float], num_sources: int, total_style_score: int, ) -> bool: if combined is None: return False if num_sources < MIN_SOURCES_REQUIRED: return False if combined < MIN_COMBINED_SCORE: return False if imdb_score_100 is not None and imdb_score_100 < (MIN_IMDB_SCORE * 10): return False if total_style_score < 16: return False return True # ============================================================ # PAYLOAD BUILDING # ============================================================ def build_radarr_payload(lookup_obj: Dict, quality_profile_id: int) -> Dict: payload = dict(lookup_obj) payload["qualityProfileId"] = quality_profile_id payload["rootFolderPath"] = ROOT_FOLDER_PATH payload["monitored"] = True # User requested NO search after add payload["searchForMovie"] = False payload["addOptions"] = {"searchForMovie": False} payload.setdefault("minimumAvailability", "released") return payload # ============================================================ # SAFETY TESTS # ============================================================ def test_radarr() -> int: with Spinner("Checking Radarr connection"): movies = get_existing_radarr_movies() ok(f"Radarr OK - existing library count: {len(movies)}") with Spinner("Checking Radarr root folders"): rootfolders = get_radarr_rootfolders() root_paths = {rf.get('path') for rf in rootfolders if rf.get('path')} if ROOT_FOLDER_PATH not in root_paths: fail( f"ROOT_FOLDER_PATH '{ROOT_FOLDER_PATH}' does not exist in Radarr.\n" f"Available root folders: {sorted(root_paths)}" ) ok(f"Root folder exists: {ROOT_FOLDER_PATH}") with Spinner(f"Resolving quality profile '{QUALITY_PROFILE_NAME}'"): quality_profile_id = resolve_quality_profile_id(QUALITY_PROFILE_NAME) ok(f"Quality profile resolved: {QUALITY_PROFILE_NAME} -> ID {quality_profile_id}") return quality_profile_id def test_tmdb() -> None: auth_mode = "Bearer token" if TMDB_BEARER_TOKEN else "v3 api_key" with Spinner(f"Testing TMDb using {auth_mode}"): results = tmdb_search_movie("The Birdcage") ok(f"TMDb OK - search returned {len(results)} result(s) for test query") def test_omdb() -> None: if not OMDB_API_KEY: warn("OMDb skipped - no API key configured") return with Spinner("Testing OMDb"): sample = omdb_by_imdb("tt0133093") if not sample: fail("OMDb test failed. No movie data returned for test lookup.") ok(f"OMDb OK - test title: {sample.get('Title', '')}") def run_preflight() -> int: section("PRE-FLIGHT CHECKS") require_basic_config() quality_profile_id = test_radarr() test_tmdb() test_omdb() ok("Preflight checks passed") return quality_profile_id # ============================================================ # MAIN # ============================================================ def main() -> None: quality_profile_id = run_preflight() section("LIBRARY SCAN") with Spinner("Reading existing Radarr library"): existing_movies = get_existing_radarr_movies() existing_tmdb_ids, existing_imdb_ids = get_existing_ids(existing_movies) ok(f"Existing movies in Radarr: {len(existing_movies)}") info(f"DRY_RUN: {DRY_RUN}") info(f"CONFIRM_ADD: {CONFIRM_ADD}") info(f"Quality Profile: {QUALITY_PROFILE_NAME}") info("Search after add: disabled") section("SEED RESOLUTION") seed_matches = find_seed_movie_ids(SEED_MOVIES) if not seed_matches: fail("No seed movies could be resolved from TMDb.") section("CANDIDATE GATHERING") with Spinner("Gathering similar movies from seed titles"): candidate_pool = gather_candidates_from_seeds(seed_matches) for tmdb_id in list(candidate_pool.keys()): if tmdb_id in existing_tmdb_ids: del candidate_pool[tmdb_id] ok(f"Candidate pool from seed similarities: {len(candidate_pool)}") ranked: List[Dict] = [] preliminary = sorted( candidate_pool.values(), key=lambda x: (x["seed_hits"], x["popularity"]), reverse=True )[:MAX_CANDIDATES_TO_SCORE] section("SCORING CANDIDATES") for index, item in enumerate(preliminary, start=1): tmdb_id = int(item["tmdb_id"]) try: with Spinner(f"Scoring {item.get('title', 'candidate')} ({index}/{len(preliminary)})"): details = tmdb_movie_details(tmdb_id) if should_hard_reject(details): continue imdb_id = details.get("imdb_id") if imdb_id and imdb_id in existing_imdb_ids: continue omdb = omdb_by_imdb(imdb_id) if imdb_id else None omdb_scores = extract_omdb_scores(omdb) tmdb_scores = extract_tmdb_score(details) all_scores: Dict[str, float] = {} all_scores.update(omdb_scores) all_scores.update(tmdb_scores) combined = weighted_combined_score(all_scores) imdb_score_100 = omdb_scores.get("IMDb") genre_names = [g["name"] for g in details.get("genres", [])] overview = details.get("overview", "") bias = genre_bias_score(genre_names, overview) year = extract_year(details.get("release_date", "")) style_bonus = classic_bonus(year, item["seed_hits"], overview) total_style_score = bias + style_bonus if should_keep_candidate( combined=combined, imdb_score_100=imdb_score_100, num_sources=len(all_scores), total_style_score=total_style_score, ): ranked.append({ "tmdb_id": tmdb_id, "imdb_id": imdb_id, "title": details.get("title"), "year": (details.get("release_date") or "????")[:4], "combined_score": combined, "genre_bias": bias, "style_bonus": style_bonus, "total_style_score": total_style_score, "seed_hits": item["seed_hits"], "seed_titles": sorted(list(item["seed_titles"])), "genres": genre_names, "scores": all_scores, "popularity": details.get("popularity", 0), }) except ApiError as e: err(f"Skipping TMDb ID {tmdb_id} due to API error:\n{e}\n") except Exception as e: err(f"Skipping TMDb ID {tmdb_id} due to unexpected error: {e}\n") ranked.sort( key=lambda x: ( x["seed_hits"], x["total_style_score"], x["combined_score"], x["popularity"], ), reverse=True ) chosen = ranked[:MAX_TO_ADD] section("SUGGESTED ADDITIONS") if not chosen: warn("No candidates passed the filters.") return for i, movie in enumerate(chosen, start=1): print( colour(f"{i:>2}. ", C.BOLD + C.GREEN) + colour(f"{movie['title']} ({movie['year']})", C.BOLD) + f" | combined={movie['combined_score']}" + f" | style={movie['total_style_score']}" + f" | seed_hits={movie['seed_hits']}" + f" | genres={', '.join(movie['genres'])}" + f" | similar_to={', '.join(movie['seed_titles'])}" ) if DRY_RUN or not CONFIRM_ADD: section("SAFE MODE") warn("No movies will be added.") if DRY_RUN: warn("DRY_RUN is True") if not CONFIRM_ADD: warn("CONFIRM_ADD is False") info("To actually add movies, edit the script and set:") print(" DRY_RUN = False") print(" CONFIRM_ADD = True") return section("ADDING TO RADARR") added = 0 for movie in chosen: try: with Spinner(f"Adding {movie['title']}"): lookup = radarr_lookup_tmdb(movie["tmdb_id"]) payload = build_radarr_payload(lookup, quality_profile_id) result = add_movie_to_radarr(payload) added += 1 ok(f"Added: {result.get('title')} ({result.get('year')})") except ApiError as e: err(f"Failed to add {movie['title']}:\n{e}\n") except Exception as e: err(f"Failed to add {movie['title']}: {e}\n") section("DONE") ok(f"Added {added} movie(s)") if __name__ == "__main__": try: main() except ApiError as e: print() err("FATAL ERROR") print(e) sys.exit(1) except KeyboardInterrupt: print() warn("Stopped by user.") sys.exit(1) except Exception as e: print() err("UNEXPECTED ERROR") print(repr(e)) sys.exit(1)