Files
sublogue/server/core/subtitle_processor.py
T

2084 lines
87 KiB
Python

from __future__ import annotations
import re
import logging
from logging_utils import get_logger
import textwrap
import time
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List, Tuple, Optional
from contextlib import contextmanager
# Import keyword stripper for filename cleaning
from core.keyword_stripper import get_stripper
# Cross-platform file locking
try:
import fcntl
_HAS_FCNTL = True
except ImportError:
_HAS_FCNTL = False
try:
import msvcrt
_HAS_MSVCRT = True
except ImportError:
_HAS_MSVCRT = False
logger = get_logger("SubtitleProcessor")
# ============================================================
# Sentinel tag for deterministic detection
# ============================================================
# This tag is used to mark Sublogue-generated blocks.
# It's deliberately unusual to never appear in real dialogue.
SUBLOGUE_SENTINEL = "{SUBLOGUE}"
# Regex pattern to match ALL Sublogue-style tokens for sanitization
# Matches: {SUBLOGUE}, {SUBLOGUE:anything}, {SUBLOGUE:123}, etc.
SUBLOGUE_TOKEN_PATTERN = re.compile(r"\{SUBLOGUE(?::[^}]*)?\}", re.IGNORECASE)
# ============================================================
# Reading time configuration (WPM-based timing model)
# ============================================================
# These constants define the reading speed model for subtitle timing.
# Based on research showing average reading speeds of 150-250 WPM,
# with 160 WPM being comfortable for on-screen subtitle consumption.
READING_WPM = 160 # Words per minute - comfortable reading pace
MIN_DURATION_SECONDS = 1.2 # Minimum time any subtitle should display
MAX_DURATION_SECONDS = 6.0 # Maximum time before text becomes stale
# ============================================================
# File locking for concurrency protection
# ============================================================
# Prevents race conditions where two tasks both pass _has_plot_fast()
# before either writes, causing duplicate plot insertions.
class FileLockError(Exception):
"""Raised when a file lock cannot be acquired."""
pass
@contextmanager
def file_lock(path: Path, timeout: float = 10.0):
"""
Cross-platform file lock context manager.
Uses a separate .lock file to avoid interfering with the actual file.
This ensures atomic operations on subtitle files.
Args:
path: Path to the file to lock
timeout: Maximum time to wait for lock (seconds)
Raises:
FileLockError: If lock cannot be acquired within timeout
"""
lock_path = path.with_suffix(path.suffix + ".lock")
lock_file = None
start_time = time.monotonic()
try:
# Create lock file and acquire lock
while True:
try:
# Try to create lock file exclusively (O_CREAT | O_EXCL)
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
lock_file = os.fdopen(fd, 'w')
lock_file.write(f"{os.getpid()}\n{time.time()}")
lock_file.flush()
break
except FileExistsError:
# Lock file exists - check if stale (older than 60 seconds)
try:
lock_stat = lock_path.stat()
if time.time() - lock_stat.st_mtime > 60:
# Stale lock - remove it
logger.warning(f"Removing stale lock file: {lock_path}")
lock_path.unlink(missing_ok=True)
continue
except OSError:
pass
# Check timeout
if time.monotonic() - start_time > timeout:
raise FileLockError(
f"Could not acquire lock on {path} within {timeout}s"
)
# Wait and retry
time.sleep(0.1)
logger.debug(f"Acquired lock: {lock_path}")
yield
finally:
# Release lock
if lock_file:
lock_file.close()
try:
lock_path.unlink(missing_ok=True)
logger.debug(f"Released lock: {lock_path}")
except OSError as e:
logger.warning(f"Failed to remove lock file {lock_path}: {e}")
# ============================================================
# Data structures
# ============================================================
@dataclass(slots=True)
class SubtitleBlock:
index: int
start_time: int
end_time: int
text: str
# ============================================================
# Timecode helpers
# ============================================================
_TIMECODE_RE = re.compile(
r"(?P<start>\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2},\d{3})"
)
def _timecode_to_ms(tc: str) -> int:
h, m, rest = tc.split(":")
s, ms = rest.split(",")
return ((int(h) * 3600 + int(m) * 60 + int(s)) * 1000) + int(ms)
def _ms_to_timecode(ms: int) -> str:
ms = max(ms, 0)
s, ms = divmod(ms, 1000)
h, s = divmod(s, 3600)
m, s = divmod(s, 60)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
# ============================================================
# Reading time calculation (WPM-based timing)
# ============================================================
def count_words(text: str) -> int:
"""
Count words in text for reading time calculation.
Args:
text: Text to count words in
Returns:
Number of words (whitespace-separated tokens)
"""
# Strip any markup/tokens before counting
cleaned = SUBLOGUE_TOKEN_PATTERN.sub("", text)
# Split on whitespace and filter empty strings
words = [w for w in cleaned.split() if w]
return len(words)
def calculate_reading_duration_ms(
text: str,
wpm: float = READING_WPM,
min_seconds: float = MIN_DURATION_SECONDS,
max_seconds: float = MAX_DURATION_SECONDS
) -> int:
"""
Calculate how long a subtitle should display based on reading time.
Uses a words-per-minute (WPM) model to determine comfortable reading duration.
The formula ensures text is displayed long enough to read but not so long
it becomes stale or boring.
Formula: duration = words / (WPM / 60)
Clamped to [MIN_DURATION_SECONDS, MAX_DURATION_SECONDS]
Args:
text: The subtitle text to calculate duration for
wpm: Words per minute reading speed (default: 160 WPM)
min_seconds: Minimum duration in seconds (default: 1.2s)
max_seconds: Maximum duration in seconds (default: 6.0s)
Returns:
Duration in milliseconds
"""
word_count = count_words(text)
# Calculate raw reading time: words / (words per minute / 60 seconds)
# This gives us seconds needed to read at the given WPM
words_per_second = wpm / 60.0
raw_duration_seconds = word_count / words_per_second if words_per_second > 0 else min_seconds
# Clamp to min/max bounds to ensure comfortable reading
# Min prevents flash-frames, Max prevents stale text
clamped_duration_seconds = max(min(raw_duration_seconds, max_seconds), min_seconds)
# Convert to milliseconds for SRT format
duration_ms = int(clamped_duration_seconds * 1000)
logger.debug(
f"Reading time: {word_count} words @ {wpm} WPM = "
f"{raw_duration_seconds:.2f}s → clamped to {clamped_duration_seconds:.2f}s ({duration_ms}ms)"
)
return duration_ms
def split_text_into_readable_chunks(
text: str,
max_duration_ms: int,
wpm: float = READING_WPM,
min_chunk_duration_ms: int = int(MIN_DURATION_SECONDS * 1000),
max_chunk_duration_ms: int = int(MAX_DURATION_SECONDS * 1000),
) -> List[str]:
"""
Split long text into multiple chunks, each sized for comfortable reading.
This ensures long plot descriptions are broken into digestible pieces
that fit within the available time window and respect reading speed limits.
Strategy:
1. Calculate how many words fit in max_chunk_duration at given WPM
2. Split text at sentence boundaries when possible
3. Fall back to word boundaries if sentences are too long
Args:
text: Full text to split
max_duration_ms: Maximum time available for all chunks
wpm: Words per minute reading speed
min_chunk_duration_ms: Minimum display time per chunk
max_chunk_duration_ms: Maximum display time per chunk
Returns:
List of text chunks, each sized for comfortable reading
"""
if not text.strip():
return []
# Calculate max words per chunk based on max duration
words_per_second = wpm / 60.0
max_seconds_per_chunk = max_chunk_duration_ms / 1000.0
max_words_per_chunk = int(words_per_second * max_seconds_per_chunk)
# Calculate how many chunks we can fit in available time
min_seconds_per_chunk = min_chunk_duration_ms / 1000.0
max_chunks = max(1, int(max_duration_ms / min_chunk_duration_ms))
# If text fits in one chunk, return it
word_count = count_words(text)
if word_count <= max_words_per_chunk:
return [text.strip()]
# Split into sentences for natural breaks
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
chunks = []
current_chunk_words = []
current_word_count = 0
for sentence in sentences:
sentence_words = sentence.split()
sentence_word_count = len(sentence_words)
# Would adding this sentence exceed chunk word limit?
if current_word_count + sentence_word_count > max_words_per_chunk and current_chunk_words:
# Save current chunk
chunks.append(" ".join(current_chunk_words))
current_chunk_words = []
current_word_count = 0
# Check if we've hit max chunks
if len(chunks) >= max_chunks - 1:
# Add remaining text to final chunk (may be truncated in display)
remaining_sentences = sentences[sentences.index(sentence):]
remaining_text = " ".join(remaining_sentences)
chunks.append(remaining_text)
break
# Add sentence to current chunk
current_chunk_words.extend(sentence_words)
current_word_count += sentence_word_count
# Don't forget the last chunk
if current_chunk_words and len(chunks) < max_chunks:
chunks.append(" ".join(current_chunk_words))
# Limit to max_chunks
if len(chunks) > max_chunks:
chunks = chunks[:max_chunks]
return chunks if chunks else [text.strip()]
# ============================================================
# Sanitization - remove internal markers from output
# ============================================================
def sanitize_subtitle_text(text: str) -> str:
"""
Remove all internal Sublogue markers from subtitle text.
This is a DEFENSIVE sanitization pass that ensures no internal
placeholders or tokens leak into the final SRT output.
Removes:
- {SUBLOGUE}
- {SUBLOGUE:*} (any variant with parameters)
- Any future Sublogue-style tokens
Args:
text: Raw subtitle text potentially containing markers
Returns:
Clean text with all Sublogue markers removed
"""
# Remove all Sublogue tokens using regex
cleaned = SUBLOGUE_TOKEN_PATTERN.sub("", text)
# Clean up any resulting double newlines or leading/trailing whitespace
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) # Collapse multiple newlines
cleaned = cleaned.strip()
return cleaned
def sanitize_all_blocks(blocks: List[SubtitleBlock]) -> List[SubtitleBlock]:
"""
Apply sanitization to all subtitle blocks before final output.
This is the FINAL DEFENSE against internal markers appearing in output.
Called just before writing the SRT file to ensure clean output.
Args:
blocks: List of subtitle blocks to sanitize
Returns:
New list with sanitized text (immutable - creates new blocks)
"""
sanitized = []
for block in blocks:
clean_text = sanitize_subtitle_text(block.text)
# Only include blocks that have content after sanitization
if clean_text:
sanitized.append(SubtitleBlock(
index=block.index,
start_time=block.start_time,
end_time=block.end_time,
text=clean_text
))
return sanitized
# ============================================================
# Parsing / formatting
# ============================================================
def parse_srt(content: str) -> List[SubtitleBlock]:
"""
Parse SRT content into subtitle blocks.
Handles BOM, inconsistent line endings, and malformed blocks.
Empty subtitle blocks are skipped entirely.
Args:
content: Raw SRT file content
Returns:
List of parsed subtitle blocks
"""
# Strip BOM if present
content = content.lstrip("\ufeff")
# Normalize line endings (handle \r\n, \n, \r)
content = content.replace("\r\n", "\n").replace("\r", "\n")
lines = content.splitlines()
blocks: List[SubtitleBlock] = []
i = 0
n = len(lines)
while i < n:
line = lines[i].strip()
# Skip empty lines
if not line:
i += 1
continue
# Look for timecode line
m = _TIMECODE_RE.search(line)
if not m:
i += 1
continue
try:
start = _timecode_to_ms(m.group("start"))
end = _timecode_to_ms(m.group("end"))
except (ValueError, AttributeError) as e:
logger.warning(f"Malformed timecode at line {i+1}: {line}")
i += 1
continue
# Try to get index from previous line
index = 0
if i > 0 and lines[i - 1].strip().isdigit():
try:
index = int(lines[i - 1].strip())
except ValueError:
pass
# Collect subtitle text lines
i += 1
text_lines: List[str] = []
while i < n and lines[i].strip() and not _TIMECODE_RE.search(lines[i]):
text_lines.append(lines[i])
i += 1
text = "\n".join(text_lines).strip()
# Skip empty subtitle blocks
if text:
blocks.append(
SubtitleBlock(
index=index,
start_time=start,
end_time=end,
text=text,
)
)
# Skip trailing empty lines
while i < n and not lines[i].strip():
i += 1
return blocks
def format_srt(subs: List[SubtitleBlock]) -> str:
out: List[str] = []
for b in subs:
out.append(str(b.index))
out.append(f"{_ms_to_timecode(b.start_time)} --> {_ms_to_timecode(b.end_time)}")
out.extend(b.text.splitlines())
out.append("")
return "\n".join(out).rstrip() + "\n"
# ============================================================
# Subtitle construction helpers
# ============================================================
# TV-safe display constraints
TV_LINE_WIDTH = 55 # Wider lines = fewer line breaks (most TVs handle 55+ chars)
TV_MAX_LINES = 2 # Max 2 lines per subtitle block for readability
def wrap_for_tv(text: str, width: int = TV_LINE_WIDTH, max_lines: int = TV_MAX_LINES) -> str:
"""
Wrap text for TV display with line limits.
Args:
text: Text to wrap
width: Max characters per line (default 55 for modern TVs)
max_lines: Max lines to display (default 2)
Returns:
Wrapped text, truncated to max_lines with ellipsis if needed
"""
lines = textwrap.wrap(text, width=width)
if len(lines) > max_lines:
# Truncate and add ellipsis
lines = lines[:max_lines]
if lines:
lines[-1] = lines[-1].rstrip() + "..."
return "\n".join(lines)
def chunk_plot_for_display(
plot: str,
available_duration_ms: int,
min_chunk_duration_ms: int = 3000,
width: int = TV_LINE_WIDTH,
max_lines: int = TV_MAX_LINES,
) -> List[str]:
"""
Split a long plot into multiple chunks for sequential display.
Each chunk is designed to:
- Fit within TV_MAX_LINES lines
- Be readable within its time slot
- Break at sentence boundaries when possible
Args:
plot: Full plot text
available_duration_ms: Total time available for plot display
min_chunk_duration_ms: Minimum display time per chunk
width: Characters per line
max_lines: Lines per chunk
Returns:
List of plot text chunks
"""
# Calculate max chars per chunk based on lines
max_chars_per_chunk = width * max_lines
# How many chunks can we fit?
max_chunks = max(1, available_duration_ms // min_chunk_duration_ms)
# If plot fits in one chunk, return it
if len(plot) <= max_chars_per_chunk:
return [wrap_for_tv(plot, width, max_lines)]
# Split into sentences for natural breaks
sentences = re.split(r'(?<=[.!?])\s+', plot)
chunks = []
current_chunk = ""
for sentence in sentences:
# Would adding this sentence exceed chunk size?
test_chunk = (current_chunk + " " + sentence).strip() if current_chunk else sentence
if len(test_chunk) <= max_chars_per_chunk:
current_chunk = test_chunk
else:
# Save current chunk if not empty
if current_chunk:
chunks.append(wrap_for_tv(current_chunk, width, max_lines))
# Start new chunk with this sentence
if len(sentence) <= max_chars_per_chunk:
current_chunk = sentence
else:
# Sentence itself is too long, truncate it
current_chunk = sentence[:max_chars_per_chunk - 3] + "..."
# Stop if we've hit max chunks
if len(chunks) >= max_chunks - 1 and current_chunk:
break
# Add final chunk
if current_chunk:
chunks.append(wrap_for_tv(current_chunk, width, max_lines))
# Limit to max_chunks
if len(chunks) > max_chunks:
chunks = chunks[:max_chunks]
# Add ellipsis to last chunk if we truncated
if chunks:
last = chunks[-1]
if not last.endswith("..."):
chunks[-1] = last.rstrip() + "..."
return chunks if chunks else [wrap_for_tv(plot[:max_chars_per_chunk - 3] + "...", width, max_lines)]
def _merge_small_trailing_chunks(
chunks: List[str],
min_words_for_separate_block: int = 6,
max_chars_per_chunk: int = TV_LINE_WIDTH * TV_MAX_LINES,
) -> List[str]:
"""
Merge small trailing chunks with the previous chunk if they fit.
Avoids creating tiny subtitle blocks for just a few words at the end.
For example, "from day one." (3 words) should be merged with the previous
chunk rather than displayed as a separate subtitle.
Args:
chunks: List of text chunks
min_words_for_separate_block: Minimum words to warrant a separate block
max_chars_per_chunk: Maximum characters allowed per chunk
Returns:
List of chunks with small trailing ones merged where possible
"""
if len(chunks) <= 1:
return chunks
result = list(chunks) # Make a copy
# Work backwards, merging small chunks into their predecessors
i = len(result) - 1
while i > 0:
current_chunk = result[i]
current_words = count_words(current_chunk)
# If this chunk is small enough to consider merging
if current_words < min_words_for_separate_block:
prev_chunk = result[i - 1]
merged = f"{prev_chunk} {current_chunk}"
# If the merged result fits, do the merge
if len(merged) <= max_chars_per_chunk:
result[i - 1] = merged
result.pop(i)
logger.debug(
f"Merged small chunk ({current_words} words) with previous chunk"
)
i -= 1
return result
def _split_plot_into_display_chunks(
plot: str,
width: int = TV_LINE_WIDTH,
max_lines: int = TV_MAX_LINES,
) -> List[str]:
"""
Split plot text into display-friendly chunks that preserve ALL text.
Unlike wrap_for_tv() which truncates, this function ensures every word
of the plot is included across multiple subtitle blocks if needed.
Strategy:
1. Split at sentence boundaries first (natural reading breaks)
2. Each chunk must fit within TV display constraints (width * max_lines chars)
3. If a sentence is too long, split at word boundaries
4. NEVER truncate or add ellipsis - preserve complete plot text
Args:
plot: Full plot text to split
width: Max characters per line
max_lines: Max lines per subtitle block
Returns:
List of text chunks, each fitting TV constraints, preserving ALL text
"""
if not plot or not plot.strip():
return []
max_chars_per_chunk = width * max_lines
# Split into sentences for natural breaks
# Match periods, exclamation, question marks followed by space
sentences = re.split(r'(?<=[.!?])\s+', plot.strip())
chunks: List[str] = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Test if adding this sentence fits in current chunk
test_chunk = f"{current_chunk} {sentence}".strip() if current_chunk else sentence
if len(test_chunk) <= max_chars_per_chunk:
# Fits - add to current chunk
current_chunk = test_chunk
else:
# Doesn't fit - save current chunk and start new one
if current_chunk:
chunks.append(current_chunk)
# Check if sentence itself fits in one chunk
if len(sentence) <= max_chars_per_chunk:
current_chunk = sentence
else:
# Sentence is too long - split at word boundaries
words = sentence.split()
current_chunk = ""
for word in words:
test_word = f"{current_chunk} {word}".strip() if current_chunk else word
if len(test_word) <= max_chars_per_chunk:
current_chunk = test_word
else:
# Save current chunk and start fresh with this word
if current_chunk:
chunks.append(current_chunk)
current_chunk = word
# Don't forget the last chunk
if current_chunk:
chunks.append(current_chunk)
# Verify we preserved all content (sanity check)
original_words = set(plot.split())
chunk_words = set(" ".join(chunks).split())
if original_words != chunk_words:
logger.warning(
f"Plot chunking may have lost words! "
f"Original: {len(original_words)}, Chunks: {len(chunk_words)}"
)
logger.debug(f"Split plot into {len(chunks)} chunks: {[len(c) for c in chunks]} chars each")
return chunks
def strip_existing_plot_blocks(blocks: List[SubtitleBlock]) -> List[SubtitleBlock]:
"""
Remove any existing Sublogue-generated blocks from the subtitle list.
This ensures idempotency - running the operation twice won't duplicate plot blocks.
Returns only the dialogue/original subtitle blocks.
Detection strategy (DETERMINISTIC):
───────────────────────────────────
Primary: Look for the {SUBLOGUE} sentinel tag - this is definitive.
Fallback: For legacy files without sentinel, use heuristics:
1. Look for "Generated by Sublogue" signature
2. Remove zero-duration blocks (metadata-only)
3. Remove blocks with metadata markers (IMDb, stars, clock emojis)
The sentinel tag makes detection 100% reliable and eliminates false positives.
"""
cleaned_blocks: List[SubtitleBlock] = []
for block in blocks:
text = block.text
text_lower = text.lower()
# PRIMARY: Deterministic sentinel detection - definitive match
if SUBLOGUE_SENTINEL in text or SUBLOGUE_TOKEN_PATTERN.search(text):
logger.debug(f"Stripping Sublogue block (sentinel) at index {block.index}")
continue
# FALLBACK: Legacy detection for files without sentinel
# Skip blocks that contain old Sublogue signature
if "generated by sublogue" in text_lower:
logger.debug(f"Stripping Sublogue block (legacy) at index {block.index}")
continue
# Skip zero-duration blocks (metadata-only)
if block.start_time == 0 and block.end_time == 0:
logger.debug(f"Stripping zero-duration block at index {block.index}")
continue
# Skip blocks that contain metadata markers (definitely not dialogue)
if any(marker in text_lower for marker in ["imdb:", "", ""]):
logger.debug(f"Stripping metadata block at index {block.index}")
continue
# This is a real subtitle - keep it
cleaned_blocks.append(block)
logger.info(f"Stripped plot blocks: {len(blocks)}{len(cleaned_blocks)} blocks")
return cleaned_blocks
@dataclass
class SubtitleFormatOptions:
"""Configuration options for subtitle formatting."""
title_bold: bool = True # Wrap title in <b> tags
plot_italic: bool = True # Wrap plot text in <i> tags
show_director: bool = False # Include director in header
show_actors: bool = False # Include actors in header
show_released: bool = False # Include release date in header
show_genre: bool = False # Include genre in header
# Default formatting options (used when none provided)
DEFAULT_FORMAT_OPTIONS = SubtitleFormatOptions()
def build_intro_blocks(
movie: dict,
plot: str,
first_subtitle_start_ms: int,
min_safe_gap_ms: int = 500, # Minimum gap before first subtitle
format_options: SubtitleFormatOptions = None,
) -> List[SubtitleBlock]:
"""
Build intro blocks that appear BEFORE the first real subtitle.
Uses WPM-based timing for comfortable reading speed.
╔══════════════════════════════════════════════════════════════════════════╗
║ SAFETY GUARANTEE: This function NEVER modifies existing subtitle timing ║
║ All intro blocks are placed in the gap BEFORE the first subtitle ║
║ STRICT INVARIANT: intro_end < first_subtitle_start_ms (ALWAYS) ║
╚══════════════════════════════════════════════════════════════════════════╝
Header Format:
──────────────
<b>Title</b> (Year)
⭐ IMDb: <rating> 🍅 RT: <percent>% ⏱ <runtime>
[Optional: Director, Actors, Released, Genre]
Plot Format:
────────────
Plot: <i>plot text here...</i>
- Always shows both IMDb and RT ratings
- If RT is unavailable, explicitly shows "RT: N/A" (never silently omits)
- Runtime is shown in minutes
- Formatting (bold/italic) is configurable via format_options
Timing Strategy (WPM-based):
────────────────────────────
Duration is calculated based on word count and reading speed:
- Base reading speed: 160 WPM
- Minimum duration: 1.2 seconds (prevents flash-frames)
- Maximum duration: 6.0 seconds (prevents stale text)
Formula: duration = max(min(words / (WPM/60), MAX), MIN)
The key principle: We find space BEFORE the first subtitle, never shift it.
If we can't fit content safely, we DON'T INSERT rather than risk overlap.
Args:
movie: Movie metadata dict with title, year, imdb_rating, rotten_tomatoes, runtime,
director, actors, released, genre
plot: Plot text to inject
first_subtitle_start_ms: Start time of first real subtitle in milliseconds
min_safe_gap_ms: Minimum gap to maintain before first subtitle
format_options: Formatting configuration (bold, italic, extra fields)
Returns:
List of intro subtitle blocks with safe timing that won't overlap.
Returns EMPTY list if insufficient gap exists.
"""
if format_options is None:
format_options = DEFAULT_FORMAT_OPTIONS
title = movie.get("title", "Unknown Title")
year = movie.get("year", "")
# ─────────────────────────────────────────────────────────────────────────
# Extract and validate all metadata fields
# ─────────────────────────────────────────────────────────────────────────
imdb_rating = movie.get("imdb_rating") or movie.get("imdbRating") or "N/A"
if not imdb_rating or imdb_rating in ("", "N/A", None):
imdb_rating = "N/A"
rt_rating = movie.get("rotten_tomatoes") or movie.get("rottenTomatoes") or "N/A"
if not rt_rating or rt_rating in ("", "N/A", None):
rt_rating = "N/A"
runtime_raw = movie.get("runtime") or movie.get("Runtime") or "N/A"
if runtime_raw and runtime_raw != "N/A":
runtime_match = re.search(r'(\d+)', str(runtime_raw))
runtime = f"{runtime_match.group(1)} min" if runtime_match else runtime_raw
else:
runtime = "N/A"
# Additional metadata fields
director = movie.get("director") or movie.get("Director") or "N/A"
actors = movie.get("actors") or movie.get("Actors") or "N/A"
released = movie.get("released") or movie.get("Released") or "N/A"
genre = movie.get("genre") or movie.get("Genre") or "N/A"
# ─────────────────────────────────────────────────────────────────────────
# Build header with formatting
# ─────────────────────────────────────────────────────────────────────────
# Title line - optionally bold
title_display = f"<b>{title}</b>" if format_options.title_bold else title
title_line = f"{title_display} ({year})"
# Ratings/info line
info_line = f"⭐ IMDb: {imdb_rating} 🍅 RT: {rt_rating}{runtime}"
# Build optional extra info lines
extra_lines = []
if format_options.show_director and director != "N/A":
extra_lines.append(f"🎬 Director: {director}")
if format_options.show_actors and actors != "N/A":
# Truncate long actor lists to first 3
actor_list = actors.split(", ")
if len(actor_list) > 3:
actors_display = ", ".join(actor_list[:3]) + "..."
else:
actors_display = actors
extra_lines.append(f"🎭 Cast: {actors_display}")
if format_options.show_released and released != "N/A":
extra_lines.append(f"📅 Released: {released}")
if format_options.show_genre and genre != "N/A":
extra_lines.append(f"🎞 Genre: {genre}")
# Combine header text
header_parts = [
SUBLOGUE_SENTINEL,
title_line,
info_line,
]
if extra_lines:
header_parts.extend(extra_lines)
header_parts.append("") # Empty line before attribution
header_parts.append("— Generated by Sublogue")
header_text = "\n".join(header_parts)
# ─────────────────────────────────────────────────────────────────────────
# Build plot text with "Plot:" prefix and optional italic formatting
# ─────────────────────────────────────────────────────────────────────────
wrapped_plot = wrap_for_tv(plot)
if format_options.plot_italic:
plot_display = f"<i>{wrapped_plot}</i>"
else:
plot_display = wrapped_plot
plot_text = f"{SUBLOGUE_SENTINEL}\nPlot: {plot_display}"
# ─────────────────────────────────────────────────────────────────────────
# Calculate available time window BEFORE first subtitle
# This is the safe zone where we can insert content without overlap
# ─────────────────────────────────────────────────────────────────────────
available_time_ms = first_subtitle_start_ms - min_safe_gap_ms
logger.info(
f"Timing analysis: First subtitle at {first_subtitle_start_ms}ms, "
f"available window: {available_time_ms}ms (with {min_safe_gap_ms}ms safety gap)"
)
# ─────────────────────────────────────────────────────────────────────────
# STRICT NON-OVERLAP: If we can't guarantee intro_end < first_subtitle_start,
# return empty list rather than risk overlap
# ─────────────────────────────────────────────────────────────────────────
min_required_ms = int(MIN_DURATION_SECONDS * 1000) + min_safe_gap_ms
if first_subtitle_start_ms < min_required_ms:
logger.warning(
f"[NO INSERT] Insufficient gap: First subtitle at {first_subtitle_start_ms}ms, "
f"need at least {min_required_ms}ms. Skipping intro blocks."
)
return []
# Helper to validate end time is strictly before first subtitle
def safe_end_time(proposed_end: int) -> int:
"""Ensure end time is strictly less than first subtitle start."""
max_allowed = first_subtitle_start_ms - 1 # At least 1ms gap
return min(proposed_end, max_allowed)
# ─────────────────────────────────────────────────────────────────────────
# FIX #3: Use WPM-based timing instead of arbitrary durations
# Calculate reading time for header based on word count
# ─────────────────────────────────────────────────────────────────────────
header_duration_ms = calculate_reading_duration_ms(header_text)
plot_duration_ms = calculate_reading_duration_ms(wrapped_plot)
total_ideal_duration_ms = header_duration_ms + plot_duration_ms
logger.info(
f"WPM timing: Header needs {header_duration_ms}ms, Plot needs {plot_duration_ms}ms, "
f"Total ideal: {total_ideal_duration_ms}ms, Available: {available_time_ms}ms"
)
blocks: List[SubtitleBlock] = []
# ─────────────────────────────────────────────────────────────────────────
# Split plot into TV-friendly chunks FIRST, then determine timing
# Each chunk should fit on screen (2 lines max) while preserving ALL text
# ─────────────────────────────────────────────────────────────────────────
plot_chunks = _split_plot_into_display_chunks(plot)
# Helper to format a plot chunk with optional italic and "Plot:" prefix
def format_plot_chunk(chunk_text: str, is_first_chunk: bool) -> str:
"""Format a plot chunk with appropriate styling."""
wrapped = wrap_for_tv(chunk_text)
if format_options.plot_italic:
styled = f"<i>{wrapped}</i>"
else:
styled = wrapped
# Only add "Plot:" prefix to the first chunk
if is_first_chunk:
return f"{SUBLOGUE_SENTINEL}\nPlot: {styled}"
else:
return f"{SUBLOGUE_SENTINEL}\n{styled}"
# Calculate total time needed for all plot chunks
total_plot_duration_ms = sum(
calculate_reading_duration_ms(chunk) for chunk in plot_chunks
)
total_needed_ms = header_duration_ms + total_plot_duration_ms
logger.info(
f"Plot split into {len(plot_chunks)} chunks, "
f"total plot duration: {total_plot_duration_ms}ms, "
f"total needed: {total_needed_ms}ms, available: {available_time_ms}ms"
)
# ─────────────────────────────────────────────────────────────────────────
# Case 1: Enough time for header + ALL plot chunks at ideal reading speed
# ─────────────────────────────────────────────────────────────────────────
if available_time_ms >= total_needed_ms:
# We have enough time - display everything at comfortable reading pace
blocks.append(SubtitleBlock(1, 0, header_duration_ms, header_text))
# Try to merge small trailing chunks with the previous one
merged_chunks = _merge_small_trailing_chunks(plot_chunks)
current_ms = header_duration_ms
for i, chunk in enumerate(merged_chunks):
chunk_start = current_ms
# Each chunk gets exactly its reading duration - no padding
chunk_duration = calculate_reading_duration_ms(chunk)
chunk_end = safe_end_time(chunk_start + chunk_duration)
chunk_text = format_plot_chunk(chunk, is_first_chunk=(i == 0))
blocks.append(SubtitleBlock(len(blocks) + 1, chunk_start, chunk_end, chunk_text))
current_ms = chunk_end
logger.info(
f"[CASE 1] Full intro: Header [0-{header_duration_ms}ms], "
f"{len(merged_chunks)} plot chunk(s) at ideal pace"
)
# ─────────────────────────────────────────────────────────────────────────
# Case 2: Enough time for header + plot chunks, but need to compress timing
# ─────────────────────────────────────────────────────────────────────────
elif available_time_ms >= header_duration_ms + int(MIN_DURATION_SECONDS * 1000) * len(plot_chunks):
# We can fit everything, but need to speed up the pace
# Try to merge small trailing chunks first
merged_chunks = _merge_small_trailing_chunks(plot_chunks)
# Header gets minimum of ideal time or proportional share
header_end_ms = min(header_duration_ms, max(int(MIN_DURATION_SECONDS * 1000), available_time_ms // (len(merged_chunks) + 1)))
plot_available_ms = available_time_ms - header_end_ms
blocks.append(SubtitleBlock(1, 0, header_end_ms, header_text))
# Distribute time across chunks proportionally to their word count
total_words = sum(count_words(chunk) for chunk in merged_chunks)
current_ms = header_end_ms
for i, chunk in enumerate(merged_chunks):
chunk_start = current_ms
# Proportional time based on word count
chunk_words = count_words(chunk)
if total_words > 0:
proportion = chunk_words / total_words
chunk_duration = max(int(plot_available_ms * proportion), int(MIN_DURATION_SECONDS * 1000))
else:
chunk_duration = plot_available_ms // len(merged_chunks)
chunk_end = safe_end_time(chunk_start + chunk_duration)
chunk_text = format_plot_chunk(chunk, is_first_chunk=(i == 0))
blocks.append(SubtitleBlock(len(blocks) + 1, chunk_start, chunk_end, chunk_text))
current_ms = chunk_end
logger.info(
f"[CASE 2] Compressed intro: Header [0-{header_end_ms}ms], "
f"{len(merged_chunks)} plot chunk(s) compressed"
)
# ─────────────────────────────────────────────────────────────────────────
# Case 3: Limited time - show header + as many chunks as fit
# ─────────────────────────────────────────────────────────────────────────
elif available_time_ms >= header_duration_ms + int(MIN_DURATION_SECONDS * 1000):
# Try to merge small trailing chunks first
merged_chunks = _merge_small_trailing_chunks(plot_chunks)
header_end_ms = min(header_duration_ms, available_time_ms // 2)
blocks.append(SubtitleBlock(1, 0, header_end_ms, header_text))
# Fit as many chunks as we can
current_ms = header_end_ms
chunks_added = 0
for i, chunk in enumerate(merged_chunks):
remaining_ms = (first_subtitle_start_ms - min_safe_gap_ms) - current_ms
min_needed = int(MIN_DURATION_SECONDS * 1000)
if remaining_ms < min_needed:
break
chunk_start = current_ms
chunk_duration = calculate_reading_duration_ms(chunk)
# Don't exceed remaining time, but respect minimum
chunk_duration = max(min(chunk_duration, remaining_ms), min_needed)
chunk_end = safe_end_time(chunk_start + chunk_duration)
# Check if this is the last chunk we can fit
is_last_fitting = (i == len(merged_chunks) - 1) or (remaining_ms - chunk_duration < min_needed)
# If we're truncating and can't fit remaining chunks, combine them
if is_last_fitting and i < len(merged_chunks) - 1:
remaining_text = " ".join(merged_chunks[i:])
chunk = remaining_text # Will be formatted below
chunk_text = format_plot_chunk(chunk, is_first_chunk=(chunks_added == 0))
blocks.append(SubtitleBlock(len(blocks) + 1, chunk_start, chunk_end, chunk_text))
current_ms = chunk_end
chunks_added += 1
if is_last_fitting:
break
logger.info(
f"[CASE 3] Partial intro: Header [0-{header_end_ms}ms], "
f"{chunks_added}/{len(merged_chunks)} plot chunk(s)"
)
# ─────────────────────────────────────────────────────────────────────────
# Case 4: Only enough time for brief header (no plot)
# ─────────────────────────────────────────────────────────────────────────
elif available_time_ms >= int(MIN_DURATION_SECONDS * 1000):
block_end_ms = safe_end_time(first_subtitle_start_ms - min_safe_gap_ms)
# If we can fit a brief header, include title + ratings/runtime
brief_text = (
f"{SUBLOGUE_SENTINEL}\n"
f"{title_line}\n"
f"{info_line}\n"
f"- Generated by Sublogue"
)
blocks.append(SubtitleBlock(1, 0, block_end_ms, brief_text))
logger.info(f"[CASE 4] Brief header only: [0-{block_end_ms}ms]")
# Validate: last block must end before first subtitle
if blocks:
assert blocks[-1].end_time < first_subtitle_start_ms, "Intro would overlap first subtitle!"
return blocks
def build_outro_blocks(
movie: dict,
plot: str,
last_subtitle_end_ms: int,
min_safe_gap_ms: int = 500,
format_options: SubtitleFormatOptions = None,
) -> List[SubtitleBlock]:
"""
Build outro blocks that appear AFTER the last real subtitle.
This avoids any overlap by placing new blocks after the final subtitle end time.
"""
if format_options is None:
format_options = DEFAULT_FORMAT_OPTIONS
title = movie.get("title", "Unknown Title")
year = movie.get("year", "")
imdb_rating = movie.get("imdb_rating") or movie.get("imdbRating") or "N/A"
if not imdb_rating or imdb_rating in ("", "N/A", None):
imdb_rating = "N/A"
rt_rating = movie.get("rotten_tomatoes") or movie.get("rottenTomatoes") or "N/A"
if not rt_rating or rt_rating in ("", "N/A", None):
rt_rating = "N/A"
runtime_raw = movie.get("runtime") or movie.get("Runtime") or "N/A"
if runtime_raw and runtime_raw != "N/A":
runtime_match = re.search(r'(\d+)', str(runtime_raw))
runtime = f"{runtime_match.group(1)} min" if runtime_match else runtime_raw
else:
runtime = "N/A"
director = movie.get("director") or movie.get("Director") or "N/A"
actors = movie.get("actors") or movie.get("Actors") or "N/A"
released = movie.get("released") or movie.get("Released") or "N/A"
genre = movie.get("genre") or movie.get("Genre") or "N/A"
title_display = f"<b>{title}</b>" if format_options.title_bold else title
title_line = f"{title_display} ({year})"
info_line = f"? IMDb: {imdb_rating} ?? RT: {rt_rating} ? {runtime}"
extra_lines = []
if format_options.show_director and director != "N/A":
extra_lines.append(f"?? Director: {director}")
if format_options.show_actors and actors != "N/A":
actor_list = actors.split(", ")
if len(actor_list) > 3:
actors_display = ", ".join(actor_list[:3]) + "..."
else:
actors_display = actors
extra_lines.append(f"?? Cast: {actors_display}")
if format_options.show_released and released != "N/A":
extra_lines.append(f"?? Released: {released}")
if format_options.show_genre and genre != "N/A":
extra_lines.append(f"?? Genre: {genre}")
header_parts = [
SUBLOGUE_SENTINEL,
title_line,
info_line,
]
if extra_lines:
header_parts.extend(extra_lines)
header_parts.append("")
header_parts.append("- Generated by Sublogue")
header_text = "\n".join(header_parts)
plot_chunks = _split_plot_into_display_chunks(plot)
def format_plot_chunk(chunk_text: str, is_first_chunk: bool) -> str:
wrapped = wrap_for_tv(chunk_text)
if format_options.plot_italic:
wrapped = f"<i>{wrapped}</i>"
prefix = "Plot: " if is_first_chunk else ""
return f"{SUBLOGUE_SENTINEL}\n{prefix}{wrapped}"
blocks = []
current_ms = last_subtitle_end_ms + min_safe_gap_ms
header_duration_ms = calculate_reading_duration_ms(header_text)
header_end_ms = current_ms + header_duration_ms
blocks.append(SubtitleBlock(1, current_ms, header_end_ms, header_text))
current_ms = header_end_ms
for i, chunk in enumerate(plot_chunks):
chunk_text = format_plot_chunk(chunk, is_first_chunk=(i == 0))
chunk_duration_ms = calculate_reading_duration_ms(chunk)
chunk_end_ms = current_ms + chunk_duration_ms
blocks.append(SubtitleBlock(len(blocks) + 1, current_ms, chunk_end_ms, chunk_text))
current_ms = chunk_end_ms
return blocks
# ============================================================
# Processor
# ============================================================
class SubtitleProcessor:
"""
Subtitle processor supporting OMDb, TMDb, and TVmaze.
Safety Guarantees:
──────────────────
- Header = subtitle 01 (inserted, not shifted)
- Plot = subtitle 02 (inserted, not shifted)
- Original subtitles start at 03+ with ORIGINAL TIMING PRESERVED
- No subtitle data is lost or truncated
- Timestamps are NEVER modified on existing blocks
- Internal markers ({SUBLOGUE}) are NEVER present in final output
Processing Flow:
────────────────
1. Parse SRT file into blocks
2. Strip any existing Sublogue metadata (idempotency)
3. Analyze timing gap before first subtitle
4. Build intro blocks that fit in available gap (WPM-based timing)
5. Prepend intro blocks (renumber indices only)
6. SANITIZE: Remove all internal markers from output
7. Write atomically via temp file
"""
MAX_SRT_BYTES = 5 * 1024 * 1024
PLOT_SCAN_LINES = 40
def __init__(self, omdb_client=None, tmdb_client=None, tvmaze_client=None, wikipedia_client=None, preferred_source="omdb"):
self.omdb_client = omdb_client
self.tmdb_client = tmdb_client
self.tvmaze_client = tvmaze_client
self.wikipedia_client = wikipedia_client
self.preferred_source = preferred_source
async def process_file(
self,
file_path: str | Path,
duration: int = 40,
force_reprocess: bool = False,
title_override: dict = None,
format_options: SubtitleFormatOptions = None,
strip_keywords: bool = True,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
insertion_position: str = "start",
preferred_source: str | None = None,
language: str | None = None,
) -> dict:
"""
Process a subtitle file to add plot information.
Concurrency Safety:
───────────────────
Uses file locking to prevent race conditions where multiple tasks
could both pass _has_plot_fast() before either writes, causing
duplicate plot insertions.
Args:
file_path: Path to the SRT file
duration: Duration in seconds for the plot display (legacy, now uses WPM)
force_reprocess: If True, reprocess even if plot exists
title_override: Dict with title metadata to use instead of auto-detection
Expected keys: title, year, plot, imdb_rating, rotten_tomatoes, runtime,
media_type, director, actors, released, genre
format_options: Subtitle formatting configuration (bold, italic, extra fields)
strip_keywords: If True, remove torrent/release tags from filename before API lookup.
This improves matching accuracy by cleaning names like
"Movie.2024.1080p.BluRay.x264-GROUP""Movie (2024)"
IMPORTANT: This ONLY affects the title lookup, NOT the subtitle content or timing.
clean_subtitle_content: If True, remove embedded ads/watermarks (YTS, RARBG, etc.)
from inside subtitle text. This cleans the actual dialogue content.
clean_subtitle_garbage: If True, remove OCR garbage like timecodes, music-only
lines, and duplicate lines inside a subtitle block.
"""
file_path = Path(file_path)
if not file_path.exists():
return self._fail("File not found")
if file_path.stat().st_size > self.MAX_SRT_BYTES:
return self._fail("Subtitle file too large")
# If title_override is provided, use it directly instead of fetching
# (Do metadata fetch BEFORE acquiring lock to minimize lock hold time)
if title_override:
logger.info("Using provided title override for %s: %s", file_path.name, title_override.get("title"))
movie = dict(title_override)
# If extra fields are missing, try to enrich from OMDb using IMDb ID.
missing_fields = ["director", "actors", "released", "genre"]
has_missing = any(
not movie.get(field) or movie.get(field) == "N/A"
for field in missing_fields
)
imdb_id = movie.get("imdb_id") or movie.get("imdbID")
if has_missing and imdb_id and self.omdb_client:
try:
enrichment = await self.omdb_client.fetch_summary_by_imdb_id(imdb_id)
if enrichment:
for field in missing_fields:
if not movie.get(field) or movie.get(field) == "N/A":
movie[field] = enrichment.get(field, movie.get(field))
except Exception as e:
logger.warning("Failed to enrich metadata for %s: %s", imdb_id, e)
else:
raw_name = file_path.stem
movie_name, year = self.extract_title_and_year(raw_name, strip_keywords=strip_keywords)
season, episode = get_stripper().extract_season_episode(raw_name)
is_series = season is not None or episode is not None
logger.info("Resolved movie name: '%s''%s' (year=%s, strip_keywords=%s)", raw_name, movie_name, year, strip_keywords)
movie = await self._fetch_summary(
movie_name,
year=year,
is_series=is_series,
season=season,
episode=episode,
preferred_source=preferred_source,
language=language,
)
if not movie:
return self._fail("No metadata found")
plot = movie.get("plot", "").strip()
if not plot:
return self._fail("Empty plot")
# ─────────────────────────────────────────────────────────────────────
# CRITICAL SECTION: Acquire file lock to prevent concurrent processing
# This ensures only one task can check + modify the file at a time
# ─────────────────────────────────────────────────────────────────────
try:
with file_lock(file_path, timeout=30.0):
# Re-check for existing plot INSIDE the lock
# This is the key to preventing duplicates: check-then-write is atomic
if self._has_plot_fast(file_path) and not force_reprocess and not title_override:
logger.info("Skipping %s (plot already exists - checked under lock)", file_path.name)
existing_plot = self._extract_existing_plot(file_path)
existing_metadata = self._extract_existing_metadata(file_path)
return {
"success": True,
"status": "Skipped",
"summary": existing_plot,
**existing_metadata
}
# ─────────────────────────────────────────────────────────────
# PHASE 1: Parse the original subtitle file
# ─────────────────────────────────────────────────────────────
original = file_path.read_text(encoding="utf-8", errors="ignore")
subs = parse_srt(original)
stripper = get_stripper()
detected_keywords = stripper.detect_subtitle_watermarks(original)
detected_keywords += stripper.detect_garbage_labels([b.text for b in subs])
if not subs:
return self._fail("No valid subtitle blocks found")
logger.info(f"Parsed {len(subs)} subtitle blocks from {file_path.name}")
# ─────────────────────────────────────────────────────────────
# PHASE 2: Strip any existing Sublogue-generated blocks (idempotency)
# This ensures running the operation multiple times doesn't duplicate
# ─────────────────────────────────────────────────────────────
clean_subs = strip_existing_plot_blocks(subs)
if not clean_subs:
return self._fail("No dialogue subtitles found after cleaning")
# ─────────────────────────────────────────────────────────────
# PHASE 2.5: Clean embedded ads/watermarks from subtitle content
# This removes things like "YTS", "RARBG", "OpenSubtitles" etc.
# from inside the actual subtitle text
# ─────────────────────────────────────────────────────────────
if clean_subtitle_content or clean_subtitle_garbage:
stripper = get_stripper()
original_count = len(clean_subs)
# Convert SubtitleBlock to dict format for cleaning
blocks_as_dicts = [
{"index": b.index, "start_time": b.start_time, "end_time": b.end_time, "text": b.text}
for b in clean_subs
]
# Clean the content
cleaned_dicts = stripper.clean_subtitle_blocks(
blocks_as_dicts,
remove_watermarks=clean_subtitle_content,
remove_garbage=clean_subtitle_garbage,
)
# Convert back to SubtitleBlock
clean_subs = [
SubtitleBlock(d["index"], d["start_time"], d["end_time"], d["text"])
for d in cleaned_dicts
]
removed_blocks = original_count - len(clean_subs)
if removed_blocks > 0:
logger.info(
f"Removed {removed_blocks} cleaned subtitle blocks from {file_path.name}"
)
if not clean_subs:
return self._fail("No dialogue subtitles found after ad removal")
# ─────────────────────────────────────────────────────────────
# PHASE 3: Analyze timing - find safe insertion window
#
# CRITICAL SAFETY GUARANTEE:
# ══════════════════════════════════════════════════════════════
# We NEVER modify the timing of any existing subtitle.
# The original subtitles are treated as IMMUTABLE.
#
# Our intro blocks are inserted into the gap BEFORE first subtitle.
# If no gap exists, we return empty list (no intro blocks).
# ══════════════════════════════════════════════════════════════
# ─────────────────────────────────────────────────────────────
first_subtitle_start_ms = clean_subs[0].start_time
last_original_timing = clean_subs[-1].end_time
logger.info(
f"Original subtitle timing: First={_ms_to_timecode(first_subtitle_start_ms)} ({first_subtitle_start_ms}ms), "
f"Last={_ms_to_timecode(last_original_timing)} ({last_original_timing}ms)"
)
# ─────────────────────────────────────────────────────────────
# PHASE 4: Build intro blocks that fit in the available gap
# These will NEVER overlap with or shift existing subtitles
# Returns EMPTY list if insufficient gap
# ─────────────────────────────────────────────────────────────
if insertion_position == "end":
intro_blocks = build_outro_blocks(
movie,
plot,
last_subtitle_end_ms=last_original_timing,
min_safe_gap_ms=500, # 500ms safety buffer after last subtitle
format_options=format_options,
)
else:
intro_blocks = build_intro_blocks(
movie,
plot,
first_subtitle_start_ms=first_subtitle_start_ms,
min_safe_gap_ms=500, # 500ms safety buffer before first subtitle
format_options=format_options,
)
# ─────────────────────────────────────────────────────────────
if insertion_position != "end" and not intro_blocks:
return {
"success": False,
"error": "Insufficient gap before first subtitle",
"status": "Insufficient Gap",
"summary": ""
}
# PHASE 5: Combine intro + original subtitles
#
# NOTE: We're ONLY renumbering indices (1, 2, 3...), NOT timestamps!
# The start_time and end_time of clean_subs are PRESERVED EXACTLY.
# ─────────────────────────────────────────────────────────────
final = clean_subs + intro_blocks if insertion_position == "end" else intro_blocks + clean_subs
# Renumber all blocks sequentially (index only, timing unchanged)
renumbered = [
SubtitleBlock(i + 1, b.start_time, b.end_time, b.text)
for i, b in enumerate(final)
]
# Verify timing preservation (sanity check)
num_intro = len(intro_blocks)
if len(renumbered) > num_intro:
preserved_first = renumbered[0] if insertion_position == "end" else renumbered[num_intro]
if preserved_first.start_time != first_subtitle_start_ms:
logger.error(
f"TIMING CORRUPTION DETECTED! Original first subtitle was at "
f"{first_subtitle_start_ms}ms but is now at {preserved_first.start_time}ms"
)
return self._fail("Internal error: timing corruption detected")
logger.info(
f"✓ Timing preserved: First original subtitle still at "
f"{_ms_to_timecode(preserved_first.start_time)}"
)
# ─────────────────────────────────────────────────────────────
# PHASE 6: SANITIZE - Remove all internal markers before output
# FIX #2: This ensures {SUBLOGUE} tokens NEVER appear in final SRT
# ─────────────────────────────────────────────────────────────
sanitized = sanitize_all_blocks(renumbered)
logger.info(
f"Sanitized {len(renumbered)} blocks → {len(sanitized)} clean blocks "
f"(removed internal markers)"
)
# ─────────────────────────────────────────────────────────────
# PHASE 7: Write output atomically (temp file + rename)
# ─────────────────────────────────────────────────────────────
tmp = file_path.with_suffix(".srt.tmp")
tmp.write_text(format_srt(sanitized), encoding="utf-8")
tmp.replace(file_path)
logger.info(
f"Successfully wrote {len(sanitized)} blocks to {file_path.name} "
f"({num_intro} intro + {len(clean_subs)} original)"
)
return {
"success": True,
"status": "Processed",
"summary": plot,
"title": movie.get("title"),
"year": movie.get("year"),
"imdb_rating": movie.get("imdb_rating") or movie.get("imdbRating"),
"rotten_tomatoes": movie.get("rotten_tomatoes") or movie.get("rottenTomatoes"),
"runtime": movie.get("runtime"),
"media_type": movie.get("media_type")
}
except FileLockError as e:
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
return self._fail(f"File is being processed by another task: {e}")
def clean_file(
self,
file_path: str | Path,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
) -> dict:
"""Clean ad/watermark content from a subtitle file without inserting plots."""
file_path = Path(file_path)
if not file_path.exists():
return self._fail("File not found")
if file_path.stat().st_size > self.MAX_SRT_BYTES:
return self._fail("Subtitle file too large")
try:
stripper = get_stripper()
with file_lock(file_path, timeout=30.0):
original = file_path.read_text(encoding="utf-8", errors="ignore")
subs = parse_srt(original)
if not subs:
return self._fail("No valid subtitle blocks found")
original_blocks = subs
detected_keywords = stripper.detect_subtitle_watermarks(original)
detected_keywords += stripper.detect_garbage_labels(
[b.text for b in original_blocks]
)
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
original_blocks,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
if not sanitized:
return self._fail("No dialogue subtitles found after cleaning")
renumbered = [
SubtitleBlock(i + 1, b.start_time, b.end_time, b.text)
for i, b in enumerate(sanitized)
]
changed = len(renumbered) != len(original_blocks)
if not changed:
for updated, original_block in zip(renumbered, original_blocks):
if (
updated.start_time != original_block.start_time
or updated.end_time != original_block.end_time
or updated.text != original_block.text
):
changed = True
break
if not changed:
return {
"success": True,
"status": "Skipped",
"summary": "No changes needed",
"removed_blocks": 0,
"modified_blocks": 0,
"clean_keywords": detected_keywords,
}
tmp = file_path.with_suffix(".srt.tmp")
tmp.write_text(format_srt(renumbered), encoding="utf-8")
tmp.replace(file_path)
summary = (
f"Removed {removed_count} blocks, modified {modified_count} blocks"
if (clean_subtitle_content or clean_subtitle_garbage)
else "Cleaned subtitle content"
)
return {
"success": True,
"status": "Cleaned",
"summary": summary,
"removed_blocks": removed_count,
"modified_blocks": modified_count,
"clean_keywords": detected_keywords,
}
except FileLockError as e:
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
return self._fail(f"File is being processed by another task: {e}")
def preview_clean_file(
self,
file_path: str | Path,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
max_changes: int = 80,
) -> dict:
"""Preview cleaning changes without modifying the file."""
file_path = Path(file_path)
if not file_path.exists():
return self._fail("File not found")
if file_path.stat().st_size > self.MAX_SRT_BYTES:
return self._fail("Subtitle file too large")
try:
with file_lock(file_path, timeout=10.0):
original = file_path.read_text(encoding="utf-8", errors="ignore")
subs = parse_srt(original)
if not subs:
return self._fail("No valid subtitle blocks found")
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
subs,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
cleaned_map = {}
for block in sanitized:
key = (block.start_time, block.end_time)
cleaned_map.setdefault(key, []).append(block)
changes = []
for block in subs:
key = (block.start_time, block.end_time)
updated = None
if key in cleaned_map and cleaned_map[key]:
updated = cleaned_map[key].pop(0)
if updated is None:
changes.append({
"type": "removed",
"start_ms": block.start_time,
"end_ms": block.end_time,
"timecode": f"{_ms_to_timecode(block.start_time)}{_ms_to_timecode(block.end_time)}",
"before": block.text,
"after": "",
})
elif updated.text != block.text:
changes.append({
"type": "modified",
"start_ms": block.start_time,
"end_ms": block.end_time,
"timecode": f"{_ms_to_timecode(block.start_time)}{_ms_to_timecode(block.end_time)}",
"before": block.text,
"after": updated.text,
})
if len(changes) >= max_changes:
break
return {
"success": True,
"summary": f"Removed {removed_count} blocks, modified {modified_count} blocks",
"removed_blocks": removed_count,
"modified_blocks": modified_count,
"total_changed_blocks": removed_count + modified_count,
"changes_truncated": (removed_count + modified_count) > len(changes),
"changes": changes,
}
except FileLockError as e:
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
return self._fail(f"File is being processed by another task: {e}")
def _clean_blocks_for_content(
self,
original_blocks: List[SubtitleBlock],
clean_subtitle_content: bool,
clean_subtitle_garbage: bool,
) -> tuple[List[SubtitleBlock], int, int]:
stripper = get_stripper()
cleaned_blocks: List[SubtitleBlock] = []
removed_count = 0
modified_count = 0
for block in original_blocks:
text = block.text
if clean_subtitle_content and stripper.should_remove_subtitle_block(text):
removed_count += 1
continue
if clean_subtitle_content or clean_subtitle_garbage:
cleaned_text = stripper.clean_subtitle_text_with_options(
text,
remove_watermarks=clean_subtitle_content,
remove_garbage=clean_subtitle_garbage,
)
else:
cleaned_text = text
if not cleaned_text.strip():
removed_count += 1
continue
if cleaned_text != text:
modified_count += 1
cleaned_blocks.append(
SubtitleBlock(
block.index,
block.start_time,
block.end_time,
cleaned_text,
)
)
sanitized = sanitize_all_blocks(cleaned_blocks)
if len(sanitized) < len(cleaned_blocks):
removed_count += len(cleaned_blocks) - len(sanitized)
return sanitized, removed_count, modified_count
# ========================================================
# Metadata fetching
# ========================================================
async def _fetch_summary(
self,
movie_name: str,
year: Optional[str] = None,
is_series: bool = False,
season: Optional[int] = None,
episode: Optional[int] = None,
preferred_source: str | None = None,
language: str | None = None,
) -> Optional[dict]:
"""
Fetch metadata from configured sources with fallback.
Priority:
1. Preferred source (omdb, tmdb, tvmaze, wikipedia)
2. Fallback to other source if preferred fails
Year validation ensures we don't match wrong movies (e.g., "Eternity 2025"
shouldn't match "From Here to Eternity 1953").
"""
source_preference = preferred_source or self.preferred_source
logger.info("Fetching metadata for '%s' (year=%s, source=%s)", movie_name, year, source_preference)
result = None
omdb_type = "series" if is_series else "movie"
tmdb_type = "tv" if is_series else "movie"
# Try preferred source first
if source_preference == "wikipedia" and self.wikipedia_client:
result = await self.wikipedia_client.fetch_summary(
movie_name,
year=year,
is_series=is_series,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via Wikipedia: %s (%s)", result.get("title"), result.get("year"))
return result
if source_preference == "tvmaze" and self.tvmaze_client and is_series:
result = await self.tvmaze_client.fetch_summary(
movie_name,
year=year,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via TVmaze: %s (%s)", result.get("title"), result.get("year"))
return result
elif source_preference == "tmdb" and self.tmdb_client:
result = await self.tmdb_client.fetch_summary(
movie_name,
media_type=tmdb_type,
year=year,
season=season,
episode=episode,
language=language,
)
if result:
logger.info("Found metadata via TMDb: %s (%s)", result.get("title"), result.get("year"))
return result
elif source_preference == "omdb" and self.omdb_client:
result = await self.omdb_client.fetch_summary(
movie_name,
media_type=omdb_type,
year=year,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via OMDb: %s (%s)", result.get("title"), result.get("year"))
return result
# Fallback to other source
if not result and self.omdb_client and source_preference != "omdb":
result = await self.omdb_client.fetch_summary(
movie_name,
media_type=omdb_type,
year=year,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via OMDb (fallback): %s (%s)", result.get("title"), result.get("year"))
return result
if not result and self.tmdb_client and source_preference != "tmdb":
result = await self.tmdb_client.fetch_summary(
movie_name,
media_type=tmdb_type,
year=year,
season=season,
episode=episode,
language=language,
)
if result:
logger.info("Found metadata via TMDb (fallback): %s (%s)", result.get("title"), result.get("year"))
return result
if not result and self.wikipedia_client and source_preference != "wikipedia":
result = await self.wikipedia_client.fetch_summary(
movie_name,
year=year,
is_series=is_series,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via Wikipedia (fallback): %s (%s)", result.get("title"), result.get("year"))
return result
if not result and self.tvmaze_client and source_preference != "tvmaze" and is_series:
result = await self.tvmaze_client.fetch_summary(
movie_name,
year=year,
season=season,
episode=episode,
)
if result:
logger.info("Found metadata via TVmaze (fallback): %s (%s)", result.get("title"), result.get("year"))
return result
logger.warning("No metadata found for '%s' (year=%s) from any source", movie_name, year)
return None
# ========================================================
# Helpers
# ========================================================
def _has_plot_fast(self, path: Path) -> bool:
"""
Fast check for existing Sublogue content using sentinel tag.
Detection strategy (deterministic):
────────────────────────────────────
Primary: Look for {SUBLOGUE} sentinel - definitive match
Fallback: Look for legacy "generated by sublogue" signature
"""
try:
with path.open("r", encoding="utf-8", errors="ignore") as f:
for i, line in enumerate(f):
if i >= self.PLOT_SCAN_LINES:
break
# Primary: Sentinel tag (deterministic)
if SUBLOGUE_SENTINEL in line:
return True
# Fallback: Legacy signature
if "generated by sublogue" in line.lower():
return True
except OSError:
pass
return False
def _extract_existing_plot(self, path: Path) -> str:
"""Extract the plot summary from a file that already has one"""
try:
content = path.read_text(encoding="utf-8", errors="ignore")
blocks = parse_srt(content)
# The plot is typically in block 2 (index 1)
# Block 1 is the header with title/rating/runtime
if len(blocks) >= 2:
plot_text = blocks[1].text
# Remove any "Generated by Sublogue" footer if present
plot_text = plot_text.split("Generated by Sublogue")[0].strip()
# Also remove any lingering sentinel tags
plot_text = SUBLOGUE_TOKEN_PATTERN.sub("", plot_text).strip()
return plot_text
except Exception as e:
logger.warning("Failed to extract existing plot from %s: %s", path.name, e)
return ""
def _extract_existing_metadata(self, path: Path) -> dict:
"""Extract metadata (title, rating, runtime, year) from a file that already has a plot"""
try:
content = path.read_text(encoding="utf-8", errors="ignore")
blocks = parse_srt(content)
# The header is in block 1 (index 0)
# Format: {title} ({year})
# ⭐ IMDb: {rating} 🍅 RT: {percent}% ⏱ {runtime}
if len(blocks) >= 1:
header_text = blocks[0].text
lines = header_text.split('\n')
metadata = {
"title": None,
"year": None,
"imdb_rating": None,
"rotten_tomatoes": None,
"runtime": None
}
# Parse first line for title and year
if len(lines) > 0:
first_line = lines[0]
# Extract year from parentheses
year_match = re.search(r'\((\d{4})\)', first_line)
if year_match:
metadata["year"] = year_match.group(1)
# Title is everything before the year
metadata["title"] = first_line[:year_match.start()].strip()
else:
metadata["title"] = first_line.strip()
# Parse second line for ratings and runtime
if len(lines) > 1:
second_line = lines[1]
# Extract IMDb rating
rating_match = re.search(r'IMDb:\s*([^\s]+)', second_line)
if rating_match:
metadata["imdb_rating"] = rating_match.group(1)
# Extract Rotten Tomatoes rating
rt_match = re.search(r'RT:\s*([^\s]+)', second_line)
if rt_match:
metadata["rotten_tomatoes"] = rt_match.group(1)
# Extract runtime
runtime_match = re.search(r'\s*(.+?)(?:\s{2,}|$)', second_line)
if runtime_match:
metadata["runtime"] = runtime_match.group(1).strip()
return metadata
except Exception as e:
logger.warning("Failed to extract metadata from %s: %s", path.name, e)
return {
"title": None,
"year": None,
"imdb_rating": None,
"rotten_tomatoes": None,
"runtime": None
}
@staticmethod
def extract_title_and_year(name: str, strip_keywords: bool = True) -> Tuple[str, Optional[str]]:
"""
Extract movie/show title and year from filename.
When strip_keywords=True (default), uses the KeywordStripper to remove
torrent/release tags like quality indicators (1080p, BluRay), codecs (x264, HEVC),
release groups (YTS, RARBG), and subtitle ads (OpenSubtitles).
This ONLY affects what title is searched for on OMDb/TMDb/TVmaze/Wikipedia.
It does NOT modify the subtitle file content or timing in any way.
Examples:
"Eternity (2025).en" -> ("Eternity", "2025")
"The.Matrix.1999.BluRay" -> ("The Matrix", "1999")
"Movie.2024.1080p.BluRay.x264-YTS" -> ("Movie", "2024")
"Some Movie" -> ("Some Movie", None)
Args:
name: Filename (without extension) to parse
strip_keywords: If True, use KeywordStripper for comprehensive cleaning
Returns:
Tuple of (cleaned_title, year_or_none)
"""
if strip_keywords:
# Use the comprehensive KeywordStripper
stripper = get_stripper()
result = stripper.clean_filename(name, preserve_year=True)
cleaned_title = result["cleaned_title"]
year = result["year"]
# The KeywordStripper appends year in format "Title (year)"
# We need to separate them for API lookup
if year and f"({year})" in cleaned_title:
cleaned_title = cleaned_title.replace(f"({year})", "").strip()
logger.debug(
f"KeywordStripper: '{name}' → title='{cleaned_title}', year={year}"
)
return cleaned_title, year
# Fallback: Basic cleaning without KeywordStripper
# Try to find year in parentheses first: "Movie (2025)"
paren_match = re.search(r'\((\d{4})\)', name)
if paren_match:
year = paren_match.group(1)
# Remove the (year) from name
name = name[:paren_match.start()] + name[paren_match.end():]
else:
# Try to find standalone year: "Movie.2025.BluRay"
year_match = re.search(r'\b((?:19|20)\d{2})\b', name)
year = year_match.group(1) if year_match else None
# Remove year from name
if year:
name = re.sub(r'\b' + year + r'\b', '', name)
# Clean up the title
name = re.sub(r"\b(en|eng|english|ita|it|fr|es|de|multi)\b", "", name, flags=re.I)
name = re.sub(r"[._\-]+", " ", name)
name = " ".join(name.split()).strip()
return name, year
@staticmethod
def clean_movie_name(name: str) -> str:
"""Legacy method - returns just the title without year."""
title, _ = SubtitleProcessor.extract_title_and_year(name)
return title
@staticmethod
def _fail(msg: str) -> dict:
return {"success": False, "error": msg, "status": "Error", "summary": ""}