1.1.1 - adds OCR garbage removal
This commit is contained in:
+106
-11
@@ -253,6 +253,13 @@ class KeywordStripper:
|
||||
r"^\s*$",
|
||||
]
|
||||
|
||||
# -----------------------------
|
||||
# OCR / GARBAGE LINE DETECTION
|
||||
# -----------------------------
|
||||
|
||||
GARBAGE_MUSIC_LINE = r"^[\s\[\]\(\)\{\}_\-\.\~\*]*(?:[♪♫♬♩]+)[\s\[\]\(\)\{\}_\-\.\~\*]*$"
|
||||
GARBAGE_TIMECODE = r"\d{1,2}:\d{2}:\d{2}[,\.]\d{3}"
|
||||
|
||||
# -----------------------------
|
||||
# COMPILED REGEX CACHE
|
||||
# -----------------------------
|
||||
@@ -296,6 +303,8 @@ class KeywordStripper:
|
||||
"subtitle_force_remove": [
|
||||
re.compile(p, re.IGNORECASE) for p in combined_force_remove
|
||||
],
|
||||
"garbage_music_line": re.compile(cls.GARBAGE_MUSIC_LINE),
|
||||
"garbage_timecode": re.compile(cls.GARBAGE_TIMECODE),
|
||||
}
|
||||
|
||||
return cls._compiled
|
||||
@@ -430,26 +439,74 @@ class KeywordStripper:
|
||||
Returns:
|
||||
Cleaned text with ads removed, or empty string if nothing remains
|
||||
"""
|
||||
return self._clean_subtitle_text(text, remove_watermarks=True, remove_garbage=False)
|
||||
|
||||
def clean_subtitle_text_with_options(
|
||||
self,
|
||||
text: str,
|
||||
remove_watermarks: bool = True,
|
||||
remove_garbage: bool = False,
|
||||
) -> str:
|
||||
return self._clean_subtitle_text(
|
||||
text,
|
||||
remove_watermarks=remove_watermarks,
|
||||
remove_garbage=remove_garbage,
|
||||
)
|
||||
|
||||
def _is_timecode_line(self, line: str) -> bool:
|
||||
rx = self._compile()
|
||||
if not rx["garbage_timecode"].search(line):
|
||||
return False
|
||||
stripped = rx["garbage_timecode"].sub("", line)
|
||||
stripped = re.sub(r"[\s0-9:\-–>,\.\[\]]+", "", stripped)
|
||||
return stripped == ""
|
||||
|
||||
def _is_music_line(self, line: str) -> bool:
|
||||
rx = self._compile()
|
||||
return bool(rx["garbage_music_line"].match(line.strip()))
|
||||
|
||||
def _normalize_line(self, line: str) -> str:
|
||||
return re.sub(r"\s+", " ", line.strip()).lower()
|
||||
|
||||
def _clean_subtitle_text(
|
||||
self,
|
||||
text: str,
|
||||
remove_watermarks: bool = True,
|
||||
remove_garbage: bool = False,
|
||||
) -> str:
|
||||
rx = self._compile()
|
||||
original = text
|
||||
|
||||
# Process line by line to handle multi-line subtitles
|
||||
lines = text.split('\n')
|
||||
cleaned_lines = []
|
||||
seen_lines = set()
|
||||
|
||||
for line in lines:
|
||||
if remove_garbage:
|
||||
if self._is_music_line(line) or self._is_timecode_line(line):
|
||||
continue
|
||||
|
||||
cleaned_line = line
|
||||
|
||||
# Remove watermark patterns
|
||||
for pattern in rx["subtitle_watermarks"]:
|
||||
cleaned_line = pattern.sub("", cleaned_line)
|
||||
if remove_watermarks:
|
||||
for pattern in rx["subtitle_watermarks"]:
|
||||
cleaned_line = pattern.sub("", cleaned_line)
|
||||
|
||||
# Clean up resulting whitespace
|
||||
cleaned_line = re.sub(r'\s+', ' ', cleaned_line).strip()
|
||||
|
||||
# Only keep lines that have content after cleaning
|
||||
if cleaned_line:
|
||||
cleaned_lines.append(cleaned_line)
|
||||
if not cleaned_line:
|
||||
continue
|
||||
|
||||
if remove_garbage:
|
||||
normalized = self._normalize_line(cleaned_line)
|
||||
if normalized and normalized in seen_lines:
|
||||
continue
|
||||
seen_lines.add(normalized)
|
||||
|
||||
cleaned_lines.append(cleaned_line)
|
||||
|
||||
result = '\n'.join(cleaned_lines)
|
||||
|
||||
@@ -463,7 +520,12 @@ class KeywordStripper:
|
||||
|
||||
return result
|
||||
|
||||
def clean_subtitle_blocks(self, blocks: List[dict]) -> List[dict]:
|
||||
def clean_subtitle_blocks(
|
||||
self,
|
||||
blocks: List[dict],
|
||||
remove_watermarks: bool = True,
|
||||
remove_garbage: bool = False,
|
||||
) -> List[dict]:
|
||||
"""
|
||||
Clean a list of subtitle blocks, removing ads and watermarks.
|
||||
|
||||
@@ -485,13 +547,17 @@ class KeywordStripper:
|
||||
text = block.get("text", "")
|
||||
|
||||
# Check if entire block should be removed
|
||||
if self.should_remove_subtitle_block(text):
|
||||
if remove_watermarks and self.should_remove_subtitle_block(text):
|
||||
removed_count += 1
|
||||
logger.debug("Removing ad block: '%s'", text[:50])
|
||||
continue
|
||||
|
||||
# Clean the text
|
||||
cleaned_text = self.clean_subtitle_text(text)
|
||||
cleaned_text = self._clean_subtitle_text(
|
||||
text,
|
||||
remove_watermarks=remove_watermarks,
|
||||
remove_garbage=remove_garbage,
|
||||
)
|
||||
|
||||
# Skip if cleaning resulted in empty text
|
||||
if not cleaned_text.strip():
|
||||
@@ -526,6 +592,27 @@ class KeywordStripper:
|
||||
detected.append(keyword)
|
||||
return detected
|
||||
|
||||
def detect_garbage_labels(self, block_texts: List[str]) -> List[str]:
|
||||
"""Detect OCR/garbage patterns in subtitle blocks."""
|
||||
labels = set()
|
||||
for text in block_texts:
|
||||
lines = text.split("\n")
|
||||
seen = set()
|
||||
for line in lines:
|
||||
if self._is_music_line(line):
|
||||
labels.add("Music-only lines")
|
||||
if self._is_timecode_line(line):
|
||||
labels.add("OCR timecodes")
|
||||
normalized = self._normalize_line(line)
|
||||
if normalized:
|
||||
if normalized in seen:
|
||||
labels.add("Duplicate lines")
|
||||
else:
|
||||
seen.add(normalized)
|
||||
if len(labels) >= 3:
|
||||
break
|
||||
return sorted(labels)
|
||||
|
||||
def set_force_remove_keywords(self, keywords: List[str]) -> None:
|
||||
"""Set custom force-remove keywords and refresh regex cache."""
|
||||
type(self)._custom_force_remove_keywords = [
|
||||
@@ -556,9 +643,17 @@ def clean_filename(filename: str, preserve_year: bool = True) -> dict:
|
||||
return get_stripper().clean_filename(filename, preserve_year)
|
||||
|
||||
|
||||
def clean_subtitle_content(text: str) -> str:
|
||||
"""Clean watermarks and ads from subtitle text."""
|
||||
return get_stripper().clean_subtitle_text(text)
|
||||
def clean_subtitle_content(
|
||||
text: str,
|
||||
remove_watermarks: bool = True,
|
||||
remove_garbage: bool = False,
|
||||
) -> str:
|
||||
"""Clean watermarks/ads and optional OCR garbage from subtitle text."""
|
||||
return get_stripper().clean_subtitle_text_with_options(
|
||||
text,
|
||||
remove_watermarks=remove_watermarks,
|
||||
remove_garbage=remove_garbage,
|
||||
)
|
||||
|
||||
|
||||
def should_remove_subtitle(text: str) -> bool:
|
||||
|
||||
@@ -1267,6 +1267,7 @@ class SubtitleProcessor:
|
||||
format_options: SubtitleFormatOptions = None,
|
||||
strip_keywords: bool = True,
|
||||
clean_subtitle_content: bool = True,
|
||||
clean_subtitle_garbage: bool = False,
|
||||
insertion_position: str = "start",
|
||||
preferred_source: str | None = None,
|
||||
language: str | None = None,
|
||||
@@ -1294,6 +1295,8 @@ class SubtitleProcessor:
|
||||
IMPORTANT: This ONLY affects the title lookup, NOT the subtitle content or timing.
|
||||
clean_subtitle_content: If True, remove embedded ads/watermarks (YTS, RARBG, etc.)
|
||||
from inside subtitle text. This cleans the actual dialogue content.
|
||||
clean_subtitle_garbage: If True, remove OCR garbage like timecodes, music-only
|
||||
lines, and duplicate lines inside a subtitle block.
|
||||
"""
|
||||
file_path = Path(file_path)
|
||||
|
||||
@@ -1375,6 +1378,7 @@ class SubtitleProcessor:
|
||||
subs = parse_srt(original)
|
||||
stripper = get_stripper()
|
||||
detected_keywords = stripper.detect_subtitle_watermarks(original)
|
||||
detected_keywords += stripper.detect_garbage_labels([b.text for b in subs])
|
||||
|
||||
if not subs:
|
||||
return self._fail("No valid subtitle blocks found")
|
||||
@@ -1395,7 +1399,7 @@ class SubtitleProcessor:
|
||||
# This removes things like "YTS", "RARBG", "OpenSubtitles" etc.
|
||||
# from inside the actual subtitle text
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
if clean_subtitle_content:
|
||||
if clean_subtitle_content or clean_subtitle_garbage:
|
||||
stripper = get_stripper()
|
||||
original_count = len(clean_subs)
|
||||
|
||||
@@ -1406,7 +1410,11 @@ class SubtitleProcessor:
|
||||
]
|
||||
|
||||
# Clean the content
|
||||
cleaned_dicts = stripper.clean_subtitle_blocks(blocks_as_dicts)
|
||||
cleaned_dicts = stripper.clean_subtitle_blocks(
|
||||
blocks_as_dicts,
|
||||
remove_watermarks=clean_subtitle_content,
|
||||
remove_garbage=clean_subtitle_garbage,
|
||||
)
|
||||
|
||||
# Convert back to SubtitleBlock
|
||||
clean_subs = [
|
||||
@@ -1414,10 +1422,10 @@ class SubtitleProcessor:
|
||||
for d in cleaned_dicts
|
||||
]
|
||||
|
||||
removed_ads = original_count - len(clean_subs)
|
||||
if removed_ads > 0:
|
||||
removed_blocks = original_count - len(clean_subs)
|
||||
if removed_blocks > 0:
|
||||
logger.info(
|
||||
f"Removed {removed_ads} ad/watermark subtitle blocks from {file_path.name}"
|
||||
f"Removed {removed_blocks} cleaned subtitle blocks from {file_path.name}"
|
||||
)
|
||||
|
||||
if not clean_subs:
|
||||
@@ -1546,6 +1554,7 @@ class SubtitleProcessor:
|
||||
self,
|
||||
file_path: str | Path,
|
||||
clean_subtitle_content: bool = True,
|
||||
clean_subtitle_garbage: bool = False,
|
||||
) -> dict:
|
||||
"""Clean ad/watermark content from a subtitle file without inserting plots."""
|
||||
file_path = Path(file_path)
|
||||
@@ -1557,6 +1566,7 @@ class SubtitleProcessor:
|
||||
return self._fail("Subtitle file too large")
|
||||
|
||||
try:
|
||||
stripper = get_stripper()
|
||||
with file_lock(file_path, timeout=30.0):
|
||||
original = file_path.read_text(encoding="utf-8", errors="ignore")
|
||||
subs = parse_srt(original)
|
||||
@@ -1565,40 +1575,16 @@ class SubtitleProcessor:
|
||||
return self._fail("No valid subtitle blocks found")
|
||||
|
||||
original_blocks = subs
|
||||
removed_count = 0
|
||||
modified_count = 0
|
||||
detected_keywords = stripper.detect_subtitle_watermarks(original)
|
||||
detected_keywords += stripper.detect_garbage_labels(
|
||||
[b.text for b in original_blocks]
|
||||
)
|
||||
|
||||
if clean_subtitle_content:
|
||||
cleaned_blocks: List[SubtitleBlock] = []
|
||||
|
||||
for block in original_blocks:
|
||||
text = block.text
|
||||
if stripper.should_remove_subtitle_block(text):
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
cleaned_text = stripper.clean_subtitle_text(text)
|
||||
if not cleaned_text.strip():
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
if cleaned_text != text:
|
||||
modified_count += 1
|
||||
|
||||
cleaned_blocks.append(
|
||||
SubtitleBlock(
|
||||
block.index,
|
||||
block.start_time,
|
||||
block.end_time,
|
||||
cleaned_text,
|
||||
)
|
||||
)
|
||||
else:
|
||||
cleaned_blocks = list(original_blocks)
|
||||
|
||||
sanitized = sanitize_all_blocks(cleaned_blocks)
|
||||
if len(sanitized) < len(cleaned_blocks):
|
||||
removed_count += len(cleaned_blocks) - len(sanitized)
|
||||
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
|
||||
original_blocks,
|
||||
clean_subtitle_content=clean_subtitle_content,
|
||||
clean_subtitle_garbage=clean_subtitle_garbage,
|
||||
)
|
||||
|
||||
if not sanitized:
|
||||
return self._fail("No dialogue subtitles found after cleaning")
|
||||
@@ -1634,8 +1620,8 @@ class SubtitleProcessor:
|
||||
tmp.replace(file_path)
|
||||
|
||||
summary = (
|
||||
f"Removed {removed_count} ad blocks, modified {modified_count} blocks"
|
||||
if clean_subtitle_content
|
||||
f"Removed {removed_count} blocks, modified {modified_count} blocks"
|
||||
if (clean_subtitle_content or clean_subtitle_garbage)
|
||||
else "Cleaned subtitle content"
|
||||
)
|
||||
|
||||
@@ -1652,6 +1638,133 @@ class SubtitleProcessor:
|
||||
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
|
||||
return self._fail(f"File is being processed by another task: {e}")
|
||||
|
||||
def preview_clean_file(
|
||||
self,
|
||||
file_path: str | Path,
|
||||
clean_subtitle_content: bool = True,
|
||||
clean_subtitle_garbage: bool = False,
|
||||
max_changes: int = 80,
|
||||
) -> dict:
|
||||
"""Preview cleaning changes without modifying the file."""
|
||||
file_path = Path(file_path)
|
||||
|
||||
if not file_path.exists():
|
||||
return self._fail("File not found")
|
||||
|
||||
if file_path.stat().st_size > self.MAX_SRT_BYTES:
|
||||
return self._fail("Subtitle file too large")
|
||||
|
||||
try:
|
||||
with file_lock(file_path, timeout=10.0):
|
||||
original = file_path.read_text(encoding="utf-8", errors="ignore")
|
||||
subs = parse_srt(original)
|
||||
|
||||
if not subs:
|
||||
return self._fail("No valid subtitle blocks found")
|
||||
|
||||
sanitized, removed_count, modified_count = self._clean_blocks_for_content(
|
||||
subs,
|
||||
clean_subtitle_content=clean_subtitle_content,
|
||||
clean_subtitle_garbage=clean_subtitle_garbage,
|
||||
)
|
||||
|
||||
cleaned_map = {}
|
||||
for block in sanitized:
|
||||
key = (block.start_time, block.end_time)
|
||||
cleaned_map.setdefault(key, []).append(block)
|
||||
|
||||
changes = []
|
||||
for block in subs:
|
||||
key = (block.start_time, block.end_time)
|
||||
updated = None
|
||||
if key in cleaned_map and cleaned_map[key]:
|
||||
updated = cleaned_map[key].pop(0)
|
||||
|
||||
if updated is None:
|
||||
changes.append({
|
||||
"type": "removed",
|
||||
"start_ms": block.start_time,
|
||||
"end_ms": block.end_time,
|
||||
"timecode": f"{_ms_to_timecode(block.start_time)} → {_ms_to_timecode(block.end_time)}",
|
||||
"before": block.text,
|
||||
"after": "",
|
||||
})
|
||||
elif updated.text != block.text:
|
||||
changes.append({
|
||||
"type": "modified",
|
||||
"start_ms": block.start_time,
|
||||
"end_ms": block.end_time,
|
||||
"timecode": f"{_ms_to_timecode(block.start_time)} → {_ms_to_timecode(block.end_time)}",
|
||||
"before": block.text,
|
||||
"after": updated.text,
|
||||
})
|
||||
|
||||
if len(changes) >= max_changes:
|
||||
break
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"summary": f"Removed {removed_count} blocks, modified {modified_count} blocks",
|
||||
"removed_blocks": removed_count,
|
||||
"modified_blocks": modified_count,
|
||||
"total_changed_blocks": removed_count + modified_count,
|
||||
"changes_truncated": (removed_count + modified_count) > len(changes),
|
||||
"changes": changes,
|
||||
}
|
||||
|
||||
except FileLockError as e:
|
||||
logger.error(f"Could not acquire lock for {file_path.name}: {e}")
|
||||
return self._fail(f"File is being processed by another task: {e}")
|
||||
|
||||
def _clean_blocks_for_content(
|
||||
self,
|
||||
original_blocks: List[SubtitleBlock],
|
||||
clean_subtitle_content: bool,
|
||||
clean_subtitle_garbage: bool,
|
||||
) -> tuple[List[SubtitleBlock], int, int]:
|
||||
stripper = get_stripper()
|
||||
cleaned_blocks: List[SubtitleBlock] = []
|
||||
removed_count = 0
|
||||
modified_count = 0
|
||||
|
||||
for block in original_blocks:
|
||||
text = block.text
|
||||
|
||||
if clean_subtitle_content and stripper.should_remove_subtitle_block(text):
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
if clean_subtitle_content or clean_subtitle_garbage:
|
||||
cleaned_text = stripper.clean_subtitle_text_with_options(
|
||||
text,
|
||||
remove_watermarks=clean_subtitle_content,
|
||||
remove_garbage=clean_subtitle_garbage,
|
||||
)
|
||||
else:
|
||||
cleaned_text = text
|
||||
|
||||
if not cleaned_text.strip():
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
if cleaned_text != text:
|
||||
modified_count += 1
|
||||
|
||||
cleaned_blocks.append(
|
||||
SubtitleBlock(
|
||||
block.index,
|
||||
block.start_time,
|
||||
block.end_time,
|
||||
cleaned_text,
|
||||
)
|
||||
)
|
||||
|
||||
sanitized = sanitize_all_blocks(cleaned_blocks)
|
||||
if len(sanitized) < len(cleaned_blocks):
|
||||
removed_count += len(cleaned_blocks) - len(sanitized)
|
||||
|
||||
return sanitized, removed_count, modified_count
|
||||
|
||||
# ========================================================
|
||||
# Metadata fetching
|
||||
# ========================================================
|
||||
|
||||
Reference in New Issue
Block a user