diff --git a/language_code.py b/language_code.py index 7c25e8d..f5876fa 100644 --- a/language_code.py +++ b/language_code.py @@ -102,7 +102,8 @@ class LanguageCode(Enum): YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá") CHINESE = ("zh", "zho", "chi", "Chinese", "中文") CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語") - NONE = (None, None, None, None, None) # For unknown languages or no language + NONE = (None, None, None, None, None) # For no language + # und for Undetermined aka unknown language https://www.loc.gov/standards/iso639-2/faq.html#25 def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native): self.iso_639_1 = iso_639_1 @@ -155,6 +156,11 @@ class LanguageCode(Enum): return lang return LanguageCode.NONE + # is valid language + @staticmethod + def is_valid_language(language: str): + return LanguageCode.from_string(language) is not LanguageCode.NONE + def to_iso_639_1(self): return self.iso_639_1 @@ -180,10 +186,10 @@ class LanguageCode(Enum): Explicitly handle comparison to None. """ if other is None: - # If compared to None, return False - # print(other) - # print(self) + # If compared to None, return False unless self is None return self.iso_639_1 is None + if isinstance(other, str): # Allow comparison with a string + return self.value == LanguageCode.from_string(other) if isinstance(other, LanguageCode): # Normal comparison for LanguageCode instances return self.iso_639_1 == other.iso_639_1 diff --git a/subgen.py b/subgen.py index 2da1a27..bb206f2 100644 --- a/subgen.py +++ b/subgen.py @@ -65,16 +65,32 @@ reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', F lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True)) custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1') detect_language_length = os.getenv('DETECT_LANGUAGE_LENGTH', 30) +detect_language_start_offset = os.getenv('DETECT_LANGUAGE_START_OFFSET', int(0)) skipifexternalsub = convert_to_bool(os.getenv('SKIPIFEXTERNALSUB', False)) skip_if_to_transcribe_sub_already_exist = convert_to_bool(os.getenv('SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True)) skipifinternalsublang = LanguageCode.from_iso_639_2(os.getenv('SKIPIFINTERNALSUBLANG', '')) -skip_lang_codes_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")] +skip_lang_codes_list = ( + [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")] + if os.getenv('SKIP_LANG_CODES') + else [] +) force_detected_language_to = LanguageCode.from_iso_639_2(os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')) -preferred_audio_language = LanguageCode.from_iso_639_2(os.getenv('PREFERRED_AUDIO_LANGUAGE', 'eng')) -skip_if_audio_track_is_in_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")] -# Maybe just have skip_if_audio_track_is_in_list and skip_lang_codes_list and remove skipifinternalsublang -# TODO option which iso code to write in the subtitle file1 +preferred_audio_languages = ( + [LanguageCode.from_iso_639_2(code) for code in os.getenv('PREFERRED_AUDIO_LANGUAGES', 'eng').split("|")] + if os.getenv('PREFERRED_AUDIO_LANGUAGES') + else [] +) # in order of preferrence +limit_to_preferred_audio_languages = convert_to_bool(os.getenv('LIMIT_TO_PREFERRED_AUDIO_LANGUAGE', False)) #TODO: add support for this +skip_if_audio_track_is_in_list = ( + [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")] + if os.getenv('SKIP_IF_AUDIO_TRACK_IS') + else [] +) subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B') +only_skip_if_subgen_subtitle = convert_to_bool(os.getenv('ONLY_SKIP_IF_SUBGEN_SUBTITLE', False)) +skip_unknown_language = convert_to_bool(os.getenv('SKIP_UNKNOWN_LANGUAGE', False)) +skip_if_language_is_not_set_but_subtitles_exist = convert_to_bool(os.getenv('SKIP_IF_LANGUAGE_IS_NOT_SET_BUT_SUBTITLES_EXIST', False)) +should_whiser_detect_audio_language = convert_to_bool(os.getenv('SHOULD_WHISPER_DETECT_AUDIO_LANGUAGE', False)) try: kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}') @@ -86,6 +102,19 @@ if transcribe_device == "gpu": transcribe_device = "cuda" +VIDEO_EXTENSIONS = ( + ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mpg", ".mpeg", + ".3gp", ".ogv", ".vob", ".rm", ".rmvb", ".ts", ".m4v", ".f4v", ".svq3", + ".asf", ".m2ts", ".divx", ".xvid" +) + +AUDIO_EXTENSIONS = ( + ".mp3", ".wav", ".aac", ".flac", ".ogg", ".wma", ".alac", ".m4a", ".opus", + ".aiff", ".aif", ".pcm", ".ra", ".ram", ".mid", ".midi", ".ape", ".wv", + ".amr", ".vox", ".tak", ".spx", '.m4b' +) + + app = FastAPI() model = None @@ -99,8 +128,13 @@ task_queue = queue.Queue() def transcription_worker(): while True: task = task_queue.get() + + logger.info(f"Task {task['path']} is being handled by Subgen.") + if 'Bazarr-' in task['path']: logging.info(f"Task {task['path']} is being handled by ASR.") + if "type" in task and task["type"] == "detect_language": + detect_language_task(task['path']) else: gen_subtitles(task['path'], task['transcribe_or_translate'], task['force_language']) task_queue.task_done() @@ -189,15 +223,6 @@ def appendLine(result): # Append the new segment to the result's segments result.segments.append(newSegment) -def has_image_extension(file_path): - valid_extensions = ['.rgb', '.gif', '.pbm', '.pgm', '.ppm', '.tiff', '.rast', '.xbm', '.jpg', '.jpeg', '.bmp', '.png', '.webp', '.exr', '.bif'] # taken from the extensions detected by the imghdr module & added Emby's '.bif' files - - if os.path.exists(file_path): - file_extension = os.path.splitext(file_path)[1].lower() - return file_extension in valid_extensions - else: - return True # return a value that causes the file to be skipped. - @app.get("/plex") @app.get("/webhook") @app.get("/jellyfin") @@ -345,7 +370,7 @@ async def asr( random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)) if force_detected_language_to: - language = force_detected_language_to + language = force_detected_language_to.from_iso_639_1() logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") start_time = time.time() @@ -445,6 +470,62 @@ async def detect_language( return {"detected_language": detected_language.to_name(), "language_code": language_code} +def detect_language_task(path): + detected_language = LanguageCode.NONE + language_code = 'und' + global detect_language_length + + logger.info(f"Detecting language of file: {path} on the first {detect_language_length} seconds of the file") + + try: + start_model() + + audio_segment = extract_audio_segment_to_memory(path, detect_language_start_offset, int(detect_language_length)).read() + + + detected_language = LanguageCode.from_name(model.transcribe_stable(audio_segment).language) + logging.debug(f"Detected language: {detected_language.to_name()}") + # reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ... + language_code = detected_language.to_iso_639_1() + logging.debug(f"Language Code: {language_code}") + + except Exception as e: + logging.info(f"Error detectign language of file with whisper: {e}") + + finally: + task_queue.task_done() + delete_model() + # put task to transcribe this with the detected language + task_id = { 'path': path, "transcribe_or_translate": transcribe_or_translate, 'force_language': detected_language } + task_queue.put(task_id) + + #maybe modify the file to contain detected language so we won't trigger this again + + return + +def extract_audio_segment_to_memory(input_file, start_time, duration): + """ + Extract a segment of audio from input_file, starting at start_time for duration seconds. + + :param input_file: Path to the input audio file + :param start_time: Start time in seconds (e.g., 60 for 1 minute) + :param duration: Duration in seconds (e.g., 30 for 30 seconds) + :return: BytesIO object containing the audio segment + """ + try: + # Run FFmpeg to extract the desired segment + out, _ = ( + ffmpeg + .input(input_file, ss=start_time, t=duration) # Start time and duration + .output('pipe:1', format='wav', acodec='pcm_s16le', ar=16000) # Output to pipe as WAV + .run(capture_stdout=True, capture_stderr=True) + ) + return io.BytesIO(out) # Convert output to BytesIO for in-memory processing + except ffmpeg.Error as e: + print("Error occurred:", e.stderr.decode()) + return None + + def start_model(): global model if model is None: @@ -460,7 +541,7 @@ def delete_model(): def isAudioFileExtension(file_extension): return file_extension.casefold() in \ - [ '.mp3', '.flac', '.wav', '.alac', '.ape', '.ogg', '.wma', '.m4a', '.m4b', '.aac', '.aiff' ] + AUDIO_EXTENSIONS def write_lrc(result, file_path): with open(file_path, "w") as file: @@ -469,7 +550,7 @@ def write_lrc(result, file_path): fraction = int((segment.start - int(segment.start)) * 100) file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n") -def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode | None = None) -> None: +def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode = LanguageCode.NONE) -> None: """Generates subtitles for a video file. Args: @@ -512,6 +593,8 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang if is_audio_file and lrc_for_audio_files: write_lrc(result, file_name + '.lrc') else: + if not force_language: + force_language = LanguageCode.from_string(result.language) result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight) elapsed_time = time.time() - start_time @@ -520,7 +603,7 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.") except Exception as e: - logging.info(f"Error processing or transcribing {file_path}: {e}") + logging.info(f"Error processing or transcribing {file_path} in {force_language}: {e}") finally: delete_model() @@ -663,23 +746,31 @@ def choose_transcribe_language(file_path, forced_language): determined. """ - # todo handle iso 2/3 + logger.debug(f"choose_transcribe_language({file_path}, {forced_language})") + if forced_language: + logger.debug(f"ENV FORCE_LANGUAGE is set: Forcing language to {forced_language}") return forced_language if force_detected_language_to: + logger.debug(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") return force_detected_language_to audio_tracks = get_audio_tracks(file_path) - if has_language_audio_track(audio_tracks, preferred_audio_language): - language = preferred_audio_language + + found_track_in_language = find_language_audio_track(audio_tracks, preferred_audio_languages) + if found_track_in_language: + language = found_track_in_language if language: + logger.debug(f"Preferred language found: {language}") return language + default_language = find_default_audio_track_language(audio_tracks) if default_language: + logger.debug(f"Default language found: {default_language}") return default_language - return None + return LanguageCode.NONE def get_audio_tracks(video_file): @@ -758,22 +849,23 @@ def get_audio_tracks(video_file): logging.error(f"An error occurred while reading audio track information: {str(e)}") return [] -def has_language_audio_track(audio_tracks, find_language): +def find_language_audio_track(audio_tracks, find_languages): """ - Checks if an audio track with the given language is present in the list of audio tracks. + Checks if an audio track with any of the given languages is present in the list of audio tracks. + Returns the first language from `find_languages` that matches. Args: audio_tracks (list): A list of dictionaries containing information about each audio track. - find_language (str): The ISO 639-2 code of the language to search for. + find_languages (list): A list language codes to search for. Returns: - bool: True if an audio track with the given language was found, False otherwise. + str or None: The first language found from `find_languages`, or None if no match is found. """ - for track in audio_tracks: - if track['language'] == find_language: #ISO 639-2 - return True - return False - + for language in find_languages: + for track in audio_tracks: + if track['language'] == language: + return language + return None def find_default_audio_track_language(audio_tracks): """ Finds the language of the default audio track in the given list of audio tracks. @@ -791,7 +883,7 @@ def find_default_audio_track_language(audio_tracks): return None -def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode | None = None) -> None: +def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode = LanguageCode.NONE) -> None: global task_queue if not has_audio(file_path): @@ -800,7 +892,17 @@ def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: force_language = choose_transcribe_language(file_path, force_language) + # check if we would like to detect audio language in case of no audio language specified. Will return here again with specified language from whisper + if not force_language and should_whiser_detect_audio_language: + # make a detect language task + task_id = { 'path': file_path, 'type': "detect_language" } + task_queue.put(task_id) + logging.info(f"task_queue.put(task_id)({file_path}, detect_language)") + return + + if have_to_skip(file_path, force_language): + logging.debug(f"{file_path} already has subtitles in {force_language}, skipping.") return task = { @@ -823,6 +925,10 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: Returns: True if subtitle generation should be skipped; otherwise, False. """ + if skip_unknown_language and transcribe_language == LanguageCode.NONE: + logging.debug(f"{file_path} has unknown language, skipping.") + return True + # Check if subtitles in the desired transcription language already exist if skip_if_to_transcribe_sub_already_exist and has_subtitle_language(file_path, transcribe_language): logging.debug(f"{file_path} already has subtitles in {transcribe_language}, skipping.") @@ -834,9 +940,11 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: return True # Check if external subtitles exist for the specified language - if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)): - logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.") - return True + # Probably not use LanguageCode for this, but just check with strings, to be able to skip with custom named languages. + if LanguageCode.is_valid_language(namesublang): + if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)): + logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.") + return True # Skip if any language in the skip list is detected in existing subtitles existing_sub_langs = get_subtitle_languages(file_path) @@ -844,11 +952,17 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool: logging.debug(f"Languages in skip list {skip_lang_codes_list} detected in {file_path}, skipping.") return True - # Skip if any language in the audio track skip list is detected audio_langs = get_audio_languages(file_path) - if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs): - logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.") - return True + if preferred_audio_languages in audio_langs: + logging.debug(f"Preferred audio language {preferred_audio_languages} detected in {file_path}.") + # maybe not skip if subtitle exist in preferred audio language, but not in another preferred audio language if the file has multiple audio tracks matching the preferred audio languages + else: + if limit_to_preferred_audio_languages: + logging.debug(f"Only non-preferred audio language detected in {file_path}, skipping.") + return True + if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs): + logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.") + return True # If none of the conditions matched, do not skip return False @@ -903,9 +1017,6 @@ def has_subtitle_language(video_file, target_language: LanguageCode): Returns: bool: True if a subtitle file with the target language is found, False otherwise. """ - logging.debug(f"has_subtitle_language({video_file}, {target_language})") - if target_language == LanguageCode.NONE: - return False return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language) def has_subtitle_language_in_file(video_file, target_language: LanguageCode): @@ -919,21 +1030,25 @@ def has_subtitle_language_in_file(video_file, target_language: LanguageCode): Returns: bool: True if a subtitle file with the target language is found, False otherwise. """ - logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})") - if target_language == LanguageCode.NONE: + # logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})") + if (target_language == LanguageCode.NONE and not skip_if_language_is_not_set_but_subtitles_exist) or only_skip_if_subgen_subtitle: # skip if language is not set or we are only interested in subgen subtitles which are not internal, only external return False try: with av.open(video_file) as container: - subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and LanguageCode.from_string(stream.metadata['language']) == target_language), None) + subtitle_streams = (stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata) - if subtitle_stream: + if skip_if_language_is_not_set_but_subtitles_exist and target_language == LanguageCode.NONE and any(subtitle_streams): + logging.debug("Language is not set but internal subtitles exist.") + return True + + if next(stream for stream in subtitle_streams if LanguageCode.from_string(stream.metadata['language']) == target_language): logging.debug(f"Subtitles in '{target_language}' language found in the video.") return True else: logging.debug(f"No subtitles in '{target_language}' language found in the video.") return False except Exception as e: - logging.info(f"An error occurred: {e}") + logging.error(f"An error occurred while checking the file with pyav: {e}") # TODO: figure out why this throws (empty) errors return False def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode, recursion = True): @@ -961,9 +1076,32 @@ def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode root, ext = os.path.splitext(file_name) if root.startswith(video_file_stripped) and ext.lower() in subtitle_extensions: parts = root[len(video_file_stripped):].lstrip(".").split(".") - # Check if the target language is one of the parts + + has_subgen = "subgen" in parts # Checks if "subgen" is in parts + + #checking this first because e.g LanguageCode.from_string("subgen") == LanguageCode.NONE is equal to True. Maybe handle this better with a check with a function like is language code. To check if part is a valid language before comparing it to target_language + + if target_language == LanguageCode.NONE: + if only_skip_if_subgen_subtitle: + if has_subgen: + logger.debug("Subtitles from subgen found in the folder. ") + return skip_if_language_is_not_set_but_subtitles_exist + else: + #might be other subtitles that have subgen in the name + continue + logger.debug("Subtitles exist in the folder. and only_skip_if_subgen_subtitle is False.") + return skip_if_language_is_not_set_but_subtitles_exist + if any(LanguageCode.from_string(part) == target_language for part in parts): - # If the language is found, return True + # If the subtitle is found, return True + if only_skip_if_subgen_subtitle: + if has_subgen: + logger.debug(f"Subtitles from subgen in '{target_language}' language found in the folder.") + return True + else: + #might be other subtitles that have subgen in the name + continue + logger.debug(f"Subtitles in '{target_language}' language found in the folder.") return True elif os.path.isdir(file_path) and recursion: # Looking in the subfolders of the video for subtitles @@ -1105,8 +1243,11 @@ def get_jellyfin_admin(users): def has_audio(file_path): try: - if has_image_extension(file_path): - logging.debug(f"{file_path} is an image or is an invalid file or path (are your volumes correct?), skipping processing") + if not is_valid_path(file_path): + return False + + if not (has_video_extension(file_path) or has_audio_extension(file_path)): + # logging.debug(f"{file_path} is an not a video or audio file, skipping processing. skipping processing") return False with av.open(file_path) as container: @@ -1124,6 +1265,28 @@ def has_audio(file_path): logging.debug(f"Error processing file {file_path}") return False +def is_valid_path(file_path): + # Check if the path is a file + if not os.path.isfile(file_path): + # If it's not a file, check if it's a directory + if not os.path.isdir(file_path): + logging.warning(f"{file_path} is neither a file nor a directory. Are your volumes correct?") + return False + else: + logging.debug(f"{file_path} is a directory, skipping processing as a file.") + return False + else: + return True + +def has_video_extension(file_name): + file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension + return file_extension in VIDEO_EXTENSIONS + +def has_audio_extension(file_name): + file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension + return file_extension in AUDIO_EXTENSIONS + + def path_mapping(fullpath): if use_path_mapping: logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))