Muisje/issue143 (#144)

* * Handle multiple audio tracks
* Improved skipping

* Added language code class for more robustness and flexibility

* extract audio to bytes instead of file in case of multiple audio
This commit is contained in:
McCloudS
2024-11-30 19:23:42 +01:00
parent b1e2bb2fdf
commit 688719925b
3 changed files with 627 additions and 171 deletions

2
.gitignore vendored
View File

@@ -8,3 +8,5 @@
#ignore our settings
subgen.env
models/

191
language_code.py Normal file
View File

@@ -0,0 +1,191 @@
from enum import Enum
class LanguageCode(Enum):
# ISO 639-1, ISO 639-2/T, ISO 639-2/B, English Name, Native Name
AFRIKAANS = ("af", "afr", "afr", "Afrikaans", "Afrikaans")
AMHARIC = ("am", "amh", "amh", "Amharic", "አማርኛ")
ARABIC = ("ar", "ara", "ara", "Arabic", "العربية")
ASSAMESE = ("as", "asm", "asm", "Assamese", "অসমীয়া")
AZERBAIJANI = ("az", "aze", "aze", "Azerbaijani", "Azərbaycanca")
BASHKIR = ("ba", "bak", "bak", "Bashkir", "Башҡортса")
BELARUSIAN = ("be", "bel", "bel", "Belarusian", "Беларуская")
BULGARIAN = ("bg", "bul", "bul", "Bulgarian", "Български")
BENGALI = ("bn", "ben", "ben", "Bengali", "বাংলা")
TIBETAN = ("bo", "bod", "tib", "Tibetan", "བོད་ཡིག")
BRETON = ("br", "bre", "bre", "Breton", "Brezhoneg")
BOSNIAN = ("bs", "bos", "bos", "Bosnian", "Bosanski")
CATALAN = ("ca", "cat", "cat", "Catalan", "Català")
CZECH = ("cs", "ces", "cze", "Czech", "Čeština")
WELSH = ("cy", "cym", "wel", "Welsh", "Cymraeg")
DANISH = ("da", "dan", "dan", "Danish", "Dansk")
GERMAN = ("de", "deu", "ger", "German", "Deutsch")
GREEK = ("el", "ell", "gre", "Greek", "Ελληνικά")
ENGLISH = ("en", "eng", "eng", "English", "English")
SPANISH = ("es", "spa", "spa", "Spanish", "Español")
ESTONIAN = ("et", "est", "est", "Estonian", "Eesti")
BASQUE = ("eu", "eus", "baq", "Basque", "Euskara")
PERSIAN = ("fa", "fas", "per", "Persian", "فارسی")
FINNISH = ("fi", "fin", "fin", "Finnish", "Suomi")
FAROESE = ("fo", "fao", "fao", "Faroese", "Føroyskt")
FRENCH = ("fr", "fra", "fre", "French", "Français")
GALICIAN = ("gl", "glg", "glg", "Galician", "Galego")
GUJARATI = ("gu", "guj", "guj", "Gujarati", "ગુજરાતી")
HAUSA = ("ha", "hau", "hau", "Hausa", "Hausa")
HAWAIIAN = ("haw", "haw", "haw", "Hawaiian", "ʻŌlelo Hawaiʻi")
HEBREW = ("he", "heb", "heb", "Hebrew", "עברית")
HINDI = ("hi", "hin", "hin", "Hindi", "हिन्दी")
CROATIAN = ("hr", "hrv", "hrv", "Croatian", "Hrvatski")
HAITIAN_CREOLE = ("ht", "hat", "hat", "Haitian Creole", "Kreyòl Ayisyen")
HUNGARIAN = ("hu", "hun", "hun", "Hungarian", "Magyar")
ARMENIAN = ("hy", "hye", "arm", "Armenian", "Հայերեն")
INDONESIAN = ("id", "ind", "ind", "Indonesian", "Bahasa Indonesia")
ICELANDIC = ("is", "isl", "ice", "Icelandic", "Íslenska")
ITALIAN = ("it", "ita", "ita", "Italian", "Italiano")
JAPANESE = ("ja", "jpn", "jpn", "Japanese", "日本語")
JAVANESE = ("jw", "jav", "jav", "Javanese", "ꦧꦱꦗꦮ")
GEORGIAN = ("ka", "kat", "geo", "Georgian", "ქართული")
KAZAKH = ("kk", "kaz", "kaz", "Kazakh", "Қазақша")
KHMER = ("km", "khm", "khm", "Khmer", "ភាសាខ្មែរ")
KANNADA = ("kn", "kan", "kan", "Kannada", "ಕನ್ನಡ")
KOREAN = ("ko", "kor", "kor", "Korean", "한국어")
LATIN = ("la", "lat", "lat", "Latin", "Latina")
LUXEMBOURGISH = ("lb", "ltz", "ltz", "Luxembourgish", "Lëtzebuergesch")
LINGALA = ("ln", "lin", "lin", "Lingala", "Lingála")
LAO = ("lo", "lao", "lao", "Lao", "ພາສາລາວ")
LITHUANIAN = ("lt", "lit", "lit", "Lithuanian", "Lietuvių")
LATVIAN = ("lv", "lav", "lav", "Latvian", "Latviešu")
MALAGASY = ("mg", "mlg", "mlg", "Malagasy", "Malagasy")
MAORI = ("mi", "mri", "mao", "Maori", "Te Reo Māori")
MACEDONIAN = ("mk", "mkd", "mac", "Macedonian", "Македонски")
MALAYALAM = ("ml", "mal", "mal", "Malayalam", "മലയാളം")
MONGOLIAN = ("mn", "mon", "mon", "Mongolian", "Монгол")
MARATHI = ("mr", "mar", "mar", "Marathi", "मराठी")
MALAY = ("ms", "msa", "may", "Malay", "Bahasa Melayu")
MALTESE = ("mt", "mlt", "mlt", "Maltese", "Malti")
BURMESE = ("my", "mya", "bur", "Burmese", "မြန်မာစာ")
NEPALI = ("ne", "nep", "nep", "Nepali", "नेपाली")
DUTCH = ("nl", "nld", "dut", "Dutch", "Nederlands")
NORWEGIAN_NYNORSK = ("nn", "nno", "nno", "Norwegian Nynorsk", "Nynorsk")
NORWEGIAN = ("no", "nor", "nor", "Norwegian", "Norsk")
OCCITAN = ("oc", "oci", "oci", "Occitan", "Occitan")
PUNJABI = ("pa", "pan", "pan", "Punjabi", "ਪੰਜਾਬੀ")
POLISH = ("pl", "pol", "pol", "Polish", "Polski")
PASHTO = ("ps", "pus", "pus", "Pashto", "پښتو")
PORTUGUESE = ("pt", "por", "por", "Portuguese", "Português")
ROMANIAN = ("ro", "ron", "rum", "Romanian", "Română")
RUSSIAN = ("ru", "rus", "rus", "Russian", "Русский")
SANSKRIT = ("sa", "san", "san", "Sanskrit", "संस्कृतम्")
SINDHI = ("sd", "snd", "snd", "Sindhi", "سنڌي")
SINHALA = ("si", "sin", "sin", "Sinhala", "සිංහල")
SLOVAK = ("sk", "slk", "slo", "Slovak", "Slovenčina")
SLOVENE = ("sl", "slv", "slv", "Slovene", "Slovenščina")
SHONA = ("sn", "sna", "sna", "Shona", "ChiShona")
SOMALI = ("so", "som", "som", "Somali", "Soomaaliga")
ALBANIAN = ("sq", "sqi", "alb", "Albanian", "Shqip")
SERBIAN = ("sr", "srp", "srp", "Serbian", "Српски")
SUNDANESE = ("su", "sun", "sun", "Sundanese", "Basa Sunda")
SWEDISH = ("sv", "swe", "swe", "Swedish", "Svenska")
SWAHILI = ("sw", "swa", "swa", "Swahili", "Kiswahili")
TAMIL = ("ta", "tam", "tam", "Tamil", "தமிழ்")
TELUGU = ("te", "tel", "tel", "Telugu", "తెలుగు")
TAJIK = ("tg", "tgk", "tgk", "Tajik", "Тоҷикӣ")
THAI = ("th", "tha", "tha", "Thai", "ไทย")
TURKMEN = ("tk", "tuk", "tuk", "Turkmen", "Türkmençe")
TAGALOG = ("tl", "tgl", "tgl", "Tagalog", "Tagalog")
TURKISH = ("tr", "tur", "tur", "Turkish", "Türkçe")
TATAR = ("tt", "tat", "tat", "Tatar", "Татарча")
UKRAINIAN = ("uk", "ukr", "ukr", "Ukrainian", "Українська")
URDU = ("ur", "urd", "urd", "Urdu", "اردو")
UZBEK = ("uz", "uzb", "uzb", "Uzbek", "Oʻzbek")
VIETNAMESE = ("vi", "vie", "vie", "Vietnamese", "Tiếng Việt")
YIDDISH = ("yi", "yid", "yid", "Yiddish", "ייִדיש")
YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá")
CHINESE = ("zh", "zho", "chi", "Chinese", "中文")
CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語")
NONE = (None, None, None, None, None) # For unknown languages or no language
def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native):
self.iso_639_1 = iso_639_1
self.iso_639_2_t = iso_639_2_t
self.iso_639_2_b = iso_639_2_b
self.name_en = name_en
self.name_native = name_native
@staticmethod
def from_iso_639_1(code):
for lang in LanguageCode:
if lang.iso_639_1 == code:
return lang
return LanguageCode.NONE
@staticmethod
def from_iso_639_2(code):
for lang in LanguageCode:
if lang.iso_639_2_t == code or lang.iso_639_2_b == code:
return lang
return LanguageCode.NONE
@staticmethod
def from_name(name : str):
"""Convert a language name (either English or native) to LanguageCode enum."""
for lang in LanguageCode:
if lang.name_en.lower() == name.lower() or lang.name_native.lower() == name.lower():
return lang
LanguageCode.NONE
@staticmethod
def from_string(value: str):
"""
Convert a string to a LanguageCode instance. Matches on ISO codes, English name, or native name.
"""
if value is None:
return LanguageCode.NONE
value = value.strip().lower()
for lang in LanguageCode:
if lang is LanguageCode.NONE:
continue
elif (
value == lang.iso_639_1
or value == lang.iso_639_2_t
or value == lang.iso_639_2_b
or value == lang.name_en.lower()
or value == lang.name_native.lower()
):
return lang
return LanguageCode.NONE
def to_iso_639_1(self):
return self.iso_639_1
def to_iso_639_2_t(self):
return self.iso_639_2_t
def to_iso_639_2_b(self):
return self.iso_639_2_b
def to_name(self, in_english=True):
return self.name_en if in_english else self.name_native
def __str__(self):
if self.name_en is None:
return "Unkown"
return self.name_en
def __bool__(self):
return True if self.iso_639_1 is not None else False
def __eq__(self, other):
"""
Compare the LanguageCode instance to another object.
Explicitly handle comparison to None.
"""
if other is None:
# If compared to None, return False
# print(other)
# print(self)
return self.iso_639_1 is None
if isinstance(other, LanguageCode):
# Normal comparison for LanguageCode instances
return self.iso_639_1 == other.iso_639_1
# Otherwise, defer to the default equality
return NotImplemented

605
subgen.py
View File

@@ -1,7 +1,7 @@
subgen_version = '2024.11.29'
subgen_version = '2024.11.31'
from language_code import LanguageCode
from datetime import datetime
import subprocess
import os
import json
import xml.etree.ElementTree as ET
@@ -11,11 +11,10 @@ import time
import queue
import logging
import gc
import io
import random
from typing import BinaryIO, Union, Any
from typing import Union, Any
from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request
from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse
from fastapi.responses import StreamingResponse
import numpy as np
import stable_whisper
from stable_whisper import Segment
@@ -23,11 +22,12 @@ import requests
import av
import ffmpeg
import whisper
import re
import ast
from watchdog.observers.polling import PollingObserver as Observer
from watchdog.events import FileSystemEventHandler
import faster_whisper
import io
def get_key_by_value(d, value):
reverse_dict = {v: k for k, v in d.items()}
@@ -47,8 +47,7 @@ concurrent_transcriptions = int(os.getenv('CONCURRENT_TRANSCRIPTIONS', 2))
transcribe_device = os.getenv('TRANSCRIBE_DEVICE', 'cpu')
procaddedmedia = convert_to_bool(os.getenv('PROCADDEDMEDIA', True))
procmediaonplay = convert_to_bool(os.getenv('PROCMEDIAONPLAY', True))
namesublang = os.getenv('NAMESUBLANG', 'aa')
skipifinternalsublang = os.getenv('SKIPIFINTERNALSUBLANG', 'eng')
namesublang = os.getenv('NAMESUBLANG', '')
webhookport = int(os.getenv('WEBHOOKPORT', 9000))
word_level_highlight = convert_to_bool(os.getenv('WORD_LEVEL_HIGHLIGHT', False))
debug = convert_to_bool(os.getenv('DEBUG', True))
@@ -59,7 +58,6 @@ model_location = os.getenv('MODEL_PATH', './models')
monitor = convert_to_bool(os.getenv('MONITOR', False))
transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '')
transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'transcribe')
force_detected_language_to = os.getenv('FORCE_DETECTED_LANGUAGE_TO', '').lower()
clear_vram_on_complete = convert_to_bool(os.getenv('CLEAR_VRAM_ON_COMPLETE', True))
compute_type = os.getenv('COMPUTE_TYPE', 'auto')
append = convert_to_bool(os.getenv('APPEND', False))
@@ -68,8 +66,15 @@ lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True))
custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1')
detect_language_length = os.getenv('DETECT_LANGUAGE_LENGTH', 30)
skipifexternalsub = convert_to_bool(os.getenv('SKIPIFEXTERNALSUB', False))
skip_lang_codes = os.getenv("SKIP_LANG_CODES", "")
skip_lang_codes_list = skip_lang_codes.split("|") if skip_lang_codes else []
skip_if_to_transcribe_sub_already_exist = convert_to_bool(os.getenv('SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True))
skipifinternalsublang = LanguageCode.from_iso_639_2(os.getenv('SKIPIFINTERNALSUBLANG', ''))
skip_lang_codes_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")]
force_detected_language_to = LanguageCode.from_iso_639_2(os.getenv('FORCE_DETECTED_LANGUAGE_TO', ''))
preferred_audio_language = LanguageCode.from_iso_639_2(os.getenv('PREFERRED_AUDIO_LANGUAGE', 'eng'))
skip_if_audio_track_is_in_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")]
# Maybe just have skip_if_audio_track_is_in_list and skip_lang_codes_list and remove skipifinternalsublang
# TODO option which iso code to write in the subtitle file1
subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B')
try:
kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}')
@@ -80,8 +85,6 @@ except ValueError:
if transcribe_device == "gpu":
transcribe_device = "cuda"
subextension = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.srt"
subextensionSDH = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.sdh.srt"
app = FastAPI()
model = None
@@ -402,11 +405,10 @@ async def detect_language(
#encode: bool = Query(default=True, description="Encode audio first through ffmpeg") # This is always false from Bazarr
detect_lang_length: int = Query(default=30, description="Detect language on the first X seconds of the file")
):
detected_language = "" # Initialize with an empty string
language_code = "" # Initialize with an empty string
detected_language = LanguageCode.NONE
if force_detected_language_to:
language = force_detected_language_to
logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}")
logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}\n Returning without detection")
return {"detected_language": force_detected_language_to.to_name(), "language_code": force_detected_language_to.to_iso_639_1()}
if int(detect_lang_length) != 30:
global detect_language_length
detect_language_length = detect_lang_length
@@ -426,9 +428,9 @@ async def detect_language(
args['audio'] = whisper.pad_or_trim(np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0, args['input_sr'] * int(detect_language_length))
args.update(kwargs)
detected_language = model.transcribe_stable(**args).language
detected_language = LanguageCode.from_iso_639_1(model.transcribe_stable(**args).language)
# reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ...
language_code = get_key_by_value(whisper_languages, detected_language)
language_code = get_key_by_value(detected_language.to_name(), detected_language.to_iso_639_1())
except Exception as e:
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
@@ -464,7 +466,7 @@ def write_lrc(result, file_path):
fraction = int((segment.start - int(segment.start)) * 100)
file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n")
def gen_subtitles(file_path: str, transcription_type: str, force_language=None) -> None:
def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode | None = None) -> None:
"""Generates subtitles for a video file.
Args:
@@ -476,16 +478,21 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None)
try:
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
logging.info(f"Transcribing file language: {force_language}")
start_time = time.time()
start_model()
if force_language:
logging.info(f"Forcing detected language to {force_language} from /batch endpoint")
elif force_detected_language_to:
force_language = force_detected_language_to
logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_language}")
# Check if the file is an audio file before trying to extract audio
file_name, file_extension = os.path.splitext(file_path)
is_audio_file = isAudioFileExtension(file_extension)
data = file_path
# Extract audio from the file if it has multiple audio tracks
exctracted_audio_file = handle_multiple_audio_tracks(file_path, force_language)
if exctracted_audio_file:
data = exctracted_audio_file.read()
args = {}
args['progress_callback'] = progress
@@ -493,16 +500,16 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None)
args['regroup'] = custom_regroup
args.update(kwargs)
result = model.transcribe_stable(file_path, language=force_language, task=transcription_type, **args)
result = model.transcribe_stable(data, language=force_language.to_iso_639_1(), task=transcription_type, **args)
appendLine(result)
file_name, file_extension = os.path.splitext(file_path)
if isAudioFileExtension(file_extension) and lrc_for_audio_files:
# If it is an audio file, write the LRC file
if is_audio_file and lrc_for_audio_files:
write_lrc(result, file_name + '.lrc')
else:
result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight)
result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight)
elapsed_time = time.time() - start_time
minutes, seconds = divmod(int(elapsed_time), 60)
@@ -514,72 +521,348 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None)
finally:
delete_model()
def define_subtitle_language_naming(language: LanguageCode, type):
"""
Determines the naming format for a subtitle language based on the given type.
def gen_subtitles_queue(file_path: str, transcription_type: str, force_language=None) -> None:
Args:
language (LanguageCode): The language code object containing methods to get different formats of the language name.
type (str): The type of naming format desired, such as 'ISO_639_1', 'ISO_639_2_T', 'ISO_639_2_B', 'NAME', or 'NATIVE'.
Returns:
str: The language name in the specified format. If an invalid type is provided, it defaults to the language's name.
"""
if namesublang:
return namesublang
switch_dict = {
"ISO_639_1": language.to_iso_639_1,
"ISO_639_2_T": language.to_iso_639_2_t,
"ISO_639_2_B": language.to_iso_639_2_b,
"NAME": language.to_name,
"NATIVE": lambda : language.to_name(in_english=False)
}
return switch_dict.get(type, language.to_name)()
def name_subtitle(file_path: str, language: LanguageCode) -> str:
"""
Name the the subtitle file to be written, based on the source file and the language of the subtitle.
Args:
file_path: The path to the source file.
language: The language of the subtitle.
Returns:
The name of the subtitle file to be written.
"""
return f"{os.path.splitext(file_path)[0]}.subgen.{whisper_model.split('.')[0]}.{define_subtitle_language_naming(language, subtitle_language_naming_type)}.srt"
def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = None) -> io.BytesIO | None:
"""
Handles the possibility of a media file having multiple audio tracks.
If the media file has multiple audio tracks, it will extract the audio track of the selected language. Otherwise, it will extract the first audio track.
Parameters:
file_path (str): The path to the media file.
language (LanguageCode | None): The language of the audio track to search for. If None, it will extract the first audio track.
Returns:
io.BytesIO | None: The audio or None if no audio track was extracted.
"""
audio_bytes = None
audio_tracks = get_audio_tracks(file_path)
if len(audio_tracks) > 1:
logging.debug(f"Handling multiple audio tracks from {file_path} and planning to extract audio track of language {language}")
logging.debug(
"Audio tracks:\n"
+ "\n".join([f" - {track['index']}: {track['codec']} {track['language']} {('default' if track['default'] else '')}" for track in audio_tracks])
)
if language is not None:
audio_track = get_audio_track_by_language(audio_tracks, language)
if audio_track is None:
audio_track = audio_tracks[0]
audio_bytes = extract_audio_track_to_memory(file_path, audio_track["index"])
if audio_bytes is None:
logging.error(f"Failed to extract audio track {audio_track['index']} from {file_path}")
return None
return audio_bytes
def extract_audio_track_to_memory(input_video_path, track_index) -> io.BytesIO | None:
"""
Extract a specific audio track from a video file to memory using FFmpeg.
Args:
input_video_path (str): The path to the video file.
track_index (int): The index of the audio track to extract. If None, skip extraction.
Returns:
io.BytesIO | None: The audio data as a BytesIO object, or None if extraction failed.
"""
if track_index is None:
logging.warning(f"Skipping audio track extraction for {input_video_path} because track index is None")
return None
try:
# Use FFmpeg to extract the specific audio track and output to memory
out, _ = (
ffmpeg.input(input_video_path)
.output(
"pipe:", # Direct output to a pipe
map=f"0:{track_index}", # Select the specific audio track
format="wav", # Output format
ac=1, # Mono audio (optional)
ar=16000, # Sample rate 16 kHz (recommended for speech models)
loglevel="quiet"
)
.run(capture_stdout=True, capture_stderr=True) # Capture output in memory
)
# Return the audio data as a BytesIO object
return io.BytesIO(out)
except ffmpeg.Error as e:
print("An error occurred:", e.stderr.decode())
return None
def get_audio_track_by_language(audio_tracks, language):
"""
Returns the first audio track with the given language.
Args:
audio_tracks (list): A list of dictionaries containing information about each audio track.
language (str): The language of the audio track to search for.
Returns:
dict: The first audio track with the given language, or None if no match is found.
"""
for track in audio_tracks:
if track['language'] == language:
return track
return None
def choose_transcribe_language(file_path, forced_language):
"""
Determines the language to be used for transcription based on the provided
file path and language preferences.
Args:
file_path: The path to the file for which the audio tracks are analyzed.
forced_language: The language to force for transcription if specified.
Returns:
The language code to be used for transcription. It prioritizes the
`forced_language`, then the environment variable `force_detected_language_to`,
then the preferred audio language if available, and finally the default
language of the audio tracks. Returns None if no language preference is
determined.
"""
# todo handle iso 2/3
if forced_language:
return forced_language
if force_detected_language_to:
return force_detected_language_to
audio_tracks = get_audio_tracks(file_path)
if has_language_audio_track(audio_tracks, preferred_audio_language):
language = preferred_audio_language
if language:
return language
default_language = find_default_audio_track_language(audio_tracks)
if default_language:
return default_language
return None
def get_audio_tracks(video_file):
"""
Extracts information about the audio tracks in a file.
Returns:
List of dictionaries with information about each audio track.
Each dictionary has the following keys:
index (int): The stream index of the audio track.
codec (str): The name of the audio codec.
channels (int): The number of audio channels.
language (LanguageCode): The language of the audio track.
title (str): The title of the audio track.
default (bool): Whether the audio track is the default for the file.
forced (bool): Whether the audio track is forced.
original (bool): Whether the audio track is the original.
commentary (bool): Whether the audio track is a commentary.
Example:
>>> get_audio_tracks("french_movie_with_english_dub.mp4")
[
{
"index": 0,
"codec": "dts",
"channels": 6,
"language": LanguageCode.FRENCH,
"title": "French",
"default": True,
"forced": False,
"original": True,
"commentary": False
},
{
"index": 1,
"codec": "aac",
"channels": 2,
"language": LanguageCode.ENGLISH,
"title": "English",
"default": False,
"forced": False,
"original": False,
"commentary": False
}
]
Raises:
ffmpeg.Error: If FFmpeg fails to probe the file.
"""
try:
# Probe the file to get audio stream metadata
probe = ffmpeg.probe(video_file, select_streams='a')
audio_streams = probe.get('streams', [])
# Extract information for each audio track
audio_tracks = []
for stream in audio_streams:
audio_track = {
"index": int(stream.get("index", None)),
"codec": stream.get("codec_name", "Unknown"),
"channels": int(stream.get("channels", None)),
"language": LanguageCode.from_iso_639_2(stream.get("tags", {}).get("language", "Unknown")),
"title": stream.get("tags", {}).get("title", "None"),
"default": stream.get("disposition", {}).get("default", 0) == 1,
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
"original": stream.get("disposition", {}).get("original", 0) == 1,
"commentary": "commentary" in stream.get("tags", {}).get("title", "").lower()
}
audio_tracks.append(audio_track)
return audio_tracks
except ffmpeg.Error as e:
logging.error(f"FFmpeg error: {e.stderr}")
return []
except Exception as e:
logging.error(f"An error occurred while reading audio track information: {str(e)}")
return []
def has_language_audio_track(audio_tracks, find_language):
"""
Checks if an audio track with the given language is present in the list of audio tracks.
Args:
audio_tracks (list): A list of dictionaries containing information about each audio track.
find_language (str): The ISO 639-2 code of the language to search for.
Returns:
bool: True if an audio track with the given language was found, False otherwise.
"""
for track in audio_tracks:
if track['language'] == find_language: #ISO 639-2
return True
return False
def find_default_audio_track_language(audio_tracks):
"""
Finds the language of the default audio track in the given list of audio tracks.
Args:
audio_tracks (list): A list of dictionaries containing information about each audio track.
Must contain the key "default" which is a boolean indicating if the track is the default track.
Returns:
str: The ISO 639-2 code of the language of the default audio track, or None if no default track was found.
"""
for track in audio_tracks:
if track['default'] is True:
return track['language']
return None
def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode | None = None) -> None:
global task_queue
if not has_audio(file_path):
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
return
message = None
if has_subtitle_language(file_path, skipifinternalsublang):
message = f"{file_path} already has an internal subtitle we want, skipping generation"
elif os.path.exists(get_file_name_without_extension(file_path) + subextension):
message = f"{file_path} already has a Subgen subtitle created for this, skipping it"
elif skipifexternalsub and (os.path.exists(get_file_name_without_extension(file_path) + f".{namesublang}.srt") or os.path.exists(get_file_name_without_extension(file_path) + f".{namesublang}.ass")):
message = f"{file_path} already has an external {namesublang} (non-Subgen) subtitle created for this, skipping it"
elif os.path.exists(get_file_name_without_extension(file_path) + subextensionSDH):
message = f"{file_path} already has a Subgen SDH subtitle created for this, skipping it"
elif os.path.exists(get_file_name_without_extension(file_path) + '.lrc'):
message = f"{file_path} already has a LRC created for this, skipping it"
elif skip_lang_codes_list:
# Check if any language in the audio streams matches a skip language
should_skip, skip_language = should_skip_languages(get_audio_languages(file_path))
if should_skip:
message = f"Language '{skip_language}' detected in {file_path} and is in the skip list {skip_lang_codes_list}, skipping subtitle generation"
if message:
logging.debug(message)
force_language = choose_transcribe_language(file_path, force_language)
if have_to_skip(file_path, force_language):
return
task = {
'path': file_path,
'transcribe_or_translate': transcription_type,
'force_language':force_language
'force_language': force_language
}
task['force_language'] = force_language
task_queue.put(task)
logging.info(f"task_queue.put(task)({task['path']}, {task['transcribe_or_translate']}, {task['force_language']})")
def should_skip_languages(language_codes):
def have_to_skip(file_path, transcribe_language : LanguageCode):
"""
Check if any language in language_codes matches a code in skip_lang_codes_list.
:return: (True, language_code) if a match is found, otherwise (False, None)
"""
for code in language_codes:
if code in skip_lang_codes_list:
return True, code
return False, None
Determines whether subtitle generation should be skipped for a given file.
def get_audio_languages(video_path):
Args:
file_path: The path to the file to check for existing subtitles.
transcribe_language: The language intended for transcription.
Returns:
True if subtitle generation should be skipped based on existing subtitles
or specified conditions; otherwise, returns False.
This function helps optimize subtitle processing by preventing redundant
subtitle generation for files that already contain subtitles in the desired
language or in any language specified in the skip list.
"""
if skip_if_to_transcribe_sub_already_exist:
if has_subtitle_language(file_path, transcribe_language):
logging.debug(f"{file_path} already has the language {transcribe_language} as subtitle we would transcribe, skipping subtitle generation")
return True
if skipifinternalsublang:
if has_subtitle_language(file_path, skipifinternalsublang):
logging.debug(f"{file_path} already has an subtitle we want, skipping subtitle generation")
return True
if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)):
return True
if any(item in skip_lang_codes_list for item in get_subtitle_languages(file_path)):
logging.debug(f"Language a code from {skip_lang_codes_list} detected in subtitle of {file_path}, skipping subtitle generation")
return True
if any(item in skip_if_audio_track_is_in_list for item in get_audio_languages(file_path)):
# Maybe add a check if the audio track is the default/ orginal or forced language to not skip it if it is a dubbed track in case of movies with multiple audio tracks.
logging.debug(f"Language a code from {skip_if_audio_track_is_in_list} detected in audio track of {file_path}, skipping subtitle generation")
return True
return False
def get_subtitle_languages(video_path):
"""
Extract language codes from each audio stream in the video file using pyav.
:param video_path: Path to the video file
:return: List of language codes for each audio stream
:return: List of language codes for each subtitle stream
"""
languages = []
# Open the video file
with av.open(video_path) as container:
# Iterate through each audio stream
for stream in container.streams.audio:
for stream in container.streams.subtitles:
# Access the metadata for each audio stream
lang_code = stream.metadata.get('language')
if lang_code:
languages.append(lang_code)
languages.append(LanguageCode.from_iso_639_2(lang_code))
else:
# Append 'und' (undefined) if no language metadata is present
languages.append('und')
languages.append(LanguageCode.NONE)
return languages
@@ -587,20 +870,101 @@ def get_file_name_without_extension(file_path):
file_name, file_extension = os.path.splitext(file_path)
return file_name
def has_subtitle_language(video_file, target_language):
def get_audio_languages(video_path):
"""
Extract language codes from each audio stream in the video file.
:param video_path: Path to the video file
:return: List of language codes for each audio stream
"""
audio_tracks = get_audio_tracks(video_path)
return [track['language'] for track in audio_tracks]
def has_subtitle_language(video_file, target_language: LanguageCode):
"""
Determines if a subtitle file with the target language is available for a specified video file.
This function checks both within the video file and in its associated folder for subtitles
matching the specified language.
Args:
video_file: The path to the video file.
target_language: The language of the subtitle file to search for.
Returns:
bool: True if a subtitle file with the target language is found, False otherwise.
"""
logging.debug(f"has_subtitle_language({video_file}, {target_language})")
if target_language == LanguageCode.NONE:
return False
return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language)
def has_subtitle_language_in_file(video_file, target_language: LanguageCode):
"""
Checks if a video file contains subtitles with a specific language.
Args:
video_file: The path to the video file.
target_language: The language of the subtitle file to search for.
Returns:
bool: True if a subtitle file with the target language is found, False otherwise.
"""
logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})")
if target_language == LanguageCode.NONE:
return False
try:
with av.open(video_file) as container:
subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and stream.metadata['language'] == target_language), None)
subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and LanguageCode.from_string(stream.metadata['language']) == target_language), None)
if subtitle_stream:
logging.debug(f"Subtitles in '{target_language}' language found in the video.")
return True
else:
logging.debug(f"No subtitles in '{target_language}' language found in the video.")
return False
except Exception as e:
logging.info(f"An error occurred: {e}")
return False
def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode, recursion = True):
"""Checks if the given folder has a subtitle file with the given language.
Args:
video_file: The path of the video file.
target_language: The language of the subtitle file that we are looking for.
recursion: If True, search in subfolders of the given folder. If False,
only search in the given folder.
Returns:
True if a subtitle file with the given language is found in the folder,
False otherwise.
"""
subtitle_extensions = ['.srt', '.vtt', '.sub', '.ass', '.ssa', '.idx', '.sbv', '.pgs', '.ttml', '.lrc']
# just get the name of the movie e.g. movie.2025.remastered
video_file_stripped = os.path.splitext(os.path.split(video_file)[1])[0]
folder_path = os.path.dirname(video_file)
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)
if os.path.isfile(file_path):
root, ext = os.path.splitext(file_name)
if root.startswith(video_file_stripped) and ext.lower() in subtitle_extensions:
parts = root[len(video_file_stripped):].lstrip(".").split(".")
# Check if the target language is one of the parts
if any(LanguageCode.from_string(part) == target_language for part in parts):
# If the language is found, return True
return True
elif os.path.isdir(file_path) and recursion:
# Looking in the subfolders of the video for subtitles
if has_subtitle_of_language_in_folder(os.path.join(file_path, os.path.split(video_file)[1]) , target_language, False):
# If the language is found in the subfolders, return True
return True
# If the language is not found, return False
return False
def get_plex_file_name(itemid: str, server_ip: str, plex_token: str) -> str:
"""Gets the full path to a file from the Plex server.
@@ -773,7 +1137,7 @@ if monitor:
def on_modified(self, event):
self.create_subtitle(event)
def transcribe_existing(transcribe_folders, forceLanguage=None):
def transcribe_existing(transcribe_folders, forceLanguage : LanguageCode | None = None):
transcribe_folders = transcribe_folders.split("|")
logging.info("Starting to search folders to see if we need to create subtitles.")
logging.debug("The folders are:")
@@ -797,107 +1161,6 @@ def transcribe_existing(transcribe_folders, forceLanguage=None):
observer.start()
logging.info("Finished searching and queueing files for transcription. Now watching for new files.")
whisper_languages = {
"en": "english",
"zh": "chinese",
"de": "german",
"es": "spanish",
"ru": "russian",
"ko": "korean",
"fr": "french",
"ja": "japanese",
"pt": "portuguese",
"tr": "turkish",
"pl": "polish",
"ca": "catalan",
"nl": "dutch",
"ar": "arabic",
"sv": "swedish",
"it": "italian",
"id": "indonesian",
"hi": "hindi",
"fi": "finnish",
"vi": "vietnamese",
"he": "hebrew",
"uk": "ukrainian",
"el": "greek",
"ms": "malay",
"cs": "czech",
"ro": "romanian",
"da": "danish",
"hu": "hungarian",
"ta": "tamil",
"no": "norwegian",
"th": "thai",
"ur": "urdu",
"hr": "croatian",
"bg": "bulgarian",
"lt": "lithuanian",
"la": "latin",
"mi": "maori",
"ml": "malayalam",
"cy": "welsh",
"sk": "slovak",
"te": "telugu",
"fa": "persian",
"lv": "latvian",
"bn": "bengali",
"sr": "serbian",
"az": "azerbaijani",
"sl": "slovenian",
"kn": "kannada",
"et": "estonian",
"mk": "macedonian",
"br": "breton",
"eu": "basque",
"is": "icelandic",
"hy": "armenian",
"ne": "nepali",
"mn": "mongolian",
"bs": "bosnian",
"kk": "kazakh",
"sq": "albanian",
"sw": "swahili",
"gl": "galician",
"mr": "marathi",
"pa": "punjabi",
"si": "sinhala",
"km": "khmer",
"sn": "shona",
"yo": "yoruba",
"so": "somali",
"af": "afrikaans",
"oc": "occitan",
"ka": "georgian",
"be": "belarusian",
"tg": "tajik",
"sd": "sindhi",
"gu": "gujarati",
"am": "amharic",
"yi": "yiddish",
"lo": "lao",
"uz": "uzbek",
"fo": "faroese",
"ht": "haitian creole",
"ps": "pashto",
"tk": "turkmen",
"nn": "nynorsk",
"mt": "maltese",
"sa": "sanskrit",
"lb": "luxembourgish",
"my": "myanmar",
"bo": "tibetan",
"tl": "tagalog",
"mg": "malagasy",
"as": "assamese",
"tt": "tatar",
"haw": "hawaiian",
"ln": "lingala",
"ha": "hausa",
"ba": "bashkir",
"jw": "javanese",
"su": "sundanese",
}
if __name__ == "__main__":
import uvicorn