Files
Transcriptarr/backend/scanning/language_detector.py
Dasemu d28c4caa6a feat(scanning): add library scanner with rules engine
- Add ScanRule model with configurable conditions
- Add FileAnalyzer for ffprobe-based media analysis
- Add LibraryScanner with manual, scheduled and watcher modes
- Add LanguageDetector for audio language detection
- Support rule-based filtering with priority evaluation
2026-01-16 16:54:41 +01:00

296 lines
11 KiB
Python

"""Language detection service using Whisper."""
import logging
from typing import Optional, Tuple
from pathlib import Path
from backend.scanning.detected_languages import DetectedLanguage
from backend.core.language_code import LanguageCode
logger = logging.getLogger(__name__)
class LanguageDetector:
"""
Service for detecting audio language in media files.
Uses Whisper's language detection on a small audio sample.
Results are cached in database to avoid re-detection.
"""
@staticmethod
def detect_language(file_path: str, sample_duration: int = 30) -> Tuple[Optional[LanguageCode], Optional[int]]:
"""
Detect language of audio in a media file.
First checks cache, then uses Whisper if needed.
Args:
file_path: Path to media file
sample_duration: Seconds of audio to analyze (default: 30)
Returns:
Tuple of (LanguageCode, confidence_percentage) or (None, None)
"""
# Check cache first
cached = LanguageDetector._get_cached_language(file_path)
if cached:
logger.info(f"Using cached language for {Path(file_path).name}: {cached}")
# When returning from cache, we don't have confidence stored, use 100%
return cached, 100
# Detect using Whisper
try:
detected_lang, confidence = LanguageDetector._detect_with_whisper(
file_path, sample_duration
)
if detected_lang:
# Cache the result
LanguageDetector._cache_language(file_path, detected_lang, confidence)
logger.info(
f"Detected language for {Path(file_path).name}: "
f"{detected_lang} (confidence: {confidence}%)"
)
return detected_lang, confidence
return None, None
except Exception as e:
logger.error(f"Language detection failed for {file_path}: {e}")
return None, None
@staticmethod
def _get_cached_language(file_path: str) -> Optional[LanguageCode]:
"""
Get cached detected language from database.
Args:
file_path: Path to media file
Returns:
LanguageCode if cached, None otherwise
"""
from backend.core.database import database
with database.get_session() as session:
cached = session.query(DetectedLanguage).filter(
DetectedLanguage.file_path == file_path
).first()
if cached:
return LanguageCode.from_string(cached.detected_language)
return None
@staticmethod
def _cache_language(
file_path: str,
language: LanguageCode,
confidence: Optional[int] = None
):
"""
Cache detected language in database.
Args:
file_path: Path to media file
language: Detected language code
confidence: Detection confidence (0-100)
"""
from backend.core.database import database
with database.get_session() as session:
# Check if entry exists
existing = session.query(DetectedLanguage).filter(
DetectedLanguage.file_path == file_path
).first()
lang_code = language.to_iso_639_1() if language else "und"
if existing:
# Update existing
existing.detected_language = lang_code
existing.detection_confidence = confidence
else:
# Create new
detected = DetectedLanguage(
file_path=file_path,
detected_language=lang_code,
detection_confidence=confidence
)
session.add(detected)
session.commit()
logger.debug(f"Cached language detection: {file_path} -> {lang_code}")
@staticmethod
def _detect_with_whisper(
file_path: str,
sample_duration: int = 30
) -> Tuple[Optional[LanguageCode], Optional[int]]:
"""
Detect language using Whisper model.
Args:
file_path: Path to media file
sample_duration: Seconds of audio to analyze
Returns:
Tuple of (LanguageCode, confidence_percentage) or (None, None)
"""
try:
from backend.transcription.transcriber import WhisperTranscriber, WHISPER_AVAILABLE
from backend.transcription.audio_utils import extract_audio_segment
if not WHISPER_AVAILABLE:
logger.error("Whisper not available - cannot detect language")
return None, None
# Get file duration first to extract from the middle
import ffmpeg
try:
probe = ffmpeg.probe(file_path)
duration = float(probe['format']['duration'])
# Extract from the middle of the file for better detection
# (beginning might have intro music, credits, etc.)
start_time = max(0, (duration / 2) - (sample_duration / 2))
logger.debug(
f"Extracting {sample_duration}s audio sample from middle of {file_path} "
f"(duration: {duration:.1f}s, sample start: {start_time:.1f}s)"
)
except Exception as e:
logger.warning(f"Could not get file duration: {e}, using start of file")
start_time = 0
audio_data = extract_audio_segment(
file_path,
start_time=int(start_time),
duration=sample_duration
)
if not audio_data:
logger.warning(f"Failed to extract audio from {file_path}")
return None, None
# Save audio_data to temporary file since stable-whisper doesn't accept BytesIO
import tempfile
import os
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_audio:
temp_audio.write(audio_data.read())
temp_audio_path = temp_audio.name
try:
# Initialize transcriber (will use small/fast model for detection)
transcriber = WhisperTranscriber(model_name="tiny") # Tiny model for fast detection
transcriber.load_model()
# Detect language using Whisper
logger.debug("Detecting language with Whisper...")
# Use transcribe with language=None to trigger auto-detection
# This is more reliable than detect_language()
result = transcriber.model.transcribe(
temp_audio_path, # Use file path instead of BytesIO
language=None, # Auto-detect
task="transcribe",
vad_filter=False, # Don't filter, just detect
beam_size=1, # Faster
best_of=1, # Faster
temperature=0.0, # Deterministic
condition_on_previous_text=False,
initial_prompt=None,
)
if result:
# stable-whisper/faster-whisper returns language info
# Try different attributes that might contain the language code
lang_code_str = None
probability = 1.0
# Try to get language code (2-letter ISO 639-1)
if hasattr(result, 'language_code'):
lang_code_str = result.language_code
elif hasattr(result, 'language'):
# result.language might be full name like "japanese" or code like "ja"
lang = result.language
if len(lang) == 2:
# Already a code
lang_code_str = lang
else:
# Full name - need to map to code
# Common mappings
lang_map = {
'japanese': 'ja',
'english': 'en',
'spanish': 'es',
'french': 'fr',
'german': 'de',
'italian': 'it',
'portuguese': 'pt',
'russian': 'ru',
'chinese': 'zh',
'korean': 'ko',
'arabic': 'ar',
'hindi': 'hi',
}
lang_code_str = lang_map.get(lang.lower())
# Get language probability if available
if hasattr(result, 'language_probability'):
probability = result.language_probability
if lang_code_str:
confidence = int(probability * 100)
language = LanguageCode.from_iso_639_1(lang_code_str)
logger.info(
f"Whisper detected language: {lang_code_str} "
f"(confidence: {confidence}%)"
)
return language, confidence
else:
logger.warning(f"Could not extract language code from result: {result}")
return None, None
finally:
# Clean up temporary file
try:
os.unlink(temp_audio_path)
except Exception as e:
logger.warning(f"Failed to delete temporary audio file: {e}")
except Exception as e:
logger.error(f"Whisper language detection error: {e}", exc_info=True)
return None, None
@staticmethod
def clear_cache(file_path: Optional[str] = None):
"""
Clear language detection cache.
Args:
file_path: Specific file to clear, or None to clear all
"""
from backend.core.database import database
with database.get_session() as session:
if file_path:
session.query(DetectedLanguage).filter(
DetectedLanguage.file_path == file_path
).delete()
logger.info(f"Cleared language cache for {file_path}")
else:
count = session.query(DetectedLanguage).delete()
logger.info(f"Cleared all language cache ({count} entries)")
session.commit()
# Global instance
language_detector = LanguageDetector()