Compare commits

...

3 Commits

Author SHA1 Message Date
4b451aa7a6 chore: update project structure and workflows
Some checks failed
Build_Subgen_Dockerfile_CPU / docker (push) Failing after 20s
Build_Subgen_Dockerfile_GPU / docker (push) Has been cancelled
- Update .gitignore for new backend structure
- Update GitHub workflows for transcriptarr rename
- Update launcher.py to use new module name
2026-01-11 21:24:11 +01:00
529af217e9 docs: add comprehensive documentation and test suite
- Add CLAUDE.md with project architecture and operation modes
- Add backend/README.md with setup and usage instructions
- Add test_backend.py with automated tests for config, database, and queue
- Update requirements.txt with optional dependencies structure
- Update .env.example with all configuration options
2026-01-11 21:23:58 +01:00
7959210724 feat: add centralized configuration system with Pydantic
- Add backend/config.py with Pydantic settings validation
- Support for standalone, provider, and hybrid operation modes
- Multi-database backend configuration (SQLite/PostgreSQL/MariaDB)
- Environment variable validation with helpful error messages
- Worker and Whisper model configuration
2026-01-11 21:23:45 +01:00
20 changed files with 2549 additions and 29 deletions

90
.env.example Normal file
View File

@@ -0,0 +1,90 @@
# ============================================
# TranscriptorIO Configuration
# ============================================
# === Application Mode ===
# Options: standalone, provider, or standalone,provider (hybrid mode)
TRANSCRIPTARR_MODE=standalone
# === Database Configuration ===
# SQLite (default - no additional driver needed)
DATABASE_URL=sqlite:///./transcriptarr.db
# PostgreSQL example (requires psycopg2-binary)
# DATABASE_URL=postgresql://user:password@localhost:5432/transcriptarr
# MariaDB/MySQL example (requires pymysql)
# DATABASE_URL=mariadb+pymysql://user:password@localhost:3306/transcriptarr
# === Worker Configuration ===
CONCURRENT_TRANSCRIPTIONS=2
WHISPER_THREADS=4
TRANSCRIBE_DEVICE=cpu
CLEAR_VRAM_ON_COMPLETE=True
# === Whisper Model Configuration ===
# Options: tiny, base, small, medium, large-v3, large-v3-turbo, etc.
WHISPER_MODEL=medium
MODEL_PATH=./models
COMPUTE_TYPE=auto
# === Standalone Mode Configuration ===
# Pipe-separated paths to scan
LIBRARY_PATHS=/media/anime|/media/movies
AUTO_SCAN_ENABLED=False
SCAN_INTERVAL_MINUTES=30
# Filter rules for standalone mode
REQUIRED_AUDIO_LANGUAGE=ja
REQUIRED_MISSING_SUBTITLE=spa
SKIP_IF_SUBTITLE_EXISTS=True
# === Provider Mode Configuration ===
BAZARR_URL=http://bazarr:6767
BAZARR_API_KEY=your_api_key_here
PROVIDER_TIMEOUT_SECONDS=600
PROVIDER_CALLBACK_ENABLED=True
PROVIDER_POLLING_INTERVAL=30
# === API Configuration ===
WEBHOOK_PORT=9000
API_HOST=0.0.0.0
DEBUG=True
# === Transcription Settings ===
# Options: transcribe, translate
TRANSCRIBE_OR_TRANSLATE=transcribe
SUBTITLE_LANGUAGE_NAME=
# Options: ISO_639_1, ISO_639_2_T, ISO_639_2_B, NAME, NATIVE
SUBTITLE_LANGUAGE_NAMING_TYPE=ISO_639_2_B
WORD_LEVEL_HIGHLIGHT=False
CUSTOM_REGROUP=cm_sl=84_sl=42++++++1
# === Skip Configuration ===
SKIP_IF_EXTERNAL_SUBTITLES_EXIST=False
SKIP_IF_TARGET_SUBTITLES_EXIST=True
SKIP_IF_INTERNAL_SUBTITLES_LANGUAGE=eng
# Pipe-separated language codes
SKIP_SUBTITLE_LANGUAGES=
SKIP_IF_AUDIO_LANGUAGES=
SKIP_UNKNOWN_LANGUAGE=False
SKIP_ONLY_SUBGEN_SUBTITLES=False
# === Advanced Settings ===
FORCE_DETECTED_LANGUAGE_TO=
DETECT_LANGUAGE_LENGTH=30
DETECT_LANGUAGE_OFFSET=0
SHOULD_WHISPER_DETECT_AUDIO_LANGUAGE=False
# Pipe-separated list in order of preference
PREFERRED_AUDIO_LANGUAGES=eng
# === Path Mapping ===
USE_PATH_MAPPING=False
PATH_MAPPING_FROM=/tv
PATH_MAPPING_TO=/Volumes/TV
# === Legacy SubGen Compatibility ===
SHOW_IN_SUBNAME_SUBGEN=True
SHOW_IN_SUBNAME_MODEL=True
APPEND=False
LRC_FOR_AUDIO_FILES=True

View File

@@ -17,7 +17,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Get version from subgen.py
- name: Get version from transcriptarr.py
id: get_version
run: |
version=$(grep -oP "subgen_version\s*=\s*'\K[^']+" subgen.py)

View File

@@ -17,7 +17,7 @@ jobs:
with:
fetch-depth: 0
- name: Get version from subgen.py
- name: Get version from transcriptarr.py
id: get_version
run: |
version=$(grep -oP "subgen_version\s*=\s*'\K[^']+" subgen.py)

View File

@@ -5,7 +5,7 @@ on:
branches:
- 'main'
paths:
- 'subgen.py'
- '../../transcriptarr.py'
workflow_dispatch: # Allow manual triggering
jobs:
@@ -26,11 +26,11 @@ jobs:
echo "COMMIT_COUNT=$COMMIT_COUNT"
echo "VERSION=${YEAR}.${MONTH}.${COMMIT_COUNT}" >> $GITHUB_ENV
- name: Update subgen.py with version
- name: Update transcriptarr.py with version
run: |
sed -i "s/subgen_version =.*/subgen_version = '${{ env.VERSION }}'/" subgen.py
- name: Check if subgen.py was actually changed (compare with HEAD)
- name: Check if transcriptarr.py was actually changed (compare with HEAD)
id: check_change
run: |
if git diff --quiet HEAD subgen.py; then
@@ -39,7 +39,7 @@ jobs:
echo "::set-output name=changed::true"
fi
- name: Amend commit if subgen.py changed
- name: Amend commit if transcriptarr.py changed
if: steps.check_change.outputs.changed == 'true'
env:
GIT_AUTHOR_NAME: "McCloudS"

2
.gitignore vendored
View File

@@ -7,6 +7,6 @@
*.vsix
#ignore our settings
subgen.env
.env
models/

747
CLAUDE.md Normal file
View File

@@ -0,0 +1,747 @@
# CLAUDE.md - TranscriptorIO
## ¿Qué es TranscriptorIO?
TranscriptorIO es un sistema completo de generación automática de subtítulos para contenido multimedia usando IA (Whisper + modelos de traducción). Es un **hard fork** de [SubGen](https://github.com/McCloudS/subgen) con una arquitectura completamente rediseñada inspirada en Tdarr.
## Motivación
SubGen es funcional pero tiene limitaciones fundamentales de diseño:
### Problemas de SubGen
- **Procesamiento síncrono**: Bloquea threads mientras transcribe
- **Sin cola persistente**: Los trabajos se pierden al reiniciar
- **Sin WebUI**: Removida en marzo 2024, solo tiene Swagger docs
- **Sin visibilidad**: No sabes progreso, ETA, o estado de trabajos
- **Sin priorización**: No puedes reordenar trabajos
- **Timeouts en Bazarr**: Si un episodio tarda >5min, throttle de 24 horas
- **Configuración compleja**: 40+ variables ENV sin validación
### Visión de TranscriptorIO
Un sistema tipo **Tdarr pero para subtítulos**, con:
- ✅ Sistema de cola asíncrona persistente (SQLite)
- ✅ Workers configurables (múltiples GPUs/CPUs)
- ✅ WebUI moderna con progreso en tiempo real
- ✅ Múltiples pipelines de calidad (Fast/Balanced/Best)
- ✅ Integración asíncrona con Bazarr
- ✅ Procesamiento batch (temporadas completas)
- ✅ API REST completa
- ✅ WebSocket para updates en vivo
## Casos de uso
### Caso principal: Anime japonés → Subtítulos español
**Problema**: Anime sin fansubs en español, solo tiene audio japonés.
**Pipeline**:
```
Audio japonés
Whisper (task="translate") → Texto inglés
Helsinki-NLP (en→es) → Texto español
Generar .srt con timestamps
```
**Alternativas configurables**:
- **Fast** (4GB VRAM): ja→en→es con Helsinki-NLP
- **Balanced** (6GB VRAM): ja→ja→es con M2M100
- **Best** (10GB+ VRAM): ja→es directo con SeamlessM4T
### Integración con stack existente
```
Sonarr descarga episodio
Bazarr detecta: faltan subtítulos español
Bazarr → TranscriptorIO (provider asíncrono)
TranscriptorIO encola trabajo
Worker procesa cuando está libre
Callback a Bazarr con .srt generado
Jellyfin detecta nuevo subtítulo
```
## Modos de Operación
TranscriptorIO soporta dos modos de operación distintos que se configuran vía environment variables:
### Modo Standalone (Tdarr-like)
**Descripción**: TranscriptorIO escanea automáticamente tu biblioteca de medios y genera subtítulos según reglas configurables.
**Casos de uso**:
- Procesamiento batch de biblioteca existente
- Monitoreo automático de nuevos archivos
- Control total sobre qué se transcribe sin depender de Bazarr
**Funcionamiento**:
```
1. Escaneo periódico con ffprobe
└─> Detecta archivos que cumplen criterios
(Ej: audio japonés + sin subs español)
2. Encolado automático
└─> Añade a cola con prioridad configurada
3. Procesamiento batch
└─> Workers procesan según disponibilidad
4. Escritura directa
└─> Guarda .srt junto al archivo origen
```
**Configuración**:
```env
# Habilitar modo standalone
TRANSCRIPTARR_MODE=standalone
# Carpetas a escanear (separadas por |)
LIBRARY_PATHS=/media/anime|/media/movies
# Reglas de filtrado
REQUIRED_AUDIO_LANGUAGE=ja
REQUIRED_MISSING_SUBTITLE=spa
SKIP_IF_SUBTITLE_EXISTS=true
# Escaneo automático
AUTO_SCAN_ENABLED=true
SCAN_INTERVAL_MINUTES=30
```
**Ventajas**:
- ✅ No depende de integraciones externas
- ✅ Procesamiento batch eficiente
- ✅ Monitoreo automático de nueva media
- ✅ Control granular con reglas de filtrado
### Modo Provider (Bazarr-slave)
**Descripción**: TranscriptorIO actúa como provider de subtítulos para Bazarr mediante una API asíncrona mejorada.
**Casos de uso**:
- Integración con stack *arr existente
- Gestión centralizada de subtítulos en Bazarr
- Fallback cuando no hay subtítulos pre-hechos
**Funcionamiento**:
```
1. Bazarr solicita subtítulo (API call)
└─> POST /api/provider/request
2. TranscriptorIO encola trabajo
└─> Retorna job_id inmediatamente
└─> No bloquea thread de Bazarr
3. Procesamiento asíncrono
└─> Worker transcribe cuando hay capacidad
4. Callback a Bazarr
└─> POST {bazarr_callback_url} con .srt
└─> O polling de Bazarr cada 30s
```
**Configuración**:
```env
# Habilitar modo provider
TRANSCRIPTARR_MODE=provider
# API de Bazarr para callbacks
BAZARR_URL=http://bazarr:6767
BAZARR_API_KEY=your_api_key_here
# Configuración del provider
PROVIDER_TIMEOUT_SECONDS=600
PROVIDER_CALLBACK_ENABLED=true
PROVIDER_POLLING_INTERVAL=30
```
**Ventajas vs SubGen original**:
-**No bloquea**: Retorna inmediatamente con job_id
-**Sin timeouts**: Bazarr no throttle por trabajos lentos
-**Visibilidad**: Bazarr puede consultar progreso
-**Reintentos**: Manejo automático de errores
-**Priorización**: Trabajos manuales tienen mayor prioridad
### Modo Híbrido (Recomendado)
Puedes habilitar ambos modos simultáneamente:
```env
TRANSCRIPTARR_MODE=standalone,provider
```
**Beneficios**:
- Bazarr maneja media nueva automáticamente
- Standalone procesa biblioteca existente
- Cola unificada con priorización inteligente
- Mejor aprovechamiento de recursos
## Arquitectura técnica
### Stack tecnológico
**Backend**:
- FastAPI (API REST + WebSocket)
- SQLAlchemy (ORM multi-backend)
- SQLite / PostgreSQL / MariaDB (queue persistente)
- faster-whisper (transcripción optimizada)
- Helsinki-NLP/opus-mt-en-es (traducción ligera)
- stable-ts (mejora de timestamps)
**Frontend**:
- Vue 3 + Vite
- Tailwind CSS
- Chart.js (estadísticas)
- Socket.io-client (updates en tiempo real)
**Infraestructura**:
- Docker + Docker Compose
- NVIDIA GPU support (opcional, también CPU)
- Multi-container: backend + workers + frontend
### Componentes principales
```
transcriptorio/
├── backend/
│ ├── core/
│ │ ├── pipelines/
│ │ │ ├── whisper_fast.py # ja→en→es (Helsinki)
│ │ │ ├── whisper_balanced.py # ja→ja→es (M2M100)
│ │ │ └── seamless.py # ja→es directo
│ │ ├── queue_manager.py # Cola SQLite
│ │ ├── worker_pool.py # Gestión de workers
│ │ └── transcriber.py # Core Whisper
│ ├── api/
│ │ ├── legacy.py # /asr (compat SubGen/Bazarr)
│ │ ├── queue.py # /api/queue/*
│ │ ├── jobs.py # /api/jobs/*
│ │ └── websocket.py # /ws (real-time)
│ └── main.py
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ │ ├── Dashboard.vue # Stats + current job
│ │ │ ├── QueueManager.vue # Lista de trabajos
│ │ │ ├── JobDetails.vue # Detalles + logs
│ │ │ └── Settings.vue # Configuración
│ │ ├── App.vue
│ │ └── main.js
│ └── package.json
├── bazarr-integration/
│ └── transcriptorio_provider.py # Custom provider asíncrono
└── docker-compose.yml
```
### Base de datos (SQLite)
```sql
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
file_path TEXT NOT NULL,
file_name TEXT NOT NULL,
status TEXT DEFAULT 'queued', -- queued, processing, completed, failed
priority INTEGER DEFAULT 0,
-- Config
source_lang TEXT,
target_lang TEXT,
quality_preset TEXT DEFAULT 'fast',
-- Progress
progress REAL DEFAULT 0,
current_stage TEXT, -- transcribing, translating, generating
eta_seconds INTEGER,
-- Timestamps
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
-- Results
output_path TEXT,
srt_content TEXT,
segments_count INTEGER,
-- Error handling
error TEXT,
retry_count INTEGER DEFAULT 0,
-- Metadata
worker_id TEXT,
vram_used_mb INTEGER,
processing_time_seconds REAL
);
CREATE INDEX idx_status ON jobs(status);
CREATE INDEX idx_priority ON jobs(priority DESC, created_at ASC);
CREATE INDEX idx_created ON jobs(created_at DESC);
```
### API Endpoints
#### Legacy (compatibilidad SubGen/Bazarr)
```http
POST /asr?task=translate&language=ja&output=srt
Content-Type: multipart/form-data
→ Respuesta síncrona con .srt
```
#### Modernos (TranscriptorIO)
```http
# Añadir trabajo a cola
POST /api/queue/add
{
"files": ["/media/anime/episode.mkv"],
"source_lang": "ja",
"target_lang": "es",
"quality_preset": "fast",
"priority": 0
}
→ { "job_ids": ["uuid-1234"], "queued": 1 }
# Estado de la cola
GET /api/queue/status
→ {
"pending": 3,
"processing": 1,
"completed_today": 12,
"failed_today": 0,
"vram_available": "1.5GB/4GB"
}
# Detalles de trabajo
GET /api/jobs/{job_id}
→ {
"id": "uuid-1234",
"status": "processing",
"progress": 45.2,
"current_stage": "translating",
"eta_seconds": 120,
"file_name": "anime_ep01.mkv"
}
# Historial
GET /api/jobs/history?limit=50
→ [ { job }, { job }, ... ]
# WebSocket updates
WS /ws
→ Stream continuo de updates
```
### WebUI
#### Dashboard
```
┌─────────────────────────────────────────────────────────┐
│ TranscriptorIO 🟢 │
├─────────────────────────────────────────────────────────┤
│ │
│ 📊 Stats │
│ ┌─────────┬──────────┬──────────┬─────────────────┐ │
│ │ Queue: 3│Processing│Completed │ VRAM: 2.8/4.0GB │ │
│ │ │ 1 │ Today │ │ │
│ │ │ │ 12 │ │ │
│ └─────────┴──────────┴──────────┴─────────────────┘ │
│ │
│ 🎬 Current Job │
│ ┌─────────────────────────────────────────────────┐ │
│ │ File: Anime_S01E05.mkv │ │
│ │ Stage: Translating segments │ │
│ │ Progress: ████████████░░░░░░ 65% │ │
│ │ ETA: 2m 15s │ │
│ │ Model: whisper-medium + helsinki-nlp │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 📋 Queue (3 pending) │
│ ┌──┬─────────────────────┬────────┬──────────────┐ │
│ │#1│Anime_S01E06.mkv │ Fast │ Priority: 0 │ │
│ │#2│Movie_2024.mkv │ Best │ Priority: 0 │ │
│ │#3│Show_S02E01.mkv │ Fast │ Priority: -1 │ │
│ └──┴─────────────────────┴────────┴──────────────┘ │
│ │
│ [+ Add Files] [⚙️ Settings] [📊 Stats] [📖 Logs] │
└─────────────────────────────────────────────────────────┘
```
#### Settings
```
┌─────────────────────────────────────────────────────────┐
│ Settings │
├─────────────────────────────────────────────────────────┤
│ │
│ 🎯 Default Quality Preset │
│ ○ Fast (4GB VRAM, ~3min/episode) │
│ Whisper medium + Helsinki-NLP │
│ Best for: GTX 1650, RTX 3050 │
│ │
│ ● Balanced (6GB VRAM, ~5min/episode) │
│ Whisper medium + M2M100 │
│ Best for: RTX 3060, RTX 4060 │
│ │
│ ○ Best (10GB+ VRAM, ~10min/episode) │
│ SeamlessM4T direct translation │
│ Best for: RTX 4070+, professional GPUs │
│ │
│ ⚡ Workers Configuration │
│ GPU Workers: [2] ▾ │
│ CPU Workers: [1] ▾ │
│ Concurrent jobs per worker: [1] ▾ │
│ │
│ 🌐 Default Languages │
│ Source: [Japanese ▾] Target: [Spanish ▾] │
│ │
│ 📁 Paths │
│ Watch folders: /media/anime │
│ /media/movies │
│ Output format: {filename}.{lang}.srt │
│ │
│ 🔔 Notifications │
│ ☑ Discord webhook on completion │
│ ☑ Email on failure │
│ │
│ [Save Changes] [Reset Defaults] │
└─────────────────────────────────────────────────────────┘
```
## Pipeline de transcripción detallado
### Flujo Fast Preset (ja→en→es)
```python
# 1. Extracción de audio (si es video)
ffprobe detecta pistas de audio
Selecciona pista japonesa
Extrae con ffmpeg (opcional, Whisper acepta video directo)
# 2. Whisper transcripción
WhisperModel("medium", compute_type="int8")
transcribe(audio, language="ja", task="translate")
Output: Segmentos con timestamps en INGLÉS
Ejemplo:
[0.00s -> 3.50s] "Hello, welcome to today's episode"
[3.50s -> 7.80s] "We're going to see something interesting"
# 3. Traducción en→es (batch)
Helsinki-NLP/opus-mt-en-es
Batch de 32 segmentos a la vez
Mantiene timestamps originales
Ejemplo:
[0.00s -> 3.50s] "Hola, bienvenido al episodio de hoy"
[3.50s -> 7.80s] "Vamos a ver algo interesante"
# 4. Generación SRT
Formato timestamps + texto
Guarda archivo.es.srt
# 5. Post-processing (opcional)
- Aeneas re-sync (ajuste fino de timestamps)
- Subtitle styling (ASS format)
- Quality check (detección de errores)
```
### Uso de VRAM esperado
**GTX 1650 (4GB VRAM)**:
```
Fast preset:
- Whisper medium INT8: ~2.5GB
- Helsinki-NLP: ~1GB
- Overhead sistema: ~0.5GB
Total: ~4GB ✅ Cabe perfecto
Tiempo: ~3-5 min por episodio 24min
```
**RTX 3060 (12GB VRAM)**:
```
Balanced preset:
- Whisper large-v3 INT8: ~5GB
- M2M100: ~2GB
- Overhead: ~1GB
Total: ~8GB ✅ Sobra espacio
Tiempo: ~4-7 min por episodio 24min
```
## Integración con Bazarr
### Custom Provider (asíncrono)
```python
# bazarr/libs/subliminal_patch/providers/transcriptorio.py
class TranscriptorIOProvider(Provider):
"""
Provider asíncrono para TranscriptorIO
A diferencia del provider Whisper original, NO bloquea
"""
provider_name = 'transcriptorio'
def download_subtitle(self, subtitle):
# Si es búsqueda automática → async (no bloquea)
if not subtitle.manual_search:
job_id = self._queue_job(subtitle)
raise SubtitlePending(
job_id=job_id,
eta=self._estimate_time(subtitle)
)
# Si es búsqueda manual → sync con long polling
return self._process_sync(subtitle, timeout=600)
def _queue_job(self, subtitle):
"""Encola trabajo sin esperar"""
response = requests.post(
f"{self.endpoint}/api/queue/add",
json={
"file": subtitle.video.name,
"source_lang": "ja",
"target_lang": "es",
"quality_preset": self.quality_preset,
"callback_url": self._get_callback_url(subtitle.id)
},
headers={"X-API-Key": self.api_key}
)
return response.json()["job_ids"][0]
# Background task en Bazarr (cada 30s)
@scheduler.scheduled_job('interval', seconds=30)
def poll_transcriptorio_jobs():
"""Revisar trabajos completados"""
pending = db.get_pending_transcriptorio_jobs()
for job in pending:
status = get_job_status(job.provider_job_id)
if status['status'] == 'completed':
save_subtitle(job.subtitle_id, status['srt_content'])
db.mark_completed(job.id)
```
### Ventajas vs provider Whisper original
| Feature | Whisper (original) | TranscriptorIO |
|---------|-------------------|----------------|
| Bloquea thread Bazarr | ✅ Sí (3-10min) | ❌ No (async) |
| Timeout 24h si tarda | ✅ Sí | ❌ No |
| Cola visible | ❌ No | ✅ Sí (WebUI) |
| Retry automático | ❌ No | ✅ Sí |
| Priorización | ❌ No | ✅ Sí |
| Múltiples GPUs | ❌ No | ✅ Sí |
| WebUI | ❌ No | ✅ Sí |
## Roadmap de desarrollo
### Fase 1: MVP Backend (2-3 semanas)
**Objetivos**:
- [ ] Queue manager con SQLite
- [ ] Worker pool básico
- [ ] Pipeline Fast (Whisper + Helsinki-NLP)
- [ ] API REST completa
- [ ] Endpoint legacy `/asr` compatible
**Entregables**:
- Backend funcional headless
- Docker Compose para testing
- Documentación API
### Fase 2: WebUI (2-3 semanas)
**Objetivos**:
- [ ] Dashboard con stats
- [ ] Queue viewer con drag&drop
- [ ] Job details con logs
- [ ] Settings page
- [ ] WebSocket integration
**Entregables**:
- WebUI completa y funcional
- Mobile responsive
- Tema dark/light
### Fase 3: Bazarr Integration (1-2 semanas)
**Objetivos**:
- [ ] Custom provider asíncrono
- [ ] Background polling task
- [ ] Callback webhook support
- [ ] Testing con Bazarr real
**Entregables**:
- Provider plugin para Bazarr
- Documentación integración
- PR al repo de Bazarr (si aceptan)
### Fase 4: Features Avanzados (3-4 semanas)
**Objetivos**:
- [ ] Pipeline Balanced (M2M100)
- [ ] Pipeline Best (SeamlessM4T)
- [ ] Batch operations (temporadas)
- [ ] Scanner automático (inotify)
- [ ] Post-processing (Aeneas sync)
- [ ] Notificaciones (Discord, email)
**Entregables**:
- Sistema completo production-ready
- Docs completas
- Tests automatizados
### Fase 5: Release & Community (ongoing)
**Objetivos**:
- [ ] Docker Hub releases
- [ ] GitHub Actions CI/CD
- [ ] Documentación completa
- [ ] Video tutoriales
- [ ] Anuncio en comunidades
**Canales**:
- /r/selfhosted
- /r/homelab
- Discord de Bazarr
- LinuxServer.io
## Métricas de éxito
**Técnicas**:
- ✅ Procesa episodio 24min en <5min (GTX 1650)
- Uso VRAM <4GB total
- Queue persiste entre reinicios
- API response time <100ms
- WebUI load time <2s
**UX**:
- Setup en <15min para usuario promedio
- Zero-config con defaults razonables
- WebUI intuitiva (no necesita docs)
**Comunidad**:
- 🎯 100 stars en primer mes
- 🎯 500 stars en 6 meses
- 🎯 10+ contributors
- 🎯 Featured en LinuxServer.io
## Diferenciadores clave
### vs SubGen
- WebUI moderna vs Sin UI
- Cola asíncrona vs Queue simple
- Múltiples presets vs Config manual
- Worker pool vs Single process
### vs Tdarr
- Específico para subtítulos vs 🔧 General transcoding
- Integración Bazarr nativa vs Solo webhooks
- Traducción multilingüe vs No traduce
### vs Whisper-ASR-Webservice
- Cola persistente vs Stateless
- WebUI vs Solo API
- Múltiples pipelines vs Solo Whisper
## Consideraciones técnicas
### Limitaciones conocidas
**Whisper**:
- Solo traduce a inglés (limitación del modelo)
- Necesita audio limpio (música de fondo degrada calidad)
- Nombres propios se traducen mal
- Honoríficos japoneses se pierden
**Traducción**:
- Helsinki-NLP a veces muy literal
- Expresiones idiomáticas se pierden
- Sin contexto entre segmentos
**Hardware**:
- GPU mínima: GTX 1050 Ti (4GB VRAM)
- Recomendada: RTX 3060 (12GB VRAM)
- CPU funciona pero 10x más lento
### Mitigaciones
**Mejorar calidad**:
- Usar Balanced/Best presets si hay VRAM
- Post-processing con Aeneas para mejor sync
- Manual review de nombres propios
- Context prompting en Whisper
**Optimizar velocidad**:
- Batch translation (32 segments)
- Cache de modelos en VRAM
- Pipeline paralelo (transcribe + traduce simultáneo)
## Stack de desarrollo
### Backend
```
Python 3.11+
FastAPI 0.100+
SQLite 3.40+
faster-whisper 1.0+
transformers 4.35+
torch 2.1+ (CUDA 12.x)
```
### Frontend
```
Node 20+
Vue 3.4+
Vite 5+
Tailwind CSS 3.4+
Socket.io-client 4.7+
Chart.js 4.4+
```
### DevOps
```
Docker 24+
Docker Compose 2.20+
GitHub Actions
Docker Hub
```
## Licencia
**Apache 2.0** (misma que SubGen)
Permite:
- Uso comercial
- Modificación
- Distribución
- Uso privado
Requiere:
- Incluir licencia y copyright
- Documentar cambios
## Contacto
- **GitHub**: `github.com/[tu-usuario]/transcriptorio`
- **Discord**: [crear servidor]
- **Email**: [configurar]
## Referencias
- SubGen original: https://github.com/McCloudS/subgen
- Bazarr: https://github.com/morpheus65535/bazarr
- Whisper: https://github.com/openai/whisper
- faster-whisper: https://github.com/guillaumekln/faster-whisper
- stable-ts: https://github.com/jianfch/stable-ts
- Tdarr: https://github.com/HaveAGitGat/Tdarr
---
**Última actualización**: 2026-01-11
**Versión**: 0.1.0-planning
**Estado**: En diseño

185
backend/README.md Normal file
View File

@@ -0,0 +1,185 @@
# TranscriptorIO Backend
This is the redesigned backend for TranscriptorIO, a complete fork of SubGen with modern asynchronous architecture.
## 🎯 Goal
Replace SubGen's synchronous non-persistent system with a modern Tdarr-inspired architecture:
- ✅ Persistent queue (SQLite/PostgreSQL/MariaDB)
- ✅ Asynchronous processing
- ✅ Job prioritization
- ✅ Complete state visibility
- ✅ No Bazarr timeouts
## 📁 Structure
```
backend/
├── core/
│ ├── database.py # Multi-backend database management
│ ├── models.py # SQLAlchemy models (Job, etc.)
│ ├── queue_manager.py # Asynchronous persistent queue
│ └── __init__.py
├── api/ # (coming soon) FastAPI endpoints
├── config.py # Centralized configuration with Pydantic
└── README.md # This file
```
## 🚀 Setup
### 1. Install dependencies
```bash
pip install -r requirements.txt
```
### 2. Configure .env
Copy `.env.example` to `.env` and adjust as needed:
```bash
cp .env.example .env
```
#### Database Options
**SQLite (default)**:
```env
DATABASE_URL=sqlite:///./transcriptarr.db
```
**PostgreSQL**:
```bash
pip install psycopg2-binary
```
```env
DATABASE_URL=postgresql://user:password@localhost:5432/transcriptarr
```
**MariaDB/MySQL**:
```bash
pip install pymysql
```
```env
DATABASE_URL=mariadb+pymysql://user:password@localhost:3306/transcriptarr
```
### 3. Choose operation mode
**Standalone Mode** (automatically scans your library):
```env
TRANSCRIPTARR_MODE=standalone
LIBRARY_PATHS=/media/anime|/media/movies
AUTO_SCAN_ENABLED=True
SCAN_INTERVAL_MINUTES=30
```
**Provider Mode** (receives jobs from Bazarr):
```env
TRANSCRIPTARR_MODE=provider
BAZARR_URL=http://bazarr:6767
BAZARR_API_KEY=your_api_key
```
**Hybrid Mode** (both simultaneously):
```env
TRANSCRIPTARR_MODE=standalone,provider
```
## 🧪 Testing
Run the test script to verify everything works:
```bash
python test_backend.py
```
This will verify:
- ✓ Configuration loading
- ✓ Database connection
- ✓ Table creation
- ✓ Queue operations (add, get, deduplicate)
## 📊 Implemented Components
### config.py
- Centralized configuration with Pydantic
- Automatic environment variable validation
- Multi-backend database support
- Operation mode configuration
### database.py
- Connection management with SQLAlchemy
- Support for SQLite, PostgreSQL, MariaDB
- Backend-specific optimizations
- SQLite: WAL mode, optimized cache
- PostgreSQL: connection pooling, pre-ping
- MariaDB: utf8mb4 charset, pooling
- Health checks and statistics
### models.py
- Complete `Job` model with:
- States: queued, processing, completed, failed, cancelled
- Stages: pending, detecting_language, transcribing, translating, etc.
- Quality presets: fast, balanced, best
- Progress tracking (0-100%)
- Complete timestamps
- Retry logic
- Worker assignment
- Optimized indexes for common queries
### queue_manager.py
- Thread-safe persistent queue
- Job prioritization
- Duplicate detection
- Automatic retry for failed jobs
- Real-time statistics
- Automatic cleanup of old jobs
## 🔄 Comparison with SubGen
| Feature | SubGen | TranscriptorIO |
|---------|--------|----------------|
| Queue | In-memory (lost on restart) | **Persistent in DB** |
| Processing | Synchronous (blocks threads) | **Asynchronous** |
| Prioritization | No | **Yes (configurable)** |
| Visibility | No progress/ETA | **Progress + real-time ETA** |
| Deduplication | Basic (memory only) | **Persistent + intelligent** |
| Retries | No | **Automatic with limit** |
| Database | No | **SQLite/PostgreSQL/MariaDB** |
| Bazarr Timeouts | Yes (>5min = 24h throttle) | **No (async)** |
## 📝 Next Steps
1. **Worker Pool** - Asynchronous worker system
2. **REST API** - FastAPI endpoints for management
3. **WebSocket** - Real-time updates
4. **Transcriber** - Whisper wrapper with progress callbacks
5. **Bazarr Provider** - Improved async provider
6. **Standalone Scanner** - Automatic library scanning
## 🐛 Troubleshooting
### Error: "No module named 'backend'"
Make sure to run scripts from the project root:
```bash
cd /home/dasemu/Hacking/Transcriptarr
python test_backend.py
```
### Error: Database locked (SQLite)
SQLite is configured with WAL mode for better concurrency. If you still have issues, consider using PostgreSQL for production.
### Error: pydantic.errors.ConfigError
Verify that all required variables are in your `.env`:
```bash
cp .env.example .env
# Edit .env with your values
```
## 📚 Documentation
See `CLAUDE.md` for complete architecture and project roadmap.

1
backend/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""TranscriptorIO Backend Package."""

1
backend/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""TranscriptorIO API Module."""

214
backend/config.py Normal file
View File

@@ -0,0 +1,214 @@
"""Configuration management for TranscriptorIO."""
import os
from enum import Enum
from typing import Optional, List
from pydantic_settings import BaseSettings
from pydantic import Field, field_validator
class OperationMode(str, Enum):
"""Operation modes for TranscriptorIO."""
STANDALONE = "standalone"
PROVIDER = "provider"
HYBRID = "standalone,provider"
class DatabaseType(str, Enum):
"""Supported database backends."""
SQLITE = "sqlite"
POSTGRESQL = "postgresql"
MARIADB = "mariadb"
MYSQL = "mysql"
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# === Application Mode ===
transcriptarr_mode: str = Field(
default="standalone",
description="Operation mode: standalone, provider, or standalone,provider"
)
# === Database Configuration ===
database_url: str = Field(
default="sqlite:///./transcriptarr.db",
description="Database connection URL. Examples:\n"
" SQLite: sqlite:///./transcriptarr.db\n"
" PostgreSQL: postgresql://user:pass@localhost/transcriptarr\n"
" MariaDB: mariadb+pymysql://user:pass@localhost/transcriptarr"
)
# === Worker Configuration ===
concurrent_transcriptions: int = Field(default=2, ge=1, le=10)
whisper_threads: int = Field(default=4, ge=1, le=32)
transcribe_device: str = Field(default="cpu", pattern="^(cpu|gpu|cuda)$")
clear_vram_on_complete: bool = Field(default=True)
# === Whisper Model Configuration ===
whisper_model: str = Field(
default="medium",
description="Whisper model: tiny, base, small, medium, large-v3, etc."
)
model_path: str = Field(default="./models")
compute_type: str = Field(default="auto")
# === Standalone Mode Configuration ===
library_paths: Optional[str] = Field(
default=None,
description="Pipe-separated paths to scan: /media/anime|/media/movies"
)
auto_scan_enabled: bool = Field(default=False)
scan_interval_minutes: int = Field(default=30, ge=1)
required_audio_language: Optional[str] = Field(
default=None,
description="Only process files with this audio language (ISO 639-2)"
)
required_missing_subtitle: Optional[str] = Field(
default=None,
description="Only process if this subtitle language is missing (ISO 639-2)"
)
skip_if_subtitle_exists: bool = Field(default=True)
# === Provider Mode Configuration ===
bazarr_url: Optional[str] = Field(default=None)
bazarr_api_key: Optional[str] = Field(default=None)
provider_timeout_seconds: int = Field(default=600, ge=60)
provider_callback_enabled: bool = Field(default=True)
provider_polling_interval: int = Field(default=30, ge=10)
# === API Configuration ===
webhook_port: int = Field(default=9000, ge=1024, le=65535)
api_host: str = Field(default="0.0.0.0")
debug: bool = Field(default=True)
# === Transcription Settings ===
transcribe_or_translate: str = Field(
default="transcribe",
pattern="^(transcribe|translate)$"
)
subtitle_language_name: str = Field(default="")
subtitle_language_naming_type: str = Field(
default="ISO_639_2_B",
description="Naming format: ISO_639_1, ISO_639_2_T, ISO_639_2_B, NAME, NATIVE"
)
word_level_highlight: bool = Field(default=False)
custom_regroup: str = Field(default="cm_sl=84_sl=42++++++1")
# === Skip Configuration ===
skip_if_external_subtitles_exist: bool = Field(default=False)
skip_if_target_subtitles_exist: bool = Field(default=True)
skip_if_internal_subtitles_language: Optional[str] = Field(default="eng")
skip_subtitle_languages: Optional[str] = Field(
default=None,
description="Pipe-separated language codes to skip: eng|spa"
)
skip_if_audio_languages: Optional[str] = Field(
default=None,
description="Skip if audio track is in these languages: eng|spa"
)
skip_unknown_language: bool = Field(default=False)
skip_only_subgen_subtitles: bool = Field(default=False)
# === Advanced Settings ===
force_detected_language_to: Optional[str] = Field(default=None)
detect_language_length: int = Field(default=30, ge=5)
detect_language_offset: int = Field(default=0, ge=0)
should_whisper_detect_audio_language: bool = Field(default=False)
preferred_audio_languages: str = Field(
default="eng",
description="Pipe-separated list in order of preference: eng|jpn"
)
# === Path Mapping ===
use_path_mapping: bool = Field(default=False)
path_mapping_from: str = Field(default="/tv")
path_mapping_to: str = Field(default="/Volumes/TV")
# === Legacy SubGen Compatibility ===
show_in_subname_subgen: bool = Field(default=True)
show_in_subname_model: bool = Field(default=True)
append: bool = Field(default=False)
lrc_for_audio_files: bool = Field(default=True)
@field_validator("transcriptarr_mode")
@classmethod
def validate_mode(cls, v: str) -> str:
"""Validate operation mode."""
valid_modes = {"standalone", "provider", "standalone,provider"}
if v not in valid_modes:
raise ValueError(f"Invalid mode: {v}. Must be one of: {valid_modes}")
return v
@field_validator("database_url")
@classmethod
def validate_database_url(cls, v: str) -> str:
"""Validate database URL format."""
valid_prefixes = ("sqlite://", "postgresql://", "mariadb+pymysql://", "mysql+pymysql://")
if not any(v.startswith(prefix) for prefix in valid_prefixes):
raise ValueError(
f"Invalid database URL. Must start with one of: {valid_prefixes}"
)
return v
@property
def database_type(self) -> DatabaseType:
"""Get the database type from the URL."""
if self.database_url.startswith("sqlite"):
return DatabaseType.SQLITE
elif self.database_url.startswith("postgresql"):
return DatabaseType.POSTGRESQL
elif "mariadb" in self.database_url:
return DatabaseType.MARIADB
elif "mysql" in self.database_url:
return DatabaseType.MYSQL
else:
raise ValueError(f"Unknown database type in URL: {self.database_url}")
@property
def is_standalone_mode(self) -> bool:
"""Check if standalone mode is enabled."""
return "standalone" in self.transcriptarr_mode
@property
def is_provider_mode(self) -> bool:
"""Check if provider mode is enabled."""
return "provider" in self.transcriptarr_mode
@property
def library_paths_list(self) -> List[str]:
"""Get library paths as a list."""
if not self.library_paths:
return []
return [p.strip() for p in self.library_paths.split("|") if p.strip()]
@property
def skip_subtitle_languages_list(self) -> List[str]:
"""Get skip subtitle languages as a list."""
if not self.skip_subtitle_languages:
return []
return [lang.strip() for lang in self.skip_subtitle_languages.split("|") if lang.strip()]
@property
def skip_audio_languages_list(self) -> List[str]:
"""Get skip audio languages as a list."""
if not self.skip_if_audio_languages:
return []
return [lang.strip() for lang in self.skip_if_audio_languages.split("|") if lang.strip()]
@property
def preferred_audio_languages_list(self) -> List[str]:
"""Get preferred audio languages as a list."""
return [lang.strip() for lang in self.preferred_audio_languages.split("|") if lang.strip()]
class Config:
"""Pydantic configuration."""
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
# Global settings instance
settings = Settings()

1
backend/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""TranscriptorIO Core Module."""

219
backend/core/database.py Normal file
View File

@@ -0,0 +1,219 @@
"""Database configuration and session management."""
import logging
from contextlib import contextmanager
from typing import Generator
from sqlalchemy import create_engine, event, Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import StaticPool, QueuePool
from backend.config import settings, DatabaseType
logger = logging.getLogger(__name__)
# Base class for all models
Base = declarative_base()
class Database:
"""Database manager supporting SQLite, PostgreSQL, and MariaDB."""
def __init__(self, auto_create_tables: bool = True):
"""
Initialize database engine and session maker.
Args:
auto_create_tables: If True, automatically create tables if they don't exist
"""
self.engine = self._create_engine()
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
logger.info(f"Database initialized: {settings.database_type.value}")
# Automatically create tables if they don't exist
if auto_create_tables:
self._ensure_tables_exist()
def _create_engine(self) -> Engine:
"""Create SQLAlchemy engine based on database type."""
connect_args = {}
poolclass = QueuePool
if settings.database_type == DatabaseType.SQLITE:
# SQLite-specific configuration
connect_args = {
"check_same_thread": False, # Allow multi-threaded access
"timeout": 30.0, # Wait up to 30s for lock
}
# Use StaticPool for SQLite to avoid connection issues
poolclass = StaticPool
# Enable WAL mode for better concurrency
engine = create_engine(
settings.database_url,
connect_args=connect_args,
poolclass=poolclass,
echo=settings.debug,
)
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
"""Enable SQLite optimizations."""
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA foreign_keys=ON")
cursor.execute("PRAGMA cache_size=-64000") # 64MB cache
cursor.close()
elif settings.database_type == DatabaseType.POSTGRESQL:
# PostgreSQL-specific configuration
try:
import psycopg2 # noqa: F401
except ImportError:
raise ImportError(
"PostgreSQL support requires psycopg2-binary.\n"
"Install it with: pip install psycopg2-binary"
)
engine = create_engine(
settings.database_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Verify connections before using
echo=settings.debug,
)
elif settings.database_type in (DatabaseType.MARIADB, DatabaseType.MYSQL):
# MariaDB/MySQL-specific configuration
try:
import pymysql # noqa: F401
except ImportError:
raise ImportError(
"MariaDB/MySQL support requires pymysql.\n"
"Install it with: pip install pymysql"
)
connect_args = {
"charset": "utf8mb4",
}
engine = create_engine(
settings.database_url,
connect_args=connect_args,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=settings.debug,
)
else:
raise ValueError(f"Unsupported database type: {settings.database_type}")
return engine
def _ensure_tables_exist(self):
"""Check if tables exist and create them if they don't."""
# Import models to register them with Base.metadata
from backend.core import models # noqa: F401
from sqlalchemy import inspect
inspector = inspect(self.engine)
existing_tables = inspector.get_table_names()
# Check if the main 'jobs' table exists
if 'jobs' not in existing_tables:
logger.info("Tables don't exist, creating them automatically...")
self.create_tables()
else:
logger.debug("Database tables already exist")
def create_tables(self):
"""Create all database tables."""
# Import models to register them with Base.metadata
from backend.core import models # noqa: F401
logger.info("Creating database tables...")
Base.metadata.create_all(bind=self.engine, checkfirst=True)
# Verify tables were actually created
from sqlalchemy import inspect
inspector = inspect(self.engine)
created_tables = inspector.get_table_names()
if 'jobs' in created_tables:
logger.info(f"Database tables created successfully: {created_tables}")
else:
logger.error(f"Failed to create tables. Existing tables: {created_tables}")
raise RuntimeError("Failed to create database tables")
def drop_tables(self):
"""Drop all database tables (use with caution!)."""
logger.warning("Dropping all database tables...")
Base.metadata.drop_all(bind=self.engine)
logger.info("Database tables dropped")
@contextmanager
def get_session(self) -> Generator[Session, None, None]:
"""
Get a database session as a context manager.
Usage:
with db.get_session() as session:
session.query(Job).all()
"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Database session error: {e}")
raise
finally:
session.close()
def get_db(self) -> Generator[Session, None, None]:
"""
Dependency for FastAPI endpoints.
Usage:
@app.get("/jobs")
def get_jobs(db: Session = Depends(database.get_db)):
return db.query(Job).all()
"""
session = self.SessionLocal()
try:
yield session
finally:
session.close()
def health_check(self) -> bool:
"""Check if database connection is healthy."""
try:
from sqlalchemy import text
with self.get_session() as session:
session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False
def get_stats(self) -> dict:
"""Get database statistics."""
stats = {
"type": settings.database_type.value,
"url": settings.database_url.split("@")[-1] if "@" in settings.database_url else settings.database_url,
"pool_size": getattr(self.engine.pool, "size", lambda: "N/A")(),
"pool_checked_in": getattr(self.engine.pool, "checkedin", lambda: 0)(),
"pool_checked_out": getattr(self.engine.pool, "checkedout", lambda: 0)(),
"pool_overflow": getattr(self.engine.pool, "overflow", lambda: 0)(),
}
return stats
# Global database instance
database = Database()

203
backend/core/models.py Normal file
View File

@@ -0,0 +1,203 @@
"""Database models for TranscriptorIO."""
import uuid
from datetime import datetime
from enum import Enum
from typing import Optional
from sqlalchemy import (
Column, String, Integer, Float, DateTime, Text, Boolean, Enum as SQLEnum, Index
)
from sqlalchemy.sql import func
from backend.core.database import Base
class JobStatus(str, Enum):
"""Job status states."""
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class JobStage(str, Enum):
"""Job processing stages."""
PENDING = "pending"
DETECTING_LANGUAGE = "detecting_language"
EXTRACTING_AUDIO = "extracting_audio"
TRANSCRIBING = "transcribing"
TRANSLATING = "translating"
GENERATING_SUBTITLES = "generating_subtitles"
POST_PROCESSING = "post_processing"
FINALIZING = "finalizing"
class QualityPreset(str, Enum):
"""Quality presets for transcription."""
FAST = "fast" # ja→en→es with Helsinki-NLP (4GB VRAM)
BALANCED = "balanced" # ja→ja→es with M2M100 (6GB VRAM)
BEST = "best" # ja→es direct with SeamlessM4T (10GB+ VRAM)
class Job(Base):
"""Job model representing a transcription task."""
__tablename__ = "jobs"
# Primary identification
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
file_path = Column(String(1024), nullable=False, index=True)
file_name = Column(String(512), nullable=False)
# Job status
status = Column(
SQLEnum(JobStatus),
nullable=False,
default=JobStatus.QUEUED,
index=True
)
priority = Column(Integer, nullable=False, default=0, index=True)
# Configuration
source_lang = Column(String(10), nullable=True)
target_lang = Column(String(10), nullable=True)
quality_preset = Column(
SQLEnum(QualityPreset),
nullable=False,
default=QualityPreset.FAST
)
transcribe_or_translate = Column(String(20), nullable=False, default="transcribe")
# Progress tracking
progress = Column(Float, nullable=False, default=0.0) # 0-100
current_stage = Column(
SQLEnum(JobStage),
nullable=False,
default=JobStage.PENDING
)
eta_seconds = Column(Integer, nullable=True)
# Timestamps
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False, index=True)
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Results
output_path = Column(String(1024), nullable=True)
srt_content = Column(Text, nullable=True)
segments_count = Column(Integer, nullable=True)
# Error handling
error = Column(Text, nullable=True)
retry_count = Column(Integer, nullable=False, default=0)
max_retries = Column(Integer, nullable=False, default=3)
# Worker information
worker_id = Column(String(64), nullable=True)
vram_used_mb = Column(Integer, nullable=True)
processing_time_seconds = Column(Float, nullable=True)
# Provider mode specific
bazarr_callback_url = Column(String(512), nullable=True)
is_manual_request = Column(Boolean, nullable=False, default=False)
# Additional metadata
model_used = Column(String(64), nullable=True)
device_used = Column(String(32), nullable=True)
compute_type = Column(String(32), nullable=True)
def __repr__(self):
"""String representation of Job."""
return f"<Job {self.id[:8]}... {self.file_name} [{self.status.value}] {self.progress:.1f}%>"
@property
def duration_seconds(self) -> Optional[float]:
"""Calculate job duration in seconds."""
if self.started_at and self.completed_at:
delta = self.completed_at - self.started_at
return delta.total_seconds()
return None
@property
def is_terminal_state(self) -> bool:
"""Check if job is in a terminal state (completed/failed/cancelled)."""
return self.status in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED)
@property
def can_retry(self) -> bool:
"""Check if job can be retried."""
return self.status == JobStatus.FAILED and self.retry_count < self.max_retries
def to_dict(self) -> dict:
"""Convert job to dictionary for API responses."""
return {
"id": self.id,
"file_path": self.file_path,
"file_name": self.file_name,
"status": self.status.value,
"priority": self.priority,
"source_lang": self.source_lang,
"target_lang": self.target_lang,
"quality_preset": self.quality_preset.value if self.quality_preset else None,
"transcribe_or_translate": self.transcribe_or_translate,
"progress": self.progress,
"current_stage": self.current_stage.value if self.current_stage else None,
"eta_seconds": self.eta_seconds,
"created_at": self.created_at.isoformat() if self.created_at else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"output_path": self.output_path,
"segments_count": self.segments_count,
"error": self.error,
"retry_count": self.retry_count,
"worker_id": self.worker_id,
"vram_used_mb": self.vram_used_mb,
"processing_time_seconds": self.processing_time_seconds,
"model_used": self.model_used,
"device_used": self.device_used,
}
def update_progress(self, progress: float, stage: JobStage, eta_seconds: Optional[int] = None):
"""Update job progress."""
self.progress = min(100.0, max(0.0, progress))
self.current_stage = stage
if eta_seconds is not None:
self.eta_seconds = eta_seconds
def mark_started(self, worker_id: str):
"""Mark job as started."""
self.status = JobStatus.PROCESSING
self.started_at = datetime.utcnow()
self.worker_id = worker_id
def mark_completed(self, output_path: str, segments_count: int, srt_content: Optional[str] = None):
"""Mark job as completed."""
self.status = JobStatus.COMPLETED
self.completed_at = datetime.utcnow()
self.output_path = output_path
self.segments_count = segments_count
self.srt_content = srt_content
self.progress = 100.0
self.current_stage = JobStage.FINALIZING
if self.started_at:
self.processing_time_seconds = (self.completed_at - self.started_at).total_seconds()
def mark_failed(self, error: str):
"""Mark job as failed."""
self.status = JobStatus.FAILED
self.completed_at = datetime.utcnow()
self.error = error
self.retry_count += 1
def mark_cancelled(self):
"""Mark job as cancelled."""
self.status = JobStatus.CANCELLED
self.completed_at = datetime.utcnow()
# Create indexes for common queries
Index('idx_jobs_status_priority', Job.status, Job.priority.desc(), Job.created_at)
Index('idx_jobs_created', Job.created_at.desc())
Index('idx_jobs_file_path', Job.file_path)

View File

@@ -0,0 +1,394 @@
"""Queue manager for persistent job queuing."""
import logging
from datetime import datetime, timedelta
from typing import List, Optional, Dict
from sqlalchemy import and_, or_
from sqlalchemy.orm import Session
from backend.core.database import database
from backend.core.models import Job, JobStatus, JobStage, QualityPreset
logger = logging.getLogger(__name__)
class QueueManager:
"""
Persistent queue manager for transcription jobs.
Replaces the old DeduplicatedQueue with a database-backed solution that:
- Persists jobs across restarts
- Supports priority queuing
- Prevents duplicate jobs
- Provides visibility into queue state
- Thread-safe operations
"""
def __init__(self):
"""Initialize queue manager."""
self.db = database
logger.info("QueueManager initialized")
def add_job(
self,
file_path: str,
file_name: str,
source_lang: Optional[str] = None,
target_lang: Optional[str] = None,
quality_preset: QualityPreset = QualityPreset.FAST,
transcribe_or_translate: str = "transcribe",
priority: int = 0,
bazarr_callback_url: Optional[str] = None,
is_manual_request: bool = False,
) -> Optional[Job]:
"""
Add a new job to the queue.
Args:
file_path: Full path to the media file
file_name: Name of the file
source_lang: Source language code (ISO 639-2)
target_lang: Target language code (ISO 639-2)
quality_preset: Quality preset (fast/balanced/best)
transcribe_or_translate: Operation type
priority: Job priority (higher = processed first)
bazarr_callback_url: Callback URL for Bazarr provider mode
is_manual_request: Whether this is a manual request (higher priority)
Returns:
Job object if created, None if duplicate exists
"""
with self.db.get_session() as session:
# Check for existing job
existing = self._find_existing_job(session, file_path, target_lang)
if existing:
logger.info(f"Job already exists for {file_name}: {existing.id} [{existing.status.value}]")
# If existing job failed and can retry, reset it
if existing.can_retry:
logger.info(f"Resetting failed job {existing.id} for retry")
existing.status = JobStatus.QUEUED
existing.error = None
existing.current_stage = JobStage.PENDING
existing.progress = 0.0
session.commit()
return existing
return None
# Create new job
job = Job(
file_path=file_path,
file_name=file_name,
source_lang=source_lang,
target_lang=target_lang,
quality_preset=quality_preset,
transcribe_or_translate=transcribe_or_translate,
priority=priority + (10 if is_manual_request else 0), # Boost manual requests
bazarr_callback_url=bazarr_callback_url,
is_manual_request=is_manual_request,
)
session.add(job)
session.commit()
# Access all attributes before session closes to ensure they're loaded
job_id = job.id
job_status = job.status
logger.info(
f"Job {job_id} added to queue: {file_name} "
f"[{quality_preset.value}] priority={job.priority}"
)
# Re-query the job in a new session to return a fresh copy
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if job:
session.expunge(job) # Remove from session so it doesn't expire
return job
def get_next_job(self, worker_id: str) -> Optional[Job]:
"""
Get the next job from the queue for processing.
Jobs are selected based on:
1. Status = QUEUED
2. Priority (DESC)
3. Created time (ASC) - FIFO within same priority
Args:
worker_id: ID of the worker requesting the job
Returns:
Job object or None if queue is empty
"""
with self.db.get_session() as session:
job = (
session.query(Job)
.filter(Job.status == JobStatus.QUEUED)
.order_by(
Job.priority.desc(),
Job.created_at.asc()
)
.with_for_update(skip_locked=True) # Skip locked rows (concurrent workers)
.first()
)
if job:
job_id = job.id
job.mark_started(worker_id)
session.commit()
logger.info(f"Job {job_id} assigned to worker {worker_id}")
# Re-query the job if found
if job:
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if job:
session.expunge(job) # Remove from session so it doesn't expire
return job
return None
def get_job_by_id(self, job_id: str) -> Optional[Job]:
"""Get a specific job by ID."""
with self.db.get_session() as session:
return session.query(Job).filter(Job.id == job_id).first()
def update_job_progress(
self,
job_id: str,
progress: float,
stage: JobStage,
eta_seconds: Optional[int] = None
) -> bool:
"""
Update job progress.
Args:
job_id: Job ID
progress: Progress percentage (0-100)
stage: Current processing stage
eta_seconds: Estimated time to completion
Returns:
True if updated successfully, False otherwise
"""
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if not job:
logger.warning(f"Job {job_id} not found for progress update")
return False
job.update_progress(progress, stage, eta_seconds)
session.commit()
logger.debug(
f"Job {job_id} progress: {progress:.1f}% [{stage.value}] ETA: {eta_seconds}s"
)
return True
def mark_job_completed(
self,
job_id: str,
output_path: str,
segments_count: int,
srt_content: Optional[str] = None
) -> bool:
"""Mark a job as completed."""
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if not job:
logger.warning(f"Job {job_id} not found for completion")
return False
job.mark_completed(output_path, segments_count, srt_content)
session.commit()
logger.info(
f"Job {job_id} completed: {output_path} "
f"({segments_count} segments, {job.processing_time_seconds:.1f}s)"
)
return True
def mark_job_failed(self, job_id: str, error: str) -> bool:
"""Mark a job as failed."""
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if not job:
logger.warning(f"Job {job_id} not found for failure marking")
return False
job.mark_failed(error)
session.commit()
logger.error(
f"Job {job_id} failed (attempt {job.retry_count}/{job.max_retries}): {error}"
)
return True
def cancel_job(self, job_id: str) -> bool:
"""Cancel a queued or processing job."""
with self.db.get_session() as session:
job = session.query(Job).filter(Job.id == job_id).first()
if not job:
logger.warning(f"Job {job_id} not found for cancellation")
return False
if job.is_terminal_state:
logger.warning(f"Job {job_id} is already in terminal state: {job.status.value}")
return False
job.mark_cancelled()
session.commit()
logger.info(f"Job {job_id} cancelled")
return True
def get_queue_stats(self) -> Dict:
"""Get queue statistics."""
with self.db.get_session() as session:
total = session.query(Job).count()
queued = session.query(Job).filter(Job.status == JobStatus.QUEUED).count()
processing = session.query(Job).filter(Job.status == JobStatus.PROCESSING).count()
completed = session.query(Job).filter(Job.status == JobStatus.COMPLETED).count()
failed = session.query(Job).filter(Job.status == JobStatus.FAILED).count()
# Get today's stats
today = datetime.utcnow().date()
completed_today = (
session.query(Job)
.filter(
Job.status == JobStatus.COMPLETED,
Job.completed_at >= today
)
.count()
)
failed_today = (
session.query(Job)
.filter(
Job.status == JobStatus.FAILED,
Job.completed_at >= today
)
.count()
)
return {
"total": total,
"queued": queued,
"processing": processing,
"completed": completed,
"failed": failed,
"completed_today": completed_today,
"failed_today": failed_today,
}
def get_jobs(
self,
status: Optional[JobStatus] = None,
limit: int = 50,
offset: int = 0
) -> List[Job]:
"""
Get jobs with optional filtering.
Args:
status: Filter by status
limit: Maximum number of jobs to return
offset: Offset for pagination
Returns:
List of Job objects
"""
with self.db.get_session() as session:
query = session.query(Job)
if status:
query = query.filter(Job.status == status)
jobs = (
query
.order_by(Job.created_at.desc())
.limit(limit)
.offset(offset)
.all()
)
return jobs
def get_processing_jobs(self) -> List[Job]:
"""Get all currently processing jobs."""
return self.get_jobs(status=JobStatus.PROCESSING)
def get_queued_jobs(self) -> List[Job]:
"""Get all queued jobs."""
return self.get_jobs(status=JobStatus.QUEUED)
def is_queue_empty(self) -> bool:
"""Check if queue has any pending jobs."""
with self.db.get_session() as session:
count = (
session.query(Job)
.filter(Job.status.in_([JobStatus.QUEUED, JobStatus.PROCESSING]))
.count()
)
return count == 0
def cleanup_old_jobs(self, days: int = 30) -> int:
"""
Delete completed/failed jobs older than specified days.
Args:
days: Number of days to keep jobs
Returns:
Number of jobs deleted
"""
with self.db.get_session() as session:
cutoff_date = datetime.utcnow() - timedelta(days=days)
deleted = (
session.query(Job)
.filter(
Job.status.in_([JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]),
Job.completed_at < cutoff_date
)
.delete()
)
session.commit()
if deleted > 0:
logger.info(f"Cleaned up {deleted} old jobs (older than {days} days)")
return deleted
def _find_existing_job(
self,
session: Session,
file_path: str,
target_lang: Optional[str]
) -> Optional[Job]:
"""
Find existing job for the same file and target language.
Ignores completed jobs - allows re-transcription.
"""
query = session.query(Job).filter(
Job.file_path == file_path,
Job.status.in_([JobStatus.QUEUED, JobStatus.PROCESSING])
)
if target_lang:
query = query.filter(Job.target_lang == target_lang)
return query.first()
# Global queue manager instance
queue_manager = QueueManager()

285
backend/core/worker.py Normal file
View File

@@ -0,0 +1,285 @@
"""Individual worker for processing transcription jobs."""
import logging
import multiprocessing as mp
import time
import traceback
from datetime import datetime
from enum import Enum
from typing import Optional
from backend.core.database import Database
from backend.core.models import Job, JobStatus, JobStage
from backend.core.queue_manager import QueueManager
logger = logging.getLogger(__name__)
class WorkerType(str, Enum):
"""Worker device type."""
CPU = "cpu"
GPU = "gpu"
class WorkerStatus(str, Enum):
"""Worker status states."""
IDLE = "idle"
BUSY = "busy"
STOPPING = "stopping"
STOPPED = "stopped"
ERROR = "error"
class Worker:
"""
Individual worker process for transcription.
Each worker runs in its own process and can handle one job at a time.
Workers communicate with the main process via multiprocessing primitives.
"""
def __init__(
self,
worker_id: str,
worker_type: WorkerType,
device_id: Optional[int] = None
):
"""
Initialize worker.
Args:
worker_id: Unique identifier for this worker
worker_type: CPU or GPU
device_id: GPU device ID (only for GPU workers)
"""
self.worker_id = worker_id
self.worker_type = worker_type
self.device_id = device_id
# Multiprocessing primitives
self.process: Optional[mp.Process] = None
self.stop_event = mp.Event()
self.status = mp.Value('i', WorkerStatus.IDLE.value) # type: ignore
self.current_job_id = mp.Array('c', 36) # type: ignore # UUID string
# Stats
self.jobs_completed = mp.Value('i', 0) # type: ignore
self.jobs_failed = mp.Value('i', 0) # type: ignore
self.started_at: Optional[datetime] = None
def start(self):
"""Start the worker process."""
if self.process and self.process.is_alive():
logger.warning(f"Worker {self.worker_id} is already running")
return
self.stop_event.clear()
self.process = mp.Process(
target=self._worker_loop,
name=f"Worker-{self.worker_id}",
daemon=True
)
self.process.start()
self.started_at = datetime.utcnow()
logger.info(
f"Worker {self.worker_id} started (PID: {self.process.pid}, "
f"Type: {self.worker_type.value})"
)
def stop(self, timeout: float = 30.0):
"""
Stop the worker process gracefully.
Args:
timeout: Maximum time to wait for worker to stop
"""
if not self.process or not self.process.is_alive():
logger.warning(f"Worker {self.worker_id} is not running")
return
logger.info(f"Stopping worker {self.worker_id}...")
self.stop_event.set()
self.process.join(timeout=timeout)
if self.process.is_alive():
logger.warning(f"Worker {self.worker_id} did not stop gracefully, terminating...")
self.process.terminate()
self.process.join(timeout=5.0)
if self.process.is_alive():
logger.error(f"Worker {self.worker_id} did not terminate, killing...")
self.process.kill()
logger.info(f"Worker {self.worker_id} stopped")
def is_alive(self) -> bool:
"""Check if worker process is alive."""
return self.process is not None and self.process.is_alive()
def get_status(self) -> dict:
"""Get worker status information."""
status_value = self.status.value
status_enum = WorkerStatus.IDLE
for s in WorkerStatus:
if s.value == status_value:
status_enum = s
break
current_job = self.current_job_id.value.decode('utf-8').strip('\x00')
return {
"worker_id": self.worker_id,
"type": self.worker_type.value,
"device_id": self.device_id,
"status": status_enum.value,
"current_job_id": current_job if current_job else None,
"jobs_completed": self.jobs_completed.value,
"jobs_failed": self.jobs_failed.value,
"is_alive": self.is_alive(),
"pid": self.process.pid if self.process else None,
"started_at": self.started_at.isoformat() if self.started_at else None,
}
def _worker_loop(self):
"""
Main worker loop (runs in separate process).
This is the entry point for the worker process.
"""
# Set up logging in the worker process
logging.basicConfig(
level=logging.INFO,
format=f'[Worker-{self.worker_id}] %(levelname)s: %(message)s'
)
logger.info(f"Worker {self.worker_id} loop started")
# Initialize database and queue manager in worker process
# Each process needs its own DB connection
try:
db = Database(auto_create_tables=False)
queue_mgr = QueueManager()
except Exception as e:
logger.error(f"Failed to initialize worker: {e}")
self._set_status(WorkerStatus.ERROR)
return
# Main work loop
while not self.stop_event.is_set():
try:
# Try to get next job from queue
job = queue_mgr.get_next_job(self.worker_id)
if job is None:
# No jobs available, idle for a bit
self._set_status(WorkerStatus.IDLE)
time.sleep(2)
continue
# Process the job
self._set_status(WorkerStatus.BUSY)
self._set_current_job(job.id)
logger.info(f"Processing job {job.id}: {job.file_name}")
try:
self._process_job(job, queue_mgr)
self.jobs_completed.value += 1
logger.info(f"Job {job.id} completed successfully")
except Exception as e:
self.jobs_failed.value += 1
error_msg = f"Job processing failed: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
queue_mgr.mark_job_failed(job.id, error_msg)
finally:
self._clear_current_job()
except Exception as e:
logger.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
time.sleep(5) # Back off on errors
self._set_status(WorkerStatus.STOPPED)
logger.info(f"Worker {self.worker_id} loop ended")
def _process_job(self, job: Job, queue_mgr: QueueManager):
"""
Process a single transcription job.
Args:
job: Job to process
queue_mgr: Queue manager for updating progress
"""
# TODO: This will be implemented when we add the transcriber module
# For now, simulate work
# Stage 1: Detect language
queue_mgr.update_job_progress(
job.id,
progress=10.0,
stage=JobStage.DETECTING_LANGUAGE,
eta_seconds=60
)
time.sleep(2) # Simulate work
# Stage 2: Extract audio
queue_mgr.update_job_progress(
job.id,
progress=20.0,
stage=JobStage.EXTRACTING_AUDIO,
eta_seconds=50
)
time.sleep(2)
# Stage 3: Transcribe
queue_mgr.update_job_progress(
job.id,
progress=30.0,
stage=JobStage.TRANSCRIBING,
eta_seconds=40
)
# Simulate progressive transcription
for i in range(30, 90, 10):
time.sleep(1)
queue_mgr.update_job_progress(
job.id,
progress=float(i),
stage=JobStage.TRANSCRIBING,
eta_seconds=int((100 - i) / 2)
)
# Stage 4: Finalize
queue_mgr.update_job_progress(
job.id,
progress=95.0,
stage=JobStage.FINALIZING,
eta_seconds=5
)
time.sleep(1)
# Mark as completed
output_path = job.file_path.replace('.mkv', '.srt')
queue_mgr.mark_job_completed(
job.id,
output_path=output_path,
segments_count=100, # Simulated
srt_content="Simulated SRT content"
)
def _set_status(self, status: WorkerStatus):
"""Set worker status (thread-safe)."""
self.status.value = status.value
def _set_current_job(self, job_id: str):
"""Set current job ID (thread-safe)."""
job_id_bytes = job_id.encode('utf-8')
for i, byte in enumerate(job_id_bytes):
if i < len(self.current_job_id):
self.current_job_id[i] = byte
def _clear_current_job(self):
"""Clear current job ID (thread-safe)."""
for i in range(len(self.current_job_id)):
self.current_job_id[i] = b'\x00'

View File

@@ -42,7 +42,7 @@ def prompt_and_save_bazarr_env_variables():
print(instructions)
env_vars = {
'WHISPER_MODEL': ('Whisper Model', 'Enter the Whisper model you want to run: tiny, tiny.en, base, base.en, small, small.en, medium, medium.en, large, distil-large-v2, distil-medium.en, distil-small.en', 'medium'),
'WEBHOOKPORT': ('Webhook Port', 'Default listening port for subgen.py', '9000'),
'WEBHOOKPORT': ('Webhook Port', 'Default listening port for transcriptarr.py', '9000'),
'TRANSCRIBE_DEVICE': ('Transcribe Device', 'Set as cpu or gpu', 'gpu'),
# Defaulting to False here for the prompt, user can change
'DEBUG': ('Debug', 'Enable debug logging (true/false)', 'False'),
@@ -51,13 +51,13 @@ def prompt_and_save_bazarr_env_variables():
}
user_input = {}
with open('subgen.env', 'w') as file:
with open('.env', 'w') as file:
for var, (description, prompt, default) in env_vars.items():
value = input(f"{prompt} [{default}]: ") or default
file.write(f"{var}={value}\n")
print("Environment variables have been saved to subgen.env")
print("Environment variables have been saved to .env")
def load_env_variables(env_filename='subgen.env'):
def load_env_variables(env_filename='.env'):
try:
with open(env_filename, 'r') as file:
for line in file:
@@ -93,7 +93,7 @@ def main():
# Changed: action='store_true'
parser.add_argument('-a', '--append', action='store_true', help="Append 'Transcribed by whisper' (overrides .env and external ENV)")
parser.add_argument('-u', '--update', action='store_true', help="Update Subgen")
parser.add_argument('-x', '--exit-early', action='store_true', help="Exit without running subgen.py")
parser.add_argument('-x', '--exit-early', action='store_true', help="Exit without running transcriptarr.py")
parser.add_argument('-s', '--setup-bazarr', action='store_true', help="Prompt for common Bazarr setup parameters and save them for future runs")
parser.add_argument('-b', '--branch', type=str, default='main', help='Specify the branch to download from')
parser.add_argument('-l', '--launcher-update', action='store_true', help="Update launcher.py and re-launch")
@@ -126,7 +126,7 @@ def main():
# After saving, load them immediately for this run
load_env_variables()
else:
# Load if not setting up, assuming subgen.env might exist
# Load if not setting up, assuming .env might exist
load_env_variables()
@@ -157,7 +157,7 @@ def main():
if not os.path.exists(subgen_script_to_run) or args.update or convert_to_bool(os.getenv('UPDATE')):
print(f"Downloading {subgen_script_to_run} from GitHub branch {branch_name}...")
download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/subgen.py", subgen_script_to_run)
download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/transcriptarr.py", subgen_script_to_run)
print(f"Downloading {language_code_script_to_download} from GitHub branch {branch_name}...")
download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/language_code.py", language_code_script_to_download)
@@ -165,8 +165,8 @@ def main():
print(f"{subgen_script_to_run} exists and UPDATE is set to False, skipping download.")
if not args.exit_early:
#print(f"DEBUG environment variable for subgen.py: {os.getenv('DEBUG')}")
#print(f"APPEND environment variable for subgen.py: {os.getenv('APPEND')}")
#print(f"DEBUG environment variable for transcriptarr.py: {os.getenv('DEBUG')}")
#print(f"APPEND environment variable for transcriptarr.py: {os.getenv('APPEND')}")
print(f'Launching {subgen_script_to_run}')
try:
subprocess.run([python_cmd, '-u', subgen_script_to_run], check=True)
@@ -176,7 +176,7 @@ def main():
print(f"Error running {subgen_script_to_run}: {e}")
else:
print("Not running subgen.py: -x or --exit-early set")
print("Not running transcriptarr.py: -x or --exit-early set")
if __name__ == "__main__":
main()

View File

@@ -1,10 +1,33 @@
numpy
stable-ts
# Core dependencies
fastapi
requests
faster-whisper
uvicorn
uvicorn[standard]
python-multipart
requests
python-dotenv>=1.0.0
# Database & ORM (SQLite is built-in)
sqlalchemy>=2.0.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
# Media processing (CPU-only by default)
numpy
ffmpeg-python
whisper
watchdog
# Optional dependencies (install based on configuration):
#
# For PostgreSQL database:
# pip install psycopg2-binary
#
# For MariaDB/MySQL database:
# pip install pymysql
#
# For Whisper transcription:
# pip install openai-whisper faster-whisper stable-ts
#
# For GPU support (NVIDIA):
# pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
#
# For media file handling:
# pip install av>=10.0.0

View File

@@ -1,6 +0,0 @@
WHISPER_MODEL=medium
WEBHOOKPORT=9000
TRANSCRIBE_DEVICE=gpu
DEBUG=True
CLEAR_VRAM_ON_COMPLETE=False
APPEND=False

163
test_backend.py Executable file
View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Test script for TranscriptorIO backend components."""
import sys
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_config():
"""Test configuration loading."""
logger.info("Testing configuration...")
try:
from backend.config import settings
logger.info(f"✓ Config loaded successfully")
logger.info(f" - Mode: {settings.transcriptarr_mode}")
logger.info(f" - Database: {settings.database_type.value}")
logger.info(f" - Whisper Model: {settings.whisper_model}")
logger.info(f" - Device: {settings.transcribe_device}")
return True
except Exception as e:
logger.error(f"✗ Config test failed: {e}")
return False
def test_database():
"""Test database connection and table creation."""
logger.info("\nTesting database...")
try:
from backend.core.database import database
from backend.core.models import Base
# Clean database for fresh test
try:
database.drop_tables()
logger.info(f" - Dropped existing tables for clean test")
except:
pass
database.create_tables()
logger.info(f"✓ Database initialized with fresh tables")
# Test connection with health check
if database.health_check():
logger.info(f"✓ Database connection OK")
else:
logger.error("✗ Database health check failed (but tables were created)")
# Don't fail the test if health check fails but tables exist
return True
# Get stats
stats = database.get_stats()
logger.info(f" - Type: {stats['type']}")
logger.info(f" - URL: {stats['url']}")
return True
except Exception as e:
logger.error(f"✗ Database test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_queue_manager():
"""Test queue manager operations."""
logger.info("\nTesting queue manager...")
try:
from backend.core.queue_manager import queue_manager
from backend.core.models import QualityPreset
# Add a test job
job = queue_manager.add_job(
file_path="/test/anime.mkv",
file_name="anime.mkv",
source_lang="ja",
target_lang="es",
quality_preset=QualityPreset.FAST,
priority=5
)
if job:
logger.info(f"✓ Job created: {job.id}")
logger.info(f" - File: {job.file_name}")
logger.info(f" - Status: {job.status.value}")
logger.info(f" - Priority: {job.priority}")
else:
logger.error("✗ Failed to create job")
return False
# Get queue stats
stats = queue_manager.get_queue_stats()
logger.info(f"✓ Queue stats:")
logger.info(f" - Total: {stats['total']}")
logger.info(f" - Queued: {stats['queued']}")
logger.info(f" - Processing: {stats['processing']}")
logger.info(f" - Completed: {stats['completed']}")
# Try to add duplicate
duplicate = queue_manager.add_job(
file_path="/test/anime.mkv",
file_name="anime.mkv",
source_lang="ja",
target_lang="es",
quality_preset=QualityPreset.FAST
)
if duplicate is None:
logger.info(f"✓ Duplicate detection working")
else:
logger.warning(f"⚠ Duplicate job was created (should have been rejected)")
# Get next job
next_job = queue_manager.get_next_job("test-worker-1")
if next_job:
logger.info(f"✓ Got next job: {next_job.id} (assigned to test-worker-1)")
logger.info(f" - Status: {next_job.status.value}")
else:
logger.error("✗ Failed to get next job")
return False
return True
except Exception as e:
logger.error(f"✗ Queue manager test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all tests."""
logger.info("=" * 60)
logger.info("TranscriptorIO Backend Test Suite")
logger.info("=" * 60)
results = {
"Config": test_config(),
"Database": test_database(),
"Queue Manager": test_queue_manager(),
}
logger.info("\n" + "=" * 60)
logger.info("Test Results:")
logger.info("=" * 60)
all_passed = True
for test_name, passed in results.items():
status = "✓ PASSED" if passed else "✗ FAILED"
logger.info(f"{test_name}: {status}")
if not passed:
all_passed = False
logger.info("=" * 60)
if all_passed:
logger.info("🎉 All tests passed!")
return 0
else:
logger.error("❌ Some tests failed")
return 1
if __name__ == "__main__":
sys.exit(main())