diff --git a/.gitignore b/.gitignore index e6828a9..f7ddb6e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ #ignore our settings subgen.env + +models/ \ No newline at end of file diff --git a/language_code.py b/language_code.py new file mode 100644 index 0000000..87c9f08 --- /dev/null +++ b/language_code.py @@ -0,0 +1,191 @@ +from enum import Enum + +class LanguageCode(Enum): + # ISO 639-1, ISO 639-2/T, ISO 639-2/B, English Name, Native Name + AFRIKAANS = ("af", "afr", "afr", "Afrikaans", "Afrikaans") + AMHARIC = ("am", "amh", "amh", "Amharic", "አማርኛ") + ARABIC = ("ar", "ara", "ara", "Arabic", "العربية") + ASSAMESE = ("as", "asm", "asm", "Assamese", "অসমীয়া") + AZERBAIJANI = ("az", "aze", "aze", "Azerbaijani", "Azərbaycanca") + BASHKIR = ("ba", "bak", "bak", "Bashkir", "Башҡортса") + BELARUSIAN = ("be", "bel", "bel", "Belarusian", "Беларуская") + BULGARIAN = ("bg", "bul", "bul", "Bulgarian", "Български") + BENGALI = ("bn", "ben", "ben", "Bengali", "বাংলা") + TIBETAN = ("bo", "bod", "tib", "Tibetan", "བོད་ཡིག") + BRETON = ("br", "bre", "bre", "Breton", "Brezhoneg") + BOSNIAN = ("bs", "bos", "bos", "Bosnian", "Bosanski") + CATALAN = ("ca", "cat", "cat", "Catalan", "Català") + CZECH = ("cs", "ces", "cze", "Czech", "Čeština") + WELSH = ("cy", "cym", "wel", "Welsh", "Cymraeg") + DANISH = ("da", "dan", "dan", "Danish", "Dansk") + GERMAN = ("de", "deu", "ger", "German", "Deutsch") + GREEK = ("el", "ell", "gre", "Greek", "Ελληνικά") + ENGLISH = ("en", "eng", "eng", "English", "English") + SPANISH = ("es", "spa", "spa", "Spanish", "Español") + ESTONIAN = ("et", "est", "est", "Estonian", "Eesti") + BASQUE = ("eu", "eus", "baq", "Basque", "Euskara") + PERSIAN = ("fa", "fas", "per", "Persian", "فارسی") + FINNISH = ("fi", "fin", "fin", "Finnish", "Suomi") + FAROESE = ("fo", "fao", "fao", "Faroese", "Føroyskt") + FRENCH = ("fr", "fra", "fre", "French", "Français") + GALICIAN = ("gl", "glg", "glg", "Galician", "Galego") + GUJARATI = ("gu", "guj", "guj", "Gujarati", "ગુજરાતી") + HAUSA = ("ha", "hau", "hau", "Hausa", "Hausa") + HAWAIIAN = ("haw", "haw", "haw", "Hawaiian", "ʻŌlelo Hawaiʻi") + HEBREW = ("he", "heb", "heb", "Hebrew", "עברית") + HINDI = ("hi", "hin", "hin", "Hindi", "हिन्दी") + CROATIAN = ("hr", "hrv", "hrv", "Croatian", "Hrvatski") + HAITIAN_CREOLE = ("ht", "hat", "hat", "Haitian Creole", "Kreyòl Ayisyen") + HUNGARIAN = ("hu", "hun", "hun", "Hungarian", "Magyar") + ARMENIAN = ("hy", "hye", "arm", "Armenian", "Հայերեն") + INDONESIAN = ("id", "ind", "ind", "Indonesian", "Bahasa Indonesia") + ICELANDIC = ("is", "isl", "ice", "Icelandic", "Íslenska") + ITALIAN = ("it", "ita", "ita", "Italian", "Italiano") + JAPANESE = ("ja", "jpn", "jpn", "Japanese", "日本語") + JAVANESE = ("jw", "jav", "jav", "Javanese", "ꦧꦱꦗꦮ") + GEORGIAN = ("ka", "kat", "geo", "Georgian", "ქართული") + KAZAKH = ("kk", "kaz", "kaz", "Kazakh", "Қазақша") + KHMER = ("km", "khm", "khm", "Khmer", "ភាសាខ្មែរ") + KANNADA = ("kn", "kan", "kan", "Kannada", "ಕನ್ನಡ") + KOREAN = ("ko", "kor", "kor", "Korean", "한국어") + LATIN = ("la", "lat", "lat", "Latin", "Latina") + LUXEMBOURGISH = ("lb", "ltz", "ltz", "Luxembourgish", "Lëtzebuergesch") + LINGALA = ("ln", "lin", "lin", "Lingala", "Lingála") + LAO = ("lo", "lao", "lao", "Lao", "ພາສາລາວ") + LITHUANIAN = ("lt", "lit", "lit", "Lithuanian", "Lietuvių") + LATVIAN = ("lv", "lav", "lav", "Latvian", "Latviešu") + MALAGASY = ("mg", "mlg", "mlg", "Malagasy", "Malagasy") + MAORI = ("mi", "mri", "mao", "Maori", "Te Reo Māori") + MACEDONIAN = ("mk", "mkd", "mac", "Macedonian", "Македонски") + MALAYALAM = ("ml", "mal", "mal", "Malayalam", "മലയാളം") + MONGOLIAN = ("mn", "mon", "mon", "Mongolian", "Монгол") + MARATHI = ("mr", "mar", "mar", "Marathi", "मराठी") + MALAY = ("ms", "msa", "may", "Malay", "Bahasa Melayu") + MALTESE = ("mt", "mlt", "mlt", "Maltese", "Malti") + BURMESE = ("my", "mya", "bur", "Burmese", "မြန်မာစာ") + NEPALI = ("ne", "nep", "nep", "Nepali", "नेपाली") + DUTCH = ("nl", "nld", "dut", "Dutch", "Nederlands") + NORWEGIAN_NYNORSK = ("nn", "nno", "nno", "Norwegian Nynorsk", "Nynorsk") + NORWEGIAN = ("no", "nor", "nor", "Norwegian", "Norsk") + OCCITAN = ("oc", "oci", "oci", "Occitan", "Occitan") + PUNJABI = ("pa", "pan", "pan", "Punjabi", "ਪੰਜਾਬੀ") + POLISH = ("pl", "pol", "pol", "Polish", "Polski") + PASHTO = ("ps", "pus", "pus", "Pashto", "پښتو") + PORTUGUESE = ("pt", "por", "por", "Portuguese", "Português") + ROMANIAN = ("ro", "ron", "rum", "Romanian", "Română") + RUSSIAN = ("ru", "rus", "rus", "Russian", "Русский") + SANSKRIT = ("sa", "san", "san", "Sanskrit", "संस्कृतम्") + SINDHI = ("sd", "snd", "snd", "Sindhi", "سنڌي") + SINHALA = ("si", "sin", "sin", "Sinhala", "සිංහල") + SLOVAK = ("sk", "slk", "slo", "Slovak", "Slovenčina") + SLOVENE = ("sl", "slv", "slv", "Slovene", "Slovenščina") + SHONA = ("sn", "sna", "sna", "Shona", "ChiShona") + SOMALI = ("so", "som", "som", "Somali", "Soomaaliga") + ALBANIAN = ("sq", "sqi", "alb", "Albanian", "Shqip") + SERBIAN = ("sr", "srp", "srp", "Serbian", "Српски") + SUNDANESE = ("su", "sun", "sun", "Sundanese", "Basa Sunda") + SWEDISH = ("sv", "swe", "swe", "Swedish", "Svenska") + SWAHILI = ("sw", "swa", "swa", "Swahili", "Kiswahili") + TAMIL = ("ta", "tam", "tam", "Tamil", "தமிழ்") + TELUGU = ("te", "tel", "tel", "Telugu", "తెలుగు") + TAJIK = ("tg", "tgk", "tgk", "Tajik", "Тоҷикӣ") + THAI = ("th", "tha", "tha", "Thai", "ไทย") + TURKMEN = ("tk", "tuk", "tuk", "Turkmen", "Türkmençe") + TAGALOG = ("tl", "tgl", "tgl", "Tagalog", "Tagalog") + TURKISH = ("tr", "tur", "tur", "Turkish", "Türkçe") + TATAR = ("tt", "tat", "tat", "Tatar", "Татарча") + UKRAINIAN = ("uk", "ukr", "ukr", "Ukrainian", "Українська") + URDU = ("ur", "urd", "urd", "Urdu", "اردو") + UZBEK = ("uz", "uzb", "uzb", "Uzbek", "Oʻzbek") + VIETNAMESE = ("vi", "vie", "vie", "Vietnamese", "Tiếng Việt") + YIDDISH = ("yi", "yid", "yid", "Yiddish", "ייִדיש") + YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá") + CHINESE = ("zh", "zho", "chi", "Chinese", "中文") + CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語") + NONE = (None, None, None, None, None) # For unknown languages or no language + + def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native): + self.iso_639_1 = iso_639_1 + self.iso_639_2_t = iso_639_2_t + self.iso_639_2_b = iso_639_2_b + self.name_en = name_en + self.name_native = name_native + + @staticmethod + def from_iso_639_1(code): + for lang in LanguageCode: + if lang.iso_639_1 == code: + return lang + return LanguageCode.NONE + + @staticmethod + def from_iso_639_2(code): + for lang in LanguageCode: + if lang.iso_639_2_t == code or lang.iso_639_2_b == code: + return lang + return LanguageCode.NONE + + @staticmethod + def from_name(name : str): + """Convert a language name (either English or native) to LanguageCode enum.""" + for lang in LanguageCode: + if lang.name_en.lower() == name.lower() or lang.name_native.lower() == name.lower(): + return lang + LanguageCode.NONE + + + @staticmethod + def from_string(value: str): + """ + Convert a string to a LanguageCode instance. Matches on ISO codes, English name, or native name. + """ + if value is None: + return LanguageCode.NONE + value = value.strip().lower() + for lang in LanguageCode: + if lang is LanguageCode.NONE: + continue + elif ( + value == lang.iso_639_1 + or value == lang.iso_639_2_t + or value == lang.iso_639_2_b + or value == lang.name_en.lower() + or value == lang.name_native.lower() + ): + return lang + return LanguageCode.NONE + + def to_iso_639_1(self): + return self.iso_639_1 + + def to_iso_639_2_t(self): + return self.iso_639_2_t + + def to_iso_639_2_b(self): + return self.iso_639_2_b + + def to_name(self, in_english=True): + return self.name_en if in_english else self.name_native + def __str__(self): + if self.name_en is None: + return "Unkown" + return self.name_en + + def __bool__(self): + return True if self.iso_639_1 is not None else False + + def __eq__(self, other): + """ + Compare the LanguageCode instance to another object. + Explicitly handle comparison to None. + """ + if other is None: + # If compared to None, return False + # print(other) + # print(self) + return self.iso_639_1 is None + if isinstance(other, LanguageCode): + # Normal comparison for LanguageCode instances + return self.iso_639_1 == other.iso_639_1 + # Otherwise, defer to the default equality + return NotImplemented \ No newline at end of file diff --git a/subgen.py b/subgen.py index 823b0ec..6c878d0 100644 --- a/subgen.py +++ b/subgen.py @@ -1,7 +1,7 @@ -subgen_version = '2024.11.29' +subgen_version = '2024.11.31' +from language_code import LanguageCode from datetime import datetime -import subprocess import os import json import xml.etree.ElementTree as ET @@ -11,11 +11,10 @@ import time import queue import logging import gc -import io import random -from typing import BinaryIO, Union, Any +from typing import Union, Any from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request -from fastapi.responses import StreamingResponse, RedirectResponse, HTMLResponse +from fastapi.responses import StreamingResponse import numpy as np import stable_whisper from stable_whisper import Segment @@ -23,11 +22,12 @@ import requests import av import ffmpeg import whisper -import re import ast from watchdog.observers.polling import PollingObserver as Observer from watchdog.events import FileSystemEventHandler import faster_whisper +import io + def get_key_by_value(d, value): reverse_dict = {v: k for k, v in d.items()} @@ -47,8 +47,7 @@ concurrent_transcriptions = int(os.getenv('CONCURRENT_TRANSCRIPTIONS', 2)) transcribe_device = os.getenv('TRANSCRIBE_DEVICE', 'cpu') procaddedmedia = convert_to_bool(os.getenv('PROCADDEDMEDIA', True)) procmediaonplay = convert_to_bool(os.getenv('PROCMEDIAONPLAY', True)) -namesublang = os.getenv('NAMESUBLANG', 'aa') -skipifinternalsublang = os.getenv('SKIPIFINTERNALSUBLANG', 'eng') +namesublang = os.getenv('NAMESUBLANG', '') webhookport = int(os.getenv('WEBHOOKPORT', 9000)) word_level_highlight = convert_to_bool(os.getenv('WORD_LEVEL_HIGHLIGHT', False)) debug = convert_to_bool(os.getenv('DEBUG', True)) @@ -59,7 +58,6 @@ model_location = os.getenv('MODEL_PATH', './models') monitor = convert_to_bool(os.getenv('MONITOR', False)) transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '') transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'transcribe') -force_detected_language_to = os.getenv('FORCE_DETECTED_LANGUAGE_TO', '').lower() clear_vram_on_complete = convert_to_bool(os.getenv('CLEAR_VRAM_ON_COMPLETE', True)) compute_type = os.getenv('COMPUTE_TYPE', 'auto') append = convert_to_bool(os.getenv('APPEND', False)) @@ -68,8 +66,15 @@ lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True)) custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1') detect_language_length = os.getenv('DETECT_LANGUAGE_LENGTH', 30) skipifexternalsub = convert_to_bool(os.getenv('SKIPIFEXTERNALSUB', False)) -skip_lang_codes = os.getenv("SKIP_LANG_CODES", "") -skip_lang_codes_list = skip_lang_codes.split("|") if skip_lang_codes else [] +skip_if_to_transcribe_sub_already_exist = convert_to_bool(os.getenv('SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True)) +skipifinternalsublang = LanguageCode.from_iso_639_2(os.getenv('SKIPIFINTERNALSUBLANG', '')) +skip_lang_codes_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")] +force_detected_language_to = LanguageCode.from_iso_639_2(os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')) +preferred_audio_language = LanguageCode.from_iso_639_2(os.getenv('PREFERRED_AUDIO_LANGUAGE', 'eng')) +skip_if_audio_track_is_in_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")] +# Maybe just have skip_if_audio_track_is_in_list and skip_lang_codes_list and remove skipifinternalsublang +# TODO option which iso code to write in the subtitle file1 +subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B') try: kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}') @@ -80,8 +85,6 @@ except ValueError: if transcribe_device == "gpu": transcribe_device = "cuda" -subextension = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.srt" -subextensionSDH = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.sdh.srt" app = FastAPI() model = None @@ -402,11 +405,10 @@ async def detect_language( #encode: bool = Query(default=True, description="Encode audio first through ffmpeg") # This is always false from Bazarr detect_lang_length: int = Query(default=30, description="Detect language on the first X seconds of the file") ): - detected_language = "" # Initialize with an empty string - language_code = "" # Initialize with an empty string + detected_language = LanguageCode.NONE if force_detected_language_to: - language = force_detected_language_to - logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") + logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}\n Returning without detection") + return {"detected_language": force_detected_language_to.to_name(), "language_code": force_detected_language_to.to_iso_639_1()} if int(detect_lang_length) != 30: global detect_language_length detect_language_length = detect_lang_length @@ -426,9 +428,9 @@ async def detect_language( args['audio'] = whisper.pad_or_trim(np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0, args['input_sr'] * int(detect_language_length)) args.update(kwargs) - detected_language = model.transcribe_stable(**args).language + detected_language = LanguageCode.from_iso_639_1(model.transcribe_stable(**args).language) # reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ... - language_code = get_key_by_value(whisper_languages, detected_language) + language_code = get_key_by_value(detected_language.to_name(), detected_language.to_iso_639_1()) except Exception as e: logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}") @@ -464,7 +466,7 @@ def write_lrc(result, file_path): fraction = int((segment.start - int(segment.start)) * 100) file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n") -def gen_subtitles(file_path: str, transcription_type: str, force_language=None) -> None: +def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode | None = None) -> None: """Generates subtitles for a video file. Args: @@ -476,16 +478,21 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None) try: logging.info(f"Added {os.path.basename(file_path)} for transcription.") logging.info(f"Transcribing file: {os.path.basename(file_path)}") + logging.info(f"Transcribing file language: {force_language}") start_time = time.time() start_model() - - if force_language: - logging.info(f"Forcing detected language to {force_language} from /batch endpoint") - elif force_detected_language_to: - force_language = force_detected_language_to - logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_language}") - + + # Check if the file is an audio file before trying to extract audio + file_name, file_extension = os.path.splitext(file_path) + is_audio_file = isAudioFileExtension(file_extension) + + data = file_path + # Extract audio from the file if it has multiple audio tracks + exctracted_audio_file = handle_multiple_audio_tracks(file_path, force_language) + if exctracted_audio_file: + data = exctracted_audio_file.read() + args = {} args['progress_callback'] = progress @@ -493,16 +500,16 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None) args['regroup'] = custom_regroup args.update(kwargs) - - result = model.transcribe_stable(file_path, language=force_language, task=transcription_type, **args) + + result = model.transcribe_stable(data, language=force_language.to_iso_639_1(), task=transcription_type, **args) appendLine(result) - file_name, file_extension = os.path.splitext(file_path) - if isAudioFileExtension(file_extension) and lrc_for_audio_files: + # If it is an audio file, write the LRC file + if is_audio_file and lrc_for_audio_files: write_lrc(result, file_name + '.lrc') else: - result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight) + result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight) elapsed_time = time.time() - start_time minutes, seconds = divmod(int(elapsed_time), 60) @@ -514,72 +521,348 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language=None) finally: delete_model() + +def define_subtitle_language_naming(language: LanguageCode, type): + """ + Determines the naming format for a subtitle language based on the given type. -def gen_subtitles_queue(file_path: str, transcription_type: str, force_language=None) -> None: + Args: + language (LanguageCode): The language code object containing methods to get different formats of the language name. + type (str): The type of naming format desired, such as 'ISO_639_1', 'ISO_639_2_T', 'ISO_639_2_B', 'NAME', or 'NATIVE'. + + Returns: + str: The language name in the specified format. If an invalid type is provided, it defaults to the language's name. + """ + if namesublang: + return namesublang + switch_dict = { + "ISO_639_1": language.to_iso_639_1, + "ISO_639_2_T": language.to_iso_639_2_t, + "ISO_639_2_B": language.to_iso_639_2_b, + "NAME": language.to_name, + "NATIVE": lambda : language.to_name(in_english=False) + } + return switch_dict.get(type, language.to_name)() + +def name_subtitle(file_path: str, language: LanguageCode) -> str: + """ + Name the the subtitle file to be written, based on the source file and the language of the subtitle. + + Args: + file_path: The path to the source file. + language: The language of the subtitle. + + Returns: + The name of the subtitle file to be written. + """ + return f"{os.path.splitext(file_path)[0]}.subgen.{whisper_model.split('.')[0]}.{define_subtitle_language_naming(language, subtitle_language_naming_type)}.srt" + +def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = None) -> io.BytesIO | None: + """ + Handles the possibility of a media file having multiple audio tracks. + + If the media file has multiple audio tracks, it will extract the audio track of the selected language. Otherwise, it will extract the first audio track. + + Parameters: + file_path (str): The path to the media file. + language (LanguageCode | None): The language of the audio track to search for. If None, it will extract the first audio track. + + Returns: + io.BytesIO | None: The audio or None if no audio track was extracted. + """ + audio_bytes = None + audio_tracks = get_audio_tracks(file_path) + + if len(audio_tracks) > 1: + logging.debug(f"Handling multiple audio tracks from {file_path} and planning to extract audio track of language {language}") + logging.debug( + "Audio tracks:\n" + + "\n".join([f" - {track['index']}: {track['codec']} {track['language']} {('default' if track['default'] else '')}" for track in audio_tracks]) + ) + + if language is not None: + audio_track = get_audio_track_by_language(audio_tracks, language) + if audio_track is None: + audio_track = audio_tracks[0] + + audio_bytes = extract_audio_track_to_memory(file_path, audio_track["index"]) + if audio_bytes is None: + logging.error(f"Failed to extract audio track {audio_track['index']} from {file_path}") + return None + return audio_bytes + +def extract_audio_track_to_memory(input_video_path, track_index) -> io.BytesIO | None: + """ + Extract a specific audio track from a video file to memory using FFmpeg. + + Args: + input_video_path (str): The path to the video file. + track_index (int): The index of the audio track to extract. If None, skip extraction. + + Returns: + io.BytesIO | None: The audio data as a BytesIO object, or None if extraction failed. + """ + if track_index is None: + logging.warning(f"Skipping audio track extraction for {input_video_path} because track index is None") + return None + + try: + # Use FFmpeg to extract the specific audio track and output to memory + out, _ = ( + ffmpeg.input(input_video_path) + .output( + "pipe:", # Direct output to a pipe + map=f"0:{track_index}", # Select the specific audio track + format="wav", # Output format + ac=1, # Mono audio (optional) + ar=16000, # Sample rate 16 kHz (recommended for speech models) + loglevel="quiet" + ) + .run(capture_stdout=True, capture_stderr=True) # Capture output in memory + ) + # Return the audio data as a BytesIO object + return io.BytesIO(out) + + except ffmpeg.Error as e: + print("An error occurred:", e.stderr.decode()) + return None + +def get_audio_track_by_language(audio_tracks, language): + """ + Returns the first audio track with the given language. + + Args: + audio_tracks (list): A list of dictionaries containing information about each audio track. + language (str): The language of the audio track to search for. + + Returns: + dict: The first audio track with the given language, or None if no match is found. + """ + for track in audio_tracks: + if track['language'] == language: + return track + return None + +def choose_transcribe_language(file_path, forced_language): + """ + Determines the language to be used for transcription based on the provided + file path and language preferences. + + Args: + file_path: The path to the file for which the audio tracks are analyzed. + forced_language: The language to force for transcription if specified. + + Returns: + The language code to be used for transcription. It prioritizes the + `forced_language`, then the environment variable `force_detected_language_to`, + then the preferred audio language if available, and finally the default + language of the audio tracks. Returns None if no language preference is + determined. + """ + + # todo handle iso 2/3 + if forced_language: + return forced_language + + if force_detected_language_to: + return force_detected_language_to + + audio_tracks = get_audio_tracks(file_path) + if has_language_audio_track(audio_tracks, preferred_audio_language): + language = preferred_audio_language + if language: + return language + default_language = find_default_audio_track_language(audio_tracks) + if default_language: + return default_language + + return None + + +def get_audio_tracks(video_file): + """ + Extracts information about the audio tracks in a file. + + Returns: + List of dictionaries with information about each audio track. + Each dictionary has the following keys: + index (int): The stream index of the audio track. + codec (str): The name of the audio codec. + channels (int): The number of audio channels. + language (LanguageCode): The language of the audio track. + title (str): The title of the audio track. + default (bool): Whether the audio track is the default for the file. + forced (bool): Whether the audio track is forced. + original (bool): Whether the audio track is the original. + commentary (bool): Whether the audio track is a commentary. + + Example: + >>> get_audio_tracks("french_movie_with_english_dub.mp4") + [ + { + "index": 0, + "codec": "dts", + "channels": 6, + "language": LanguageCode.FRENCH, + "title": "French", + "default": True, + "forced": False, + "original": True, + "commentary": False + }, + { + "index": 1, + "codec": "aac", + "channels": 2, + "language": LanguageCode.ENGLISH, + "title": "English", + "default": False, + "forced": False, + "original": False, + "commentary": False + } + ] + + Raises: + ffmpeg.Error: If FFmpeg fails to probe the file. + """ + try: + # Probe the file to get audio stream metadata + probe = ffmpeg.probe(video_file, select_streams='a') + audio_streams = probe.get('streams', []) + + # Extract information for each audio track + audio_tracks = [] + for stream in audio_streams: + audio_track = { + "index": int(stream.get("index", None)), + "codec": stream.get("codec_name", "Unknown"), + "channels": int(stream.get("channels", None)), + "language": LanguageCode.from_iso_639_2(stream.get("tags", {}).get("language", "Unknown")), + "title": stream.get("tags", {}).get("title", "None"), + "default": stream.get("disposition", {}).get("default", 0) == 1, + "forced": stream.get("disposition", {}).get("forced", 0) == 1, + "original": stream.get("disposition", {}).get("original", 0) == 1, + "commentary": "commentary" in stream.get("tags", {}).get("title", "").lower() + } + audio_tracks.append(audio_track) + return audio_tracks + + except ffmpeg.Error as e: + logging.error(f"FFmpeg error: {e.stderr}") + return [] + except Exception as e: + logging.error(f"An error occurred while reading audio track information: {str(e)}") + return [] + +def has_language_audio_track(audio_tracks, find_language): + """ + Checks if an audio track with the given language is present in the list of audio tracks. + + Args: + audio_tracks (list): A list of dictionaries containing information about each audio track. + find_language (str): The ISO 639-2 code of the language to search for. + + Returns: + bool: True if an audio track with the given language was found, False otherwise. + """ + for track in audio_tracks: + if track['language'] == find_language: #ISO 639-2 + return True + return False + +def find_default_audio_track_language(audio_tracks): + """ + Finds the language of the default audio track in the given list of audio tracks. + + Args: + audio_tracks (list): A list of dictionaries containing information about each audio track. + Must contain the key "default" which is a boolean indicating if the track is the default track. + + Returns: + str: The ISO 639-2 code of the language of the default audio track, or None if no default track was found. + """ + for track in audio_tracks: + if track['default'] is True: + return track['language'] + return None + + +def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode | None = None) -> None: global task_queue if not has_audio(file_path): logging.debug(f"{file_path} doesn't have any audio to transcribe!") return - message = None - - if has_subtitle_language(file_path, skipifinternalsublang): - message = f"{file_path} already has an internal subtitle we want, skipping generation" - elif os.path.exists(get_file_name_without_extension(file_path) + subextension): - message = f"{file_path} already has a Subgen subtitle created for this, skipping it" - elif skipifexternalsub and (os.path.exists(get_file_name_without_extension(file_path) + f".{namesublang}.srt") or os.path.exists(get_file_name_without_extension(file_path) + f".{namesublang}.ass")): - message = f"{file_path} already has an external {namesublang} (non-Subgen) subtitle created for this, skipping it" - elif os.path.exists(get_file_name_without_extension(file_path) + subextensionSDH): - message = f"{file_path} already has a Subgen SDH subtitle created for this, skipping it" - elif os.path.exists(get_file_name_without_extension(file_path) + '.lrc'): - message = f"{file_path} already has a LRC created for this, skipping it" - elif skip_lang_codes_list: - # Check if any language in the audio streams matches a skip language - should_skip, skip_language = should_skip_languages(get_audio_languages(file_path)) - if should_skip: - message = f"Language '{skip_language}' detected in {file_path} and is in the skip list {skip_lang_codes_list}, skipping subtitle generation" - - if message: - logging.debug(message) + force_language = choose_transcribe_language(file_path, force_language) + + if have_to_skip(file_path, force_language): return task = { 'path': file_path, 'transcribe_or_translate': transcription_type, - 'force_language':force_language + 'force_language': force_language } + task['force_language'] = force_language task_queue.put(task) + logging.info(f"task_queue.put(task)({task['path']}, {task['transcribe_or_translate']}, {task['force_language']})") -def should_skip_languages(language_codes): +def have_to_skip(file_path, transcribe_language : LanguageCode): """ - Check if any language in language_codes matches a code in skip_lang_codes_list. - :return: (True, language_code) if a match is found, otherwise (False, None) - """ - for code in language_codes: - if code in skip_lang_codes_list: - return True, code - return False, None + Determines whether subtitle generation should be skipped for a given file. -def get_audio_languages(video_path): + Args: + file_path: The path to the file to check for existing subtitles. + transcribe_language: The language intended for transcription. + + Returns: + True if subtitle generation should be skipped based on existing subtitles + or specified conditions; otherwise, returns False. + + This function helps optimize subtitle processing by preventing redundant + subtitle generation for files that already contain subtitles in the desired + language or in any language specified in the skip list. + """ + if skip_if_to_transcribe_sub_already_exist: + if has_subtitle_language(file_path, transcribe_language): + logging.debug(f"{file_path} already has the language {transcribe_language} as subtitle we would transcribe, skipping subtitle generation") + return True + if skipifinternalsublang: + if has_subtitle_language(file_path, skipifinternalsublang): + logging.debug(f"{file_path} already has an subtitle we want, skipping subtitle generation") + return True + if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)): + return True + if any(item in skip_lang_codes_list for item in get_subtitle_languages(file_path)): + logging.debug(f"Language a code from {skip_lang_codes_list} detected in subtitle of {file_path}, skipping subtitle generation") + return True + if any(item in skip_if_audio_track_is_in_list for item in get_audio_languages(file_path)): + # Maybe add a check if the audio track is the default/ orginal or forced language to not skip it if it is a dubbed track in case of movies with multiple audio tracks. + logging.debug(f"Language a code from {skip_if_audio_track_is_in_list} detected in audio track of {file_path}, skipping subtitle generation") + return True + return False + +def get_subtitle_languages(video_path): """ Extract language codes from each audio stream in the video file using pyav. :param video_path: Path to the video file - :return: List of language codes for each audio stream + :return: List of language codes for each subtitle stream """ languages = [] # Open the video file with av.open(video_path) as container: # Iterate through each audio stream - for stream in container.streams.audio: + for stream in container.streams.subtitles: # Access the metadata for each audio stream lang_code = stream.metadata.get('language') if lang_code: - languages.append(lang_code) + languages.append(LanguageCode.from_iso_639_2(lang_code)) else: # Append 'und' (undefined) if no language metadata is present - languages.append('und') + languages.append(LanguageCode.NONE) return languages @@ -587,20 +870,101 @@ def get_file_name_without_extension(file_path): file_name, file_extension = os.path.splitext(file_path) return file_name -def has_subtitle_language(video_file, target_language): +def get_audio_languages(video_path): + """ + Extract language codes from each audio stream in the video file. + + :param video_path: Path to the video file + :return: List of language codes for each audio stream + """ + audio_tracks = get_audio_tracks(video_path) + return [track['language'] for track in audio_tracks] + +def has_subtitle_language(video_file, target_language: LanguageCode): + """ + Determines if a subtitle file with the target language is available for a specified video file. + + This function checks both within the video file and in its associated folder for subtitles + matching the specified language. + + Args: + video_file: The path to the video file. + target_language: The language of the subtitle file to search for. + + Returns: + bool: True if a subtitle file with the target language is found, False otherwise. + """ + logging.debug(f"has_subtitle_language({video_file}, {target_language})") + if target_language == LanguageCode.NONE: + return False + return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language) + +def has_subtitle_language_in_file(video_file, target_language: LanguageCode): + """ + Checks if a video file contains subtitles with a specific language. + + Args: + video_file: The path to the video file. + target_language: The language of the subtitle file to search for. + + Returns: + bool: True if a subtitle file with the target language is found, False otherwise. + """ + logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})") + if target_language == LanguageCode.NONE: + return False try: with av.open(video_file) as container: - subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and stream.metadata['language'] == target_language), None) + subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and LanguageCode.from_string(stream.metadata['language']) == target_language), None) if subtitle_stream: logging.debug(f"Subtitles in '{target_language}' language found in the video.") return True else: logging.debug(f"No subtitles in '{target_language}' language found in the video.") + return False except Exception as e: logging.info(f"An error occurred: {e}") return False + +def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode, recursion = True): + """Checks if the given folder has a subtitle file with the given language. + + Args: + video_file: The path of the video file. + target_language: The language of the subtitle file that we are looking for. + recursion: If True, search in subfolders of the given folder. If False, + only search in the given folder. + + Returns: + True if a subtitle file with the given language is found in the folder, + False otherwise. + """ + subtitle_extensions = ['.srt', '.vtt', '.sub', '.ass', '.ssa', '.idx', '.sbv', '.pgs', '.ttml', '.lrc'] + # just get the name of the movie e.g. movie.2025.remastered + video_file_stripped = os.path.splitext(os.path.split(video_file)[1])[0] + folder_path = os.path.dirname(video_file) + for file_name in os.listdir(folder_path): + file_path = os.path.join(folder_path, file_name) + + if os.path.isfile(file_path): + root, ext = os.path.splitext(file_name) + if root.startswith(video_file_stripped) and ext.lower() in subtitle_extensions: + parts = root[len(video_file_stripped):].lstrip(".").split(".") + # Check if the target language is one of the parts + if any(LanguageCode.from_string(part) == target_language for part in parts): + # If the language is found, return True + return True + elif os.path.isdir(file_path) and recursion: + # Looking in the subfolders of the video for subtitles + if has_subtitle_of_language_in_folder(os.path.join(file_path, os.path.split(video_file)[1]) , target_language, False): + # If the language is found in the subfolders, return True + return True + # If the language is not found, return False + return False + + def get_plex_file_name(itemid: str, server_ip: str, plex_token: str) -> str: """Gets the full path to a file from the Plex server. @@ -773,7 +1137,7 @@ if monitor: def on_modified(self, event): self.create_subtitle(event) -def transcribe_existing(transcribe_folders, forceLanguage=None): +def transcribe_existing(transcribe_folders, forceLanguage : LanguageCode | None = None): transcribe_folders = transcribe_folders.split("|") logging.info("Starting to search folders to see if we need to create subtitles.") logging.debug("The folders are:") @@ -797,107 +1161,6 @@ def transcribe_existing(transcribe_folders, forceLanguage=None): observer.start() logging.info("Finished searching and queueing files for transcription. Now watching for new files.") -whisper_languages = { - "en": "english", - "zh": "chinese", - "de": "german", - "es": "spanish", - "ru": "russian", - "ko": "korean", - "fr": "french", - "ja": "japanese", - "pt": "portuguese", - "tr": "turkish", - "pl": "polish", - "ca": "catalan", - "nl": "dutch", - "ar": "arabic", - "sv": "swedish", - "it": "italian", - "id": "indonesian", - "hi": "hindi", - "fi": "finnish", - "vi": "vietnamese", - "he": "hebrew", - "uk": "ukrainian", - "el": "greek", - "ms": "malay", - "cs": "czech", - "ro": "romanian", - "da": "danish", - "hu": "hungarian", - "ta": "tamil", - "no": "norwegian", - "th": "thai", - "ur": "urdu", - "hr": "croatian", - "bg": "bulgarian", - "lt": "lithuanian", - "la": "latin", - "mi": "maori", - "ml": "malayalam", - "cy": "welsh", - "sk": "slovak", - "te": "telugu", - "fa": "persian", - "lv": "latvian", - "bn": "bengali", - "sr": "serbian", - "az": "azerbaijani", - "sl": "slovenian", - "kn": "kannada", - "et": "estonian", - "mk": "macedonian", - "br": "breton", - "eu": "basque", - "is": "icelandic", - "hy": "armenian", - "ne": "nepali", - "mn": "mongolian", - "bs": "bosnian", - "kk": "kazakh", - "sq": "albanian", - "sw": "swahili", - "gl": "galician", - "mr": "marathi", - "pa": "punjabi", - "si": "sinhala", - "km": "khmer", - "sn": "shona", - "yo": "yoruba", - "so": "somali", - "af": "afrikaans", - "oc": "occitan", - "ka": "georgian", - "be": "belarusian", - "tg": "tajik", - "sd": "sindhi", - "gu": "gujarati", - "am": "amharic", - "yi": "yiddish", - "lo": "lao", - "uz": "uzbek", - "fo": "faroese", - "ht": "haitian creole", - "ps": "pashto", - "tk": "turkmen", - "nn": "nynorsk", - "mt": "maltese", - "sa": "sanskrit", - "lb": "luxembourgish", - "my": "myanmar", - "bo": "tibetan", - "tl": "tagalog", - "mg": "malagasy", - "as": "assamese", - "tt": "tatar", - "haw": "hawaiian", - "ln": "lingala", - "ha": "hausa", - "ba": "bashkir", - "jw": "javanese", - "su": "sundanese", -} if __name__ == "__main__": import uvicorn