diff --git a/backend/scanning/__init__.py b/backend/scanning/__init__.py new file mode 100644 index 0000000..3d209a1 --- /dev/null +++ b/backend/scanning/__init__.py @@ -0,0 +1,11 @@ +"""Library scanning module for standalone mode.""" +from backend.scanning.models import ScanRule +from backend.scanning.file_analyzer import FileAnalyzer, FileAnalysis +from backend.scanning.detected_languages import DetectedLanguage + +__all__ = [ + "ScanRule", + "FileAnalyzer", + "FileAnalysis", + "DetectedLanguage", +] diff --git a/backend/scanning/detected_languages.py b/backend/scanning/detected_languages.py new file mode 100644 index 0000000..f97100f --- /dev/null +++ b/backend/scanning/detected_languages.py @@ -0,0 +1,41 @@ +"""Model for storing detected audio languages.""" +from sqlalchemy import Column, Integer, String, DateTime, Index +from sqlalchemy.sql import func + +from backend.core.database import Base + + +class DetectedLanguage(Base): + """ + Stores detected audio languages for files where metadata is undefined. + + This cache prevents re-detecting the same file multiple times. + """ + + __tablename__ = "detected_languages" + + id = Column(Integer, primary_key=True, autoincrement=True) + file_path = Column(String(1024), nullable=False, unique=True, index=True) + detected_language = Column(String(10), nullable=False) # ISO 639-1 code + detection_confidence = Column(Integer, nullable=True) # 0-100 + detected_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + + # Indexes for performance + __table_args__ = ( + Index('idx_detected_lang_path', 'file_path'), + Index('idx_detected_lang_language', 'detected_language'), + ) + + def __repr__(self): + return f"" + + def to_dict(self) -> dict: + """Convert to dictionary.""" + return { + "id": self.id, + "file_path": self.file_path, + "detected_language": self.detected_language, + "detection_confidence": self.detection_confidence, + "detected_at": self.detected_at.isoformat() if self.detected_at else None, + } + diff --git a/backend/scanning/file_analyzer.py b/backend/scanning/file_analyzer.py new file mode 100644 index 0000000..098988e --- /dev/null +++ b/backend/scanning/file_analyzer.py @@ -0,0 +1,253 @@ +"""File analyzer using ffprobe for media file inspection.""" +import logging +import os +from typing import Optional, List, Dict +from dataclasses import dataclass + +from backend.transcription.audio_utils import ( + get_audio_tracks, + get_audio_languages, + get_subtitle_languages, + has_audio, + has_subtitle_of_language_in_folder, +) +from backend.core.language_code import LanguageCode + +logger = logging.getLogger(__name__) + + +@dataclass +class AudioTrackInfo: + """Information about an audio track.""" + + index: int + language: LanguageCode + codec: str + channels: int + is_default: bool + title: Optional[str] = None + + +@dataclass +class SubtitleTrackInfo: + """Information about a subtitle track.""" + + language: LanguageCode + is_embedded: bool + is_external: bool + file_path: Optional[str] = None + + +@dataclass +class FileAnalysis: + """Complete analysis of a media file.""" + + file_path: str + file_name: str + file_extension: str + has_audio: bool + audio_tracks: List[AudioTrackInfo] + embedded_subtitles: List[LanguageCode] + external_subtitles: List[SubtitleTrackInfo] + + @property + def audio_languages(self) -> List[LanguageCode]: + """Get list of audio languages.""" + return [track.language for track in self.audio_tracks] + + @property + def all_subtitle_languages(self) -> List[LanguageCode]: + """Get all subtitle languages (embedded + external).""" + languages = self.embedded_subtitles.copy() + for sub in self.external_subtitles: + if sub.language not in languages: + languages.append(sub.language) + return languages + + @property + def default_audio_language(self) -> Optional[LanguageCode]: + """Get default audio track language.""" + for track in self.audio_tracks: + if track.is_default: + return track.language + # Fallback to first track + return self.audio_tracks[0].language if self.audio_tracks else None + + def has_subtitle_language(self, language: LanguageCode) -> bool: + """Check if file has subtitles in given language (embedded or external).""" + return language in self.all_subtitle_languages + + def has_embedded_subtitle_language(self, language: LanguageCode) -> bool: + """Check if file has embedded subtitles in given language.""" + return language in self.embedded_subtitles + + def has_external_subtitle_language(self, language: LanguageCode) -> bool: + """Check if file has external subtitles in given language.""" + return any(sub.language == language for sub in self.external_subtitles) + + +class FileAnalyzer: + """Analyzer for media files using ffprobe.""" + + # Supported video extensions + VIDEO_EXTENSIONS = ( + ".mp4", + ".mkv", + ".avi", + ".mov", + ".wmv", + ".flv", + ".webm", + ".mpg", + ".mpeg", + ".3gp", + ".ogv", + ".vob", + ".rm", + ".rmvb", + ".ts", + ".m4v", + ".f4v", + ".svq3", + ".asf", + ".m2ts", + ".divx", + ".xvid", + ) + + # Subtitle file extensions + SUBTITLE_EXTENSIONS = {".srt", ".vtt", ".sub", ".ass", ".ssa", ".idx", ".sbv"} + + @staticmethod + def is_video_file(file_path: str) -> bool: + """ + Check if file is a video file by extension. + + Args: + file_path: Path to file + + Returns: + True if video file + """ + _, ext = os.path.splitext(file_path) + return ext.lower() in FileAnalyzer.VIDEO_EXTENSIONS + + @staticmethod + def analyze_file(file_path: str) -> Optional[FileAnalysis]: + """ + Analyze a media file completely. + + Args: + file_path: Path to media file + + Returns: + FileAnalysis object or None if analysis fails + """ + try: + # Basic file info + file_name = os.path.basename(file_path) + _, file_extension = os.path.splitext(file_path) + + # Check if file is video + if not FileAnalyzer.is_video_file(file_path): + logger.debug(f"Skipping non-video file: {file_name}") + return None + + # Check if file exists and has audio + if not os.path.isfile(file_path): + logger.warning(f"File not found: {file_path}") + return None + + file_has_audio = has_audio(file_path) + if not file_has_audio: + logger.debug(f"File has no audio, skipping: {file_name}") + return None + + # Get audio tracks + audio_tracks_raw = get_audio_tracks(file_path) + audio_tracks = [ + AudioTrackInfo( + index=track["index"], + language=track["language"], + codec=track["codec"], + channels=track["channels"], + is_default=track["default"], + title=track.get("title"), + ) + for track in audio_tracks_raw + ] + + # Get embedded subtitles + embedded_subtitles = get_subtitle_languages(file_path) + + # Find external subtitles + external_subtitles = FileAnalyzer._find_external_subtitles(file_path) + + return FileAnalysis( + file_path=file_path, + file_name=file_name, + file_extension=file_extension.lower(), + has_audio=file_has_audio, + audio_tracks=audio_tracks, + embedded_subtitles=embedded_subtitles, + external_subtitles=external_subtitles, + ) + + except Exception as e: + logger.error(f"Error analyzing file {file_path}: {e}") + return None + + @staticmethod + def _find_external_subtitles(video_file: str) -> List[SubtitleTrackInfo]: + """ + Find external subtitle files for a video. + + Args: + video_file: Path to video file + + Returns: + List of SubtitleTrackInfo for external subtitles + """ + external_subs = [] + video_folder = os.path.dirname(video_file) + video_name = os.path.splitext(os.path.basename(video_file))[0] + + try: + for file_name in os.listdir(video_folder): + # Check if it's a subtitle file + if not any(file_name.endswith(ext) for ext in FileAnalyzer.SUBTITLE_EXTENSIONS): + continue + + subtitle_path = os.path.join(video_folder, file_name) + subtitle_name, _ = os.path.splitext(file_name) + + # Check if subtitle belongs to this video + if not subtitle_name.startswith(video_name): + continue + + # Extract language from filename + # Format: video_name.lang.srt or video_name.subgen.medium.lang.srt + parts = subtitle_name[len(video_name) :].lstrip(".").split(".") + + # Try to find language code in parts + detected_language = None + for part in parts: + lang = LanguageCode.from_string(part) + if lang != LanguageCode.NONE: + detected_language = lang + break + + if detected_language: + external_subs.append( + SubtitleTrackInfo( + language=detected_language, + is_embedded=False, + is_external=True, + file_path=subtitle_path, + ) + ) + + except Exception as e: + logger.error(f"Error finding external subtitles for {video_file}: {e}") + + return external_subs diff --git a/backend/scanning/language_detector.py b/backend/scanning/language_detector.py new file mode 100644 index 0000000..5105250 --- /dev/null +++ b/backend/scanning/language_detector.py @@ -0,0 +1,295 @@ +"""Language detection service using Whisper.""" +import logging +from typing import Optional, Tuple +from pathlib import Path + +from backend.scanning.detected_languages import DetectedLanguage +from backend.core.language_code import LanguageCode + +logger = logging.getLogger(__name__) + + +class LanguageDetector: + """ + Service for detecting audio language in media files. + + Uses Whisper's language detection on a small audio sample. + Results are cached in database to avoid re-detection. + """ + + @staticmethod + def detect_language(file_path: str, sample_duration: int = 30) -> Tuple[Optional[LanguageCode], Optional[int]]: + """ + Detect language of audio in a media file. + + First checks cache, then uses Whisper if needed. + + Args: + file_path: Path to media file + sample_duration: Seconds of audio to analyze (default: 30) + + Returns: + Tuple of (LanguageCode, confidence_percentage) or (None, None) + """ + # Check cache first + cached = LanguageDetector._get_cached_language(file_path) + if cached: + logger.info(f"Using cached language for {Path(file_path).name}: {cached}") + # When returning from cache, we don't have confidence stored, use 100% + return cached, 100 + + # Detect using Whisper + try: + detected_lang, confidence = LanguageDetector._detect_with_whisper( + file_path, sample_duration + ) + + if detected_lang: + # Cache the result + LanguageDetector._cache_language(file_path, detected_lang, confidence) + logger.info( + f"Detected language for {Path(file_path).name}: " + f"{detected_lang} (confidence: {confidence}%)" + ) + return detected_lang, confidence + + return None, None + + except Exception as e: + logger.error(f"Language detection failed for {file_path}: {e}") + return None, None + + @staticmethod + def _get_cached_language(file_path: str) -> Optional[LanguageCode]: + """ + Get cached detected language from database. + + Args: + file_path: Path to media file + + Returns: + LanguageCode if cached, None otherwise + """ + from backend.core.database import database + + with database.get_session() as session: + cached = session.query(DetectedLanguage).filter( + DetectedLanguage.file_path == file_path + ).first() + + if cached: + return LanguageCode.from_string(cached.detected_language) + + return None + + @staticmethod + def _cache_language( + file_path: str, + language: LanguageCode, + confidence: Optional[int] = None + ): + """ + Cache detected language in database. + + Args: + file_path: Path to media file + language: Detected language code + confidence: Detection confidence (0-100) + """ + from backend.core.database import database + + with database.get_session() as session: + # Check if entry exists + existing = session.query(DetectedLanguage).filter( + DetectedLanguage.file_path == file_path + ).first() + + lang_code = language.to_iso_639_1() if language else "und" + + if existing: + # Update existing + existing.detected_language = lang_code + existing.detection_confidence = confidence + else: + # Create new + detected = DetectedLanguage( + file_path=file_path, + detected_language=lang_code, + detection_confidence=confidence + ) + session.add(detected) + + session.commit() + logger.debug(f"Cached language detection: {file_path} -> {lang_code}") + + @staticmethod + def _detect_with_whisper( + file_path: str, + sample_duration: int = 30 + ) -> Tuple[Optional[LanguageCode], Optional[int]]: + """ + Detect language using Whisper model. + + Args: + file_path: Path to media file + sample_duration: Seconds of audio to analyze + + Returns: + Tuple of (LanguageCode, confidence_percentage) or (None, None) + """ + try: + from backend.transcription.transcriber import WhisperTranscriber, WHISPER_AVAILABLE + from backend.transcription.audio_utils import extract_audio_segment + + if not WHISPER_AVAILABLE: + logger.error("Whisper not available - cannot detect language") + return None, None + + # Get file duration first to extract from the middle + import ffmpeg + try: + probe = ffmpeg.probe(file_path) + duration = float(probe['format']['duration']) + + # Extract from the middle of the file for better detection + # (beginning might have intro music, credits, etc.) + start_time = max(0, (duration / 2) - (sample_duration / 2)) + + logger.debug( + f"Extracting {sample_duration}s audio sample from middle of {file_path} " + f"(duration: {duration:.1f}s, sample start: {start_time:.1f}s)" + ) + except Exception as e: + logger.warning(f"Could not get file duration: {e}, using start of file") + start_time = 0 + + audio_data = extract_audio_segment( + file_path, + start_time=int(start_time), + duration=sample_duration + ) + + if not audio_data: + logger.warning(f"Failed to extract audio from {file_path}") + return None, None + + # Save audio_data to temporary file since stable-whisper doesn't accept BytesIO + import tempfile + import os + + with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_audio: + temp_audio.write(audio_data.read()) + temp_audio_path = temp_audio.name + + try: + # Initialize transcriber (will use small/fast model for detection) + transcriber = WhisperTranscriber(model_name="tiny") # Tiny model for fast detection + transcriber.load_model() + + # Detect language using Whisper + logger.debug("Detecting language with Whisper...") + + # Use transcribe with language=None to trigger auto-detection + # This is more reliable than detect_language() + result = transcriber.model.transcribe( + temp_audio_path, # Use file path instead of BytesIO + language=None, # Auto-detect + task="transcribe", + vad_filter=False, # Don't filter, just detect + beam_size=1, # Faster + best_of=1, # Faster + temperature=0.0, # Deterministic + condition_on_previous_text=False, + initial_prompt=None, + ) + + if result: + # stable-whisper/faster-whisper returns language info + # Try different attributes that might contain the language code + lang_code_str = None + probability = 1.0 + + # Try to get language code (2-letter ISO 639-1) + if hasattr(result, 'language_code'): + lang_code_str = result.language_code + elif hasattr(result, 'language'): + # result.language might be full name like "japanese" or code like "ja" + lang = result.language + if len(lang) == 2: + # Already a code + lang_code_str = lang + else: + # Full name - need to map to code + # Common mappings + lang_map = { + 'japanese': 'ja', + 'english': 'en', + 'spanish': 'es', + 'french': 'fr', + 'german': 'de', + 'italian': 'it', + 'portuguese': 'pt', + 'russian': 'ru', + 'chinese': 'zh', + 'korean': 'ko', + 'arabic': 'ar', + 'hindi': 'hi', + } + lang_code_str = lang_map.get(lang.lower()) + + # Get language probability if available + if hasattr(result, 'language_probability'): + probability = result.language_probability + + if lang_code_str: + confidence = int(probability * 100) + language = LanguageCode.from_iso_639_1(lang_code_str) + + logger.info( + f"Whisper detected language: {lang_code_str} " + f"(confidence: {confidence}%)" + ) + + return language, confidence + else: + logger.warning(f"Could not extract language code from result: {result}") + + return None, None + + finally: + # Clean up temporary file + try: + os.unlink(temp_audio_path) + except Exception as e: + logger.warning(f"Failed to delete temporary audio file: {e}") + + except Exception as e: + logger.error(f"Whisper language detection error: {e}", exc_info=True) + return None, None + + @staticmethod + def clear_cache(file_path: Optional[str] = None): + """ + Clear language detection cache. + + Args: + file_path: Specific file to clear, or None to clear all + """ + from backend.core.database import database + + with database.get_session() as session: + if file_path: + session.query(DetectedLanguage).filter( + DetectedLanguage.file_path == file_path + ).delete() + logger.info(f"Cleared language cache for {file_path}") + else: + count = session.query(DetectedLanguage).delete() + logger.info(f"Cleared all language cache ({count} entries)") + + session.commit() + + +# Global instance +language_detector = LanguageDetector() + diff --git a/backend/scanning/library_scanner.py b/backend/scanning/library_scanner.py new file mode 100644 index 0000000..cf65e19 --- /dev/null +++ b/backend/scanning/library_scanner.py @@ -0,0 +1,894 @@ +"""Library scanner with rule-based filtering and scheduling.""" +import logging +import os +import time +from typing import List, Optional, Dict +from datetime import datetime, timezone +from pathlib import Path + +from apscheduler.schedulers.background import BackgroundScheduler +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler, FileCreatedEvent + +from backend.core.database import database +from backend.core.queue_manager import queue_manager +from backend.core.models import QualityPreset +from backend.scanning.models import ScanRule +from backend.scanning.file_analyzer import FileAnalyzer, FileAnalysis +from backend.scanning.language_detector import language_detector +from backend.core.language_code import LanguageCode + +logger = logging.getLogger(__name__) + + +class LibraryFileHandler(FileSystemEventHandler): + """Watchdog handler for real-time file detection.""" + + def __init__(self, scanner: "LibraryScanner"): + """ + Initialize file handler. + + Args: + scanner: Parent LibraryScanner instance + """ + super().__init__() + self.scanner = scanner + + def on_created(self, event: FileCreatedEvent): + """ + Handle new file creation. + + Args: + event: File creation event + """ + if event.is_directory: + return + + file_path = event.src_path + + # Check if it's a video file + if not FileAnalyzer.is_video_file(file_path): + return + + # Wait a bit for file to be fully written + time.sleep(5) + + logger.info(f"New file detected: {file_path}") + self.scanner.process_file(file_path) + + +class LibraryScanner: + """ + Library scanner with rule-based filtering. + + Scans media libraries, analyzes files with ffprobe, and applies + configurable rules to determine which files need transcription. + + Supports: + - One-time manual scans + - Scheduled periodic scans (cron-like) + - Real-time file watching (Tdarr-style) + """ + + def __init__(self): + """Initialize library scanner.""" + self.scheduler: Optional[BackgroundScheduler] = None + self.file_observer: Optional[Observer] = None + self.is_scanning = False + self.last_scan_time: Optional[datetime] = None + self.files_scanned = 0 + self.files_queued = 0 + + logger.info("LibraryScanner initialized") + + def scan_libraries(self, paths: Optional[List[str]] = None) -> Dict: + """ + Perform a one-time scan of library directories. + + Args: + paths: List of directories to scan (uses config if None) + + Returns: + Dictionary with scan statistics + """ + if self.is_scanning: + logger.warning("Scan already in progress") + return {"error": "Scan already in progress"} + + self.is_scanning = True + self.files_scanned = 0 + self.files_queued = 0 + scan_start = time.time() + + try: + # Get paths from settings_service if not provided + if paths is None: + from backend.core.settings_service import settings_service + library_paths = settings_service.get('library_paths', '') + if not library_paths: + logger.error("No library paths configured") + return {"error": "No library paths configured"} + # Handle both comma and pipe separators + if '|' in library_paths: + paths = [p.strip() for p in library_paths.split("|") if p.strip()] + else: + paths = [p.strip() for p in library_paths.split(",") if p.strip()] + + logger.info(f"Starting library scan: {len(paths)} paths") + + # Load all enabled rules + rules = self._load_scan_rules() + logger.info(f"Loaded {len(rules)} enabled scan rules") + + # Scan each path + for path in paths: + if not os.path.isdir(path): + logger.warning(f"Path not found or not a directory: {path}") + continue + + logger.info(f"Scanning: {path}") + self._scan_directory(path, rules) + + scan_duration = time.time() - scan_start + self.last_scan_time = datetime.now(timezone.utc) + self._persist_scan_stats(files_in_this_scan=self.files_scanned) + + results = { + "status": "completed", + "files_scanned": self.files_scanned, + "files_queued": self.files_queued, + "duration_seconds": round(scan_duration, 2), + "timestamp": self.last_scan_time.isoformat(), + } + + logger.info( + f"Scan completed: {self.files_scanned} files scanned, " + f"{self.files_queued} jobs queued in {scan_duration:.1f}s" + ) + + return results + + except Exception as e: + logger.error(f"Scan failed: {e}", exc_info=True) + return {"error": str(e)} + + finally: + self.is_scanning = False + + def _scan_directory(self, directory: str, rules: List[ScanRule]): + """ + Recursively scan a directory. + + Args: + directory: Directory path + rules: List of scan rules to apply + """ + try: + for root, dirs, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + self.files_scanned += 1 + + # Process file + self.process_file(file_path, rules) + + except Exception as e: + logger.error(f"Error scanning directory {directory}: {e}") + + def process_file( + self, file_path: str, rules: Optional[List[ScanRule]] = None + ) -> bool: + """ + Process a single file against scan rules. + + Args: + file_path: Path to media file + rules: Optional list of rules (will load if None) + + Returns: + True if job was queued, False otherwise + """ + try: + # Analyze file + analysis = FileAnalyzer.analyze_file(file_path) + if not analysis: + return False + + # Check if we need language detection + if not analysis.default_audio_language or len(analysis.audio_languages) == 0: + logger.info( + f"Audio language unknown for {analysis.file_name}, " + f"queuing language detection job" + ) + return self._queue_language_detection_job(analysis) + + # Load rules if not provided + if rules is None: + rules = self._load_scan_rules() + + # Evaluate against rules + matching_rule = self._evaluate_rules(analysis, rules) + + if matching_rule: + # Queue job based on rule + return self._queue_job_from_rule(analysis, matching_rule) + + return False + + except Exception as e: + logger.error(f"Error processing file {file_path}: {e}") + return False + + def _evaluate_rules( + self, file_analysis: FileAnalysis, rules: List[ScanRule] + ) -> Optional[ScanRule]: + """ + Evaluate file against rules (in priority order). + + Args: + file_analysis: File analysis result + rules: List of scan rules + + Returns: + First matching rule or None + """ + for rule in rules: + if self._rule_matches(file_analysis, rule): + logger.debug(f"File {file_analysis.file_name} matches rule: {rule.name}") + return rule + + return None + + def _rule_matches(self, file_analysis: FileAnalysis, rule: ScanRule) -> bool: + """ + Check if a file matches a scan rule. + + Args: + file_analysis: File analysis + rule: Scan rule + + Returns: + True if all conditions match + """ + # Check if rule has any conditions defined + has_conditions = any([ + rule.file_extension, + rule.audio_language_is, + rule.audio_language_not, + rule.audio_track_count_min, + rule.has_embedded_subtitle_lang, + rule.missing_embedded_subtitle_lang, + rule.missing_external_subtitle_lang + ]) + + if not has_conditions: + logger.warning( + f"Rule '{rule.name}' has no conditions - will match ALL files. " + f"This is probably not what you want!" + ) + + # Check file extension filter + if rule.file_extension: + if file_analysis.file_extension not in rule.file_extension_list: + return False + + # Check audio language IS + if rule.audio_language_is: + target_lang = LanguageCode.from_string(rule.audio_language_is) + + # Check if file has the target language + has_target_lang = target_lang in file_analysis.audio_languages + + # Also check if file has undefined language (None) - will need detection + has_undefined_lang = None in file_analysis.audio_languages or \ + any(lang is None for lang in file_analysis.audio_languages) + + if not has_target_lang: + # If language is undefined, try to detect it with Whisper + if has_undefined_lang: + logger.info( + f"File {file_analysis.file_name} has undefined audio language - " + f"attempting detection with Whisper..." + ) + + detected_lang = language_detector.detect_language(file_analysis.file_path) + + if detected_lang: + logger.info( + f"Detected language for {file_analysis.file_name}: {detected_lang}" + ) + + # Check if detected language matches rule + if detected_lang == target_lang: + logger.info( + f"✓ Detected language '{detected_lang}' matches rule '{rule.name}'" + ) + # Update file_analysis with detected language for later use + if file_analysis.audio_tracks: + file_analysis.audio_tracks[0].language = detected_lang + return True # Continue checking other conditions + else: + logger.debug( + f"Rule '{rule.name}' failed: detected '{detected_lang}' " + f"but expected '{rule.audio_language_is}'" + ) + return False + else: + logger.warning( + f"Failed to detect language for {file_analysis.file_name} - skipping" + ) + return False + else: + # Language is defined but doesn't match + logger.debug( + f"Rule '{rule.name}' audio check failed for {file_analysis.file_name}: " + f"Expected '{rule.audio_language_is}' but found " + f"{[str(lang) if lang else 'und' for lang in file_analysis.audio_languages]}" + ) + return False + + + # Check audio language NOT + if rule.audio_language_not: + excluded_langs = [ + LanguageCode.from_string(lang) for lang in rule.audio_language_not_list + ] + if any(lang in file_analysis.audio_languages for lang in excluded_langs): + return False + + # Check minimum audio tracks + if rule.audio_track_count_min: + if len(file_analysis.audio_tracks) < rule.audio_track_count_min: + return False + + # Check HAS embedded subtitle + if rule.has_embedded_subtitle_lang: + required_lang = LanguageCode.from_string(rule.has_embedded_subtitle_lang) + if not file_analysis.has_embedded_subtitle_language(required_lang): + return False + + # Check MISSING embedded subtitle + if rule.missing_embedded_subtitle_lang: + excluded_lang = LanguageCode.from_string(rule.missing_embedded_subtitle_lang) + if file_analysis.has_embedded_subtitle_language(excluded_lang): + return False + + # Check MISSING external subtitle + if rule.missing_external_subtitle_lang: + excluded_lang = LanguageCode.from_string(rule.missing_external_subtitle_lang) + if file_analysis.has_external_subtitle_language(excluded_lang): + return False + + # All conditions matched + logger.debug( + f"File '{file_analysis.file_name}' matched rule '{rule.name}' " + f"(priority: {rule.priority})" + ) + return True + + def _queue_language_detection_job(self, file_analysis: FileAnalysis) -> bool: + """ + Create and queue a language detection job for a file with unknown audio language. + + Args: + file_analysis: File analysis + + Returns: + True if job was queued successfully + """ + try: + from backend.core.models import JobType, JobStatus + + # Check if there's already a completed detection job for this file + with database.get_session() as session: + from backend.core.models import Job + existing_detection = session.query(Job).filter( + Job.file_path == file_analysis.file_path, + Job.job_type == JobType.LANGUAGE_DETECTION, + Job.status == JobStatus.COMPLETED + ).first() + + if existing_detection: + logger.info( + f"✓ Language already detected for {file_analysis.file_name}, " + f"checking for transcription rules..." + ) + # Extract detected language from SRT content + if existing_detection.srt_content: + # Format: "Language detected: ja (Japanese)\nConfidence: 99%" + lines = existing_detection.srt_content.split('\n') + if lines: + lang_line = lines[0] + if 'Language detected:' in lang_line: + lang_code = lang_line.split(':')[1].strip().split(' ')[0] + # Trigger rule checking with detected language + self._check_and_queue_transcription_for_file( + file_analysis.file_path, lang_code + ) + return False + + # Add language detection job with high priority + job = queue_manager.add_job( + file_path=file_analysis.file_path, + file_name=file_analysis.file_name, + source_lang=None, # To be detected + target_lang=None, + quality_preset=QualityPreset.FAST, + priority=15, # Higher than normal transcription (0-10) but lower than manual (20+) + transcribe_or_translate="transcribe", + job_type=JobType.LANGUAGE_DETECTION, + ) + + if job: + logger.info( + f"✓ Queued LANGUAGE DETECTION job {job.id} for {file_analysis.file_name}" + ) + self.files_queued += 1 + return True + else: + logger.warning( + f"✗ Skipped detection for {file_analysis.file_name}: Job already exists" + ) + return False + + except Exception as e: + logger.error(f"Error queuing language detection job: {e}") + return False + + def _check_and_queue_transcription_for_file(self, file_path: str, detected_lang_code: str): + """ + Check if a file with detected language matches any scan rules and queue transcription. + + Args: + file_path: Path to the file + detected_lang_code: Detected language code (ISO 639-1, e.g., 'ja', 'en') + """ + try: + logger.info( + f"Checking if {file_path} with language '{detected_lang_code}' " + f"matches any scan rules..." + ) + + # Load scan rules + rules = self._load_scan_rules() + if not rules: + logger.debug("No active scan rules found") + return + + # Check each rule + for rule in rules: + # Check if language matches + if rule.audio_language_is: + try: + rule_lang = LanguageCode.from_string(rule.audio_language_is) + # Convert detected language (ISO 639-1) to LanguageCode for comparison + detected_lang = LanguageCode.from_iso_639_1(detected_lang_code) + + if detected_lang != rule_lang: + logger.debug( + f"Rule '{rule.name}' requires language {rule_lang}, " + f"but detected {detected_lang}" + ) + continue + except Exception as e: + logger.warning(f"Could not parse rule language code: {e}") + continue + + # Check if language should be excluded + if rule.audio_language_not: + excluded_langs = [ + LanguageCode.from_string(lang.strip()) + for lang in rule.audio_language_not.split(',') + ] + detected_lang_obj = LanguageCode.from_iso_639_1(detected_lang_code) + if detected_lang_obj in excluded_langs: + logger.debug( + f"Rule '{rule.name}' excludes language {detected_lang_code}" + ) + continue + + # File matches this rule - queue transcription job + logger.info( + f"File {file_path} matches rule '{rule.name}' - queueing transcription job" + ) + + # Get target language (use ISO 639-1 throughout) + target_lang_code = rule.target_language or "eng" + + # Map quality preset + quality_map = { + "fast": QualityPreset.FAST, + "balanced": QualityPreset.BALANCED, + "best": QualityPreset.BEST, + } + quality = quality_map.get(rule.quality_preset, QualityPreset.FAST) + + # Create transcription job + # All language codes in ISO 639-1 format (ja, en, es) + job = queue_manager.add_job( + file_path=file_path, + file_name=os.path.basename(file_path), + source_lang=detected_lang_code, # ISO 639-1 (ja, en, es) + target_lang=target_lang_code, # ISO 639-1 (es, en, fr, etc) + quality_preset=quality, + transcribe_or_translate=rule.action_type or "translate", + priority=rule.job_priority or 5, + is_manual_request=False, + ) + + if job: + logger.info( + f"✓ Queued transcription job {job.id} for {os.path.basename(file_path)}: " + f"{rule.action_type} {detected_lang_code} → {target_lang_code}" + ) + self.files_queued += 1 + + # Only queue once (first matching rule) + return + + logger.debug(f"File {file_path} does not match any scan rules") + + except Exception as e: + logger.error( + f"Error checking scan rules for {file_path}: {e}", + exc_info=True + ) + + def _queue_job_from_rule( + self, file_analysis: FileAnalysis, rule: ScanRule + ) -> bool: + """ + Create and queue a job based on matched rule. + + Args: + file_analysis: File analysis + rule: Matched scan rule + + Returns: + True if job was queued successfully + """ + try: + # Map quality preset + quality_map = { + "fast": QualityPreset.FAST, + "balanced": QualityPreset.BALANCED, + "best": QualityPreset.BEST, + } + quality_preset = quality_map.get(rule.quality_preset, QualityPreset.FAST) + + # Determine source language (default audio track) + source_lang = file_analysis.default_audio_language + source_lang_code = source_lang.to_iso_639_1() if source_lang else None + + # Add job to queue + job = queue_manager.add_job( + file_path=file_analysis.file_path, + file_name=file_analysis.file_name, + source_lang=source_lang_code, + target_lang=rule.target_language, + quality_preset=quality_preset, + priority=rule.job_priority, + transcribe_or_translate=rule.action_type, + ) + + if job: + logger.info( + f"✓ Queued job {job.id} for {file_analysis.file_name}: " + f"{rule.action_type} {source_lang_code} → {rule.target_language}" + ) + self.files_queued += 1 + return True + else: + logger.warning( + f"✗ Skipped {file_analysis.file_name}: Job already exists or in queue " + f"(path: {file_analysis.file_path}, target: {rule.target_language})" + ) + return False + + except Exception as e: + logger.error(f"Error queuing job: {e}") + return False + + def _load_scan_rules(self) -> List[ScanRule]: + """ + Load enabled scan rules from database. + + Returns: + List of enabled rules (sorted by priority) + """ + with database.get_session() as session: + rules = ( + session.query(ScanRule) + .filter(ScanRule.enabled == True) + .order_by(ScanRule.priority.desc(), ScanRule.id) + .all() + ) + # Expunge rules from session so they can be used outside the context + for rule in rules: + session.expunge(rule) + return rules + + def _persist_scan_stats(self, files_in_this_scan: int = 0): + """ + Persist scan statistics to database for persistence across restarts. + + Args: + files_in_this_scan: Number of files scanned in the current scan operation + """ + from backend.core.settings_service import settings_service + + try: + # Save last scan time + if self.last_scan_time: + settings_service.set( + 'scanner_last_scan_time', + self.last_scan_time.isoformat(), + category='scanner' + ) + + # Increment scan count + scan_count = settings_service.get('scanner_scan_count', 0) + try: + scan_count = int(scan_count) + except (ValueError, TypeError): + scan_count = 0 + + scan_count += 1 + settings_service.set( + 'scanner_scan_count', + str(scan_count), + category='scanner' + ) + + # Save total files scanned (cumulative) + if files_in_this_scan > 0: + current_total = settings_service.get('scanner_total_files_scanned', 0) + try: + current_total = int(current_total) + except (ValueError, TypeError): + current_total = 0 + + new_total = current_total + files_in_this_scan + settings_service.set( + 'scanner_total_files_scanned', + str(new_total), + category='scanner' + ) + + logger.debug(f"Persisted scan stats: scan_count={scan_count}, last_scan={self.last_scan_time}, total_files={new_total}") + else: + logger.debug(f"Persisted scan stats: scan_count={scan_count}, last_scan={self.last_scan_time}") + except Exception as e: + logger.error(f"Failed to persist scan stats: {e}") + + # === Scheduler Methods === + + def start_scheduler(self, interval_minutes: Optional[int] = None): + """ + Start scheduled periodic scanning. + + Args: + interval_minutes: Scan interval (uses config if None) + """ + if self.scheduler and self.scheduler.running: + logger.warning("Scheduler already running") + return + + from backend.core.settings_service import settings_service + interval = interval_minutes or int(settings_service.get('scanner_schedule_interval_minutes', 360)) + + self.scheduler = BackgroundScheduler() + self.scheduler.add_job( + func=self.scan_libraries, + trigger="interval", + minutes=interval, + id="library_scan", + name=f"Library scan (every {interval}m)", + ) + self.scheduler.start() + + logger.info(f"Scheduler started: scanning every {interval} minutes") + + def stop_scheduler(self): + """Stop scheduled scanning.""" + if self.scheduler and self.scheduler.running: + try: + # wait=False to avoid blocking on running jobs + self.scheduler.shutdown(wait=False) + except Exception as e: + logger.warning(f"Error shutting down scheduler: {e}") + self.scheduler = None + logger.info("Scheduler stopped") + + # === File Watcher Methods === + + def start_file_watcher(self, paths: Optional[List[str]] = None, recursive: bool = True): + """ + Start real-time file watching. + + Args: + paths: Paths to watch (uses config if None) + recursive: Whether to watch subdirectories + """ + if self.file_observer: + logger.warning("File watcher already running") + return + + # Get paths from settings_service if not provided + if paths is None: + from backend.core.settings_service import settings_service + library_paths = settings_service.get('library_paths', '') + if not library_paths: + logger.error("No library paths configured") + return + # Handle both comma and pipe separators + if '|' in library_paths: + paths = [p.strip() for p in library_paths.split("|") if p.strip()] + else: + paths = [p.strip() for p in library_paths.split(",") if p.strip()] + + self.file_observer = Observer() + handler = LibraryFileHandler(self) + + for path in paths: + if os.path.isdir(path): + self.file_observer.schedule(handler, path, recursive=recursive) + logger.info(f"Watching: {path} (recursive={recursive})") + + self.file_observer.start() + logger.info("File watcher started") + + def stop_file_watcher(self): + """Stop real-time file watching.""" + if self.file_observer: + try: + self.file_observer.stop() + # Use timeout to avoid blocking indefinitely + self.file_observer.join(timeout=5.0) + except Exception as e: + logger.warning(f"Error stopping file watcher: {e}") + self.file_observer = None + logger.info("File watcher stopped") + + def get_status(self) -> Dict: + """ + Get scanner status. + + Returns: + Dictionary with scanner status + """ + from backend.core.settings_service import settings_service + + watched_paths = [] + if self.file_observer: + # Get watched paths from observer + watched_paths = [str(w.path) for w in self.file_observer.emitters] + + next_scan_time = None + if self.scheduler and self.scheduler.running: + # Get next scheduled job time + jobs = self.scheduler.get_jobs() + if jobs: + next_scan_time = jobs[0].next_run_time.isoformat() + + # Get last_scan_time from database (persisted) or memory (current session) + last_scan_time = self.last_scan_time + if last_scan_time is None: + # Try to load from database + db_last_scan = settings_service.get('scanner_last_scan_time') + if db_last_scan: + try: + last_scan_time = datetime.fromisoformat(db_last_scan) + except ValueError: + last_scan_time = None + + # Get scan count from database + scan_count = settings_service.get('scanner_scan_count', 0) + try: + scan_count = int(scan_count) + except (ValueError, TypeError): + scan_count = 0 + + # Get total_files_scanned from database + total_files_scanned = settings_service.get('scanner_total_files_scanned', 0) + try: + total_files_scanned = int(total_files_scanned) + except (ValueError, TypeError): + total_files_scanned = 0 + + return { + "scheduler_enabled": self.scheduler is not None, + "scheduler_running": self.scheduler is not None and self.scheduler.running, + "next_scan_time": next_scan_time, + "watcher_enabled": self.file_observer is not None, + "watcher_running": self.file_observer is not None, + "watched_paths": watched_paths, + "last_scan_time": last_scan_time.isoformat() if last_scan_time else None, + "total_scans": scan_count, + "total_files_scanned": total_files_scanned, + } + + def scan_paths(self, paths: List[str], recursive: bool = True) -> Dict: + """ + Scan specific paths. + + Args: + paths: List of paths to scan + recursive: Whether to scan subdirectories + + Returns: + Scan result dictionary + """ + if self.is_scanning: + logger.warning("Scan already in progress") + return { + "scanned_files": 0, + "matched_files": 0, + "jobs_created": 0, + "skipped_files": 0, + "paths_scanned": [], + "error": "Scan already in progress" + } + + self.is_scanning = True + scanned = 0 + matched = 0 + jobs_created = 0 + skipped = 0 + + try: + + for path in paths: + if not os.path.exists(path): + logger.warning(f"Path does not exist: {path}") + continue + + # Scan directory + if os.path.isdir(path): + for root, dirs, files in os.walk(path): + for file in files: + file_path = os.path.join(root, file) + + if not FileAnalyzer.is_video_file(file_path): + continue + + scanned += 1 + + # Process file + if self.process_file(file_path): + matched += 1 + jobs_created += 1 + else: + skipped += 1 + + if not recursive: + break + + # Single file + elif os.path.isfile(path): + if FileAnalyzer.is_video_file(path): + scanned += 1 + if self.process_file(path): + matched += 1 + jobs_created += 1 + else: + skipped += 1 + + self.last_scan_time = datetime.now(timezone.utc) + self.files_scanned += scanned + self._persist_scan_stats(files_in_this_scan=scanned) + + return { + "scanned_files": scanned, + "matched_files": matched, + "jobs_created": jobs_created, + "skipped_files": skipped, + "paths_scanned": paths, + } + + finally: + self.is_scanning = False + + +# Global scanner instance +library_scanner = LibraryScanner() diff --git a/backend/scanning/models.py b/backend/scanning/models.py new file mode 100644 index 0000000..abe2832 --- /dev/null +++ b/backend/scanning/models.py @@ -0,0 +1,118 @@ +"""Database models for library scanning rules.""" +from datetime import datetime +from typing import Optional, List + +from sqlalchemy import Column, Integer, String, Boolean, DateTime, Index +from sqlalchemy.sql import func + +from backend.core.database import Base + + +class ScanRule(Base): + """ + Scan rule for filtering media files in standalone mode. + + Rules define conditions that files must match and actions to take when matched. + Example: "All Japanese audio without Spanish subtitles should be transcribed to Spanish" + """ + + __tablename__ = "scan_rules" + + # Primary identification + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(256), nullable=False, unique=True) + enabled = Column(Boolean, nullable=False, default=True, index=True) + priority = Column(Integer, nullable=False, default=0, index=True) + + # === CONDITION FILTERS (all must match for rule to apply) === + + # Audio language filters + audio_language_is = Column(String(10), nullable=True) + """Audio track language must be this (ISO 639-1). Example: 'ja'""" + + audio_language_not = Column(String(64), nullable=True) + """Audio track language must NOT be any of these (comma-separated). Example: 'en,es'""" + + audio_track_count_min = Column(Integer, nullable=True) + """Minimum number of audio tracks required""" + + # Subtitle filters + has_embedded_subtitle_lang = Column(String(10), nullable=True) + """Must have embedded subtitle in this language. Example: 'en'""" + + missing_embedded_subtitle_lang = Column(String(10), nullable=True) + """Must NOT have embedded subtitle in this language. Example: 'es'""" + + missing_external_subtitle_lang = Column(String(10), nullable=True) + """Must NOT have external .srt file in this language. Example: 'es'""" + + # File format filters + file_extension = Column(String(64), nullable=True) + """File extension filter (comma-separated). Example: '.mkv,.mp4'""" + + # === ACTION (what to do when rule matches) === + + action_type = Column(String(20), nullable=False, default="transcribe") + """Action: 'transcribe' or 'translate'""" + + target_language = Column(String(10), nullable=False) + """Target subtitle language (ISO 639-1). Example: 'es'""" + + quality_preset = Column(String(20), nullable=False, default="fast") + """Quality preset: 'fast', 'balanced', or 'best'""" + + job_priority = Column(Integer, nullable=False, default=0) + """Priority for jobs created by this rule (higher = processed first)""" + + # Metadata + created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + def __repr__(self): + """String representation.""" + return f"" + + def to_dict(self) -> dict: + """Convert rule to dictionary for API responses.""" + return { + "id": self.id, + "name": self.name, + "enabled": self.enabled, + "priority": self.priority, + "conditions": { + "audio_language_is": self.audio_language_is, + "audio_language_not": self.audio_language_not, + "audio_track_count_min": self.audio_track_count_min, + "has_embedded_subtitle_lang": self.has_embedded_subtitle_lang, + "missing_embedded_subtitle_lang": self.missing_embedded_subtitle_lang, + "missing_external_subtitle_lang": self.missing_external_subtitle_lang, + "file_extension": self.file_extension, + }, + "action": { + "action_type": self.action_type, + "target_language": self.target_language, + "quality_preset": self.quality_preset, + "job_priority": self.job_priority, + }, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } + + @property + def audio_language_not_list(self) -> List[str]: + """Get audio_language_not as a list.""" + if not self.audio_language_not: + return [] + return [lang.strip() for lang in self.audio_language_not.split(",") if lang.strip()] + + @property + def file_extension_list(self) -> List[str]: + """Get file_extension as a list.""" + if not self.file_extension: + return [] + return [ext.strip() for ext in self.file_extension.split(",") if ext.strip()] + + +# Create indexes for common queries +Index('idx_scan_rules_enabled_priority', ScanRule.enabled, ScanRule.priority.desc()) +Index('idx_scan_rules_name', ScanRule.name)