This commit is contained in:
ponzischeme89
2026-01-29 15:17:26 +13:00
8 changed files with 533 additions and 53 deletions
+60
View File
@@ -479,6 +479,8 @@ def get_settings():
settings["strip_keywords"] = True
if "clean_subtitle_content" not in settings:
settings["clean_subtitle_content"] = True
if "clean_subtitle_garbage" not in settings:
settings["clean_subtitle_garbage"] = False
if "clean_subtitle_force_remove" not in settings:
settings["clean_subtitle_force_remove"] = ["YTS", "OpenSubtitles"]
if "omdb_enabled" not in settings:
@@ -537,6 +539,8 @@ def update_settings():
db.set_setting("strip_keywords", bool(data["strip_keywords"]))
if "clean_subtitle_content" in data:
db.set_setting("clean_subtitle_content", bool(data["clean_subtitle_content"]))
if "clean_subtitle_garbage" in data:
db.set_setting("clean_subtitle_garbage", bool(data["clean_subtitle_garbage"]))
if "clean_subtitle_force_remove" in data:
db.set_setting(
"clean_subtitle_force_remove",
@@ -1534,6 +1538,57 @@ def search_title():
# ============ PROCESSING ENDPOINTS ============
@app.route('/api/clean/preview', methods=['POST'])
def preview_clean():
"""Preview cleaning changes for a subtitle file without writing."""
try:
data = request.json
file_path = data.get("file")
if not file_path:
return jsonify({
"success": False,
"error": "No file specified"
}), 400
clean_subtitle_content = _get_bool_setting("clean_subtitle_content", True)
clean_subtitle_garbage = _get_bool_setting("clean_subtitle_garbage", False)
if not clean_subtitle_content and not clean_subtitle_garbage:
return jsonify({
"success": False,
"error": "Cleaning is disabled in settings"
}), 400
processor_instance = processor or SubtitleProcessor(
omdb_client,
tmdb_client,
tvmaze_client,
wikipedia_client,
preferred_source=_get_str_setting("preferred_source", "omdb"),
)
preview = processor_instance.preview_clean_file(
file_path,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
if not preview.get("success"):
return jsonify(preview), 400
return jsonify({
"success": True,
"preview": preview
})
except Exception as e:
logger.error(f"Preview clean error: {e}")
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route('/api/process', methods=['POST'])
def process_files():
"""Process selected files to add plot summaries"""
@@ -1575,6 +1630,7 @@ def process_files():
# Load clean_subtitle_content setting (default True for ad removal)
clean_subtitle_content = _get_bool_setting("clean_subtitle_content", True)
clean_subtitle_garbage = _get_bool_setting("clean_subtitle_garbage", False)
# Load default insertion position and preferred source
default_insertion_position = _get_str_setting("insertion_position", "start")
@@ -1603,6 +1659,7 @@ def process_files():
result = processor_instance.clean_file(
file_path,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
result["clean_only"] = True
else:
@@ -1614,6 +1671,7 @@ def process_files():
format_options=effective_format,
strip_keywords=strip_keywords,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
insertion_position=insertion_position or default_insertion_position,
preferred_source=preferred_source or default_preferred_source,
language=language,
@@ -1742,6 +1800,7 @@ def process_batch():
# Load clean_subtitle_content setting (default True for ad removal)
clean_subtitle_content = _get_bool_setting("clean_subtitle_content", True)
clean_subtitle_garbage = _get_bool_setting("clean_subtitle_garbage", False)
default_insertion_position = _get_str_setting("insertion_position", "start")
default_preferred_source = _get_str_setting("preferred_source", "omdb")
@@ -1776,6 +1835,7 @@ def process_batch():
format_options=effective_format,
strip_keywords=strip_keywords,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
insertion_position=insertion_position or default_insertion_position,
preferred_source=preferred_source or default_preferred_source,
language=language,
+106 -11
View File
@@ -253,6 +253,13 @@ class KeywordStripper:
r"^\s*$",
]
# -----------------------------
# OCR / GARBAGE LINE DETECTION
# -----------------------------
GARBAGE_MUSIC_LINE = r"^[\s\[\]\(\)\{\}_\-\.\~\*]*(?:[♪♫♬♩]+)[\s\[\]\(\)\{\}_\-\.\~\*]*$"
GARBAGE_TIMECODE = r"\d{1,2}:\d{2}:\d{2}[,\.]\d{3}"
# -----------------------------
# COMPILED REGEX CACHE
# -----------------------------
@@ -296,6 +303,8 @@ class KeywordStripper:
"subtitle_force_remove": [
re.compile(p, re.IGNORECASE) for p in combined_force_remove
],
"garbage_music_line": re.compile(cls.GARBAGE_MUSIC_LINE),
"garbage_timecode": re.compile(cls.GARBAGE_TIMECODE),
}
return cls._compiled
@@ -430,26 +439,74 @@ class KeywordStripper:
Returns:
Cleaned text with ads removed, or empty string if nothing remains
"""
return self._clean_subtitle_text(text, remove_watermarks=True, remove_garbage=False)
def clean_subtitle_text_with_options(
self,
text: str,
remove_watermarks: bool = True,
remove_garbage: bool = False,
) -> str:
return self._clean_subtitle_text(
text,
remove_watermarks=remove_watermarks,
remove_garbage=remove_garbage,
)
def _is_timecode_line(self, line: str) -> bool:
rx = self._compile()
if not rx["garbage_timecode"].search(line):
return False
stripped = rx["garbage_timecode"].sub("", line)
stripped = re.sub(r"[\s0-9:\->,\.\[\]]+", "", stripped)
return stripped == ""
def _is_music_line(self, line: str) -> bool:
rx = self._compile()
return bool(rx["garbage_music_line"].match(line.strip()))
def _normalize_line(self, line: str) -> str:
return re.sub(r"\s+", " ", line.strip()).lower()
def _clean_subtitle_text(
self,
text: str,
remove_watermarks: bool = True,
remove_garbage: bool = False,
) -> str:
rx = self._compile()
original = text
# Process line by line to handle multi-line subtitles
lines = text.split('\n')
cleaned_lines = []
seen_lines = set()
for line in lines:
if remove_garbage:
if self._is_music_line(line) or self._is_timecode_line(line):
continue
cleaned_line = line
# Remove watermark patterns
for pattern in rx["subtitle_watermarks"]:
cleaned_line = pattern.sub("", cleaned_line)
if remove_watermarks:
for pattern in rx["subtitle_watermarks"]:
cleaned_line = pattern.sub("", cleaned_line)
# Clean up resulting whitespace
cleaned_line = re.sub(r'\s+', ' ', cleaned_line).strip()
# Only keep lines that have content after cleaning
if cleaned_line:
cleaned_lines.append(cleaned_line)
if not cleaned_line:
continue
if remove_garbage:
normalized = self._normalize_line(cleaned_line)
if normalized and normalized in seen_lines:
continue
seen_lines.add(normalized)
cleaned_lines.append(cleaned_line)
result = '\n'.join(cleaned_lines)
@@ -463,7 +520,12 @@ class KeywordStripper:
return result
def clean_subtitle_blocks(self, blocks: List[dict]) -> List[dict]:
def clean_subtitle_blocks(
self,
blocks: List[dict],
remove_watermarks: bool = True,
remove_garbage: bool = False,
) -> List[dict]:
"""
Clean a list of subtitle blocks, removing ads and watermarks.
@@ -485,13 +547,17 @@ class KeywordStripper:
text = block.get("text", "")
# Check if entire block should be removed
if self.should_remove_subtitle_block(text):
if remove_watermarks and self.should_remove_subtitle_block(text):
removed_count += 1
logger.debug("Removing ad block: '%s'", text[:50])
continue
# Clean the text
cleaned_text = self.clean_subtitle_text(text)
cleaned_text = self._clean_subtitle_text(
text,
remove_watermarks=remove_watermarks,
remove_garbage=remove_garbage,
)
# Skip if cleaning resulted in empty text
if not cleaned_text.strip():
@@ -526,6 +592,27 @@ class KeywordStripper:
detected.append(keyword)
return detected
def detect_garbage_labels(self, block_texts: List[str]) -> List[str]:
"""Detect OCR/garbage patterns in subtitle blocks."""
labels = set()
for text in block_texts:
lines = text.split("\n")
seen = set()
for line in lines:
if self._is_music_line(line):
labels.add("Music-only lines")
if self._is_timecode_line(line):
labels.add("OCR timecodes")
normalized = self._normalize_line(line)
if normalized:
if normalized in seen:
labels.add("Duplicate lines")
else:
seen.add(normalized)
if len(labels) >= 3:
break
return sorted(labels)
def set_force_remove_keywords(self, keywords: List[str]) -> None:
"""Set custom force-remove keywords and refresh regex cache."""
type(self)._custom_force_remove_keywords = [
@@ -556,9 +643,17 @@ def clean_filename(filename: str, preserve_year: bool = True) -> dict:
return get_stripper().clean_filename(filename, preserve_year)
def clean_subtitle_content(text: str) -> str:
"""Clean watermarks and ads from subtitle text."""
return get_stripper().clean_subtitle_text(text)
def clean_subtitle_content(
text: str,
remove_watermarks: bool = True,
remove_garbage: bool = False,
) -> str:
"""Clean watermarks/ads and optional OCR garbage from subtitle text."""
return get_stripper().clean_subtitle_text_with_options(
text,
remove_watermarks=remove_watermarks,
remove_garbage=remove_garbage,
)
def should_remove_subtitle(text: str) -> bool:
+153 -40
View File
@@ -1267,6 +1267,7 @@ class SubtitleProcessor:
format_options: SubtitleFormatOptions = None,
strip_keywords: bool = True,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
insertion_position: str = "start",
preferred_source: str | None = None,
language: str | None = None,
@@ -1294,6 +1295,8 @@ class SubtitleProcessor:
IMPORTANT: This ONLY affects the title lookup, NOT the subtitle content or timing.
clean_subtitle_content: If True, remove embedded ads/watermarks (YTS, RARBG, etc.)
from inside subtitle text. This cleans the actual dialogue content.
clean_subtitle_garbage: If True, remove OCR garbage like timecodes, music-only
lines, and duplicate lines inside a subtitle block.
"""
file_path = Path(file_path)
@@ -1375,6 +1378,7 @@ class SubtitleProcessor:
subs = parse_srt(original)
stripper = get_stripper()
detected_keywords = stripper.detect_subtitle_watermarks(original)
detected_keywords += stripper.detect_garbage_labels([b.text for b in subs])
if not subs:
return self._fail("No valid subtitle blocks found")
@@ -1395,7 +1399,7 @@ class SubtitleProcessor:
# This removes things like "YTS", "RARBG", "OpenSubtitles" etc.
# from inside the actual subtitle text
# ─────────────────────────────────────────────────────────────
if clean_subtitle_content:
if clean_subtitle_content or clean_subtitle_garbage:
stripper = get_stripper()
original_count = len(clean_subs)
@@ -1406,7 +1410,11 @@ class SubtitleProcessor:
]
# Clean the content
cleaned_dicts = stripper.clean_subtitle_blocks(blocks_as_dicts)
cleaned_dicts = stripper.clean_subtitle_blocks(
blocks_as_dicts,
remove_watermarks=clean_subtitle_content,
remove_garbage=clean_subtitle_garbage,
)
# Convert back to SubtitleBlock
clean_subs = [
@@ -1414,10 +1422,10 @@ class SubtitleProcessor:
for d in cleaned_dicts
]
removed_ads = original_count - len(clean_subs)
if removed_ads > 0:
removed_blocks = original_count - len(clean_subs)
if removed_blocks > 0:
logger.info(
f"Removed {removed_ads} ad/watermark subtitle blocks from {file_path.name}"
f"Removed {removed_blocks} cleaned subtitle blocks from {file_path.name}"
)
if not clean_subs:
@@ -1546,6 +1554,7 @@ class SubtitleProcessor:
self,
file_path: str | Path,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
) -> dict:
"""Clean ad/watermark content from a subtitle file without inserting plots."""
file_path = Path(file_path)
@@ -1557,6 +1566,7 @@ class SubtitleProcessor:
return self._fail("Subtitle file too large")
try:
stripper = get_stripper()
with file_lock(file_path, timeout=30.0):
original = file_path.read_text(encoding="utf-8", errors="ignore")
subs = parse_srt(original)
@@ -1565,40 +1575,16 @@ class SubtitleProcessor:
return self._fail("No valid subtitle blocks found")
original_blocks = subs
removed_count = 0
modified_count = 0
detected_keywords = stripper.detect_subtitle_watermarks(original)
detected_keywords += stripper.detect_garbage_labels(
[b.text for b in original_blocks]
)
if clean_subtitle_content:
cleaned_blocks: List[SubtitleBlock] = []
for block in original_blocks:
text = block.text
if stripper.should_remove_subtitle_block(text):
removed_count += 1
continue
cleaned_text = stripper.clean_subtitle_text(text)
if not cleaned_text.strip():
removed_count += 1
continue
if cleaned_text != text:
modified_count += 1
cleaned_blocks.append(
SubtitleBlock(
block.index,
block.start_time,
block.end_time,
cleaned_text,
)
)
else:
cleaned_blocks = list(original_blocks)
sanitized = sanitize_all_blocks(cleaned_blocks)
if len(sanitized) < len(cleaned_blocks):
removed_count += len(cleaned_blocks) - len(sanitized)
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
original_blocks,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
if not sanitized:
return self._fail("No dialogue subtitles found after cleaning")
@@ -1634,8 +1620,8 @@ class SubtitleProcessor:
tmp.replace(file_path)
summary = (
f"Removed {removed_count} ad blocks, modified {modified_count} blocks"
if clean_subtitle_content
f"Removed {removed_count} blocks, modified {modified_count} blocks"
if (clean_subtitle_content or clean_subtitle_garbage)
else "Cleaned subtitle content"
)
@@ -1652,6 +1638,133 @@ class SubtitleProcessor:
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
return self._fail(f"File is being processed by another task: {e}")
def preview_clean_file(
self,
file_path: str | Path,
clean_subtitle_content: bool = True,
clean_subtitle_garbage: bool = False,
max_changes: int = 80,
) -> dict:
"""Preview cleaning changes without modifying the file."""
file_path = Path(file_path)
if not file_path.exists():
return self._fail("File not found")
if file_path.stat().st_size > self.MAX_SRT_BYTES:
return self._fail("Subtitle file too large")
try:
with file_lock(file_path, timeout=10.0):
original = file_path.read_text(encoding="utf-8", errors="ignore")
subs = parse_srt(original)
if not subs:
return self._fail("No valid subtitle blocks found")
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
subs,
clean_subtitle_content=clean_subtitle_content,
clean_subtitle_garbage=clean_subtitle_garbage,
)
cleaned_map = {}
for block in sanitized:
key = (block.start_time, block.end_time)
cleaned_map.setdefault(key, []).append(block)
changes = []
for block in subs:
key = (block.start_time, block.end_time)
updated = None
if key in cleaned_map and cleaned_map[key]:
updated = cleaned_map[key].pop(0)
if updated is None:
changes.append({
"type": "removed",
"start_ms": block.start_time,
"end_ms": block.end_time,
"timecode": f"{_ms_to_timecode(block.start_time)}{_ms_to_timecode(block.end_time)}",
"before": block.text,
"after": "",
})
elif updated.text != block.text:
changes.append({
"type": "modified",
"start_ms": block.start_time,
"end_ms": block.end_time,
"timecode": f"{_ms_to_timecode(block.start_time)}{_ms_to_timecode(block.end_time)}",
"before": block.text,
"after": updated.text,
})
if len(changes) >= max_changes:
break
return {
"success": True,
"summary": f"Removed {removed_count} blocks, modified {modified_count} blocks",
"removed_blocks": removed_count,
"modified_blocks": modified_count,
"total_changed_blocks": removed_count + modified_count,
"changes_truncated": (removed_count + modified_count) > len(changes),
"changes": changes,
}
except FileLockError as e:
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
return self._fail(f"File is being processed by another task: {e}")
def _clean_blocks_for_content(
self,
original_blocks: List[SubtitleBlock],
clean_subtitle_content: bool,
clean_subtitle_garbage: bool,
) -> tuple[List[SubtitleBlock], int, int]:
stripper = get_stripper()
cleaned_blocks: List[SubtitleBlock] = []
removed_count = 0
modified_count = 0
for block in original_blocks:
text = block.text
if clean_subtitle_content and stripper.should_remove_subtitle_block(text):
removed_count += 1
continue
if clean_subtitle_content or clean_subtitle_garbage:
cleaned_text = stripper.clean_subtitle_text_with_options(
text,
remove_watermarks=clean_subtitle_content,
remove_garbage=clean_subtitle_garbage,
)
else:
cleaned_text = text
if not cleaned_text.strip():
removed_count += 1
continue
if cleaned_text != text:
modified_count += 1
cleaned_blocks.append(
SubtitleBlock(
block.index,
block.start_time,
block.end_time,
cleaned_text,
)
)
sanitized = sanitize_all_blocks(cleaned_blocks)
if len(sanitized) < len(cleaned_blocks):
removed_count += len(cleaned_blocks) - len(sanitized)
return sanitized, removed_count, modified_count
# ========================================================
# Metadata fetching
# ========================================================