""" Database layer using SQLAlchemy with SQLite Handles persistent storage for settings, runs, and history """ from datetime import datetime from pathlib import Path from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Float, Text, ForeignKey, text, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship, scoped_session import json import logging from logging_utils import get_logger logger = get_logger(__name__) Base = declarative_base() class Settings(Base): """Settings table - stores application configuration""" __tablename__ = 'settings' id = Column(Integer, primary_key=True) key = Column(String(100), unique=True, nullable=False, index=True) value = Column(Text, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class ProcessingRun(Base): """Processing runs table - stores each batch processing session""" __tablename__ = 'processing_runs' id = Column(Integer, primary_key=True) started_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) completed_at = Column(DateTime) total_files = Column(Integer, default=0) successful_files = Column(Integer, default=0) failed_files = Column(Integer, default=0) duration_seconds = Column(Float) status = Column(String(50), default='in_progress') # in_progress, completed, failed # Relationship to file results file_results = relationship("FileResult", back_populates="run", cascade="all, delete-orphan") def __repr__(self): return f"" class FileResult(Base): """File results table - stores individual file processing results""" __tablename__ = 'file_results' id = Column(Integer, primary_key=True) run_id = Column(Integer, ForeignKey('processing_runs.id'), nullable=False, index=True) file_path = Column(String(500), nullable=False, index=True) file_name = Column(String(255), nullable=False) success = Column(Boolean, default=False) status = Column(String(100)) summary = Column(Text) error_message = Column(Text) processed_at = Column(DateTime, default=datetime.utcnow, nullable=False) duration = Column(Integer) # subtitle duration in seconds # Relationship back to run run = relationship("ProcessingRun", back_populates="file_results") def __repr__(self): return f"" class ScanHistory(Base): """Scan history table - stores directory scan history""" __tablename__ = 'scan_history' id = Column(Integer, primary_key=True) directory = Column(String(500), nullable=False, index=True) scanned_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) files_found = Column(Integer, default=0) files_with_plot = Column(Integer, default=0) scan_duration_ms = Column(Integer) def __repr__(self): return f"" class ScanFile(Base): """Scan files table - stores file details per scan""" __tablename__ = 'scan_files' id = Column(Integer, primary_key=True) scan_id = Column(Integer, ForeignKey('scan_history.id'), nullable=False, index=True) file_path = Column(String(500), nullable=False, index=True) file_name = Column(String(255), nullable=False) title = Column(String(255)) year = Column(String(10)) has_plot = Column(Boolean, default=False) plot_marker_count = Column(Integer, default=0) status = Column(String(100)) summary = Column(Text) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) scan = relationship("ScanHistory") def __repr__(self): return f"" class ScheduledScan(Base): """Scheduled scans table - stores scheduled scan jobs and results""" __tablename__ = 'scheduled_scans' id = Column(Integer, primary_key=True) directory = Column(String(500), nullable=False, index=True) scheduled_for = Column(DateTime, nullable=False, index=True) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) started_at = Column(DateTime) completed_at = Column(DateTime) status = Column(String(50), default='scheduled', index=True) # scheduled, running, completed, cancelled, failed files_found = Column(Integer, default=0) files_with_plot = Column(Integer, default=0) scan_duration_ms = Column(Integer) error_message = Column(Text) def __repr__(self): return f"" class ApiUsage(Base): """API usage tracking table - monitors API calls per integration""" __tablename__ = 'api_usage' id = Column(Integer, primary_key=True) provider = Column(String(50), nullable=False, index=True) # omdb, tmdb, tvmaze, wikipedia endpoint = Column(String(200)) # Specific endpoint called timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) success = Column(Boolean, default=True) response_time_ms = Column(Integer) call_count = Column(Integer, default=1) # Number of API calls in this batch def __repr__(self): return f"" class SuggestedMatch(Base): """Suggested matches table - stores auto-matched titles for files""" __tablename__ = 'suggested_matches' id = Column(Integer, primary_key=True) file_path = Column(String(500), nullable=False, unique=True, index=True) file_name = Column(String(255), nullable=False) matched_title = Column(String(255), nullable=False) matched_year = Column(String(10)) matched_imdb_id = Column(String(50)) match_data = Column(Text) # JSON blob with full match data created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class FolderRule(Base): """Folder-specific rules that override default settings""" __tablename__ = 'folder_rules' id = Column(Integer, primary_key=True) directory = Column(String(500), nullable=False, unique=True, index=True) preferred_source = Column(String(50)) insertion_position = Column(String(50)) language = Column(String(20)) subtitle_title_bold = Column(Boolean) subtitle_plot_italic = Column(Boolean) subtitle_show_director = Column(Boolean) subtitle_show_actors = Column(Boolean) subtitle_show_released = Column(Boolean) subtitle_show_genre = Column(Boolean) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class AutomationRule(Base): """Automation rules for scheduled tasks""" __tablename__ = 'automation_rules' id = Column(String(64), primary_key=True) name = Column(String(255), nullable=False) schedule = Column(String(100), nullable=False) enabled = Column(Boolean, default=True, nullable=False) patterns = Column(Text, nullable=False) # JSON list target_folders = Column(Text, nullable=False) # JSON list created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class AutomationLog(Base): """Automation run log entries""" __tablename__ = 'automation_logs' id = Column(Integer, primary_key=True) rule_id = Column(String(64), nullable=False, index=True) file_path = Column(String(500), nullable=False) modified = Column(Boolean, default=False) removed_lines = Column(Integer, default=0) dry_run = Column(Boolean, default=False) error_message = Column(Text) run_at = Column(DateTime, default=datetime.utcnow, nullable=False) def __repr__(self): return f"" class DatabaseManager: """Manages database connections and operations""" def __init__(self, db_path="sublogue.db"): """Initialize database connection""" self.db_path = Path(db_path) self.engine = create_engine(f'sqlite:///{self.db_path}', echo=False) self.Session = scoped_session(sessionmaker(bind=self.engine)) # Create tables if they don't exist Base.metadata.create_all(self.engine) self._ensure_scan_files_schema() logger.info(f"Database initialized at {self.db_path}") def get_session(self): """Get a new database session""" return self.Session() def _ensure_scan_files_schema(self): """Ensure scan_files table has newer columns in existing databases.""" session = self.get_session() try: columns = session.execute(text("PRAGMA table_info(scan_files)")).fetchall() if not columns: return existing = {row[1] for row in columns} # column name is index 1 if "title" not in existing: session.execute(text("ALTER TABLE scan_files ADD COLUMN title VARCHAR(255)")) if "year" not in existing: session.execute(text("ALTER TABLE scan_files ADD COLUMN year VARCHAR(10)")) session.commit() except Exception as e: session.rollback() logger.error(f"Error migrating scan_files schema: {e}") finally: session.close() def close_session(self): """Close the session""" self.Session.remove() # ============ SETTINGS OPERATIONS ============ def get_setting(self, key, default=None): """Get a setting value""" session = self.get_session() try: setting = session.query(Settings).filter_by(key=key).first() if setting: try: return json.loads(setting.value) except json.JSONDecodeError: return setting.value return default finally: session.close() def set_setting(self, key, value): """Set a setting value""" session = self.get_session() try: setting = session.query(Settings).filter_by(key=key).first() # Convert value to JSON string json_value = json.dumps(value) if not isinstance(value, str) else value if setting: setting.value = json_value setting.updated_at = datetime.utcnow() else: setting = Settings(key=key, value=json_value) session.add(setting) session.commit() logger.info(f"Setting updated: {key}") except Exception as e: session.rollback() logger.error(f"Error setting value: {e}") raise finally: session.close() def get_all_settings(self): """Get all settings as a dictionary""" session = self.get_session() try: settings = session.query(Settings).all() result = {} for setting in settings: try: result[setting.key] = json.loads(setting.value) except json.JSONDecodeError: result[setting.key] = setting.value return result finally: session.close() def update_settings(self, settings_dict): """Update multiple settings at once""" for key, value in settings_dict.items(): self.set_setting(key, value) # ============ PROCESSING RUN OPERATIONS ============ def create_run(self, total_files): """Create a new processing run""" session = self.get_session() try: run = ProcessingRun( total_files=total_files, status='in_progress' ) session.add(run) session.commit() run_id = run.id logger.info(f"Created processing run {run_id}") return run_id except Exception as e: session.rollback() logger.error(f"Error creating run: {e}") raise finally: session.close() def complete_run(self, run_id, successful_files, failed_files): """Mark a run as completed""" session = self.get_session() try: run = session.query(ProcessingRun).get(run_id) if run: run.completed_at = datetime.utcnow() run.successful_files = successful_files run.failed_files = failed_files run.status = 'completed' if run.started_at: duration = (run.completed_at - run.started_at).total_seconds() run.duration_seconds = duration session.commit() logger.info(f"Completed run {run_id}") except Exception as e: session.rollback() logger.error(f"Error completing run: {e}") raise finally: session.close() def add_file_result(self, run_id, file_path, success, status, summary="", error_message="", duration=40): """Add a file processing result""" session = self.get_session() try: result = FileResult( run_id=run_id, file_path=file_path, file_name=Path(file_path).name, success=success, status=status, summary=summary, error_message=error_message, duration=duration ) session.add(result) session.commit() except Exception as e: session.rollback() logger.error(f"Error adding file result: {e}") raise finally: session.close() def get_run_history(self, limit=50): """Get processing run history""" session = self.get_session() try: runs = session.query(ProcessingRun).order_by( ProcessingRun.started_at.desc() ).limit(limit).all() result = [] for run in runs: result.append({ 'id': run.id, 'started_at': run.started_at.isoformat() if run.started_at else None, 'completed_at': run.completed_at.isoformat() if run.completed_at else None, 'total_files': run.total_files, 'successful_files': run.successful_files, 'failed_files': run.failed_files, 'duration_seconds': run.duration_seconds, 'status': run.status }) return result finally: session.close() def get_run_details(self, run_id): """Get detailed information about a specific run""" session = self.get_session() try: run = session.query(ProcessingRun).get(run_id) if not run: return None file_results = [] for result in run.file_results: file_results.append({ 'id': result.id, 'file_path': result.file_path, 'file_name': result.file_name, 'success': result.success, 'status': result.status, 'summary': result.summary, 'error_message': result.error_message, 'processed_at': result.processed_at.isoformat() if result.processed_at else None, 'duration': result.duration }) return { 'id': run.id, 'started_at': run.started_at.isoformat() if run.started_at else None, 'completed_at': run.completed_at.isoformat() if run.completed_at else None, 'total_files': run.total_files, 'successful_files': run.successful_files, 'failed_files': run.failed_files, 'duration_seconds': run.duration_seconds, 'status': run.status, 'file_results': file_results } finally: session.close() # ============ SCAN HISTORY OPERATIONS ============ def add_scan_history(self, directory, files_found, files_with_plot, scan_duration_ms): """Add a scan history entry""" session = self.get_session() try: scan = ScanHistory( directory=directory, files_found=files_found, files_with_plot=files_with_plot, scan_duration_ms=scan_duration_ms ) session.add(scan) session.commit() logger.info(f"Scan history saved for {directory}") return scan.id except Exception as e: session.rollback() logger.error(f"Error saving scan history: {e}") raise finally: session.close() def get_scan_history(self, limit=50): """Get scan history""" session = self.get_session() try: scans = session.query(ScanHistory).order_by( ScanHistory.scanned_at.desc() ).limit(limit).all() result = [] for scan in scans: result.append({ 'id': scan.id, 'directory': scan.directory, 'scanned_at': scan.scanned_at.isoformat() if scan.scanned_at else None, 'files_found': scan.files_found, 'files_with_plot': scan.files_with_plot, 'scan_duration_ms': scan.scan_duration_ms }) return result finally: session.close() def add_scan_files(self, scan_id, files): """Persist file details for a scan""" session = self.get_session() try: for file_info in files: session.add(ScanFile( scan_id=scan_id, file_path=file_info.get("path"), file_name=file_info.get("name"), title=file_info.get("title"), year=file_info.get("year"), has_plot=bool(file_info.get("has_plot")), plot_marker_count=int(file_info.get("plot_marker_count") or 0), status=file_info.get("status"), summary=file_info.get("summary", "") )) session.commit() except Exception as e: session.rollback() logger.error(f"Error saving scan files: {e}") raise finally: session.close() def get_scan_files(self, scan_id): """Get all files for a scan""" session = self.get_session() try: files = session.query(ScanFile).filter_by(scan_id=scan_id).all() return [ { "path": f.file_path, "name": f.file_name, "title": f.title, "year": f.year, "has_plot": f.has_plot, "plot_marker_count": f.plot_marker_count, "status": f.status, "summary": f.summary } for f in files ] finally: session.close() def get_latest_scan_files(self, limit=500, offset=0): """Get latest scan entry per file path, optionally paged by created_at.""" session = self.get_session() try: latest_subquery = session.query( ScanFile.file_path, func.max(ScanFile.created_at).label("max_created_at") ).group_by(ScanFile.file_path).subquery() query = session.query(ScanFile).join( latest_subquery, (ScanFile.file_path == latest_subquery.c.file_path) & (ScanFile.created_at == latest_subquery.c.max_created_at) ).order_by(ScanFile.created_at.desc()) if offset: query = query.offset(offset) if limit is not None: query = query.limit(limit) files = query.all() return [ { "path": file_entry.file_path, "name": file_entry.file_name, "title": file_entry.title, "year": file_entry.year, "has_plot": file_entry.has_plot, "plot_marker_count": file_entry.plot_marker_count, "status": file_entry.status, "summary": file_entry.summary } for file_entry in files ] finally: session.close() def get_latest_scan_files_count(self): """Count distinct latest scan entries per file path.""" session = self.get_session() try: latest_subquery = session.query( ScanFile.file_path, func.max(ScanFile.created_at).label("max_created_at") ).group_by(ScanFile.file_path).subquery() return session.query(ScanFile).join( latest_subquery, (ScanFile.file_path == latest_subquery.c.file_path) & (ScanFile.created_at == latest_subquery.c.max_created_at) ).count() finally: session.close() def get_latest_file_results(self): """Get latest processing result per file""" session = self.get_session() try: results = session.query(FileResult).order_by(FileResult.processed_at.desc()).all() latest = {} for result in results: if result.file_path not in latest: latest[result.file_path] = { "status": result.status, "error_message": result.error_message, "processed_at": result.processed_at.isoformat() if result.processed_at else None } return latest finally: session.close() # ============ SCHEDULED SCAN OPERATIONS ============ def create_scheduled_scan(self, directory, scheduled_for): """Create a scheduled scan entry""" session = self.get_session() try: scheduled = ScheduledScan( directory=directory, scheduled_for=scheduled_for, status='scheduled' ) session.add(scheduled) session.commit() return scheduled.id except Exception as e: session.rollback() logger.error(f"Error creating scheduled scan: {e}") raise finally: session.close() def get_scheduled_scans(self, limit=50, status=None): """Get scheduled scans""" session = self.get_session() try: query = session.query(ScheduledScan) if status: query = query.filter_by(status=status) scans = query.order_by(ScheduledScan.scheduled_for.desc()).limit(limit).all() result = [] for scan in scans: result.append({ 'id': scan.id, 'directory': scan.directory, 'scheduled_for': scan.scheduled_for.isoformat() if scan.scheduled_for else None, 'created_at': scan.created_at.isoformat() if scan.created_at else None, 'started_at': scan.started_at.isoformat() if scan.started_at else None, 'completed_at': scan.completed_at.isoformat() if scan.completed_at else None, 'status': scan.status, 'files_found': scan.files_found, 'files_with_plot': scan.files_with_plot, 'scan_duration_ms': scan.scan_duration_ms, 'error_message': scan.error_message }) return result finally: session.close() def get_scheduled_scan(self, scan_id): """Get a single scheduled scan by id""" session = self.get_session() try: scan = session.query(ScheduledScan).get(scan_id) if not scan: return None return { 'id': scan.id, 'directory': scan.directory, 'scheduled_for': scan.scheduled_for.isoformat() if scan.scheduled_for else None, 'created_at': scan.created_at.isoformat() if scan.created_at else None, 'started_at': scan.started_at.isoformat() if scan.started_at else None, 'completed_at': scan.completed_at.isoformat() if scan.completed_at else None, 'status': scan.status, 'files_found': scan.files_found, 'files_with_plot': scan.files_with_plot, 'scan_duration_ms': scan.scan_duration_ms, 'error_message': scan.error_message } finally: session.close() def get_due_scheduled_scans(self, now): """Get scheduled scans that are due to run""" session = self.get_session() try: scans = session.query(ScheduledScan).filter( ScheduledScan.status == 'scheduled', ScheduledScan.scheduled_for <= now ).order_by(ScheduledScan.scheduled_for.asc()).all() return [scan.id for scan in scans] finally: session.close() def mark_scheduled_scan_running(self, scan_id, started_at=None): """Mark a scheduled scan as running""" session = self.get_session() try: scan = session.query(ScheduledScan).get(scan_id) if not scan or scan.status != 'scheduled': return False scan.status = 'running' scan.started_at = started_at or datetime.utcnow() session.commit() return True except Exception as e: session.rollback() logger.error(f"Error marking scheduled scan running: {e}") return False finally: session.close() def complete_scheduled_scan(self, scan_id, files_found, files_with_plot, scan_duration_ms, completed_at=None): """Mark a scheduled scan as completed""" session = self.get_session() try: scan = session.query(ScheduledScan).get(scan_id) if not scan: return False scan.status = 'completed' scan.completed_at = completed_at or datetime.utcnow() scan.files_found = files_found scan.files_with_plot = files_with_plot scan.scan_duration_ms = scan_duration_ms session.commit() return True except Exception as e: session.rollback() logger.error(f"Error completing scheduled scan: {e}") return False finally: session.close() def fail_scheduled_scan(self, scan_id, error_message, completed_at=None): """Mark a scheduled scan as failed""" session = self.get_session() try: scan = session.query(ScheduledScan).get(scan_id) if not scan: return False scan.status = 'failed' scan.completed_at = completed_at or datetime.utcnow() scan.error_message = error_message session.commit() return True except Exception as e: session.rollback() logger.error(f"Error failing scheduled scan: {e}") return False finally: session.close() def cancel_scheduled_scan(self, scan_id): """Cancel a scheduled scan""" session = self.get_session() try: scan = session.query(ScheduledScan).get(scan_id) if not scan or scan.status != 'scheduled': return False scan.status = 'cancelled' session.commit() return True except Exception as e: session.rollback() logger.error(f"Error cancelling scheduled scan: {e}") return False finally: session.close() # ============ API USAGE OPERATIONS ============ def track_api_call(self, provider, endpoint=None, success=True, response_time_ms=None, call_count=1): """Track API call(s) for usage monitoring Args: provider: API provider name (omdb, tmdb, tvmaze) endpoint: Specific endpoint called success: Whether the call(s) succeeded response_time_ms: Total response time in milliseconds call_count: Number of API calls made (for batched operations) """ session = self.get_session() try: usage = ApiUsage( provider=provider, endpoint=endpoint, success=success, response_time_ms=response_time_ms, call_count=call_count ) session.add(usage) session.commit() logger.debug(f"Tracked {call_count} API call(s) to {provider}") except Exception as e: session.rollback() logger.error(f"Error tracking API call: {e}") finally: session.close() def get_api_usage_24h(self, provider): """Get API call count for the last 24 hours (sums call_count field)""" session = self.get_session() try: from datetime import timedelta from sqlalchemy import func cutoff_time = datetime.utcnow() - timedelta(hours=24) # Sum the call_count column to get total API calls result = session.query(func.coalesce(func.sum(ApiUsage.call_count), 0)).filter( ApiUsage.provider == provider, ApiUsage.timestamp >= cutoff_time ).scalar() return result or 0 finally: session.close() def check_api_limit(self, provider, limit=1000): """Check if API usage is under the limit""" current_usage = self.get_api_usage_24h(provider) return { 'under_limit': current_usage < limit, 'current_usage': current_usage, 'limit': limit, 'remaining': max(0, limit - current_usage) } def get_usage_stats(self, provider): """Get detailed usage statistics for a provider""" session = self.get_session() try: from datetime import timedelta cutoff_time = datetime.utcnow() - timedelta(hours=24) # Get 24h usage records usage_24h = session.query(ApiUsage).filter( ApiUsage.provider == provider, ApiUsage.timestamp >= cutoff_time ).all() # Sum call_count for accurate totals total_calls = sum(u.call_count or 1 for u in usage_24h) successful_calls = sum(u.call_count or 1 for u in usage_24h if u.success) failed_calls = total_calls - successful_calls # Calculate average response time (weighted by call count) weighted_times = [(u.response_time_ms or 0) * (u.call_count or 1) for u in usage_24h if u.response_time_ms is not None] avg_response_time = sum(weighted_times) / total_calls if total_calls > 0 and weighted_times else 0 # Find oldest call in 24h window to calculate reset time oldest_call = min([u.timestamp for u in usage_24h]) if usage_24h else datetime.utcnow() reset_time = oldest_call + timedelta(hours=24) return { 'provider': provider, 'total_calls_24h': total_calls, 'successful_calls': successful_calls, 'failed_calls': failed_calls, 'avg_response_time_ms': round(avg_response_time, 2), 'reset_time': reset_time.isoformat(), 'limit': 1000, 'remaining': max(0, 1000 - total_calls) } finally: session.close() def get_all_usage_stats(self): """Get usage statistics for all providers""" providers = ['omdb', 'tmdb', 'tvmaze', 'wikipedia'] return {provider: self.get_usage_stats(provider) for provider in providers} # ============ SUGGESTED MATCHES OPERATIONS ============ def save_suggested_match(self, file_path, file_name, match_data): """Save or update a suggested match for a file""" session = self.get_session() try: # Check if match already exists existing = session.query(SuggestedMatch).filter_by(file_path=file_path).first() if existing: # Update existing match existing.matched_title = match_data.get('title', '') existing.matched_year = match_data.get('year', '') existing.matched_imdb_id = match_data.get('imdb_id', '') existing.match_data = json.dumps(match_data) existing.updated_at = datetime.utcnow() else: # Create new match new_match = SuggestedMatch( file_path=file_path, file_name=file_name, matched_title=match_data.get('title', ''), matched_year=match_data.get('year', ''), matched_imdb_id=match_data.get('imdb_id', ''), match_data=json.dumps(match_data) ) session.add(new_match) session.commit() return True except Exception as e: session.rollback() logger.error(f"Error saving suggested match: {e}") return False finally: session.close() def get_suggested_match(self, file_path): """Get suggested match for a specific file""" session = self.get_session() try: match = session.query(SuggestedMatch).filter_by(file_path=file_path).first() if match: return { 'file_path': match.file_path, 'file_name': match.file_name, 'matched_title': match.matched_title, 'matched_year': match.matched_year, 'matched_imdb_id': match.matched_imdb_id, 'match_data': json.loads(match.match_data) if match.match_data else {}, 'created_at': match.created_at.isoformat(), 'updated_at': match.updated_at.isoformat() } return None finally: session.close() def get_suggested_matches_for_directory(self, directory): """Get all suggested matches for files in a directory""" session = self.get_session() try: matches = session.query(SuggestedMatch).filter( SuggestedMatch.file_path.like(f"{directory}%") ).all() return { match.file_path: json.loads(match.match_data) if match.match_data else {} for match in matches } finally: session.close() def delete_suggested_match(self, file_path): """Delete a suggested match for a file""" session = self.get_session() try: match = session.query(SuggestedMatch).filter_by(file_path=file_path).first() if match: session.delete(match) session.commit() return True return False except Exception as e: session.rollback() logger.error(f"Error deleting suggested match: {e}") return False finally: session.close() def clear_all_suggested_matches(self): """Clear all suggested matches""" session = self.get_session() try: session.query(SuggestedMatch).delete() session.commit() return True except Exception as e: session.rollback() logger.error(f"Error clearing suggested matches: {e}") return False finally: session.close() # ============ FOLDER RULES OPERATIONS ============ def get_folder_rule(self, directory): """Get a folder rule for a specific directory""" session = self.get_session() try: rule = session.query(FolderRule).filter_by(directory=directory).first() if not rule: return None return { "directory": rule.directory, "preferred_source": rule.preferred_source, "insertion_position": rule.insertion_position, "language": rule.language, "subtitle_title_bold": rule.subtitle_title_bold, "subtitle_plot_italic": rule.subtitle_plot_italic, "subtitle_show_director": rule.subtitle_show_director, "subtitle_show_actors": rule.subtitle_show_actors, "subtitle_show_released": rule.subtitle_show_released, "subtitle_show_genre": rule.subtitle_show_genre, } finally: session.close() def get_all_folder_rules(self): """Get all folder rules""" session = self.get_session() try: rules = session.query(FolderRule).order_by(FolderRule.directory.asc()).all() return [ { "directory": rule.directory, "preferred_source": rule.preferred_source, "insertion_position": rule.insertion_position, "language": rule.language, "subtitle_title_bold": rule.subtitle_title_bold, "subtitle_plot_italic": rule.subtitle_plot_italic, "subtitle_show_director": rule.subtitle_show_director, "subtitle_show_actors": rule.subtitle_show_actors, "subtitle_show_released": rule.subtitle_show_released, "subtitle_show_genre": rule.subtitle_show_genre, } for rule in rules ] finally: session.close() def upsert_folder_rule(self, directory, rule_data): """Create or update a folder rule""" session = self.get_session() try: rule = session.query(FolderRule).filter_by(directory=directory).first() if not rule: rule = FolderRule(directory=directory) session.add(rule) rule.preferred_source = rule_data.get("preferred_source") rule.insertion_position = rule_data.get("insertion_position") rule.language = rule_data.get("language") rule.subtitle_title_bold = rule_data.get("subtitle_title_bold") rule.subtitle_plot_italic = rule_data.get("subtitle_plot_italic") rule.subtitle_show_director = rule_data.get("subtitle_show_director") rule.subtitle_show_actors = rule_data.get("subtitle_show_actors") rule.subtitle_show_released = rule_data.get("subtitle_show_released") rule.subtitle_show_genre = rule_data.get("subtitle_show_genre") session.commit() return True except Exception as e: session.rollback() logger.error(f"Error saving folder rule: {e}") return False finally: session.close() def delete_folder_rule(self, directory): """Delete a folder rule for a directory""" session = self.get_session() try: rule = session.query(FolderRule).filter_by(directory=directory).first() if rule: session.delete(rule) session.commit() return True return False except Exception as e: session.rollback() logger.error(f"Error deleting folder rule: {e}") return False finally: session.close() # ============ AUTOMATION RULES OPERATIONS ============ def get_automation_rules(self): """Get all automation rules""" session = self.get_session() try: rules = session.query(AutomationRule).order_by(AutomationRule.created_at.asc()).all() result = [] for rule in rules: result.append({ "id": rule.id, "name": rule.name, "schedule": rule.schedule, "enabled": rule.enabled, "patterns": json.loads(rule.patterns) if rule.patterns else [], "target_folders": json.loads(rule.target_folders) if rule.target_folders else [], "created_at": rule.created_at.isoformat() if rule.created_at else None, "updated_at": rule.updated_at.isoformat() if rule.updated_at else None }) return result finally: session.close() def get_automation_rule(self, rule_id): """Get a single automation rule""" session = self.get_session() try: rule = session.query(AutomationRule).filter_by(id=rule_id).first() if not rule: return None return { "id": rule.id, "name": rule.name, "schedule": rule.schedule, "enabled": rule.enabled, "patterns": json.loads(rule.patterns) if rule.patterns else [], "target_folders": json.loads(rule.target_folders) if rule.target_folders else [], "created_at": rule.created_at.isoformat() if rule.created_at else None, "updated_at": rule.updated_at.isoformat() if rule.updated_at else None } finally: session.close() def upsert_automation_rule(self, rule_data): """Create or update an automation rule""" session = self.get_session() try: rule_id = rule_data["id"] rule = session.query(AutomationRule).filter_by(id=rule_id).first() if not rule: rule = AutomationRule(id=rule_id) session.add(rule) rule.name = rule_data["name"] rule.schedule = rule_data["schedule"] rule.enabled = bool(rule_data.get("enabled", True)) rule.patterns = json.dumps(rule_data.get("patterns", [])) rule.target_folders = json.dumps(rule_data.get("target_folders", [])) session.commit() return True except Exception as e: session.rollback() logger.error(f"Error saving automation rule: {e}") return False finally: session.close() def delete_automation_rule(self, rule_id): """Delete an automation rule""" session = self.get_session() try: rule = session.query(AutomationRule).filter_by(id=rule_id).first() if rule: session.delete(rule) session.commit() return True return False except Exception as e: session.rollback() logger.error(f"Error deleting automation rule: {e}") return False finally: session.close() def add_automation_log(self, rule_id, file_path, modified, removed_lines, dry_run=False, error_message=None): """Add an automation log entry""" session = self.get_session() try: entry = AutomationLog( rule_id=rule_id, file_path=file_path, modified=bool(modified), removed_lines=int(removed_lines or 0), dry_run=bool(dry_run), error_message=error_message ) session.add(entry) session.commit() except Exception as e: session.rollback() logger.error(f"Error saving automation log: {e}") raise finally: session.close() def get_automation_logs(self, rule_id=None, limit=100): """Get automation log entries, optionally filtered by rule_id""" session = self.get_session() try: query = session.query(AutomationLog).order_by(AutomationLog.run_at.desc()) if rule_id: query = query.filter(AutomationLog.rule_id == rule_id) logs = query.limit(limit).all() return [ { "id": log.id, "rule_id": log.rule_id, "file_path": log.file_path, "modified": log.modified, "removed_lines": log.removed_lines, "dry_run": log.dry_run, "error_message": log.error_message, "run_at": log.run_at.isoformat() if log.run_at else None, } for log in logs ] finally: session.close() # ============ MAINTENANCE OPERATIONS ============ def clear_settings(self, keep_api_keys=False): """Clear all settings, optionally keeping API keys""" session = self.get_session() try: keep_values = {} if keep_api_keys: for key in ("omdb_api_key", "tmdb_api_key", "api_key"): setting = session.query(Settings).filter_by(key=key).first() if setting: keep_values[key] = setting.value session.query(Settings).delete() session.commit() if keep_values: for key, value in keep_values.items(): session.add(Settings(key=key, value=value)) session.commit() return True except Exception as e: session.rollback() logger.error(f"Error clearing settings: {e}") return False finally: session.close() def clear_history_and_logs(self): """Clear processing runs, scan history, scheduled scans, and API usage logs""" session = self.get_session() try: session.query(FileResult).delete() session.query(ProcessingRun).delete() session.query(ScanHistory).delete() session.query(ScheduledScan).delete() session.query(ApiUsage).delete() session.commit() return True except Exception as e: session.rollback() logger.error(f"Error clearing history and logs: {e}") return False finally: session.close() # ============ STATISTICS ============ def get_statistics(self): """Get overall statistics""" session = self.get_session() try: total_runs = session.query(ProcessingRun).count() completed_runs = session.query(ProcessingRun).filter_by(status='completed').count() total_files_processed = session.query(FileResult).count() successful_files = session.query(FileResult).filter_by(success=True).count() return { 'total_runs': total_runs, 'completed_runs': completed_runs, 'total_files_processed': total_files_processed, 'successful_files': successful_files, 'failed_files': total_files_processed - successful_files } finally: session.close()