diff --git a/backend/API.md b/backend/API.md new file mode 100644 index 0000000..774beed --- /dev/null +++ b/backend/API.md @@ -0,0 +1,611 @@ +# TranscriptorIO REST API + +Documentación completa de las APIs REST del backend de TranscriptorIO. + +## 🚀 Inicio Rápido + +### Ejecutar el servidor + +```bash +# Usando el CLI +python backend/cli.py server --host 0.0.0.0 --port 8000 + +# Con auto-reload (desarrollo) +python backend/cli.py server --reload + +# Con múltiples workers (producción) +python backend/cli.py server --workers 4 +``` + +### Documentación interactiva + +Una vez iniciado el servidor, accede a: +- **Swagger UI**: http://localhost:8000/docs +- **ReDoc**: http://localhost:8000/redoc + +## 📋 Endpoints + +### System Status + +#### `GET /` +Información básica de la API. + +**Response:** +```json +{ + "name": "TranscriptorIO API", + "version": "1.0.0", + "status": "running" +} +``` + +#### `GET /health` +Health check para monitoring. + +**Response:** +```json +{ + "status": "healthy", + "database": "connected", + "workers": 2, + "queue_size": 5 +} +``` + +#### `GET /api/status` +Estado completo del sistema. + +**Response:** +```json +{ + "system": { + "status": "running", + "uptime_seconds": 3600.5 + }, + "workers": { + "total_workers": 2, + "cpu_workers": 1, + "gpu_workers": 1, + "idle_workers": 1, + "busy_workers": 1, + "total_jobs_completed": 42, + "total_jobs_failed": 2 + }, + "queue": { + "total": 100, + "queued": 5, + "processing": 2, + "completed": 90, + "failed": 3 + }, + "scanner": { + "scheduler_running": true, + "next_scan_time": "2026-01-13T02:00:00", + "watcher_running": true + } +} +``` + +--- + +## 👷 Workers API (`/api/workers`) + +### `GET /api/workers` +Lista todos los workers activos. + +**Response:** +```json +[ + { + "worker_id": "worker-cpu-0", + "worker_type": "cpu", + "device_id": null, + "status": "busy", + "current_job_id": "abc123", + "jobs_completed": 10, + "jobs_failed": 0, + "uptime_seconds": 3600.5, + "current_job_progress": 45.2, + "current_job_eta": 120 + } +] +``` + +### `GET /api/workers/stats` +Estadísticas del pool de workers. + +**Response:** +```json +{ + "total_workers": 2, + "cpu_workers": 1, + "gpu_workers": 1, + "idle_workers": 1, + "busy_workers": 1, + "stopped_workers": 0, + "error_workers": 0, + "total_jobs_completed": 42, + "total_jobs_failed": 2, + "uptime_seconds": 3600.5, + "is_running": true +} +``` + +### `GET /api/workers/{worker_id}` +Obtener estado de un worker específico. + +**Response:** Same as individual worker in list + +### `POST /api/workers` +Añadir un nuevo worker al pool. + +**Request:** +```json +{ + "worker_type": "gpu", + "device_id": 0 +} +``` + +**Response:** Worker status object + +### `DELETE /api/workers/{worker_id}` +Remover un worker del pool. + +**Query Params:** +- `timeout` (float, default=30.0): Timeout en segundos + +**Response:** +```json +{ + "message": "Worker worker-cpu-0 removed successfully" +} +``` + +### `POST /api/workers/pool/start` +Iniciar el pool de workers. + +**Query Params:** +- `cpu_workers` (int, default=0) +- `gpu_workers` (int, default=0) + +**Response:** +```json +{ + "message": "Worker pool started: 1 CPU workers, 1 GPU workers" +} +``` + +### `POST /api/workers/pool/stop` +Detener el pool de workers. + +**Query Params:** +- `timeout` (float, default=30.0) + +**Response:** +```json +{ + "message": "Worker pool stopped successfully" +} +``` + +--- + +## 📋 Jobs API (`/api/jobs`) + +### `GET /api/jobs` +Lista de trabajos con paginación. + +**Query Params:** +- `status_filter` (optional): queued, processing, completed, failed, cancelled +- `page` (int, default=1): Número de página +- `page_size` (int, default=50): Items por página + +**Response:** +```json +{ + "jobs": [ + { + "id": "abc123", + "file_path": "/media/anime/episode.mkv", + "file_name": "episode.mkv", + "status": "completed", + "priority": 10, + "source_lang": "ja", + "target_lang": "es", + "quality_preset": "fast", + "transcribe_or_translate": "transcribe", + "progress": 100.0, + "current_stage": "finalizing", + "eta_seconds": null, + "created_at": "2026-01-12T10:00:00", + "started_at": "2026-01-12T10:00:05", + "completed_at": "2026-01-12T10:05:30", + "output_path": "/media/anime/episode.es.srt", + "segments_count": 245, + "error": null, + "retry_count": 0, + "worker_id": "worker-gpu-0", + "vram_used_mb": 4096, + "processing_time_seconds": 325.5, + "model_used": "large-v3", + "device_used": "cuda:0" + } + ], + "total": 100, + "page": 1, + "page_size": 50 +} +``` + +### `GET /api/jobs/stats` +Estadísticas de la cola. + +**Response:** +```json +{ + "total_jobs": 100, + "queued": 5, + "processing": 2, + "completed": 90, + "failed": 3, + "cancelled": 0 +} +``` + +### `GET /api/jobs/{job_id}` +Obtener un trabajo específico. + +**Response:** Job object (same as in list) + +### `POST /api/jobs` +Crear un nuevo trabajo de transcripción. + +**Request:** +```json +{ + "file_path": "/media/anime/Attack on Titan S04E01.mkv", + "file_name": "Attack on Titan S04E01.mkv", + "source_lang": "ja", + "target_lang": "es", + "quality_preset": "fast", + "transcribe_or_translate": "transcribe", + "priority": 10, + "is_manual_request": true +} +``` + +**Response:** Created job object + +### `POST /api/jobs/{job_id}/retry` +Reintentar un trabajo fallido. + +**Response:** Updated job object + +### `DELETE /api/jobs/{job_id}` +Cancelar un trabajo. + +**Response:** +```json +{ + "message": "Job abc123 cancelled successfully" +} +``` + +### `POST /api/jobs/queue/clear` +Limpiar trabajos completados. + +**Response:** +```json +{ + "message": "Cleared 42 completed jobs" +} +``` + +--- + +## 📏 Scan Rules API (`/api/scan-rules`) + +### `GET /api/scan-rules` +Lista todas las reglas de escaneo. + +**Query Params:** +- `enabled_only` (bool, default=false): Solo reglas habilitadas + +**Response:** +```json +[ + { + "id": 1, + "name": "Japanese anime without Spanish subs", + "enabled": true, + "priority": 10, + "conditions": { + "audio_language_is": "ja", + "audio_language_not": null, + "audio_track_count_min": null, + "has_embedded_subtitle_lang": null, + "missing_embedded_subtitle_lang": "es", + "missing_external_subtitle_lang": "es", + "file_extension": ".mkv,.mp4" + }, + "action": { + "action_type": "transcribe", + "target_language": "es", + "quality_preset": "fast", + "job_priority": 5 + }, + "created_at": "2026-01-12T10:00:00", + "updated_at": null + } +] +``` + +### `GET /api/scan-rules/{rule_id}` +Obtener una regla específica. + +**Response:** Rule object (same as in list) + +### `POST /api/scan-rules` +Crear una nueva regla de escaneo. + +**Request:** +```json +{ + "name": "Japanese anime without Spanish subs", + "enabled": true, + "priority": 10, + "conditions": { + "audio_language_is": "ja", + "missing_embedded_subtitle_lang": "es", + "missing_external_subtitle_lang": "es", + "file_extension": ".mkv,.mp4" + }, + "action": { + "action_type": "transcribe", + "target_language": "es", + "quality_preset": "fast", + "job_priority": 5 + } +} +``` + +**Response:** Created rule object + +### `PUT /api/scan-rules/{rule_id}` +Actualizar una regla. + +**Request:** Same as POST (all fields optional) + +**Response:** Updated rule object + +### `DELETE /api/scan-rules/{rule_id}` +Eliminar una regla. + +**Response:** +```json +{ + "message": "Scan rule 1 deleted successfully" +} +``` + +### `POST /api/scan-rules/{rule_id}/toggle` +Activar/desactivar una regla. + +**Response:** Updated rule object + +--- + +## 🔍 Scanner API (`/api/scanner`) + +### `GET /api/scanner/status` +Estado del scanner. + +**Response:** +```json +{ + "scheduler_enabled": true, + "scheduler_running": true, + "next_scan_time": "2026-01-13T02:00:00", + "watcher_enabled": true, + "watcher_running": true, + "watched_paths": ["/media/anime", "/media/movies"], + "last_scan_time": "2026-01-12T02:00:00", + "total_scans": 1523 +} +``` + +### `POST /api/scanner/scan` +Ejecutar escaneo manual. + +**Request:** +```json +{ + "paths": ["/media/anime", "/media/movies"], + "recursive": true +} +``` + +**Response:** +```json +{ + "scanned_files": 150, + "matched_files": 25, + "jobs_created": 25, + "skipped_files": 125, + "paths_scanned": ["/media/anime", "/media/movies"] +} +``` + +### `POST /api/scanner/scheduler/start` +Iniciar escaneo programado. + +**Request:** +```json +{ + "enabled": true, + "cron_expression": "0 2 * * *", + "paths": ["/media/anime"], + "recursive": true +} +``` + +**Response:** +```json +{ + "message": "Scheduler started successfully" +} +``` + +### `POST /api/scanner/scheduler/stop` +Detener escaneo programado. + +**Response:** +```json +{ + "message": "Scheduler stopped successfully" +} +``` + +### `POST /api/scanner/watcher/start` +Iniciar observador de archivos. + +**Request:** +```json +{ + "enabled": true, + "paths": ["/media/anime"], + "recursive": true +} +``` + +**Response:** +```json +{ + "message": "File watcher started successfully" +} +``` + +### `POST /api/scanner/watcher/stop` +Detener observador de archivos. + +**Response:** +```json +{ + "message": "File watcher stopped successfully" +} +``` + +### `POST /api/scanner/analyze` +Analizar un archivo específico. + +**Query Params:** +- `file_path` (required): Ruta al archivo + +**Response:** +```json +{ + "file_path": "/media/anime/episode.mkv", + "audio_tracks": [ + { + "index": 0, + "codec": "aac", + "language": "ja", + "channels": 2 + } + ], + "embedded_subtitles": [], + "external_subtitles": [ + { + "path": "/media/anime/episode.en.srt", + "language": "en" + } + ], + "duration_seconds": 1440.5, + "is_video": true +} +``` + +--- + +## 🔐 Autenticación + +> **TODO**: Implementar autenticación con JWT tokens + +--- + +## 📊 Códigos de Error + +- `200 OK`: Éxito +- `201 Created`: Recurso creado +- `400 Bad Request`: Parámetros inválidos +- `404 Not Found`: Recurso no encontrado +- `409 Conflict`: Conflicto (ej: duplicado) +- `500 Internal Server Error`: Error del servidor + +--- + +## 🧪 Testing + +### cURL Examples + +```bash +# Get system status +curl http://localhost:8000/api/status + +# Create a job +curl -X POST http://localhost:8000/api/jobs \ + -H "Content-Type: application/json" \ + -d '{ + "file_path": "/media/anime/episode.mkv", + "file_name": "episode.mkv", + "target_lang": "es", + "quality_preset": "fast" + }' + +# Add a GPU worker +curl -X POST http://localhost:8000/api/workers \ + -H "Content-Type: application/json" \ + -d '{ + "worker_type": "gpu", + "device_id": 0 + }' +``` + +### Python Example + +```python +import requests + +# Base URL +BASE_URL = "http://localhost:8000" + +# Create a job +response = requests.post( + f"{BASE_URL}/api/jobs", + json={ + "file_path": "/media/anime/episode.mkv", + "file_name": "episode.mkv", + "target_lang": "es", + "quality_preset": "fast" + } +) + +job = response.json() +print(f"Job created: {job['id']}") + +# Check job status +response = requests.get(f"{BASE_URL}/api/jobs/{job['id']}") +status = response.json() +print(f"Job status: {status['status']} - {status['progress']}%") +``` + +--- + +## 📝 Notas + +- Todas las fechas están en formato ISO 8601 UTC +- Los idiomas usan códigos ISO 639-1 (2 letras: ja, en, es, fr, etc.) +- La paginación usa índices base-1 (primera página = 1) +- Los workers se identifican por ID único generado automáticamente + diff --git a/backend/api/__init__.py b/backend/api/__init__.py index 89b73bd..23e6269 100644 --- a/backend/api/__init__.py +++ b/backend/api/__init__.py @@ -1 +1,17 @@ -"""TranscriptorIO API Module.""" \ No newline at end of file +"""API module for TranscriptorIO backend.""" +from backend.api.workers import router as workers_router +from backend.api.jobs import router as jobs_router +from backend.api.scan_rules import router as scan_rules_router +from backend.api.scanner import router as scanner_router +from backend.api.settings import router as settings_router +from backend.api.setup_wizard import router as setup_router + +__all__ = [ + "workers_router", + "jobs_router", + "scan_rules_router", + "scanner_router", + "settings_router", + "setup_router", +] + diff --git a/backend/api/filesystem.py b/backend/api/filesystem.py new file mode 100644 index 0000000..75bf9d3 --- /dev/null +++ b/backend/api/filesystem.py @@ -0,0 +1,113 @@ +"""Filesystem browsing API for path selection.""" +import logging +import os +from typing import List, Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/filesystem", tags=["filesystem"]) + + +class DirectoryItem(BaseModel): + """Directory item information.""" + name: str + path: str + is_directory: bool + is_readable: bool + + +class DirectoryListingResponse(BaseModel): + """Directory listing response.""" + current_path: str + parent_path: Optional[str] = None + items: List[DirectoryItem] + + +@router.get("/browse", response_model=DirectoryListingResponse) +async def browse_directory(path: str = "/"): + """Browse filesystem directory.""" + try: + path = os.path.abspath(path) + + if not os.path.exists(path): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Path does not exist: {path}" + ) + + if not os.path.isdir(path): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Path is not a directory: {path}" + ) + + parent_path = os.path.dirname(path) if path != "/" else None + + items = [] + try: + entries = os.listdir(path) + entries.sort() + + for entry in entries: + entry_path = os.path.join(path, entry) + + try: + is_dir = os.path.isdir(entry_path) + is_readable = os.access(entry_path, os.R_OK) + + if is_dir: + items.append(DirectoryItem( + name=entry, + path=entry_path, + is_directory=True, + is_readable=is_readable + )) + except (PermissionError, OSError) as e: + logger.debug(f"Cannot access {entry_path}: {e}") + continue + + except PermissionError: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Permission denied: {path}" + ) + + return DirectoryListingResponse( + current_path=path, + parent_path=parent_path, + items=items + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error browsing directory {path}: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error browsing directory: {str(e)}" + ) + + +@router.get("/common-paths", response_model=List[DirectoryItem]) +async def get_common_paths(): + """Get list of common starting paths.""" + common_paths = ["/", "/home", "/media", "/mnt", "/opt", "/srv", "/var", "/tmp"] + + items = [] + for path in common_paths: + if os.path.exists(path) and os.path.isdir(path): + try: + is_readable = os.access(path, os.R_OK) + items.append(DirectoryItem( + name=path, + path=path, + is_directory=True, + is_readable=is_readable + )) + except (PermissionError, OSError): + continue + + return items + diff --git a/backend/api/jobs.py b/backend/api/jobs.py new file mode 100644 index 0000000..0dd5651 --- /dev/null +++ b/backend/api/jobs.py @@ -0,0 +1,379 @@ +"""Job management API routes.""" +import logging +from typing import List, Optional +from fastapi import APIRouter, HTTPException, Query, status +from pydantic import BaseModel, Field + +from backend.core.models import JobStatus, QualityPreset + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/jobs", tags=["jobs"]) + + +# === REQUEST/RESPONSE MODELS === + +class JobCreateRequest(BaseModel): + """Request to create a new job.""" + file_path: str = Field(..., description="Full path to the media file") + file_name: str = Field(..., description="Name of the file") + source_lang: Optional[str] = Field(None, description="Source language (ISO 639-2)") + target_lang: str = Field(..., description="Target subtitle language (ISO 639-2)") + quality_preset: str = Field("fast", description="Quality preset: fast, balanced, best") + transcribe_or_translate: str = Field("transcribe", description="Operation: transcribe or translate") + priority: int = Field(0, description="Job priority (higher = processed first)") + is_manual_request: bool = Field(True, description="Whether this is a manual request") + + class Config: + json_schema_extra = { + "example": { + "file_path": "/media/anime/Attack on Titan S04E01.mkv", + "file_name": "Attack on Titan S04E01.mkv", + "source_lang": "jpn", + "target_lang": "spa", + "quality_preset": "fast", + "transcribe_or_translate": "transcribe", + "priority": 10 + } + } + + +class JobResponse(BaseModel): + """Job response model.""" + id: str + file_path: str + file_name: str + job_type: str = "transcription" # Default to transcription for backward compatibility + status: str + priority: int + source_lang: Optional[str] + target_lang: Optional[str] + quality_preset: Optional[str] + transcribe_or_translate: str + progress: float + current_stage: Optional[str] + eta_seconds: Optional[int] + created_at: Optional[str] + started_at: Optional[str] + completed_at: Optional[str] + output_path: Optional[str] + segments_count: Optional[int] + error: Optional[str] + retry_count: int + worker_id: Optional[str] + vram_used_mb: Optional[int] + processing_time_seconds: Optional[float] + model_used: Optional[str] + device_used: Optional[str] + + +class JobListResponse(BaseModel): + """Job list response with pagination.""" + jobs: List[JobResponse] + total: int + page: int + page_size: int + + +class QueueStatsResponse(BaseModel): + """Queue statistics response.""" + total_jobs: int + queued: int + processing: int + completed: int + failed: int + cancelled: int + + +class MessageResponse(BaseModel): + """Generic message response.""" + message: str + + +# === ROUTES === + +@router.get("/", response_model=JobListResponse) +async def get_jobs( + status_filter: Optional[str] = Query(None, description="Filter by status"), + page: int = Query(1, ge=1, description="Page number"), + page_size: int = Query(50, ge=1, le=500, description="Items per page"), +): + """ + Get list of jobs with optional filtering and pagination. + + Args: + status_filter: Filter by job status (queued/processing/completed/failed/cancelled) + page: Page number (1-based) + page_size: Number of items per page + + Returns: + Paginated list of jobs + """ + from backend.core.queue_manager import queue_manager + + # Validate status filter + status_enum = None + if status_filter: + try: + status_enum = JobStatus(status_filter.lower()) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid status: {status_filter}" + ) + + # Get jobs + jobs = queue_manager.get_all_jobs( + status_filter=status_enum, + limit=page_size, + offset=(page - 1) * page_size + ) + + # Get total count + total = queue_manager.count_jobs(status_filter=status_enum) + + return JobListResponse( + jobs=[JobResponse(**job.to_dict()) for job in jobs], + total=total, + page=page, + page_size=page_size + ) + + +@router.get("/stats", response_model=QueueStatsResponse) +async def get_queue_stats(): + """ + Get queue statistics. + + Returns: + Queue statistics + """ + from backend.core.queue_manager import queue_manager + + stats = queue_manager.get_queue_stats() + + return QueueStatsResponse(**stats) + + +@router.get("/{job_id}", response_model=JobResponse) +async def get_job(job_id: str): + """ + Get a specific job by ID. + + Args: + job_id: Job ID + + Returns: + Job object + + Raises: + 404: Job not found + """ + from backend.core.database import database + from backend.core.models import Job + + with database.get_session() as session: + job = session.query(Job).filter(Job.id == job_id).first() + + if not job: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Job {job_id} not found" + ) + + job_dict = job.to_dict() + + return JobResponse(**job_dict) + + +@router.post("/", response_model=JobResponse, status_code=status.HTTP_201_CREATED) +async def create_job(request: JobCreateRequest): + """ + Create a new transcription job. + + Args: + request: Job creation request + + Returns: + Created job object + + Raises: + 400: Invalid quality preset + 409: Job already exists for this file + """ + from backend.core.queue_manager import queue_manager + + # Validate quality preset + try: + quality = QualityPreset(request.quality_preset.lower()) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid quality preset: {request.quality_preset}" + ) + + # Create job + job = queue_manager.add_job( + file_path=request.file_path, + file_name=request.file_name, + source_lang=request.source_lang, + target_lang=request.target_lang, + quality_preset=quality, + transcribe_or_translate=request.transcribe_or_translate, + priority=request.priority, + is_manual_request=request.is_manual_request, + ) + + if not job: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Job already exists for {request.file_name}" + ) + + logger.info(f"Job {job.id} created via API for {request.file_name}") + + return JobResponse(**job.to_dict()) + + +@router.post("/{job_id}/retry", response_model=JobResponse) +async def retry_job(job_id: str): + """ + Retry a failed job. + + Args: + job_id: Job ID to retry + + Returns: + Updated job object + + Raises: + 404: Job not found + 400: Job cannot be retried + """ + from backend.core.queue_manager import queue_manager + from backend.core.database import database + from backend.core.models import Job, JobStatus + + # Check if job exists and can be retried (within session) + with database.get_session() as session: + job = session.query(Job).filter(Job.id == job_id).first() + + if not job: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Job {job_id} not found" + ) + + # Access attributes while session is active + can_retry = job.status == JobStatus.FAILED + current_status = job.status.value + + if not can_retry: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job {job_id} cannot be retried (status={current_status})" + ) + + # Reset job to queued + success = queue_manager.retry_job(job_id) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to retry job {job_id}" + ) + + # Get updated job and return + with database.get_session() as session: + job = session.query(Job).filter(Job.id == job_id).first() + job_dict = job.to_dict() if job else {} + + logger.info(f"Job {job_id} retried via API") + + return JobResponse(**job_dict) + + +@router.delete("/{job_id}", response_model=MessageResponse) +async def cancel_job(job_id: str): + """ + Cancel a job. + + Args: + job_id: Job ID to cancel + + Returns: + Success message + + Raises: + 404: Job not found + 400: Job already completed + """ + from backend.core.queue_manager import queue_manager + from backend.core.database import database + from backend.core.models import Job, JobStatus + + # Check if job exists and can be cancelled (within session) + with database.get_session() as session: + job = session.query(Job).filter(Job.id == job_id).first() + + if not job: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Job {job_id} not found" + ) + + # Access attributes while session is active + is_terminal = job.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED) + current_status = job.status.value + + if is_terminal: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Job {job_id} is already in terminal state: {current_status}" + ) + + # Cancel job + success = queue_manager.cancel_job(job_id) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to cancel job {job_id}" + ) + + logger.info(f"Job {job_id} cancelled via API") + + return MessageResponse(message=f"Job {job_id} cancelled successfully") + + +@router.post("/{job_id}/cancel", response_model=MessageResponse) +async def cancel_job_post(job_id: str): + """ + Cancel a job (POST alias). + + Args: + job_id: Job ID to cancel + + Returns: + Success message + """ + # Reuse the delete endpoint logic + return await cancel_job(job_id) + + +@router.post("/queue/clear", response_model=MessageResponse) +async def clear_completed_jobs(): + """ + Clear all completed jobs from the queue. + + Returns: + Success message with count of cleared jobs + """ + from backend.core.queue_manager import queue_manager + + count = queue_manager.clear_completed_jobs() + + logger.info(f"Cleared {count} completed jobs via API") + + return MessageResponse(message=f"Cleared {count} completed jobs") + diff --git a/backend/api/scan_rules.py b/backend/api/scan_rules.py new file mode 100644 index 0000000..0e578d6 --- /dev/null +++ b/backend/api/scan_rules.py @@ -0,0 +1,351 @@ +"""Scan rules management API routes.""" +import logging +from typing import List, Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/scan-rules", tags=["scan-rules"]) + + +# === REQUEST/RESPONSE MODELS === + +class ScanRuleConditions(BaseModel): + """Scan rule conditions.""" + audio_language_is: Optional[str] = Field(None, description="Audio language must be (ISO 639-2)") + audio_language_not: Optional[str] = Field(None, description="Audio language must NOT be (comma-separated)") + audio_track_count_min: Optional[int] = Field(None, description="Minimum number of audio tracks") + has_embedded_subtitle_lang: Optional[str] = Field(None, description="Must have embedded subtitle in language") + missing_embedded_subtitle_lang: Optional[str] = Field(None, description="Must NOT have embedded subtitle") + missing_external_subtitle_lang: Optional[str] = Field(None, description="Must NOT have external .srt file") + file_extension: Optional[str] = Field(None, description="File extensions filter (comma-separated)") + + +class ScanRuleAction(BaseModel): + """Scan rule action.""" + action_type: str = Field("transcribe", description="Action type: transcribe or translate") + target_language: str = Field(..., description="Target subtitle language (ISO 639-2)") + quality_preset: str = Field("fast", description="Quality preset: fast, balanced, best") + job_priority: int = Field(0, description="Priority for created jobs") + + +class ScanRuleCreateRequest(BaseModel): + """Request to create a scan rule.""" + name: str = Field(..., description="Rule name") + enabled: bool = Field(True, description="Whether rule is enabled") + priority: int = Field(0, description="Rule priority (higher = evaluated first)") + conditions: ScanRuleConditions + action: ScanRuleAction + + class Config: + json_schema_extra = { + "example": { + "name": "Japanese anime without Spanish subs", + "enabled": True, + "priority": 10, + "conditions": { + "audio_language_is": "jpn", + "missing_embedded_subtitle_lang": "spa", + "missing_external_subtitle_lang": "spa", + "file_extension": ".mkv,.mp4" + }, + "action": { + "action_type": "transcribe", + "target_language": "spa", + "quality_preset": "fast", + "job_priority": 5 + } + } + } + + +class ScanRuleUpdateRequest(BaseModel): + """Request to update a scan rule.""" + name: Optional[str] = Field(None, description="Rule name") + enabled: Optional[bool] = Field(None, description="Whether rule is enabled") + priority: Optional[int] = Field(None, description="Rule priority") + conditions: Optional[ScanRuleConditions] = None + action: Optional[ScanRuleAction] = None + + +class ScanRuleResponse(BaseModel): + """Scan rule response.""" + id: int + name: str + enabled: bool + priority: int + conditions: dict + action: dict + created_at: Optional[str] + updated_at: Optional[str] + + +class MessageResponse(BaseModel): + """Generic message response.""" + message: str + + +# === ROUTES === + +@router.get("/", response_model=List[ScanRuleResponse]) +async def get_all_rules(enabled_only: bool = False): + """ + Get all scan rules. + + Args: + enabled_only: Only return enabled rules + + Returns: + List of scan rules (ordered by priority DESC) + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + query = session.query(ScanRule) + + if enabled_only: + query = query.filter(ScanRule.enabled == True) + + rules = query.order_by(ScanRule.priority.desc()).all() + + return [ScanRuleResponse(**rule.to_dict()) for rule in rules] + + +@router.get("/{rule_id}", response_model=ScanRuleResponse) +async def get_rule(rule_id: int): + """ + Get a specific scan rule. + + Args: + rule_id: Rule ID + + Returns: + Scan rule object + + Raises: + 404: Rule not found + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first() + + if not rule: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Scan rule {rule_id} not found" + ) + + return ScanRuleResponse(**rule.to_dict()) + + +@router.post("/", response_model=ScanRuleResponse, status_code=status.HTTP_201_CREATED) +async def create_rule(request: ScanRuleCreateRequest): + """ + Create a new scan rule. + + Args: + request: Rule creation request + + Returns: + Created rule object + + Raises: + 400: Invalid data + 409: Rule with same name already exists + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + # Check for duplicate name + existing = session.query(ScanRule).filter(ScanRule.name == request.name).first() + + if existing: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Scan rule with name '{request.name}' already exists" + ) + + # Create rule + rule = ScanRule( + name=request.name, + enabled=request.enabled, + priority=request.priority, + # Conditions + audio_language_is=request.conditions.audio_language_is, + audio_language_not=request.conditions.audio_language_not, + audio_track_count_min=request.conditions.audio_track_count_min, + has_embedded_subtitle_lang=request.conditions.has_embedded_subtitle_lang, + missing_embedded_subtitle_lang=request.conditions.missing_embedded_subtitle_lang, + missing_external_subtitle_lang=request.conditions.missing_external_subtitle_lang, + file_extension=request.conditions.file_extension, + # Action + action_type=request.action.action_type, + target_language=request.action.target_language, + quality_preset=request.action.quality_preset, + job_priority=request.action.job_priority, + ) + + session.add(rule) + session.commit() + session.refresh(rule) + + logger.info(f"Scan rule created via API: {rule.name} (ID: {rule.id})") + + return ScanRuleResponse(**rule.to_dict()) + + +@router.put("/{rule_id}", response_model=ScanRuleResponse) +async def update_rule(rule_id: int, request: ScanRuleUpdateRequest): + """ + Update a scan rule. + + Args: + rule_id: Rule ID to update + request: Rule update request + + Returns: + Updated rule object + + Raises: + 404: Rule not found + 409: Name already exists + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first() + + if not rule: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Scan rule {rule_id} not found" + ) + + # Check for duplicate name + if request.name and request.name != rule.name: + existing = session.query(ScanRule).filter(ScanRule.name == request.name).first() + if existing: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Scan rule with name '{request.name}' already exists" + ) + + # Update fields + if request.name is not None: + rule.name = request.name + if request.enabled is not None: + rule.enabled = request.enabled + if request.priority is not None: + rule.priority = request.priority + + # Update conditions + if request.conditions: + if request.conditions.audio_language_is is not None: + rule.audio_language_is = request.conditions.audio_language_is + if request.conditions.audio_language_not is not None: + rule.audio_language_not = request.conditions.audio_language_not + if request.conditions.audio_track_count_min is not None: + rule.audio_track_count_min = request.conditions.audio_track_count_min + if request.conditions.has_embedded_subtitle_lang is not None: + rule.has_embedded_subtitle_lang = request.conditions.has_embedded_subtitle_lang + if request.conditions.missing_embedded_subtitle_lang is not None: + rule.missing_embedded_subtitle_lang = request.conditions.missing_embedded_subtitle_lang + if request.conditions.missing_external_subtitle_lang is not None: + rule.missing_external_subtitle_lang = request.conditions.missing_external_subtitle_lang + if request.conditions.file_extension is not None: + rule.file_extension = request.conditions.file_extension + + # Update action + if request.action: + if request.action.action_type is not None: + rule.action_type = request.action.action_type + if request.action.target_language is not None: + rule.target_language = request.action.target_language + if request.action.quality_preset is not None: + rule.quality_preset = request.action.quality_preset + if request.action.job_priority is not None: + rule.job_priority = request.action.job_priority + + session.commit() + session.refresh(rule) + + logger.info(f"Scan rule updated via API: {rule.name} (ID: {rule.id})") + + return ScanRuleResponse(**rule.to_dict()) + + +@router.delete("/{rule_id}", response_model=MessageResponse) +async def delete_rule(rule_id: int): + """ + Delete a scan rule. + + Args: + rule_id: Rule ID to delete + + Returns: + Success message + + Raises: + 404: Rule not found + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first() + + if not rule: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Scan rule {rule_id} not found" + ) + + rule_name = rule.name + session.delete(rule) + session.commit() + + logger.info(f"Scan rule deleted via API: {rule_name} (ID: {rule_id})") + + return MessageResponse(message=f"Scan rule {rule_id} deleted successfully") + + +@router.post("/{rule_id}/toggle", response_model=ScanRuleResponse) +async def toggle_rule(rule_id: int): + """ + Toggle a scan rule enabled/disabled. + + Args: + rule_id: Rule ID to toggle + + Returns: + Updated rule object + + Raises: + 404: Rule not found + """ + from backend.core.database import database + from backend.scanning.models import ScanRule + + with database.get_session() as session: + rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first() + + if not rule: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Scan rule {rule_id} not found" + ) + + rule.enabled = not rule.enabled + session.commit() + session.refresh(rule) + + logger.info(f"Scan rule toggled via API: {rule.name} -> {'enabled' if rule.enabled else 'disabled'}") + + return ScanRuleResponse(**rule.to_dict()) + diff --git a/backend/api/scanner.py b/backend/api/scanner.py new file mode 100644 index 0000000..ea6252f --- /dev/null +++ b/backend/api/scanner.py @@ -0,0 +1,312 @@ +"""Library scanner API routes.""" +import logging +from typing import List, Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/scanner", tags=["scanner"]) + + +# === REQUEST/RESPONSE MODELS === + +class ScanRequest(BaseModel): + """Request to scan paths.""" + paths: List[str] = Field(..., description="Paths to scan") + recursive: bool = Field(True, description="Scan subdirectories") + + class Config: + json_schema_extra = { + "example": { + "paths": ["/media/anime", "/media/movies"], + "recursive": True + } + } + + +class ScanResult(BaseModel): + """Scan result summary.""" + scanned_files: int + matched_files: int + jobs_created: int + skipped_files: int + paths_scanned: List[str] + + +class ScheduleConfig(BaseModel): + """Scanner schedule configuration.""" + enabled: bool = Field(..., description="Enable scheduled scanning") + cron_expression: str = Field(..., description="Cron expression for schedule") + paths: List[str] = Field(..., description="Paths to scan") + recursive: bool = Field(True, description="Scan subdirectories") + + class Config: + json_schema_extra = { + "example": { + "enabled": True, + "cron_expression": "0 2 * * *", # Daily at 2 AM + "paths": ["/media/anime", "/media/movies"], + "recursive": True + } + } + + +class WatcherConfig(BaseModel): + """File watcher configuration.""" + enabled: bool = Field(..., description="Enable file watcher") + paths: List[str] = Field(..., description="Paths to watch") + recursive: bool = Field(True, description="Watch subdirectories") + + class Config: + json_schema_extra = { + "example": { + "enabled": True, + "paths": ["/media/anime", "/media/movies"], + "recursive": True + } + } + + +class ScannerStatus(BaseModel): + """Scanner status response.""" + scheduler_enabled: bool + scheduler_running: bool + next_scan_time: Optional[str] + watcher_enabled: bool + watcher_running: bool + watched_paths: List[str] + last_scan_time: Optional[str] + total_scans: int + + +class MessageResponse(BaseModel): + """Generic message response.""" + message: str + + +# === ROUTES === + +@router.get("/status", response_model=ScannerStatus) +async def get_scanner_status(): + """ + Get library scanner status. + + Returns: + Scanner status information + """ + from backend.scanning.library_scanner import library_scanner + + status_dict = library_scanner.get_status() + + return ScannerStatus(**status_dict) + + +@router.post("/scan", response_model=ScanResult) +async def scan_paths(request: Optional[ScanRequest] = None): + """ + Manually trigger a library scan. + + Args: + request: Optional scan request with paths. If not provided, uses library_paths from settings. + + Returns: + Scan result summary + """ + from backend.scanning.library_scanner import library_scanner + from backend.core.settings_service import settings_service + + # Use request paths or load from settings + if request is None: + library_paths = settings_service.get('library_paths', '') + # Handle both string (comma-separated) and list types + if isinstance(library_paths, list): + paths = [p.strip() for p in library_paths if p and p.strip()] + elif isinstance(library_paths, str) and library_paths: + paths = [p.strip() for p in library_paths.split(',') if p.strip()] + else: + paths = [] + recursive = True + else: + paths = request.paths + recursive = request.recursive + + if not paths: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No library paths configured. Please configure library_paths in settings." + ) + + logger.info(f"Manual scan triggered via API: {paths}") + + result = library_scanner.scan_paths( + paths=paths, + recursive=recursive + ) + + return ScanResult(**result) + + +@router.post("/scheduler/start", response_model=MessageResponse) +async def start_scheduler(config: Optional[ScheduleConfig] = None): + """ + Start scheduled scanning. + + Args: + config: Optional scheduler configuration. If not provided, uses settings from database. + + Returns: + Success message + """ + from backend.scanning.library_scanner import library_scanner + from backend.core.settings_service import settings_service + + try: + # Use config from request or load from settings + if config is None: + # Load interval from database settings (in minutes) + interval_minutes = settings_service.get('scanner_schedule_interval_minutes', 360) # Default: 6 hours + interval_minutes = int(interval_minutes) if interval_minutes else 360 + else: + # Convert cron to interval (simplified - just use 360 minutes for now) + interval_minutes = 360 + + if interval_minutes <= 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid scanner interval. Must be greater than 0 minutes." + ) + + library_scanner.start_scheduler(interval_minutes=interval_minutes) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + + logger.info(f"Scheduler started via API with interval: {interval_minutes} minutes") + + return MessageResponse(message=f"Scheduler started successfully (every {interval_minutes} minutes)") + + +@router.post("/scheduler/stop", response_model=MessageResponse) +async def stop_scheduler(): + """ + Stop scheduled scanning. + + Returns: + Success message + """ + from backend.scanning.library_scanner import library_scanner + + library_scanner.stop_scheduler() + + logger.info("Scheduler stopped via API") + + return MessageResponse(message="Scheduler stopped successfully") + + +@router.post("/watcher/start", response_model=MessageResponse) +async def start_watcher(config: Optional[WatcherConfig] = None): + """ + Start file watcher. + + Args: + config: Optional watcher configuration. If not provided, uses settings from database. + + Returns: + Success message + """ + from backend.scanning.library_scanner import library_scanner + from backend.core.settings_service import settings_service + + # Use config from request or load from settings + if config is None: + library_paths = settings_service.get('library_paths', '') + # Handle both string (comma-separated) and list types + if isinstance(library_paths, list): + paths = [p.strip() for p in library_paths if p and p.strip()] + elif isinstance(library_paths, str) and library_paths: + paths = [p.strip() for p in library_paths.split(',') if p.strip()] + else: + paths = [] + recursive = True + else: + paths = config.paths + recursive = config.recursive + + if not paths: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No library paths configured. Please configure library_paths in settings." + ) + + library_scanner.start_file_watcher( + paths=paths, + recursive=recursive + ) + + logger.info(f"File watcher started via API: {paths}") + + return MessageResponse(message="File watcher started successfully") + + +@router.post("/watcher/stop", response_model=MessageResponse) +async def stop_watcher(): + """ + Stop file watcher. + + Returns: + Success message + """ + from backend.scanning.library_scanner import library_scanner + + library_scanner.stop_file_watcher() + + logger.info("File watcher stopped via API") + + return MessageResponse(message="File watcher stopped successfully") + + +@router.post("/analyze", response_model=dict) +async def analyze_file(file_path: str): + """ + Analyze a single file. + + Args: + file_path: Path to file to analyze + + Returns: + File analysis result + + Raises: + 400: Invalid file path + 404: File not found + """ + from backend.scanning.file_analyzer import FileAnalyzer + import os + + if not os.path.exists(file_path): + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"File not found: {file_path}" + ) + + if not os.path.isfile(file_path): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Path is not a file: {file_path}" + ) + + analyzer = FileAnalyzer() + + try: + analysis = analyzer.analyze(file_path) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to analyze file: {str(e)}" + ) + + return analysis.to_dict() + diff --git a/backend/api/settings.py b/backend/api/settings.py new file mode 100644 index 0000000..8f86046 --- /dev/null +++ b/backend/api/settings.py @@ -0,0 +1,323 @@ +"""Settings management API routes.""" +import logging +from typing import List, Optional +from fastapi import APIRouter, HTTPException, Query, status +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/settings", tags=["settings"]) + + +# === REQUEST/RESPONSE MODELS === + +class SettingResponse(BaseModel): + """Setting response model.""" + id: int + key: str + value: Optional[str] + description: Optional[str] + category: Optional[str] + value_type: Optional[str] + created_at: Optional[str] + updated_at: Optional[str] + + +class SettingUpdateRequest(BaseModel): + """Setting update request.""" + value: str = Field(..., description="New value (as string)") + + class Config: + json_schema_extra = { + "example": { + "value": "true" + } + } + + +class SettingCreateRequest(BaseModel): + """Setting create request.""" + key: str = Field(..., description="Setting key") + value: Optional[str] = Field(None, description="Setting value") + description: Optional[str] = Field(None, description="Description") + category: Optional[str] = Field(None, description="Category") + value_type: Optional[str] = Field("string", description="Value type") + + class Config: + json_schema_extra = { + "example": { + "key": "custom_setting", + "value": "value", + "description": "Custom setting description", + "category": "general", + "value_type": "string" + } + } + + +class BulkUpdateRequest(BaseModel): + """Bulk update request.""" + settings: dict = Field(..., description="Dictionary of key-value pairs") + + class Config: + json_schema_extra = { + "example": { + "settings": { + "worker_cpu_count": "2", + "worker_gpu_count": "1", + "scanner_enabled": "true" + } + } + } + + +class MessageResponse(BaseModel): + """Generic message response.""" + message: str + + +# === ROUTES === + +@router.get("/", response_model=List[SettingResponse]) +async def get_all_settings(category: Optional[str] = Query(None, description="Filter by category")): + """ + Get all settings or filter by category. + + Args: + category: Optional category filter (general, workers, transcription, scanner, bazarr) + + Returns: + List of settings + """ + from backend.core.settings_service import settings_service + + if category: + settings = settings_service.get_by_category(category) + else: + settings = settings_service.get_all() + + return [SettingResponse(**s.to_dict()) for s in settings] + + +@router.get("/{key}", response_model=SettingResponse) +async def get_setting(key: str): + """ + Get a specific setting by key. + + Args: + key: Setting key + + Returns: + Setting object + + Raises: + 404: Setting not found + """ + from backend.core.database import database + from backend.core.settings_model import SystemSettings + + with database.get_session() as session: + setting = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if not setting: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Setting '{key}' not found" + ) + + return SettingResponse(**setting.to_dict()) + + +@router.put("/{key}", response_model=SettingResponse) +async def update_setting(key: str, request: SettingUpdateRequest): + """ + Update a setting value. + + Args: + key: Setting key + request: Update request with new value + + Returns: + Updated setting object + + Raises: + 404: Setting not found + 400: Invalid value (e.g., GPU workers without GPU) + """ + from backend.core.settings_service import settings_service + from backend.core.database import database + from backend.core.settings_model import SystemSettings + from backend.core.system_monitor import system_monitor + + value = request.value + + # Validate GPU worker count - force to 0 if no GPU available + if key == 'worker_gpu_count': + gpu_count = int(value) if value else 0 + if gpu_count > 0 and system_monitor.gpu_count == 0: + logger.warning( + f"Attempted to set worker_gpu_count={gpu_count} but no GPU detected. " + "Forcing to 0." + ) + value = '0' + + success = settings_service.set(key, value) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to update setting '{key}'" + ) + + # Return updated setting + with database.get_session() as session: + setting = session.query(SystemSettings).filter(SystemSettings.key == key).first() + + if not setting: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Setting '{key}' not found" + ) + + return SettingResponse(**setting.to_dict()) + + +@router.post("/bulk-update", response_model=MessageResponse) +async def bulk_update_settings(request: BulkUpdateRequest): + """ + Update multiple settings at once. + + Args: + request: Bulk update request with settings dictionary + + Returns: + Success message + """ + from backend.core.settings_service import settings_service + from backend.core.system_monitor import system_monitor + + # Validate GPU worker count - force to 0 if no GPU available + settings_to_update = request.settings.copy() + if 'worker_gpu_count' in settings_to_update: + gpu_count = int(settings_to_update.get('worker_gpu_count', 0)) + if gpu_count > 0 and system_monitor.gpu_count == 0: + logger.warning( + f"Attempted to set worker_gpu_count={gpu_count} but no GPU detected. " + "Forcing to 0." + ) + settings_to_update['worker_gpu_count'] = '0' + + success = settings_service.bulk_update(settings_to_update) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to update settings" + ) + + logger.info(f"Bulk updated {len(request.settings)} settings") + + return MessageResponse(message=f"Updated {len(request.settings)} settings successfully") + + +@router.post("/", response_model=SettingResponse, status_code=status.HTTP_201_CREATED) +async def create_setting(request: SettingCreateRequest): + """ + Create a new setting. + + Args: + request: Create request with setting details + + Returns: + Created setting object + + Raises: + 409: Setting already exists + """ + from backend.core.settings_service import settings_service + from backend.core.database import database + from backend.core.settings_model import SystemSettings + + # Check if exists + with database.get_session() as session: + existing = session.query(SystemSettings).filter(SystemSettings.key == request.key).first() + + if existing: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Setting '{request.key}' already exists" + ) + + # Create + success = settings_service.set( + key=request.key, + value=request.value, + description=request.description, + category=request.category, + value_type=request.value_type + ) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to create setting" + ) + + # Return created setting + with database.get_session() as session: + setting = session.query(SystemSettings).filter(SystemSettings.key == request.key).first() + return SettingResponse(**setting.to_dict()) + + +@router.delete("/{key}", response_model=MessageResponse) +async def delete_setting(key: str): + """ + Delete a setting. + + Args: + key: Setting key + + Returns: + Success message + + Raises: + 404: Setting not found + """ + from backend.core.settings_service import settings_service + + success = settings_service.delete(key) + + if not success: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Setting '{key}' not found" + ) + + logger.info(f"Setting deleted: {key}") + + return MessageResponse(message=f"Setting '{key}' deleted successfully") + + +@router.post("/init-defaults", response_model=MessageResponse) +async def init_default_settings(): + """ + Initialize default settings. + + Creates all default settings if they don't exist. + Safe to call multiple times (won't overwrite existing). + + Returns: + Success message + """ + from backend.core.settings_service import settings_service + + try: + settings_service.init_default_settings() + return MessageResponse(message="Default settings initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize default settings: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to initialize default settings: {str(e)}" + ) + diff --git a/backend/api/setup_wizard.py b/backend/api/setup_wizard.py new file mode 100644 index 0000000..df97577 --- /dev/null +++ b/backend/api/setup_wizard.py @@ -0,0 +1,313 @@ +"""Setup wizard API endpoints.""" +import logging +import secrets +from typing import List, Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/setup", tags=["setup"]) + + +# === REQUEST/RESPONSE MODELS === + +class SetupStatusResponse(BaseModel): + """Setup status response.""" + is_first_run: bool + setup_completed: bool + + +class WorkerConfig(BaseModel): + """Worker configuration.""" + count: int = Field(default=1, ge=0, le=10, description="Number of workers to start") + type: str = Field(default="cpu", description="Worker type: cpu or gpu") + + +class ScannerConfig(BaseModel): + """Scanner configuration.""" + interval_minutes: int = Field(default=360, ge=1, le=10080, description="Scan interval in minutes") + + +class StandaloneSetupRequest(BaseModel): + """Standalone mode setup request.""" + library_paths: List[str] = Field(..., description="Library paths to scan") + scan_rules: List[dict] = Field(..., description="Initial scan rules") + worker_config: Optional[WorkerConfig] = Field(default=None, description="Worker configuration") + scanner_config: Optional[ScannerConfig] = Field(default=None, description="Scanner configuration") + + class Config: + json_schema_extra = { + "example": { + "library_paths": ["/media/anime", "/media/movies"], + "scan_rules": [ + { + "name": "Japanese to Spanish", + "audio_language_is": "jpn", + "missing_external_subtitle_lang": "spa", + "target_language": "spa", + "action_type": "transcribe" + } + ], + "worker_config": { + "count": 1, + "type": "cpu" + }, + "scanner_config": { + "interval_minutes": 360 + } + } + } + + +class BazarrSlaveSetupRequest(BaseModel): + """Bazarr slave mode setup request.""" + pass # No additional config needed + + +class BazarrConnectionInfo(BaseModel): + """Bazarr connection information.""" + mode: str = "bazarr_slave" + host: str + port: int + api_key: str + provider_url: str + + +class SetupCompleteResponse(BaseModel): + """Setup complete response.""" + success: bool + message: str + bazarr_info: Optional[BazarrConnectionInfo] = None + + +# === ROUTES === + +@router.get("/status", response_model=SetupStatusResponse) +async def get_setup_status(): + """ + Check if this is the first run or setup is completed. + + Returns: + Setup status + """ + from backend.core.settings_service import settings_service + + # Check if setup_completed setting exists + setup_completed = settings_service.get("setup_completed", None) + + return SetupStatusResponse( + is_first_run=setup_completed is None, + setup_completed=setup_completed == "true" if setup_completed else False + ) + + +@router.post("/standalone", response_model=SetupCompleteResponse) +async def setup_standalone_mode(request: StandaloneSetupRequest): + """ + Configure standalone mode with library paths and scan rules. + + Args: + request: Standalone setup configuration + + Returns: + Setup completion status + """ + from backend.core.settings_service import settings_service + from backend.core.database import database + from backend.scanning.models import ScanRule + + try: + # Set operation mode + settings_service.set("operation_mode", "standalone", + description="Operation mode", + category="general", + value_type="string") + + # Set library paths + library_paths_str = ",".join(request.library_paths) + settings_service.set("library_paths", library_paths_str, + description="Library paths to scan", + category="general", + value_type="list") + + # Enable scanner by default + settings_service.set("scanner_enabled", "true", + description="Enable library scanner", + category="scanner", + value_type="boolean") + + # Configure scanner interval if provided + if request.scanner_config: + settings_service.set("scanner_schedule_interval_minutes", + str(request.scanner_config.interval_minutes), + description="Scanner interval in minutes", + category="scanner", + value_type="integer") + else: + # Default: 6 hours + settings_service.set("scanner_schedule_interval_minutes", "360", + description="Scanner interval in minutes", + category="scanner", + value_type="integer") + + # Configure worker auto-start if provided + if request.worker_config: + settings_service.set("worker_auto_start_count", + str(request.worker_config.count), + description="Number of workers to start automatically", + category="workers", + value_type="integer") + settings_service.set("worker_auto_start_type", + request.worker_config.type, + description="Type of workers to start (cpu/gpu)", + category="workers", + value_type="string") + else: + # Default: 1 CPU worker + settings_service.set("worker_auto_start_count", "1", + description="Number of workers to start automatically", + category="workers", + value_type="integer") + settings_service.set("worker_auto_start_type", "cpu", + description="Type of workers to start (cpu/gpu)", + category="workers", + value_type="string") + + # Create scan rules + with database.get_session() as session: + for idx, rule_data in enumerate(request.scan_rules): + rule = ScanRule( + name=rule_data.get("name", f"Rule {idx + 1}"), + enabled=True, + priority=rule_data.get("priority", 10), + audio_language_is=rule_data.get("audio_language_is"), + audio_language_not=rule_data.get("audio_language_not"), + audio_track_count_min=rule_data.get("audio_track_count_min"), + has_embedded_subtitle_lang=rule_data.get("has_embedded_subtitle_lang"), + missing_embedded_subtitle_lang=rule_data.get("missing_embedded_subtitle_lang"), + missing_external_subtitle_lang=rule_data.get("missing_external_subtitle_lang"), + file_extension=rule_data.get("file_extension", ".mkv,.mp4,.avi"), + action_type=rule_data.get("action_type", "transcribe"), + target_language=rule_data.get("target_language", "spa"), + quality_preset=rule_data.get("quality_preset", "fast"), + job_priority=rule_data.get("job_priority", 5) + ) + session.add(rule) + + session.commit() + + # Mark setup as completed + settings_service.set("setup_completed", "true", + description="Setup wizard completed", + category="general", + value_type="boolean") + + logger.info("Standalone mode setup completed successfully") + + return SetupCompleteResponse( + success=True, + message="Standalone mode configured successfully" + ) + + except Exception as e: + logger.error(f"Failed to setup standalone mode: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Setup failed: {str(e)}" + ) + + +@router.post("/bazarr-slave", response_model=SetupCompleteResponse) +async def setup_bazarr_slave_mode(request: BazarrSlaveSetupRequest): + """ + Configure Bazarr slave mode and generate API key. + + Args: + request: Bazarr slave setup configuration + + Returns: + Setup completion status with connection info + """ + from backend.core.settings_service import settings_service + + try: + # Set operation mode + settings_service.set("operation_mode", "bazarr_slave", + description="Operation mode", + category="general", + value_type="string") + + # Generate API key + api_key = secrets.token_urlsafe(32) + settings_service.set("bazarr_api_key", api_key, + description="Bazarr provider API key", + category="bazarr", + value_type="string") + + # Enable Bazarr provider + settings_service.set("bazarr_provider_enabled", "true", + description="Enable Bazarr provider mode", + category="bazarr", + value_type="boolean") + + # Disable scanner (not needed in slave mode) + settings_service.set("scanner_enabled", "false", + description="Enable library scanner", + category="scanner", + value_type="boolean") + + # Mark setup as completed + settings_service.set("setup_completed", "true", + description="Setup wizard completed", + category="general", + value_type="boolean") + + # Get host and port from settings + host = getattr(app_settings, "API_HOST", "0.0.0.0") + port = getattr(app_settings, "API_PORT", 8000) + + # Create connection info + bazarr_info = BazarrConnectionInfo( + mode="bazarr_slave", + host=host if host != "0.0.0.0" else "127.0.0.1", + port=port, + api_key=api_key, + provider_url=f"http://{host if host != '0.0.0.0' else '127.0.0.1'}:{port}" + ) + + logger.info("Bazarr slave mode setup completed successfully") + + return SetupCompleteResponse( + success=True, + message="Bazarr slave mode configured successfully", + bazarr_info=bazarr_info + ) + + except Exception as e: + logger.error(f"Failed to setup Bazarr slave mode: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Setup failed: {str(e)}" + ) + + +@router.post("/skip") +async def skip_setup(): + """ + Skip setup wizard (for advanced users). + + Returns: + Success message + """ + from backend.core.settings_service import settings_service + + settings_service.set("setup_completed", "true", + description="Setup wizard completed", + category="general", + value_type="boolean") + + logger.info("Setup wizard skipped") + + return {"message": "Setup wizard skipped"} + diff --git a/backend/api/system.py b/backend/api/system.py new file mode 100644 index 0000000..36fe16f --- /dev/null +++ b/backend/api/system.py @@ -0,0 +1,210 @@ +"""System resources monitoring API.""" +import logging +import psutil +from typing import List, Optional +from fastapi import APIRouter +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/system", tags=["system"]) + + +# === RESPONSE MODELS === + +class CPUInfo(BaseModel): + """CPU information.""" + usage_percent: float + count_logical: int + count_physical: int + frequency_mhz: Optional[float] = None + + +class MemoryInfo(BaseModel): + """Memory information.""" + total_gb: float + used_gb: float + free_gb: float + usage_percent: float + + +class GPUInfo(BaseModel): + """GPU information.""" + id: int + name: str + memory_total_mb: Optional[int] = None + memory_used_mb: Optional[int] = None + memory_free_mb: Optional[int] = None + utilization_percent: Optional[int] = None + + +class SystemResourcesResponse(BaseModel): + """System resources response.""" + cpu: CPUInfo + memory: MemoryInfo + gpus: List[GPUInfo] + + +# === ROUTES === + +@router.get("/resources", response_model=SystemResourcesResponse) +async def get_system_resources(): + """ + Get current system resources (CPU, RAM, GPU). + + Returns: + System resources information + """ + # CPU info + cpu_percent = psutil.cpu_percent(interval=0.1) + cpu_count_logical = psutil.cpu_count(logical=True) + cpu_count_physical = psutil.cpu_count(logical=False) + cpu_freq = psutil.cpu_freq() + + cpu_info = CPUInfo( + usage_percent=cpu_percent, + count_logical=cpu_count_logical or 0, + count_physical=cpu_count_physical or 0, + frequency_mhz=cpu_freq.current if cpu_freq else 0 + ) + + # Memory info + mem = psutil.virtual_memory() + memory_info = MemoryInfo( + total_gb=round(mem.total / (1024**3), 2), + used_gb=round(mem.used / (1024**3), 2), + free_gb=round(mem.available / (1024**3), 2), + usage_percent=round(mem.percent, 1) + ) + + # GPU info - try to detect NVIDIA GPUs + gpus = [] + try: + import pynvml + pynvml.nvmlInit() + device_count = pynvml.nvmlDeviceGetCount() + + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + name = pynvml.nvmlDeviceGetName(handle) + memory_info_gpu = pynvml.nvmlDeviceGetMemoryInfo(handle) + utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) + + gpus.append(GPUInfo( + id=i, + name=name if isinstance(name, str) else name.decode('utf-8'), + memory_total_mb=memory_info_gpu.total // (1024**2), + memory_used_mb=memory_info_gpu.used // (1024**2), + memory_free_mb=memory_info_gpu.free // (1024**2), + utilization_percent=utilization.gpu + )) + + pynvml.nvmlShutdown() + except Exception as e: + logger.debug(f"Could not get GPU info: {e}") + # No GPUs or pynvml not available + pass + + return SystemResourcesResponse( + cpu=cpu_info, + memory=memory_info, + gpus=gpus + ) + + +@router.get("/cpu", response_model=CPUInfo) +async def get_cpu_info(): + """Get CPU information.""" + cpu_percent = psutil.cpu_percent(interval=0.1) + cpu_count_logical = psutil.cpu_count(logical=True) + cpu_count_physical = psutil.cpu_count(logical=False) + cpu_freq = psutil.cpu_freq() + + return CPUInfo( + usage_percent=cpu_percent, + count_logical=cpu_count_logical or 0, + count_physical=cpu_count_physical or 0, + frequency_mhz=cpu_freq.current if cpu_freq else 0 + ) + + +@router.get("/memory", response_model=MemoryInfo) +async def get_memory_info(): + """Get memory information.""" + mem = psutil.virtual_memory() + + return MemoryInfo( + total_gb=round(mem.total / (1024**3), 2), + used_gb=round(mem.used / (1024**3), 2), + free_gb=round(mem.available / (1024**3), 2), + usage_percent=round(mem.percent, 1) + ) + + +@router.get("/gpus", response_model=List[GPUInfo]) +async def get_gpus_info(): + """Get all GPUs information.""" + gpus = [] + try: + import pynvml + pynvml.nvmlInit() + device_count = pynvml.nvmlDeviceGetCount() + + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + name = pynvml.nvmlDeviceGetName(handle) + memory_info_gpu = pynvml.nvmlDeviceGetMemoryInfo(handle) + utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) + + gpus.append(GPUInfo( + id=i, + name=name if isinstance(name, str) else name.decode('utf-8'), + memory_total_mb=memory_info_gpu.total // (1024**2), + memory_used_mb=memory_info_gpu.used // (1024**2), + memory_free_mb=memory_info_gpu.free // (1024**2), + utilization_percent=utilization.gpu + )) + + pynvml.nvmlShutdown() + except Exception as e: + logger.debug(f"Could not get GPU info: {e}") + + return gpus + + +@router.get("/gpu/{device_id}", response_model=GPUInfo) +async def get_gpu_info(device_id: int): + """Get specific GPU information.""" + try: + import pynvml + pynvml.nvmlInit() + + handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) + name = pynvml.nvmlDeviceGetName(handle) + memory_info_gpu = pynvml.nvmlDeviceGetMemoryInfo(handle) + utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) + + gpu = GPUInfo( + id=device_id, + name=name if isinstance(name, str) else name.decode('utf-8'), + memory_total_mb=memory_info_gpu.total // (1024**2), + memory_used_mb=memory_info_gpu.used // (1024**2), + memory_free_mb=memory_info_gpu.free // (1024**2), + utilization_percent=utilization.gpu + ) + + pynvml.nvmlShutdown() + return gpu + + except Exception as e: + logger.error(f"Could not get GPU {device_id} info: {e}") + # Return basic info if can't get details + return GPUInfo( + id=device_id, + name=f"GPU {device_id}", + memory_total_mb=None, + memory_used_mb=None, + memory_free_mb=None, + utilization_percent=None + ) + diff --git a/backend/api/worker_routes.py b/backend/api/worker_routes.py new file mode 100644 index 0000000..57b02d1 --- /dev/null +++ b/backend/api/worker_routes.py @@ -0,0 +1,268 @@ +"""Worker pool management API endpoints.""" +from typing import Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +from backend.core.worker_pool import worker_pool +from backend.core.worker import WorkerType + +router = APIRouter(prefix="/api/workers", tags=["workers"]) + + +# === Request/Response Models === + + +class AddWorkerRequest(BaseModel): + """Request to add a new worker.""" + + type: str = Field(..., description="Worker type: 'cpu' or 'gpu'") + device_id: Optional[int] = Field(None, description="GPU device ID (required for GPU workers)") + + class Config: + json_schema_extra = { + "example": { + "type": "gpu", + "device_id": 0 + } + } + + +class AddWorkerResponse(BaseModel): + """Response after adding a worker.""" + + worker_id: str + message: str + + +class WorkerStatusResponse(BaseModel): + """Worker status information.""" + + worker_id: str + status: str + worker_type: str + device_id: Optional[int] + current_job_id: Optional[str] + jobs_completed: int + jobs_failed: int + started_at: Optional[str] + + +class PoolStatsResponse(BaseModel): + """Worker pool statistics.""" + + pool: dict + jobs: dict + queue: dict + + +class HealthCheckResponse(BaseModel): + """Health check results.""" + + timestamp: str + total_workers: int + dead_workers: list + restarted_workers: list + healthy: bool + + +# === Endpoints === + + +@router.get("/", response_model=list) +async def list_workers(): + """ + List all workers with their status. + + Returns: + List of worker status dictionaries + """ + return worker_pool.get_all_workers_status() + + +@router.get("/{worker_id}", response_model=WorkerStatusResponse) +async def get_worker_status(worker_id: str): + """ + Get status of a specific worker. + + Args: + worker_id: Worker ID + + Returns: + Worker status information + + Raises: + HTTPException: If worker not found + """ + status = worker_pool.get_worker_status(worker_id) + + if not status: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Worker {worker_id} not found" + ) + + return status + + +@router.post("/", response_model=AddWorkerResponse, status_code=status.HTTP_201_CREATED) +async def add_worker(request: AddWorkerRequest): + """ + Add a new worker to the pool. + + Args: + request: Worker configuration + + Returns: + Worker ID and success message + + Raises: + HTTPException: If invalid configuration + """ + # Validate worker type + worker_type_str = request.type.lower() + if worker_type_str not in ["cpu", "gpu"]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Worker type must be 'cpu' or 'gpu'" + ) + + # Map to WorkerType enum + worker_type = WorkerType.CPU if worker_type_str == "cpu" else WorkerType.GPU + + # Validate GPU device_id + if worker_type == WorkerType.GPU: + if request.device_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="device_id is required for GPU workers" + ) + if request.device_id < 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="device_id must be non-negative" + ) + + # Add worker + try: + worker_id = worker_pool.add_worker(worker_type, request.device_id) + + return AddWorkerResponse( + worker_id=worker_id, + message=f"Worker {worker_id} added successfully" + ) + + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to add worker: {str(e)}" + ) + + +@router.delete("/{worker_id}") +async def remove_worker(worker_id: str, timeout: int = 30): + """ + Remove a worker from the pool. + + Args: + worker_id: Worker ID to remove + timeout: Maximum time to wait for worker to stop (seconds) + + Returns: + Success message + + Raises: + HTTPException: If worker not found or removal fails + """ + success = worker_pool.remove_worker(worker_id, timeout=float(timeout)) + + if not success: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Worker {worker_id} not found" + ) + + return {"message": f"Worker {worker_id} removed successfully"} + + +@router.get("/pool/stats", response_model=PoolStatsResponse) +async def get_pool_stats(): + """ + Get overall worker pool statistics. + + Returns: + Pool statistics including worker counts, job stats, and queue info + """ + return worker_pool.get_pool_stats() + + +@router.post("/pool/start") +async def start_pool(cpu_workers: int = 0, gpu_workers: int = 0): + """ + Start the worker pool. + + Args: + cpu_workers: Number of CPU workers to start + gpu_workers: Number of GPU workers to start + + Returns: + Success message + """ + worker_pool.start(cpu_workers=cpu_workers, gpu_workers=gpu_workers) + + return { + "message": f"Worker pool started with {cpu_workers} CPU and {gpu_workers} GPU workers" + } + + +@router.post("/pool/stop") +async def stop_pool(timeout: int = 30): + """ + Stop the worker pool. + + Args: + timeout: Maximum time to wait for each worker to stop (seconds) + + Returns: + Success message + """ + worker_pool.stop(timeout=float(timeout)) + + return {"message": "Worker pool stopped successfully"} + + +@router.get("/pool/health", response_model=HealthCheckResponse) +async def health_check(): + """ + Perform health check on all workers. + + Automatically restarts dead workers if configured. + + Returns: + Health check results + """ + return worker_pool.health_check() + + +@router.post("/pool/autoscale") +async def autoscale_pool(target_workers: int): + """ + Auto-scale worker pool to target number. + + Args: + target_workers: Target number of workers + + Returns: + Success message + + Raises: + HTTPException: If invalid target + """ + if target_workers < 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="target_workers must be non-negative" + ) + + worker_pool.auto_scale(target_workers) + + return {"message": f"Pool scaled to {target_workers} workers"} diff --git a/backend/api/workers.py b/backend/api/workers.py new file mode 100644 index 0000000..fc21b81 --- /dev/null +++ b/backend/api/workers.py @@ -0,0 +1,329 @@ + +"""Worker management API routes.""" +import logging +from typing import List, Optional +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field + +from backend.core.worker import WorkerType + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/workers", tags=["workers"]) + + +# === REQUEST/RESPONSE MODELS === + +class WorkerAddRequest(BaseModel): + """Request to add a new worker.""" + worker_type: str = Field(..., description="Worker type: 'cpu' or 'gpu'") + device_id: Optional[int] = Field(None, description="GPU device ID (only for GPU workers)") + + class Config: + json_schema_extra = { + "example": { + "worker_type": "gpu", + "device_id": 0 + } + } + + +class WorkerStatusResponse(BaseModel): + """Worker status response.""" + worker_id: str + worker_type: str + device_id: Optional[int] + status: str + current_job_id: Optional[str] + jobs_completed: int + jobs_failed: int + uptime_seconds: float + current_job_progress: float + current_job_eta: Optional[int] + + +class WorkerPoolStatsResponse(BaseModel): + """Worker pool statistics response.""" + total_workers: int + cpu_workers: int + gpu_workers: int + idle_workers: int + busy_workers: int + stopped_workers: int + error_workers: int + total_jobs_completed: int + total_jobs_failed: int + uptime_seconds: Optional[float] + is_running: bool + + +class MessageResponse(BaseModel): + """Generic message response.""" + message: str + + +# === ROUTES === + +@router.get("/", response_model=List[WorkerStatusResponse]) +async def get_all_workers(): + """ + Get status of all workers. + + Returns: + List of worker status objects + """ + from backend.app import worker_pool + from datetime import datetime, timezone + from dateutil import parser + + workers_status = worker_pool.get_all_workers_status() + + result = [] + for w in workers_status: + # Calculate uptime + uptime_seconds = 0.0 + if w.get("started_at"): + try: + started = parser.parse(w["started_at"]) + # Remove timezone info for comparison if needed + if started.tzinfo is None: + from datetime import timezone + started = started.replace(tzinfo=timezone.utc) + uptime_seconds = (datetime.now(timezone.utc) - started).total_seconds() + except Exception as e: + logger.warning(f"Failed to parse started_at: {e}") + uptime_seconds = 0.0 + + result.append( + WorkerStatusResponse( + worker_id=w["worker_id"], + worker_type=w["type"], + device_id=w.get("device_id"), + status=w["status"], + current_job_id=w.get("current_job_id"), + jobs_completed=w["jobs_completed"], + jobs_failed=w["jobs_failed"], + uptime_seconds=uptime_seconds, + current_job_progress=w.get("current_job_progress", 0.0), + current_job_eta=w.get("current_job_eta"), + ) + ) + + return result + + +@router.get("/stats", response_model=WorkerPoolStatsResponse) +async def get_pool_stats(): + """ + Get worker pool statistics. + + Returns: + Pool statistics object + """ + from backend.app import worker_pool + from datetime import datetime, timezone + from dateutil import parser + + stats = worker_pool.get_pool_stats() + pool_stats = stats.get('pool', {}) + jobs_stats = stats.get('jobs', {}) + + # Calculate uptime + uptime_seconds = 0.0 + if pool_stats.get('started_at'): + try: + started = parser.parse(pool_stats['started_at']) + # Remove timezone info for comparison if needed + if started.tzinfo is None: + from datetime import timezone + started = started.replace(tzinfo=timezone.utc) + uptime_seconds = (datetime.now(timezone.utc) - started).total_seconds() + except Exception as e: + logger.warning(f"Failed to parse pool started_at: {e}") + uptime_seconds = 0.0 + + return WorkerPoolStatsResponse( + total_workers=pool_stats.get('total_workers', 0), + cpu_workers=pool_stats.get('cpu_workers', 0), + gpu_workers=pool_stats.get('gpu_workers', 0), + idle_workers=pool_stats.get('idle_workers', 0), + busy_workers=pool_stats.get('busy_workers', 0), + stopped_workers=pool_stats.get('stopped_workers', 0), + error_workers=pool_stats.get('error_workers', 0), + total_jobs_completed=jobs_stats.get('completed', 0), + total_jobs_failed=jobs_stats.get('failed', 0), + uptime_seconds=uptime_seconds, + is_running=pool_stats.get('is_running', False) + ) + + +@router.get("/{worker_id}", response_model=WorkerStatusResponse) +async def get_worker(worker_id: str): + """ + Get status of a specific worker. + + Args: + worker_id: Worker ID + + Returns: + Worker status object + + Raises: + 404: Worker not found + """ + from backend.app import worker_pool + + status_dict = worker_pool.get_worker_status(worker_id) + + if not status_dict: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Worker {worker_id} not found" + ) + + return WorkerStatusResponse( + worker_id=status_dict["worker_id"], + worker_type=status_dict["type"], # Fixed: use "type" instead of "worker_type" + device_id=status_dict.get("device_id"), + status=status_dict["status"], + current_job_id=status_dict.get("current_job_id"), + jobs_completed=status_dict["jobs_completed"], + jobs_failed=status_dict["jobs_failed"], + uptime_seconds=status_dict.get("uptime_seconds", 0), + current_job_progress=status_dict.get("current_job_progress", 0.0), + current_job_eta=status_dict.get("current_job_eta"), + ) + + +@router.post("/", response_model=WorkerStatusResponse, status_code=status.HTTP_201_CREATED) +async def add_worker(request: WorkerAddRequest): + """ + Add a new worker to the pool. + + Args: + request: Worker add request + + Returns: + Created worker status + + Raises: + 400: Invalid worker type or configuration + """ + from backend.app import worker_pool + + # Validate worker type + try: + wtype = WorkerType(request.worker_type.lower()) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid worker type: {request.worker_type}. Must be 'cpu' or 'gpu'" + ) + + # Validate GPU worker requirements + if wtype == WorkerType.GPU and request.device_id is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="device_id is required for GPU workers" + ) + + # Add worker + worker_id = worker_pool.add_worker(wtype, request.device_id) + + # Get status + status_dict = worker_pool.get_worker_status(worker_id) + + if not status_dict: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to create worker" + ) + + logger.info(f"Worker {worker_id} added via API") + + return WorkerStatusResponse( + worker_id=status_dict["worker_id"], + worker_type=status_dict["type"], # Fixed: use "type" instead of "worker_type" + device_id=status_dict.get("device_id"), + status=status_dict["status"], + current_job_id=status_dict.get("current_job_id"), + jobs_completed=status_dict["jobs_completed"], + jobs_failed=status_dict["jobs_failed"], + uptime_seconds=status_dict.get("uptime_seconds", 0), + current_job_progress=status_dict.get("current_job_progress", 0.0), + current_job_eta=status_dict.get("current_job_eta"), + ) + + +@router.delete("/{worker_id}", response_model=MessageResponse) +async def remove_worker(worker_id: str, timeout: float = 30.0): + """ + Remove a worker from the pool. + + Args: + worker_id: Worker ID to remove + timeout: Maximum time to wait for worker to stop (seconds) + + Returns: + Success message + + Raises: + 404: Worker not found + """ + from backend.app import worker_pool + + success = worker_pool.remove_worker(worker_id, timeout=timeout) + + if not success: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Worker {worker_id} not found" + ) + + logger.info(f"Worker {worker_id} removed via API") + + return MessageResponse(message=f"Worker {worker_id} removed successfully") + + +@router.post("/pool/start", response_model=MessageResponse) +async def start_pool(cpu_workers: int = 0, gpu_workers: int = 0): + """ + Start the worker pool. + + Args: + cpu_workers: Number of CPU workers to start + gpu_workers: Number of GPU workers to start + + Returns: + Success message + """ + from backend.app import worker_pool + + worker_pool.start(cpu_workers=cpu_workers, gpu_workers=gpu_workers) + + logger.info(f"Worker pool started via API: {cpu_workers} CPU, {gpu_workers} GPU") + + return MessageResponse( + message=f"Worker pool started: {cpu_workers} CPU workers, {gpu_workers} GPU workers" + ) + + +@router.post("/pool/stop", response_model=MessageResponse) +async def stop_pool(timeout: float = 30.0): + """ + Stop the worker pool. + + Args: + timeout: Maximum time to wait for each worker to stop (seconds) + + Returns: + Success message + """ + from backend.app import worker_pool + + worker_pool.stop(timeout=timeout) + + logger.info("Worker pool stopped via API") + + return MessageResponse(message="Worker pool stopped successfully") +