Files
sublogue/server/automations/engine.py
T
2026-01-19 02:10:08 +13:00

132 lines
4.3 KiB
Python

from __future__ import annotations
import threading
from typing import Dict, List, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from logging_utils import get_logger
from automations.actions import enumerate_srt_files, remove_lines_matching_patterns
from automations.models import AutomationRule
logger = get_logger(__name__)
class AutomationEngine:
"""Scheduler for automation rules."""
def __init__(self, db_manager):
self.db_manager = db_manager
self._scheduler = BackgroundScheduler(daemon=True)
self._lock = threading.Lock()
self._started = False
def start(self):
with self._lock:
if self._started:
return
self._scheduler.start()
self._started = True
self.reload_rules()
logger.info("Automation scheduler started")
def shutdown(self):
with self._lock:
if not self._started:
return
self._scheduler.shutdown(wait=False)
self._started = False
logger.info("Automation scheduler stopped")
def reload_rules(self):
"""Reload all automation rules from storage."""
with self._lock:
self._scheduler.remove_all_jobs()
rules = self._load_rules()
for rule in rules:
if not rule.enabled:
continue
self._schedule_rule(rule)
def run_rule_now(self, rule_id: str, dry_run: bool = False) -> dict:
rule = self._get_rule(rule_id)
if not rule:
return {"success": False, "error": "Rule not found"}
return self._execute_rule(rule, dry_run=dry_run)
def _load_rules(self) -> List[AutomationRule]:
rules_raw = self.db_manager.get_automation_rules()
return [AutomationRule.from_dict(r) for r in rules_raw]
def _get_rule(self, rule_id: str) -> Optional[AutomationRule]:
raw = self.db_manager.get_automation_rule(rule_id)
return AutomationRule.from_dict(raw) if raw else None
def _schedule_rule(self, rule: AutomationRule):
try:
trigger = CronTrigger.from_crontab(rule.schedule)
except ValueError as e:
logger.error("Invalid cron schedule for rule %s: %s", rule.id, e)
return
self._scheduler.add_job(
self._run_rule_job,
trigger=trigger,
args=[rule.id],
id=f"automation:{rule.id}",
replace_existing=True,
misfire_grace_time=300,
max_instances=1,
)
def _run_rule_job(self, rule_id: str):
rule = self._get_rule(rule_id)
if not rule or not rule.enabled:
return
self._execute_rule(rule, dry_run=False)
def _execute_rule(self, rule: AutomationRule, dry_run: bool) -> dict:
files = enumerate_srt_files(rule.target_folders)
modified = 0
total_removed = 0
errors: List[str] = []
for file_path in files:
try:
did_modify, removed_lines = remove_lines_matching_patterns(
str(file_path),
rule.patterns,
dry_run=dry_run,
)
if did_modify:
modified += 1
total_removed += removed_lines
self.db_manager.add_automation_log(
rule_id=rule.id,
file_path=str(file_path),
modified=did_modify,
removed_lines=removed_lines,
dry_run=dry_run,
error_message=None,
)
except Exception as e:
errors.append(f"{file_path}: {e}")
self.db_manager.add_automation_log(
rule_id=rule.id,
file_path=str(file_path),
modified=False,
removed_lines=0,
dry_run=dry_run,
error_message=str(e),
)
return {
"success": True,
"rule_id": rule.id,
"files_scanned": len(files),
"files_modified": modified,
"removed_lines": total_removed,
"dry_run": dry_run,
"errors": errors,
}