Files
Transcriptarr/backend/core/worker.py
Dasemu 7959210724 feat: add centralized configuration system with Pydantic
- Add backend/config.py with Pydantic settings validation
- Support for standalone, provider, and hybrid operation modes
- Multi-database backend configuration (SQLite/PostgreSQL/MariaDB)
- Environment variable validation with helpful error messages
- Worker and Whisper model configuration
2026-01-11 21:23:45 +01:00

285 lines
8.9 KiB
Python

"""Individual worker for processing transcription jobs."""
import logging
import multiprocessing as mp
import time
import traceback
from datetime import datetime
from enum import Enum
from typing import Optional
from backend.core.database import Database
from backend.core.models import Job, JobStatus, JobStage
from backend.core.queue_manager import QueueManager
logger = logging.getLogger(__name__)
class WorkerType(str, Enum):
"""Worker device type."""
CPU = "cpu"
GPU = "gpu"
class WorkerStatus(str, Enum):
"""Worker status states."""
IDLE = "idle"
BUSY = "busy"
STOPPING = "stopping"
STOPPED = "stopped"
ERROR = "error"
class Worker:
"""
Individual worker process for transcription.
Each worker runs in its own process and can handle one job at a time.
Workers communicate with the main process via multiprocessing primitives.
"""
def __init__(
self,
worker_id: str,
worker_type: WorkerType,
device_id: Optional[int] = None
):
"""
Initialize worker.
Args:
worker_id: Unique identifier for this worker
worker_type: CPU or GPU
device_id: GPU device ID (only for GPU workers)
"""
self.worker_id = worker_id
self.worker_type = worker_type
self.device_id = device_id
# Multiprocessing primitives
self.process: Optional[mp.Process] = None
self.stop_event = mp.Event()
self.status = mp.Value('i', WorkerStatus.IDLE.value) # type: ignore
self.current_job_id = mp.Array('c', 36) # type: ignore # UUID string
# Stats
self.jobs_completed = mp.Value('i', 0) # type: ignore
self.jobs_failed = mp.Value('i', 0) # type: ignore
self.started_at: Optional[datetime] = None
def start(self):
"""Start the worker process."""
if self.process and self.process.is_alive():
logger.warning(f"Worker {self.worker_id} is already running")
return
self.stop_event.clear()
self.process = mp.Process(
target=self._worker_loop,
name=f"Worker-{self.worker_id}",
daemon=True
)
self.process.start()
self.started_at = datetime.utcnow()
logger.info(
f"Worker {self.worker_id} started (PID: {self.process.pid}, "
f"Type: {self.worker_type.value})"
)
def stop(self, timeout: float = 30.0):
"""
Stop the worker process gracefully.
Args:
timeout: Maximum time to wait for worker to stop
"""
if not self.process or not self.process.is_alive():
logger.warning(f"Worker {self.worker_id} is not running")
return
logger.info(f"Stopping worker {self.worker_id}...")
self.stop_event.set()
self.process.join(timeout=timeout)
if self.process.is_alive():
logger.warning(f"Worker {self.worker_id} did not stop gracefully, terminating...")
self.process.terminate()
self.process.join(timeout=5.0)
if self.process.is_alive():
logger.error(f"Worker {self.worker_id} did not terminate, killing...")
self.process.kill()
logger.info(f"Worker {self.worker_id} stopped")
def is_alive(self) -> bool:
"""Check if worker process is alive."""
return self.process is not None and self.process.is_alive()
def get_status(self) -> dict:
"""Get worker status information."""
status_value = self.status.value
status_enum = WorkerStatus.IDLE
for s in WorkerStatus:
if s.value == status_value:
status_enum = s
break
current_job = self.current_job_id.value.decode('utf-8').strip('\x00')
return {
"worker_id": self.worker_id,
"type": self.worker_type.value,
"device_id": self.device_id,
"status": status_enum.value,
"current_job_id": current_job if current_job else None,
"jobs_completed": self.jobs_completed.value,
"jobs_failed": self.jobs_failed.value,
"is_alive": self.is_alive(),
"pid": self.process.pid if self.process else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
}
def _worker_loop(self):
"""
Main worker loop (runs in separate process).
This is the entry point for the worker process.
"""
# Set up logging in the worker process
logging.basicConfig(
level=logging.INFO,
format=f'[Worker-{self.worker_id}] %(levelname)s: %(message)s'
)
logger.info(f"Worker {self.worker_id} loop started")
# Initialize database and queue manager in worker process
# Each process needs its own DB connection
try:
db = Database(auto_create_tables=False)
queue_mgr = QueueManager()
except Exception as e:
logger.error(f"Failed to initialize worker: {e}")
self._set_status(WorkerStatus.ERROR)
return
# Main work loop
while not self.stop_event.is_set():
try:
# Try to get next job from queue
job = queue_mgr.get_next_job(self.worker_id)
if job is None:
# No jobs available, idle for a bit
self._set_status(WorkerStatus.IDLE)
time.sleep(2)
continue
# Process the job
self._set_status(WorkerStatus.BUSY)
self._set_current_job(job.id)
logger.info(f"Processing job {job.id}: {job.file_name}")
try:
self._process_job(job, queue_mgr)
self.jobs_completed.value += 1
logger.info(f"Job {job.id} completed successfully")
except Exception as e:
self.jobs_failed.value += 1
error_msg = f"Job processing failed: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
queue_mgr.mark_job_failed(job.id, error_msg)
finally:
self._clear_current_job()
except Exception as e:
logger.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
time.sleep(5) # Back off on errors
self._set_status(WorkerStatus.STOPPED)
logger.info(f"Worker {self.worker_id} loop ended")
def _process_job(self, job: Job, queue_mgr: QueueManager):
"""
Process a single transcription job.
Args:
job: Job to process
queue_mgr: Queue manager for updating progress
"""
# TODO: This will be implemented when we add the transcriber module
# For now, simulate work
# Stage 1: Detect language
queue_mgr.update_job_progress(
job.id,
progress=10.0,
stage=JobStage.DETECTING_LANGUAGE,
eta_seconds=60
)
time.sleep(2) # Simulate work
# Stage 2: Extract audio
queue_mgr.update_job_progress(
job.id,
progress=20.0,
stage=JobStage.EXTRACTING_AUDIO,
eta_seconds=50
)
time.sleep(2)
# Stage 3: Transcribe
queue_mgr.update_job_progress(
job.id,
progress=30.0,
stage=JobStage.TRANSCRIBING,
eta_seconds=40
)
# Simulate progressive transcription
for i in range(30, 90, 10):
time.sleep(1)
queue_mgr.update_job_progress(
job.id,
progress=float(i),
stage=JobStage.TRANSCRIBING,
eta_seconds=int((100 - i) / 2)
)
# Stage 4: Finalize
queue_mgr.update_job_progress(
job.id,
progress=95.0,
stage=JobStage.FINALIZING,
eta_seconds=5
)
time.sleep(1)
# Mark as completed
output_path = job.file_path.replace('.mkv', '.srt')
queue_mgr.mark_job_completed(
job.id,
output_path=output_path,
segments_count=100, # Simulated
srt_content="Simulated SRT content"
)
def _set_status(self, status: WorkerStatus):
"""Set worker status (thread-safe)."""
self.status.value = status.value
def _set_current_job(self, job_id: str):
"""Set current job ID (thread-safe)."""
job_id_bytes = job_id.encode('utf-8')
for i, byte in enumerate(job_id_bytes):
if i < len(self.current_job_id):
self.current_job_id[i] = byte
def _clear_current_job(self):
"""Clear current job ID (thread-safe)."""
for i in range(len(self.current_job_id)):
self.current_job_id[i] = b'\x00'