From 58c565cd963c55921d9239ef61c36203f8716fb6 Mon Sep 17 00:00:00 2001 From: Dasemu Date: Fri, 16 Jan 2026 15:11:30 +0100 Subject: [PATCH] feat(core): add database, models, queue and settings system - Add SQLAlchemy database setup with session management - Add Job model with status, priority and progress tracking - Add QueueManager with priority queue and deduplication - Add SystemSettings model for database-backed configuration - Add SettingsService with caching and defaults - Add SystemMonitor for CPU/RAM/GPU resource monitoring - Add LanguageCode utilities (moved from root) --- backend/core/database.py | 38 ++- backend/core/language_code.py | 198 +++++++++++ backend/core/models.py | 53 ++- backend/core/queue_manager.py | 194 ++++++++++- backend/core/settings_model.py | 74 +++++ backend/core/settings_service.py | 541 +++++++++++++++++++++++++++++++ backend/core/system_monitor.py | 294 +++++++++++++++++ 7 files changed, 1370 insertions(+), 22 deletions(-) create mode 100644 backend/core/language_code.py create mode 100644 backend/core/settings_model.py create mode 100644 backend/core/settings_service.py create mode 100644 backend/core/system_monitor.py diff --git a/backend/core/database.py b/backend/core/database.py index 3e2c8de..7e53d84 100644 --- a/backend/core/database.py +++ b/backend/core/database.py @@ -57,7 +57,7 @@ class Database: settings.database_url, connect_args=connect_args, poolclass=poolclass, - echo=settings.debug, + echo=False, ) @event.listens_for(engine, "connect") @@ -85,7 +85,7 @@ class Database: pool_size=10, max_overflow=20, pool_pre_ping=True, # Verify connections before using - echo=settings.debug, + echo=False, ) elif settings.database_type in (DatabaseType.MARIADB, DatabaseType.MYSQL): @@ -107,18 +107,26 @@ class Database: pool_size=10, max_overflow=20, pool_pre_ping=True, - echo=settings.debug, + echo=False, ) else: raise ValueError(f"Unsupported database type: {settings.database_type}") + # Disable SQLAlchemy INFO logs for cleaner output + logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) + logging.getLogger('sqlalchemy.pool').setLevel(logging.WARNING) + logging.getLogger('sqlalchemy.dialects').setLevel(logging.WARNING) + logging.getLogger('sqlalchemy.orm').setLevel(logging.WARNING) + return engine def _ensure_tables_exist(self): """Check if tables exist and create them if they don't.""" # Import models to register them with Base.metadata from backend.core import models # noqa: F401 + from backend.core import settings_model # noqa: F401 + from backend.scanning import models as scanning_models # noqa: F401 from sqlalchemy import inspect inspector = inspect(self.engine) @@ -135,6 +143,8 @@ class Database: """Create all database tables.""" # Import models to register them with Base.metadata from backend.core import models # noqa: F401 + from backend.core import settings_model # noqa: F401 + from backend.scanning import models as scanning_models # noqa: F401 logger.info("Creating database tables...") Base.metadata.create_all(bind=self.engine, checkfirst=True) @@ -150,6 +160,28 @@ class Database: logger.error(f"Failed to create tables. Existing tables: {created_tables}") raise RuntimeError("Failed to create database tables") + def init_db(self): + """ + Initialize database. + + Ensures tables exist and are up to date. + Safe to call multiple times. + """ + logger.info("Initializing database...") + self._ensure_tables_exist() + logger.info("Database initialization complete") + + def reset_db(self): + """ + Reset database (drop and recreate all tables). + + WARNING: This deletes ALL data! + """ + logger.warning("Resetting database - ALL DATA WILL BE LOST") + self.drop_tables() + self.create_tables() + logger.info("Database reset complete") + def drop_tables(self): """Drop all database tables (use with caution!).""" logger.warning("Dropping all database tables...") diff --git a/backend/core/language_code.py b/backend/core/language_code.py new file mode 100644 index 0000000..93b55fb --- /dev/null +++ b/backend/core/language_code.py @@ -0,0 +1,198 @@ +from enum import Enum + +class LanguageCode(Enum): + # ISO 639-1, ISO 639-2/T, ISO 639-2/B, English Name, Native Name + AFAR = ("aa", "aar", "aar", "Afar", "Afar") + AFRIKAANS = ("af", "afr", "afr", "Afrikaans", "Afrikaans") + AMHARIC = ("am", "amh", "amh", "Amharic", "አማርኛ") + ARABIC = ("ar", "ara", "ara", "Arabic", "العربية") + ASSAMESE = ("as", "asm", "asm", "Assamese", "অসমীয়া") + AZERBAIJANI = ("az", "aze", "aze", "Azerbaijani", "Azərbaycanca") + BASHKIR = ("ba", "bak", "bak", "Bashkir", "Башҡортса") + BELARUSIAN = ("be", "bel", "bel", "Belarusian", "Беларуская") + BULGARIAN = ("bg", "bul", "bul", "Bulgarian", "Български") + BENGALI = ("bn", "ben", "ben", "Bengali", "বাংলা") + TIBETAN = ("bo", "bod", "tib", "Tibetan", "བོད་ཡིག") + BRETON = ("br", "bre", "bre", "Breton", "Brezhoneg") + BOSNIAN = ("bs", "bos", "bos", "Bosnian", "Bosanski") + CATALAN = ("ca", "cat", "cat", "Catalan", "Català") + CZECH = ("cs", "ces", "cze", "Czech", "Čeština") + WELSH = ("cy", "cym", "wel", "Welsh", "Cymraeg") + DANISH = ("da", "dan", "dan", "Danish", "Dansk") + GERMAN = ("de", "deu", "ger", "German", "Deutsch") + GREEK = ("el", "ell", "gre", "Greek", "Ελληνικά") + ENGLISH = ("en", "eng", "eng", "English", "English") + SPANISH = ("es", "spa", "spa", "Spanish", "Español") + ESTONIAN = ("et", "est", "est", "Estonian", "Eesti") + BASQUE = ("eu", "eus", "baq", "Basque", "Euskara") + PERSIAN = ("fa", "fas", "per", "Persian", "فارسی") + FINNISH = ("fi", "fin", "fin", "Finnish", "Suomi") + FAROESE = ("fo", "fao", "fao", "Faroese", "Føroyskt") + FRENCH = ("fr", "fra", "fre", "French", "Français") + GALICIAN = ("gl", "glg", "glg", "Galician", "Galego") + GUJARATI = ("gu", "guj", "guj", "Gujarati", "ગુજરાતી") + HAUSA = ("ha", "hau", "hau", "Hausa", "Hausa") + HAWAIIAN = ("haw", "haw", "haw", "Hawaiian", "ʻŌlelo Hawaiʻi") + HEBREW = ("he", "heb", "heb", "Hebrew", "עברית") + HINDI = ("hi", "hin", "hin", "Hindi", "हिन्दी") + CROATIAN = ("hr", "hrv", "hrv", "Croatian", "Hrvatski") + HAITIAN_CREOLE = ("ht", "hat", "hat", "Haitian Creole", "Kreyòl Ayisyen") + HUNGARIAN = ("hu", "hun", "hun", "Hungarian", "Magyar") + ARMENIAN = ("hy", "hye", "arm", "Armenian", "Հայերեն") + INDONESIAN = ("id", "ind", "ind", "Indonesian", "Bahasa Indonesia") + ICELANDIC = ("is", "isl", "ice", "Icelandic", "Íslenska") + ITALIAN = ("it", "ita", "ita", "Italian", "Italiano") + JAPANESE = ("ja", "jpn", "jpn", "Japanese", "日本語") + JAVANESE = ("jw", "jav", "jav", "Javanese", "ꦧꦱꦗꦮ") + GEORGIAN = ("ka", "kat", "geo", "Georgian", "ქართული") + KAZAKH = ("kk", "kaz", "kaz", "Kazakh", "Қазақша") + KHMER = ("km", "khm", "khm", "Khmer", "ភាសាខ្មែរ") + KANNADA = ("kn", "kan", "kan", "Kannada", "ಕನ್ನಡ") + KOREAN = ("ko", "kor", "kor", "Korean", "한국어") + LATIN = ("la", "lat", "lat", "Latin", "Latina") + LUXEMBOURGISH = ("lb", "ltz", "ltz", "Luxembourgish", "Lëtzebuergesch") + LINGALA = ("ln", "lin", "lin", "Lingala", "Lingála") + LAO = ("lo", "lao", "lao", "Lao", "ພາສາລາວ") + LITHUANIAN = ("lt", "lit", "lit", "Lithuanian", "Lietuvių") + LATVIAN = ("lv", "lav", "lav", "Latvian", "Latviešu") + MALAGASY = ("mg", "mlg", "mlg", "Malagasy", "Malagasy") + MAORI = ("mi", "mri", "mao", "Maori", "Te Reo Māori") + MACEDONIAN = ("mk", "mkd", "mac", "Macedonian", "Македонски") + MALAYALAM = ("ml", "mal", "mal", "Malayalam", "മലയാളം") + MONGOLIAN = ("mn", "mon", "mon", "Mongolian", "Монгол") + MARATHI = ("mr", "mar", "mar", "Marathi", "मराठी") + MALAY = ("ms", "msa", "may", "Malay", "Bahasa Melayu") + MALTESE = ("mt", "mlt", "mlt", "Maltese", "Malti") + BURMESE = ("my", "mya", "bur", "Burmese", "မြန်မာစာ") + NEPALI = ("ne", "nep", "nep", "Nepali", "नेपाली") + DUTCH = ("nl", "nld", "dut", "Dutch", "Nederlands") + NORWEGIAN_NYNORSK = ("nn", "nno", "nno", "Norwegian Nynorsk", "Nynorsk") + NORWEGIAN = ("no", "nor", "nor", "Norwegian", "Norsk") + OCCITAN = ("oc", "oci", "oci", "Occitan", "Occitan") + PUNJABI = ("pa", "pan", "pan", "Punjabi", "ਪੰਜਾਬੀ") + POLISH = ("pl", "pol", "pol", "Polish", "Polski") + PASHTO = ("ps", "pus", "pus", "Pashto", "پښتو") + PORTUGUESE = ("pt", "por", "por", "Portuguese", "Português") + ROMANIAN = ("ro", "ron", "rum", "Romanian", "Română") + RUSSIAN = ("ru", "rus", "rus", "Russian", "Русский") + SANSKRIT = ("sa", "san", "san", "Sanskrit", "संस्कृतम्") + SINDHI = ("sd", "snd", "snd", "Sindhi", "سنڌي") + SINHALA = ("si", "sin", "sin", "Sinhala", "සිංහල") + SLOVAK = ("sk", "slk", "slo", "Slovak", "Slovenčina") + SLOVENE = ("sl", "slv", "slv", "Slovene", "Slovenščina") + SHONA = ("sn", "sna", "sna", "Shona", "ChiShona") + SOMALI = ("so", "som", "som", "Somali", "Soomaaliga") + ALBANIAN = ("sq", "sqi", "alb", "Albanian", "Shqip") + SERBIAN = ("sr", "srp", "srp", "Serbian", "Српски") + SUNDANESE = ("su", "sun", "sun", "Sundanese", "Basa Sunda") + SWEDISH = ("sv", "swe", "swe", "Swedish", "Svenska") + SWAHILI = ("sw", "swa", "swa", "Swahili", "Kiswahili") + TAMIL = ("ta", "tam", "tam", "Tamil", "தமிழ்") + TELUGU = ("te", "tel", "tel", "Telugu", "తెలుగు") + TAJIK = ("tg", "tgk", "tgk", "Tajik", "Тоҷикӣ") + THAI = ("th", "tha", "tha", "Thai", "ไทย") + TURKMEN = ("tk", "tuk", "tuk", "Turkmen", "Türkmençe") + TAGALOG = ("tl", "tgl", "tgl", "Tagalog", "Tagalog") + TURKISH = ("tr", "tur", "tur", "Turkish", "Türkçe") + TATAR = ("tt", "tat", "tat", "Tatar", "Татарча") + UKRAINIAN = ("uk", "ukr", "ukr", "Ukrainian", "Українська") + URDU = ("ur", "urd", "urd", "Urdu", "اردو") + UZBEK = ("uz", "uzb", "uzb", "Uzbek", "Oʻzbek") + VIETNAMESE = ("vi", "vie", "vie", "Vietnamese", "Tiếng Việt") + YIDDISH = ("yi", "yid", "yid", "Yiddish", "ייִדיש") + YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá") + CHINESE = ("zh", "zho", "chi", "Chinese", "中文") + CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語") + NONE = (None, None, None, None, None) # For no language + # und for Undetermined aka unknown language https://www.loc.gov/standards/iso639-2/faq.html#25 + + def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native): + self.iso_639_1 = iso_639_1 + self.iso_639_2_t = iso_639_2_t + self.iso_639_2_b = iso_639_2_b + self.name_en = name_en + self.name_native = name_native + + @staticmethod + def from_iso_639_1(code): + for lang in LanguageCode: + if lang.iso_639_1 == code: + return lang + return LanguageCode.NONE + + @staticmethod + def from_iso_639_2(code): + for lang in LanguageCode: + if lang.iso_639_2_t == code or lang.iso_639_2_b == code: + return lang + return LanguageCode.NONE + + @staticmethod + def from_name(name : str): + """Convert a language name (either English or native) to LanguageCode enum.""" + for lang in LanguageCode: + if lang.name_en.lower() == name.lower() or lang.name_native.lower() == name.lower(): + return lang + LanguageCode.NONE + + + @staticmethod + def from_string(value: str): + """ + Convert a string to a LanguageCode instance. Matches on ISO codes, English name, or native name. + """ + if value is None: + return LanguageCode.NONE + value = value.strip().lower() + for lang in LanguageCode: + if lang is LanguageCode.NONE: + continue + elif ( + value == lang.iso_639_1 + or value == lang.iso_639_2_t + or value == lang.iso_639_2_b + or value == lang.name_en.lower() + or value == lang.name_native.lower() + ): + return lang + return LanguageCode.NONE + + # is valid language + @staticmethod + def is_valid_language(language: str): + return LanguageCode.from_string(language) is not LanguageCode.NONE + + def to_iso_639_1(self): + return self.iso_639_1 + + def to_iso_639_2_t(self): + return self.iso_639_2_t + + def to_iso_639_2_b(self): + return self.iso_639_2_b + + def to_name(self, in_english=True): + return self.name_en if in_english else self.name_native + def __str__(self): + if self.name_en is None: + return "Unknown" + return self.name_en + + def __bool__(self): + return True if self.iso_639_1 is not None else False + + def __eq__(self, other): + """ + Compare the LanguageCode instance to another object. + Explicitly handle comparison to None. + """ + if other is None: + # If compared to None, return False unless self is None + return self.iso_639_1 is None + if isinstance(other, str): # Allow comparison with a string + return self.value == LanguageCode.from_string(other) + if isinstance(other, LanguageCode): + # Normal comparison for LanguageCode instances + return self.iso_639_1 == other.iso_639_1 + # Otherwise, defer to the default equality + return NotImplemented diff --git a/backend/core/models.py b/backend/core/models.py index 0c022c6..ab4c817 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -1,6 +1,6 @@ """Database models for TranscriptorIO.""" import uuid -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import Optional @@ -12,6 +12,12 @@ from sqlalchemy.sql import func from backend.core.database import Base +class JobType(str, Enum): + """Job type classification.""" + TRANSCRIPTION = "transcription" # Regular transcription/translation job + LANGUAGE_DETECTION = "language_detection" # Language detection only + + class JobStatus(str, Enum): """Job status states.""" QUEUED = "queued" @@ -24,7 +30,9 @@ class JobStatus(str, Enum): class JobStage(str, Enum): """Job processing stages.""" PENDING = "pending" + LOADING_MODEL = "loading_model" DETECTING_LANGUAGE = "detecting_language" + LANGUAGE_DETECTION = "language_detection" # Alias for backward compatibility EXTRACTING_AUDIO = "extracting_audio" TRANSCRIBING = "transcribing" TRANSLATING = "translating" @@ -50,6 +58,14 @@ class Job(Base): file_path = Column(String(1024), nullable=False, index=True) file_name = Column(String(512), nullable=False) + # Job classification + job_type = Column( + SQLEnum(JobType), + nullable=False, + default=JobType.TRANSCRIPTION, + index=True + ) + # Job status status = Column( SQLEnum(JobStatus), @@ -126,15 +142,25 @@ class Job(Base): @property def can_retry(self) -> bool: - """Check if job can be retried.""" - return self.status == JobStatus.FAILED and self.retry_count < self.max_retries + """Check if job can be retried. Always allow retry for failed jobs.""" + return self.status == JobStatus.FAILED def to_dict(self) -> dict: """Convert job to dictionary for API responses.""" + def format_datetime(dt): + """Format datetime as ISO string with UTC timezone.""" + if not dt: + return None + # If timezone-naive, assume UTC + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.isoformat() + return { "id": self.id, "file_path": self.file_path, "file_name": self.file_name, + "job_type": self.job_type.value if self.job_type else "transcription", "status": self.status.value, "priority": self.priority, "source_lang": self.source_lang, @@ -144,9 +170,9 @@ class Job(Base): "progress": self.progress, "current_stage": self.current_stage.value if self.current_stage else None, "eta_seconds": self.eta_seconds, - "created_at": self.created_at.isoformat() if self.created_at else None, - "started_at": self.started_at.isoformat() if self.started_at else None, - "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "created_at": format_datetime(self.created_at), + "started_at": format_datetime(self.started_at), + "completed_at": format_datetime(self.completed_at), "output_path": self.output_path, "segments_count": self.segments_count, "error": self.error, @@ -168,13 +194,13 @@ class Job(Base): def mark_started(self, worker_id: str): """Mark job as started.""" self.status = JobStatus.PROCESSING - self.started_at = datetime.utcnow() + self.started_at = datetime.now(timezone.utc) self.worker_id = worker_id def mark_completed(self, output_path: str, segments_count: int, srt_content: Optional[str] = None): """Mark job as completed.""" self.status = JobStatus.COMPLETED - self.completed_at = datetime.utcnow() + self.completed_at = datetime.now(timezone.utc) self.output_path = output_path self.segments_count = segments_count self.srt_content = srt_content @@ -182,19 +208,24 @@ class Job(Base): self.current_stage = JobStage.FINALIZING if self.started_at: - self.processing_time_seconds = (self.completed_at - self.started_at).total_seconds() + # Handle both timezone-aware and timezone-naive datetimes + started = self.started_at + if started.tzinfo is None: + # Convert naive datetime to UTC timezone-aware + started = started.replace(tzinfo=timezone.utc) + self.processing_time_seconds = (self.completed_at - started).total_seconds() def mark_failed(self, error: str): """Mark job as failed.""" self.status = JobStatus.FAILED - self.completed_at = datetime.utcnow() + self.completed_at = datetime.now(timezone.utc) self.error = error self.retry_count += 1 def mark_cancelled(self): """Mark job as cancelled.""" self.status = JobStatus.CANCELLED - self.completed_at = datetime.utcnow() + self.completed_at = datetime.now(timezone.utc) # Create indexes for common queries diff --git a/backend/core/queue_manager.py b/backend/core/queue_manager.py index 6e4e2cc..4a105f1 100644 --- a/backend/core/queue_manager.py +++ b/backend/core/queue_manager.py @@ -1,6 +1,6 @@ """Queue manager for persistent job queuing.""" import logging -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import List, Optional, Dict from sqlalchemy import and_, or_ from sqlalchemy.orm import Session @@ -39,6 +39,7 @@ class QueueManager: priority: int = 0, bazarr_callback_url: Optional[str] = None, is_manual_request: bool = False, + job_type: Optional['JobType'] = None, ) -> Optional[Job]: """ Add a new job to the queue. @@ -53,20 +54,29 @@ class QueueManager: priority: Job priority (higher = processed first) bazarr_callback_url: Callback URL for Bazarr provider mode is_manual_request: Whether this is a manual request (higher priority) + job_type: Type of job (transcription or language_detection) Returns: Job object if created, None if duplicate exists """ + from backend.core.models import JobType + + if job_type is None: + job_type = JobType.TRANSCRIPTION with self.db.get_session() as session: # Check for existing job existing = self._find_existing_job(session, file_path, target_lang) if existing: - logger.info(f"Job already exists for {file_name}: {existing.id} [{existing.status.value}]") + logger.warning( + f"Duplicate job detected for {file_name}: " + f"Existing job {existing.id} [{existing.status.value}] " + f"target={target_lang}, path={file_path}" + ) # If existing job failed and can retry, reset it if existing.can_retry: - logger.info(f"Resetting failed job {existing.id} for retry") + logger.info(f"Auto-retrying failed job {existing.id}") existing.status = JobStatus.QUEUED existing.error = None existing.current_stage = JobStage.PENDING @@ -74,12 +84,14 @@ class QueueManager: session.commit() return existing + logger.info(f"Job {existing.id} cannot be auto-retried (status: {existing.status.value})") return None # Create new job job = Job( file_path=file_path, file_name=file_name, + job_type=job_type, source_lang=source_lang, target_lang=target_lang, quality_preset=quality_preset, @@ -195,7 +207,11 @@ class QueueManager: job_id: str, output_path: str, segments_count: int, - srt_content: Optional[str] = None + srt_content: Optional[str] = None, + model_used: Optional[str] = None, + device_used: Optional[str] = None, + processing_time_seconds: Optional[float] = None, + detected_language: Optional[str] = None ) -> bool: """Mark a job as completed.""" with self.db.get_session() as session: @@ -206,6 +222,17 @@ class QueueManager: return False job.mark_completed(output_path, segments_count, srt_content) + + # Set optional metadata if provided + if model_used: + job.model_used = model_used + if device_used: + job.device_used = device_used + if processing_time_seconds is not None: + job.processing_time_seconds = processing_time_seconds + if detected_language: + job.source_lang = detected_language + session.commit() logger.info( @@ -227,7 +254,7 @@ class QueueManager: session.commit() logger.error( - f"Job {job_id} failed (attempt {job.retry_count}/{job.max_retries}): {error}" + f"Job {job_id} failed (attempt #{job.retry_count}): {error}" ) return True @@ -260,7 +287,7 @@ class QueueManager: failed = session.query(Job).filter(Job.status == JobStatus.FAILED).count() # Get today's stats - today = datetime.utcnow().date() + today = datetime.now(timezone.utc).date() completed_today = ( session.query(Job) .filter( @@ -321,6 +348,115 @@ class QueueManager: return jobs + def get_all_jobs( + self, + status_filter: Optional[JobStatus] = None, + limit: int = 50, + offset: int = 0 + ) -> List[Job]: + """ + Get all jobs with optional filtering. + + Args: + status_filter: Filter by status + limit: Maximum number of jobs to return + offset: Offset for pagination + + Returns: + List of Job objects (detached from session) + """ + with self.db.get_session() as session: + query = session.query(Job) + + if status_filter: + query = query.filter(Job.status == status_filter) + + jobs = ( + query + .order_by(Job.created_at.desc()) + .limit(limit) + .offset(offset) + .all() + ) + + # Expunge all jobs from session so they don't expire + for job in jobs: + session.expunge(job) + + return jobs + + def count_jobs(self, status_filter: Optional[JobStatus] = None) -> int: + """ + Count jobs with optional filtering. + + Args: + status_filter: Filter by status + + Returns: + Number of jobs + """ + with self.db.get_session() as session: + query = session.query(Job) + + if status_filter: + query = query.filter(Job.status == status_filter) + + return query.count() + + def retry_job(self, job_id: str) -> bool: + """ + Retry a failed job. + + Args: + job_id: Job ID to retry + + Returns: + True if job was reset to queued, False otherwise + """ + with self.db.get_session() as session: + job = session.query(Job).filter(Job.id == job_id).first() + + if not job: + logger.warning(f"Job {job_id} not found for retry") + return False + + if not job.can_retry: + logger.warning(f"Job {job_id} cannot be retried") + return False + + # Reset job to queued + job.status = JobStatus.QUEUED + job.error = None + job.current_stage = JobStage.PENDING + job.progress = 0.0 + job.worker_id = None + job.retry_count += 1 # Increment retry count for tracking + session.commit() + + logger.info(f"Job {job_id} reset for retry (attempt #{job.retry_count})") + return True + + def clear_completed_jobs(self) -> int: + """ + Clear all completed jobs. + + Returns: + Number of jobs cleared + """ + with self.db.get_session() as session: + deleted = ( + session.query(Job) + .filter(Job.status == JobStatus.COMPLETED) + .delete() + ) + + session.commit() + + if deleted > 0: + logger.info(f"Cleared {deleted} completed jobs") + + return deleted + def get_processing_jobs(self) -> List[Job]: """Get all currently processing jobs.""" return self.get_jobs(status=JobStatus.PROCESSING) @@ -350,7 +486,7 @@ class QueueManager: Number of jobs deleted """ with self.db.get_session() as session: - cutoff_date = datetime.utcnow() - timedelta(days=days) + cutoff_date = datetime.now(timezone.utc) - timedelta(days=days) deleted = ( session.query(Job) @@ -389,6 +525,48 @@ class QueueManager: return query.first() + def cleanup_orphaned_jobs(self) -> int: + """ + Clean up orphaned jobs after server restart. + + Jobs stuck in 'processing' state with no active worker are marked as failed. + This prevents jobs from being stuck forever after a restart. + + Returns: + Number of jobs cleaned up + """ + from datetime import datetime + + with self.db.get_session() as session: + # Find all jobs in processing state + orphaned_jobs = session.query(Job).filter( + Job.status == JobStatus.PROCESSING + ).all() + + cleaned_count = 0 + for job in orphaned_jobs: + # Mark as failed with appropriate error message + job.status = JobStatus.FAILED + job.error = "Job interrupted by server restart" + job.completed_at = datetime.now(timezone.utc) + job.progress = 0.0 + job.current_stage = JobStage.PENDING + job.worker_id = None + + logger.warning( + f"Cleaned up orphaned job {job.id} ({job.file_name}) - " + f"was stuck in processing state" + ) + cleaned_count += 1 + + session.commit() + + if cleaned_count > 0: + logger.info(f"Cleaned up {cleaned_count} orphaned job(s) after restart") + + return cleaned_count + # Global queue manager instance -queue_manager = QueueManager() \ No newline at end of file +queue_manager = QueueManager() + diff --git a/backend/core/settings_model.py b/backend/core/settings_model.py new file mode 100644 index 0000000..7af3006 --- /dev/null +++ b/backend/core/settings_model.py @@ -0,0 +1,74 @@ +"""Database model for system settings.""" +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime +from sqlalchemy.sql import func + +from backend.core.database import Base + + +class SystemSettings(Base): + """ + System settings stored in database. + + Replaces .env configuration for dynamic settings management through WebUI. + Settings are organized by category and support different value types. + """ + + __tablename__ = "system_settings" + + # Primary key + id = Column(Integer, primary_key=True, autoincrement=True) + + # Setting identification + key = Column(String(255), nullable=False, unique=True, index=True) + value = Column(Text, nullable=True) # Store as string, parse based on value_type + + # Metadata + description = Column(Text, nullable=True) + category = Column(String(100), nullable=True, index=True) # general, workers, transcription, scanner, bazarr + value_type = Column(String(50), nullable=True) # string, integer, boolean, float, list + + # Timestamps + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + def __repr__(self): + """String representation.""" + return f"" + + def to_dict(self) -> dict: + """Convert to dictionary for API responses.""" + return { + "id": self.id, + "key": self.key, + "value": self.value, + "description": self.description, + "category": self.category, + "value_type": self.value_type, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + } + + def get_parsed_value(self): + """ + Parse value based on value_type. + + Returns: + Parsed value in appropriate Python type + """ + if self.value is None: + return None + + if self.value_type == "boolean": + return self.value.lower() in ("true", "1", "yes", "on") + elif self.value_type == "integer": + return int(self.value) + elif self.value_type == "float": + return float(self.value) + elif self.value_type == "list": + # Comma-separated values + return [v.strip() for v in self.value.split(",") if v.strip()] + else: # string or unknown + return self.value + + diff --git a/backend/core/settings_service.py b/backend/core/settings_service.py new file mode 100644 index 0000000..4070505 --- /dev/null +++ b/backend/core/settings_service.py @@ -0,0 +1,541 @@ +"""Settings service for database-backed configuration.""" +import logging +from typing import Optional, Dict, Any, List +from sqlalchemy.exc import IntegrityError + +from backend.core.database import database +from backend.core.settings_model import SystemSettings + +logger = logging.getLogger(__name__) + + +class SettingsService: + """ + Service for managing system settings in database. + + Provides caching and type-safe access to settings. + Settings are organized by category: general, workers, transcription, scanner, bazarr + """ + + def __init__(self): + """Initialize settings service.""" + self._cache: Dict[str, Any] = {} + self._cache_valid = False + + def get(self, key: str, default: Any = None) -> Any: + """ + Get setting value by key. + + Args: + key: Setting key + default: Default value if not found + + Returns: + Parsed setting value or default + """ + # Refresh cache if needed + if not self._cache_valid: + self._load_cache() + + return self._cache.get(key, default) + + def set(self, key: str, value: Any, description: str = None, category: str = None, value_type: str = None) -> bool: + """ + Set setting value. + + Args: + key: Setting key + value: Setting value (will be converted to string) + description: Optional description + category: Optional category + value_type: Optional type (string, integer, boolean, float, list) + + Returns: + True if successful + """ + with database.get_session() as session: + setting = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if setting: + # Update existing + setting.value = str(value) if value is not None else None + if description: + setting.description = description + if category: + setting.category = category + if value_type: + setting.value_type = value_type + else: + # Create new + setting = SystemSettings( + key=key, + value=str(value) if value is not None else None, + description=description, + category=category, + value_type=value_type or "string" + ) + session.add(setting) + + session.commit() + + # Invalidate cache + self._cache_valid = False + + logger.info(f"Setting updated: {key}={value}") + return True + + def get_by_category(self, category: str) -> List[SystemSettings]: + """ + Get all settings in a category. + + Args: + category: Category name + + Returns: + List of SystemSettings objects + """ + with database.get_session() as session: + settings = session.query(SystemSettings).filter( + SystemSettings.category == category + ).all() + + # Detach from session + for setting in settings: + session.expunge(setting) + + return settings + + def get_all(self) -> List[SystemSettings]: + """ + Get all settings. + + Returns: + List of SystemSettings objects + """ + with database.get_session() as session: + settings = session.query(SystemSettings).all() + + # Detach from session + for setting in settings: + session.expunge(setting) + + return settings + + def delete(self, key: str) -> bool: + """ + Delete a setting. + + Args: + key: Setting key + + Returns: + True if deleted, False if not found + """ + with database.get_session() as session: + setting = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if not setting: + return False + + session.delete(setting) + session.commit() + + # Invalidate cache + self._cache_valid = False + + logger.info(f"Setting deleted: {key}") + return True + + def bulk_update(self, settings: Dict[str, Any]) -> bool: + """ + Update multiple settings at once. + + Args: + settings: Dictionary of key-value pairs + + Returns: + True if successful + """ + with database.get_session() as session: + for key, value in settings.items(): + setting = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if setting: + setting.value = str(value) if value is not None else None + else: + logger.warning(f"Setting not found for bulk update: {key}") + + session.commit() + + # Invalidate cache + self._cache_valid = False + + logger.info(f"Bulk updated {len(settings)} settings") + return True + + def init_default_settings(self): + """ + Initialize default settings if they don't exist. + Called on first run or after database reset. + """ + defaults = self._get_default_settings() + + with database.get_session() as session: + for key, config in defaults.items(): + existing = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if not existing: + setting = SystemSettings( + key=key, + value=str(config["value"]) if config["value"] is not None else None, + description=config.get("description"), + category=config.get("category"), + value_type=config.get("value_type", "string") + ) + session.add(setting) + logger.info(f"Created default setting: {key}") + + session.commit() + + # Invalidate cache + self._cache_valid = False + + logger.info("Default settings initialized") + + def _load_cache(self): + """Load all settings into cache.""" + with database.get_session() as session: + settings = session.query(SystemSettings).all() + + self._cache = {} + for setting in settings: + self._cache[setting.key] = setting.get_parsed_value() + + self._cache_valid = True + + def _get_default_settings(self) -> Dict[str, Dict]: + """ + Get default settings configuration. + + All settings have sensible defaults. Configuration is managed + through the Web UI Settings page or the Settings API. + + Returns: + Dictionary of setting configurations + """ + return { + # === General === + "operation_mode": { + "value": "standalone", + "description": "Operation mode: standalone, provider, or standalone,provider", + "category": "general", + "value_type": "string" + }, + "library_paths": { + "value": "", + "description": "Comma-separated library paths to scan", + "category": "general", + "value_type": "list" + }, + "api_host": { + "value": "0.0.0.0", + "description": "API server host", + "category": "general", + "value_type": "string" + }, + "api_port": { + "value": "8000", + "description": "API server port", + "category": "general", + "value_type": "integer" + }, + "debug": { + "value": "false", + "description": "Enable debug mode", + "category": "general", + "value_type": "boolean" + }, + "setup_completed": { + "value": "false", + "description": "Whether setup wizard has been completed", + "category": "general", + "value_type": "boolean" + }, + + # === Workers === + "worker_cpu_count": { + "value": "0", + "description": "Number of CPU workers to start on boot", + "category": "workers", + "value_type": "integer" + }, + "worker_gpu_count": { + "value": "0", + "description": "Number of GPU workers to start on boot", + "category": "workers", + "value_type": "integer" + }, + "concurrent_transcriptions": { + "value": "2", + "description": "Maximum concurrent transcriptions", + "category": "workers", + "value_type": "integer" + }, + "worker_healthcheck_interval": { + "value": "60", + "description": "Worker health check interval (seconds)", + "category": "workers", + "value_type": "integer" + }, + "worker_auto_restart": { + "value": "true", + "description": "Auto-restart failed workers", + "category": "workers", + "value_type": "boolean" + }, + "clear_vram_on_complete": { + "value": "true", + "description": "Clear VRAM after job completion", + "category": "workers", + "value_type": "boolean" + }, + + # === Whisper/Transcription === + "whisper_model": { + "value": "medium", + "description": "Whisper model: tiny, base, small, medium, large-v3, large-v3-turbo", + "category": "transcription", + "value_type": "string" + }, + "model_path": { + "value": "./models", + "description": "Path to store Whisper models", + "category": "transcription", + "value_type": "string" + }, + "transcribe_device": { + "value": "cpu", + "description": "Device for transcription (cpu, cuda, gpu)", + "category": "transcription", + "value_type": "string" + }, + "cpu_compute_type": { + "value": "auto", + "description": "CPU compute type (auto, int8, float32)", + "category": "transcription", + "value_type": "string" + }, + "gpu_compute_type": { + "value": "auto", + "description": "GPU compute type (auto, float16, float32, int8_float16, int8)", + "category": "transcription", + "value_type": "string" + }, + "whisper_threads": { + "value": "4", + "description": "Number of CPU threads for Whisper", + "category": "transcription", + "value_type": "integer" + }, + "transcribe_or_translate": { + "value": "transcribe", + "description": "Default mode: transcribe or translate", + "category": "transcription", + "value_type": "string" + }, + "word_level_highlight": { + "value": "false", + "description": "Enable word-level highlighting in subtitles", + "category": "transcription", + "value_type": "boolean" + }, + "detect_language_length": { + "value": "30", + "description": "Seconds of audio to use for language detection", + "category": "transcription", + "value_type": "integer" + }, + "detect_language_offset": { + "value": "0", + "description": "Offset in seconds for language detection sample", + "category": "transcription", + "value_type": "integer" + }, + + # === Subtitle Settings === + "subtitle_language_name": { + "value": "", + "description": "Custom subtitle language name", + "category": "subtitles", + "value_type": "string" + }, + "subtitle_language_naming_type": { + "value": "ISO_639_2_B", + "description": "Language naming: ISO_639_1, ISO_639_2_T, ISO_639_2_B, NAME, NATIVE", + "category": "subtitles", + "value_type": "string" + }, + "custom_regroup": { + "value": "cm_sl=84_sl=42++++++1", + "description": "Custom regrouping algorithm for subtitles", + "category": "subtitles", + "value_type": "string" + }, + + # === Skip Configuration === + "skip_if_external_subtitles_exist": { + "value": "false", + "description": "Skip if any external subtitle exists", + "category": "skip", + "value_type": "boolean" + }, + "skip_if_target_subtitles_exist": { + "value": "true", + "description": "Skip if target language subtitle already exists", + "category": "skip", + "value_type": "boolean" + }, + "skip_if_internal_subtitles_language": { + "value": "", + "description": "Skip if internal subtitle in this language exists", + "category": "skip", + "value_type": "string" + }, + "skip_subtitle_languages": { + "value": "", + "description": "Pipe-separated language codes to skip", + "category": "skip", + "value_type": "list" + }, + "skip_if_audio_languages": { + "value": "", + "description": "Skip if audio track is in these languages", + "category": "skip", + "value_type": "list" + }, + "skip_unknown_language": { + "value": "false", + "description": "Skip files with unknown audio language", + "category": "skip", + "value_type": "boolean" + }, + "skip_only_subgen_subtitles": { + "value": "false", + "description": "Only skip SubGen-generated subtitles", + "category": "skip", + "value_type": "boolean" + }, + + # === Scanner === + "scanner_enabled": { + "value": "true", + "description": "Enable library scanner", + "category": "scanner", + "value_type": "boolean" + }, + "scanner_cron": { + "value": "0 2 * * *", + "description": "Cron expression for scheduled scans", + "category": "scanner", + "value_type": "string" + }, + "watcher_enabled": { + "value": "false", + "description": "Enable real-time file watcher", + "category": "scanner", + "value_type": "boolean" + }, + "auto_scan_enabled": { + "value": "false", + "description": "Enable automatic scheduled scanning", + "category": "scanner", + "value_type": "boolean" + }, + "scan_interval_minutes": { + "value": "30", + "description": "Scan interval in minutes", + "category": "scanner", + "value_type": "integer" + }, + + # === Bazarr Provider === + "bazarr_provider_enabled": { + "value": "false", + "description": "Enable Bazarr provider mode", + "category": "bazarr", + "value_type": "boolean" + }, + "bazarr_url": { + "value": "http://bazarr:6767", + "description": "Bazarr server URL", + "category": "bazarr", + "value_type": "string" + }, + "bazarr_api_key": { + "value": "", + "description": "Bazarr API key", + "category": "bazarr", + "value_type": "string" + }, + "provider_timeout_seconds": { + "value": "600", + "description": "Provider request timeout in seconds", + "category": "bazarr", + "value_type": "integer" + }, + "provider_callback_enabled": { + "value": "true", + "description": "Enable callback to Bazarr on completion", + "category": "bazarr", + "value_type": "boolean" + }, + "provider_polling_interval": { + "value": "30", + "description": "Polling interval for Bazarr jobs", + "category": "bazarr", + "value_type": "integer" + }, + + # === Advanced === + "force_detected_language_to": { + "value": "", + "description": "Force detected language to specific code", + "category": "advanced", + "value_type": "string" + }, + "preferred_audio_languages": { + "value": "eng", + "description": "Pipe-separated preferred audio languages", + "category": "advanced", + "value_type": "list" + }, + "use_path_mapping": { + "value": "false", + "description": "Enable path mapping for network shares", + "category": "advanced", + "value_type": "boolean" + }, + "path_mapping_from": { + "value": "/tv", + "description": "Path mapping source", + "category": "advanced", + "value_type": "string" + }, + "path_mapping_to": { + "value": "/Volumes/TV", + "description": "Path mapping destination", + "category": "advanced", + "value_type": "string" + }, + "lrc_for_audio_files": { + "value": "true", + "description": "Generate LRC files for audio-only files", + "category": "advanced", + "value_type": "boolean" + }, + } + + +# Global settings service instance +settings_service = SettingsService() + diff --git a/backend/core/system_monitor.py b/backend/core/system_monitor.py new file mode 100644 index 0000000..1027a06 --- /dev/null +++ b/backend/core/system_monitor.py @@ -0,0 +1,294 @@ +"""System resource monitoring service.""" +import logging +import platform +from typing import Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Try to import psutil (CPU/RAM monitoring) +try: + import psutil + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + logger.warning("psutil not installed. CPU/RAM monitoring will be unavailable.") + +# Try to import pynvml (NVIDIA GPU monitoring) +try: + import pynvml + pynvml.nvmlInit() + NVML_AVAILABLE = True +except Exception as e: + NVML_AVAILABLE = False + logger.debug(f"pynvml not available: {e}. GPU monitoring will be unavailable.") + + +class SystemMonitor: + """Monitor system resources: CPU, RAM, GPU, VRAM.""" + + def __init__(self): + """Initialize system monitor.""" + self.gpu_count = 0 + + if NVML_AVAILABLE: + try: + self.gpu_count = pynvml.nvmlDeviceGetCount() + logger.info(f"Detected {self.gpu_count} NVIDIA GPU(s)") + except Exception as e: + logger.warning(f"Could not get GPU count: {e}") + self.gpu_count = 0 + + def get_cpu_info(self) -> Dict[str, any]: + """ + Get CPU usage information. + + Returns: + Dictionary with CPU stats + """ + if not PSUTIL_AVAILABLE: + return { + "available": False, + "error": "psutil not installed" + } + + try: + cpu_percent = psutil.cpu_percent(interval=0.1, percpu=False) + cpu_count = psutil.cpu_count(logical=True) + cpu_count_physical = psutil.cpu_count(logical=False) + + # Get per-core usage + cpu_percent_per_core = psutil.cpu_percent(interval=0.1, percpu=True) + + # Get CPU frequency + cpu_freq = psutil.cpu_freq() + freq_current = cpu_freq.current if cpu_freq else None + freq_max = cpu_freq.max if cpu_freq else None + + return { + "available": True, + "usage_percent": round(cpu_percent, 1), + "count_logical": cpu_count, + "count_physical": cpu_count_physical, + "per_core_usage": [round(p, 1) for p in cpu_percent_per_core], + "frequency_mhz": round(freq_current, 0) if freq_current else None, + "frequency_max_mhz": round(freq_max, 0) if freq_max else None, + } + except Exception as e: + logger.error(f"Error getting CPU info: {e}") + return { + "available": False, + "error": str(e) + } + + def get_memory_info(self) -> Dict[str, any]: + """ + Get RAM usage information. + + Returns: + Dictionary with memory stats + """ + if not PSUTIL_AVAILABLE: + return { + "available": False, + "error": "psutil not installed" + } + + try: + mem = psutil.virtual_memory() + + return { + "available": True, + "total_gb": round(mem.total / (1024**3), 2), + "used_gb": round(mem.used / (1024**3), 2), + "free_gb": round(mem.available / (1024**3), 2), + "usage_percent": round(mem.percent, 1), + "total_bytes": mem.total, + "used_bytes": mem.used, + "available_bytes": mem.available, + } + except Exception as e: + logger.error(f"Error getting memory info: {e}") + return { + "available": False, + "error": str(e) + } + + def get_swap_info(self) -> Dict[str, any]: + """ + Get swap memory information. + + Returns: + Dictionary with swap stats + """ + if not PSUTIL_AVAILABLE: + return { + "available": False, + "error": "psutil not installed" + } + + try: + swap = psutil.swap_memory() + + return { + "available": True, + "total_gb": round(swap.total / (1024**3), 2), + "used_gb": round(swap.used / (1024**3), 2), + "free_gb": round(swap.free / (1024**3), 2), + "usage_percent": round(swap.percent, 1), + "total_bytes": swap.total, + "used_bytes": swap.used, + "free_bytes": swap.free, + } + except Exception as e: + logger.error(f"Error getting swap info: {e}") + return { + "available": False, + "error": str(e) + } + + def get_gpu_info(self, device_id: int = 0) -> Dict[str, any]: + """ + Get GPU information for a specific device. + + Args: + device_id: GPU device ID (default: 0) + + Returns: + Dictionary with GPU stats + """ + if not NVML_AVAILABLE: + return { + "available": False, + "device_id": device_id, + "error": "pynvml not available or no NVIDIA GPUs detected" + } + + if device_id >= self.gpu_count: + return { + "available": False, + "device_id": device_id, + "error": f"GPU device {device_id} not found. Only {self.gpu_count} GPU(s) available." + } + + try: + handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) + + # Get GPU name + name = pynvml.nvmlDeviceGetName(handle) + if isinstance(name, bytes): + name = name.decode('utf-8') + + # Get memory info + mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) + + # Get utilization + util = pynvml.nvmlDeviceGetUtilizationRates(handle) + + # Get temperature + try: + temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) + except Exception: + temp = None + + # Get power usage + try: + power_usage = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000.0 # Convert mW to W + power_limit = pynvml.nvmlDeviceGetPowerManagementLimit(handle) / 1000.0 + except Exception: + power_usage = None + power_limit = None + + # Get fan speed + try: + fan_speed = pynvml.nvmlDeviceGetFanSpeed(handle) + except Exception: + fan_speed = None + + return { + "available": True, + "device_id": device_id, + "name": name, + "memory": { + "total_gb": round(mem_info.total / (1024**3), 2), + "used_gb": round(mem_info.used / (1024**3), 2), + "free_gb": round(mem_info.free / (1024**3), 2), + "usage_percent": round((mem_info.used / mem_info.total) * 100, 1), + "total_bytes": mem_info.total, + "used_bytes": mem_info.used, + "free_bytes": mem_info.free, + }, + "utilization": { + "gpu_percent": util.gpu, + "memory_percent": util.memory, + }, + "temperature_c": temp, + "power": { + "usage_watts": round(power_usage, 1) if power_usage else None, + "limit_watts": round(power_limit, 1) if power_limit else None, + "usage_percent": round((power_usage / power_limit) * 100, 1) if (power_usage and power_limit) else None, + }, + "fan_speed_percent": fan_speed, + } + except Exception as e: + logger.error(f"Error getting GPU {device_id} info: {e}") + return { + "available": False, + "device_id": device_id, + "error": str(e) + } + + def get_all_gpus_info(self) -> List[Dict[str, any]]: + """ + Get information for all available GPUs. + + Returns: + List of GPU info dictionaries + """ + if not NVML_AVAILABLE or self.gpu_count == 0: + return [] + + return [self.get_gpu_info(i) for i in range(self.gpu_count)] + + def get_system_info(self) -> Dict[str, any]: + """ + Get general system information. + + Returns: + Dictionary with system info + """ + return { + "platform": platform.system(), + "platform_release": platform.release(), + "platform_version": platform.version(), + "architecture": platform.machine(), + "processor": platform.processor(), + "python_version": platform.python_version(), + } + + def get_all_resources(self) -> Dict[str, any]: + """ + Get all system resources in a single call. + + Returns: + Comprehensive system resource dictionary + """ + return { + "system": self.get_system_info(), + "cpu": self.get_cpu_info(), + "memory": self.get_memory_info(), + "swap": self.get_swap_info(), + "gpus": self.get_all_gpus_info(), + "gpu_count": self.gpu_count, + } + + def __del__(self): + """Cleanup NVML on destruction.""" + if NVML_AVAILABLE: + try: + pynvml.nvmlShutdown() + except Exception: + pass + + +# Global system monitor instance +system_monitor = SystemMonitor()