feat(core): add database, models, queue and settings system
- Add SQLAlchemy database setup with session management - Add Job model with status, priority and progress tracking - Add QueueManager with priority queue and deduplication - Add SystemSettings model for database-backed configuration - Add SettingsService with caching and defaults - Add SystemMonitor for CPU/RAM/GPU resource monitoring - Add LanguageCode utilities (moved from root)
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
"""Queue manager for persistent job queuing."""
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional, Dict
|
||||
from sqlalchemy import and_, or_
|
||||
from sqlalchemy.orm import Session
|
||||
@@ -39,6 +39,7 @@ class QueueManager:
|
||||
priority: int = 0,
|
||||
bazarr_callback_url: Optional[str] = None,
|
||||
is_manual_request: bool = False,
|
||||
job_type: Optional['JobType'] = None,
|
||||
) -> Optional[Job]:
|
||||
"""
|
||||
Add a new job to the queue.
|
||||
@@ -53,20 +54,29 @@ class QueueManager:
|
||||
priority: Job priority (higher = processed first)
|
||||
bazarr_callback_url: Callback URL for Bazarr provider mode
|
||||
is_manual_request: Whether this is a manual request (higher priority)
|
||||
job_type: Type of job (transcription or language_detection)
|
||||
|
||||
Returns:
|
||||
Job object if created, None if duplicate exists
|
||||
"""
|
||||
from backend.core.models import JobType
|
||||
|
||||
if job_type is None:
|
||||
job_type = JobType.TRANSCRIPTION
|
||||
with self.db.get_session() as session:
|
||||
# Check for existing job
|
||||
existing = self._find_existing_job(session, file_path, target_lang)
|
||||
|
||||
if existing:
|
||||
logger.info(f"Job already exists for {file_name}: {existing.id} [{existing.status.value}]")
|
||||
logger.warning(
|
||||
f"Duplicate job detected for {file_name}: "
|
||||
f"Existing job {existing.id} [{existing.status.value}] "
|
||||
f"target={target_lang}, path={file_path}"
|
||||
)
|
||||
|
||||
# If existing job failed and can retry, reset it
|
||||
if existing.can_retry:
|
||||
logger.info(f"Resetting failed job {existing.id} for retry")
|
||||
logger.info(f"Auto-retrying failed job {existing.id}")
|
||||
existing.status = JobStatus.QUEUED
|
||||
existing.error = None
|
||||
existing.current_stage = JobStage.PENDING
|
||||
@@ -74,12 +84,14 @@ class QueueManager:
|
||||
session.commit()
|
||||
return existing
|
||||
|
||||
logger.info(f"Job {existing.id} cannot be auto-retried (status: {existing.status.value})")
|
||||
return None
|
||||
|
||||
# Create new job
|
||||
job = Job(
|
||||
file_path=file_path,
|
||||
file_name=file_name,
|
||||
job_type=job_type,
|
||||
source_lang=source_lang,
|
||||
target_lang=target_lang,
|
||||
quality_preset=quality_preset,
|
||||
@@ -195,7 +207,11 @@ class QueueManager:
|
||||
job_id: str,
|
||||
output_path: str,
|
||||
segments_count: int,
|
||||
srt_content: Optional[str] = None
|
||||
srt_content: Optional[str] = None,
|
||||
model_used: Optional[str] = None,
|
||||
device_used: Optional[str] = None,
|
||||
processing_time_seconds: Optional[float] = None,
|
||||
detected_language: Optional[str] = None
|
||||
) -> bool:
|
||||
"""Mark a job as completed."""
|
||||
with self.db.get_session() as session:
|
||||
@@ -206,6 +222,17 @@ class QueueManager:
|
||||
return False
|
||||
|
||||
job.mark_completed(output_path, segments_count, srt_content)
|
||||
|
||||
# Set optional metadata if provided
|
||||
if model_used:
|
||||
job.model_used = model_used
|
||||
if device_used:
|
||||
job.device_used = device_used
|
||||
if processing_time_seconds is not None:
|
||||
job.processing_time_seconds = processing_time_seconds
|
||||
if detected_language:
|
||||
job.source_lang = detected_language
|
||||
|
||||
session.commit()
|
||||
|
||||
logger.info(
|
||||
@@ -227,7 +254,7 @@ class QueueManager:
|
||||
session.commit()
|
||||
|
||||
logger.error(
|
||||
f"Job {job_id} failed (attempt {job.retry_count}/{job.max_retries}): {error}"
|
||||
f"Job {job_id} failed (attempt #{job.retry_count}): {error}"
|
||||
)
|
||||
return True
|
||||
|
||||
@@ -260,7 +287,7 @@ class QueueManager:
|
||||
failed = session.query(Job).filter(Job.status == JobStatus.FAILED).count()
|
||||
|
||||
# Get today's stats
|
||||
today = datetime.utcnow().date()
|
||||
today = datetime.now(timezone.utc).date()
|
||||
completed_today = (
|
||||
session.query(Job)
|
||||
.filter(
|
||||
@@ -321,6 +348,115 @@ class QueueManager:
|
||||
|
||||
return jobs
|
||||
|
||||
def get_all_jobs(
|
||||
self,
|
||||
status_filter: Optional[JobStatus] = None,
|
||||
limit: int = 50,
|
||||
offset: int = 0
|
||||
) -> List[Job]:
|
||||
"""
|
||||
Get all jobs with optional filtering.
|
||||
|
||||
Args:
|
||||
status_filter: Filter by status
|
||||
limit: Maximum number of jobs to return
|
||||
offset: Offset for pagination
|
||||
|
||||
Returns:
|
||||
List of Job objects (detached from session)
|
||||
"""
|
||||
with self.db.get_session() as session:
|
||||
query = session.query(Job)
|
||||
|
||||
if status_filter:
|
||||
query = query.filter(Job.status == status_filter)
|
||||
|
||||
jobs = (
|
||||
query
|
||||
.order_by(Job.created_at.desc())
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Expunge all jobs from session so they don't expire
|
||||
for job in jobs:
|
||||
session.expunge(job)
|
||||
|
||||
return jobs
|
||||
|
||||
def count_jobs(self, status_filter: Optional[JobStatus] = None) -> int:
|
||||
"""
|
||||
Count jobs with optional filtering.
|
||||
|
||||
Args:
|
||||
status_filter: Filter by status
|
||||
|
||||
Returns:
|
||||
Number of jobs
|
||||
"""
|
||||
with self.db.get_session() as session:
|
||||
query = session.query(Job)
|
||||
|
||||
if status_filter:
|
||||
query = query.filter(Job.status == status_filter)
|
||||
|
||||
return query.count()
|
||||
|
||||
def retry_job(self, job_id: str) -> bool:
|
||||
"""
|
||||
Retry a failed job.
|
||||
|
||||
Args:
|
||||
job_id: Job ID to retry
|
||||
|
||||
Returns:
|
||||
True if job was reset to queued, False otherwise
|
||||
"""
|
||||
with self.db.get_session() as session:
|
||||
job = session.query(Job).filter(Job.id == job_id).first()
|
||||
|
||||
if not job:
|
||||
logger.warning(f"Job {job_id} not found for retry")
|
||||
return False
|
||||
|
||||
if not job.can_retry:
|
||||
logger.warning(f"Job {job_id} cannot be retried")
|
||||
return False
|
||||
|
||||
# Reset job to queued
|
||||
job.status = JobStatus.QUEUED
|
||||
job.error = None
|
||||
job.current_stage = JobStage.PENDING
|
||||
job.progress = 0.0
|
||||
job.worker_id = None
|
||||
job.retry_count += 1 # Increment retry count for tracking
|
||||
session.commit()
|
||||
|
||||
logger.info(f"Job {job_id} reset for retry (attempt #{job.retry_count})")
|
||||
return True
|
||||
|
||||
def clear_completed_jobs(self) -> int:
|
||||
"""
|
||||
Clear all completed jobs.
|
||||
|
||||
Returns:
|
||||
Number of jobs cleared
|
||||
"""
|
||||
with self.db.get_session() as session:
|
||||
deleted = (
|
||||
session.query(Job)
|
||||
.filter(Job.status == JobStatus.COMPLETED)
|
||||
.delete()
|
||||
)
|
||||
|
||||
session.commit()
|
||||
|
||||
if deleted > 0:
|
||||
logger.info(f"Cleared {deleted} completed jobs")
|
||||
|
||||
return deleted
|
||||
|
||||
def get_processing_jobs(self) -> List[Job]:
|
||||
"""Get all currently processing jobs."""
|
||||
return self.get_jobs(status=JobStatus.PROCESSING)
|
||||
@@ -350,7 +486,7 @@ class QueueManager:
|
||||
Number of jobs deleted
|
||||
"""
|
||||
with self.db.get_session() as session:
|
||||
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
||||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
|
||||
deleted = (
|
||||
session.query(Job)
|
||||
@@ -389,6 +525,48 @@ class QueueManager:
|
||||
|
||||
return query.first()
|
||||
|
||||
def cleanup_orphaned_jobs(self) -> int:
|
||||
"""
|
||||
Clean up orphaned jobs after server restart.
|
||||
|
||||
Jobs stuck in 'processing' state with no active worker are marked as failed.
|
||||
This prevents jobs from being stuck forever after a restart.
|
||||
|
||||
Returns:
|
||||
Number of jobs cleaned up
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
with self.db.get_session() as session:
|
||||
# Find all jobs in processing state
|
||||
orphaned_jobs = session.query(Job).filter(
|
||||
Job.status == JobStatus.PROCESSING
|
||||
).all()
|
||||
|
||||
cleaned_count = 0
|
||||
for job in orphaned_jobs:
|
||||
# Mark as failed with appropriate error message
|
||||
job.status = JobStatus.FAILED
|
||||
job.error = "Job interrupted by server restart"
|
||||
job.completed_at = datetime.now(timezone.utc)
|
||||
job.progress = 0.0
|
||||
job.current_stage = JobStage.PENDING
|
||||
job.worker_id = None
|
||||
|
||||
logger.warning(
|
||||
f"Cleaned up orphaned job {job.id} ({job.file_name}) - "
|
||||
f"was stuck in processing state"
|
||||
)
|
||||
cleaned_count += 1
|
||||
|
||||
session.commit()
|
||||
|
||||
if cleaned_count > 0:
|
||||
logger.info(f"Cleaned up {cleaned_count} orphaned job(s) after restart")
|
||||
|
||||
return cleaned_count
|
||||
|
||||
|
||||
# Global queue manager instance
|
||||
queue_manager = QueueManager()
|
||||
queue_manager = QueueManager()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user