Language code improvements (#147)

* improved language code handling

* expanded skipping behaviour

* remove unused code

* Added an option to detect language with whisper before choosing to skip it

---------

Co-authored-by: muisje <27768559+muisje@users.noreply.github.com>
This commit is contained in:
McCloudS
2024-12-03 10:37:07 -07:00
committed by GitHub
parent 66dc8e5faa
commit 84243bb34e
2 changed files with 224 additions and 55 deletions

View File

@@ -102,7 +102,8 @@ class LanguageCode(Enum):
YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá")
CHINESE = ("zh", "zho", "chi", "Chinese", "中文")
CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語")
NONE = (None, None, None, None, None) # For unknown languages or no language
NONE = (None, None, None, None, None) # For no language
# und for Undetermined aka unknown language https://www.loc.gov/standards/iso639-2/faq.html#25
def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native):
self.iso_639_1 = iso_639_1
@@ -155,6 +156,11 @@ class LanguageCode(Enum):
return lang
return LanguageCode.NONE
# is valid language
@staticmethod
def is_valid_language(language: str):
return LanguageCode.from_string(language) is not LanguageCode.NONE
def to_iso_639_1(self):
return self.iso_639_1
@@ -180,10 +186,10 @@ class LanguageCode(Enum):
Explicitly handle comparison to None.
"""
if other is None:
# If compared to None, return False
# print(other)
# print(self)
# If compared to None, return False unless self is None
return self.iso_639_1 is None
if isinstance(other, str): # Allow comparison with a string
return self.value == LanguageCode.from_string(other)
if isinstance(other, LanguageCode):
# Normal comparison for LanguageCode instances
return self.iso_639_1 == other.iso_639_1

265
subgen.py
View File

@@ -65,16 +65,32 @@ reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', F
lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True))
custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1')
detect_language_length = os.getenv('DETECT_LANGUAGE_LENGTH', 30)
detect_language_start_offset = os.getenv('DETECT_LANGUAGE_START_OFFSET', int(0))
skipifexternalsub = convert_to_bool(os.getenv('SKIPIFEXTERNALSUB', False))
skip_if_to_transcribe_sub_already_exist = convert_to_bool(os.getenv('SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True))
skipifinternalsublang = LanguageCode.from_iso_639_2(os.getenv('SKIPIFINTERNALSUBLANG', ''))
skip_lang_codes_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")]
skip_lang_codes_list = (
[LanguageCode.from_iso_639_2(code) for code in os.getenv("SKIP_LANG_CODES", "").split("|")]
if os.getenv('SKIP_LANG_CODES')
else []
)
force_detected_language_to = LanguageCode.from_iso_639_2(os.getenv('FORCE_DETECTED_LANGUAGE_TO', ''))
preferred_audio_language = LanguageCode.from_iso_639_2(os.getenv('PREFERRED_AUDIO_LANGUAGE', 'eng'))
skip_if_audio_track_is_in_list = [LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")]
# Maybe just have skip_if_audio_track_is_in_list and skip_lang_codes_list and remove skipifinternalsublang
# TODO option which iso code to write in the subtitle file1
preferred_audio_languages = (
[LanguageCode.from_iso_639_2(code) for code in os.getenv('PREFERRED_AUDIO_LANGUAGES', 'eng').split("|")]
if os.getenv('PREFERRED_AUDIO_LANGUAGES')
else []
) # in order of preferrence
limit_to_preferred_audio_languages = convert_to_bool(os.getenv('LIMIT_TO_PREFERRED_AUDIO_LANGUAGE', False)) #TODO: add support for this
skip_if_audio_track_is_in_list = (
[LanguageCode.from_iso_639_2(code) for code in os.getenv('SKIP_IF_AUDIO_TRACK_IS', '').split("|")]
if os.getenv('SKIP_IF_AUDIO_TRACK_IS')
else []
)
subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B')
only_skip_if_subgen_subtitle = convert_to_bool(os.getenv('ONLY_SKIP_IF_SUBGEN_SUBTITLE', False))
skip_unknown_language = convert_to_bool(os.getenv('SKIP_UNKNOWN_LANGUAGE', False))
skip_if_language_is_not_set_but_subtitles_exist = convert_to_bool(os.getenv('SKIP_IF_LANGUAGE_IS_NOT_SET_BUT_SUBTITLES_EXIST', False))
should_whiser_detect_audio_language = convert_to_bool(os.getenv('SHOULD_WHISPER_DETECT_AUDIO_LANGUAGE', False))
try:
kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}')
@@ -86,6 +102,19 @@ if transcribe_device == "gpu":
transcribe_device = "cuda"
VIDEO_EXTENSIONS = (
".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mpg", ".mpeg",
".3gp", ".ogv", ".vob", ".rm", ".rmvb", ".ts", ".m4v", ".f4v", ".svq3",
".asf", ".m2ts", ".divx", ".xvid"
)
AUDIO_EXTENSIONS = (
".mp3", ".wav", ".aac", ".flac", ".ogg", ".wma", ".alac", ".m4a", ".opus",
".aiff", ".aif", ".pcm", ".ra", ".ram", ".mid", ".midi", ".ape", ".wv",
".amr", ".vox", ".tak", ".spx", '.m4b'
)
app = FastAPI()
model = None
@@ -99,8 +128,13 @@ task_queue = queue.Queue()
def transcription_worker():
while True:
task = task_queue.get()
logger.info(f"Task {task['path']} is being handled by Subgen.")
if 'Bazarr-' in task['path']:
logging.info(f"Task {task['path']} is being handled by ASR.")
if "type" in task and task["type"] == "detect_language":
detect_language_task(task['path'])
else:
gen_subtitles(task['path'], task['transcribe_or_translate'], task['force_language'])
task_queue.task_done()
@@ -189,15 +223,6 @@ def appendLine(result):
# Append the new segment to the result's segments
result.segments.append(newSegment)
def has_image_extension(file_path):
valid_extensions = ['.rgb', '.gif', '.pbm', '.pgm', '.ppm', '.tiff', '.rast', '.xbm', '.jpg', '.jpeg', '.bmp', '.png', '.webp', '.exr', '.bif'] # taken from the extensions detected by the imghdr module & added Emby's '.bif' files
if os.path.exists(file_path):
file_extension = os.path.splitext(file_path)[1].lower()
return file_extension in valid_extensions
else:
return True # return a value that causes the file to be skipped.
@app.get("/plex")
@app.get("/webhook")
@app.get("/jellyfin")
@@ -345,7 +370,7 @@ async def asr(
random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6))
if force_detected_language_to:
language = force_detected_language_to
language = force_detected_language_to.from_iso_639_1()
logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}")
start_time = time.time()
@@ -445,6 +470,62 @@ async def detect_language(
return {"detected_language": detected_language.to_name(), "language_code": language_code}
def detect_language_task(path):
detected_language = LanguageCode.NONE
language_code = 'und'
global detect_language_length
logger.info(f"Detecting language of file: {path} on the first {detect_language_length} seconds of the file")
try:
start_model()
audio_segment = extract_audio_segment_to_memory(path, detect_language_start_offset, int(detect_language_length)).read()
detected_language = LanguageCode.from_name(model.transcribe_stable(audio_segment).language)
logging.debug(f"Detected language: {detected_language.to_name()}")
# reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ...
language_code = detected_language.to_iso_639_1()
logging.debug(f"Language Code: {language_code}")
except Exception as e:
logging.info(f"Error detectign language of file with whisper: {e}")
finally:
task_queue.task_done()
delete_model()
# put task to transcribe this with the detected language
task_id = { 'path': path, "transcribe_or_translate": transcribe_or_translate, 'force_language': detected_language }
task_queue.put(task_id)
#maybe modify the file to contain detected language so we won't trigger this again
return
def extract_audio_segment_to_memory(input_file, start_time, duration):
"""
Extract a segment of audio from input_file, starting at start_time for duration seconds.
:param input_file: Path to the input audio file
:param start_time: Start time in seconds (e.g., 60 for 1 minute)
:param duration: Duration in seconds (e.g., 30 for 30 seconds)
:return: BytesIO object containing the audio segment
"""
try:
# Run FFmpeg to extract the desired segment
out, _ = (
ffmpeg
.input(input_file, ss=start_time, t=duration) # Start time and duration
.output('pipe:1', format='wav', acodec='pcm_s16le', ar=16000) # Output to pipe as WAV
.run(capture_stdout=True, capture_stderr=True)
)
return io.BytesIO(out) # Convert output to BytesIO for in-memory processing
except ffmpeg.Error as e:
print("Error occurred:", e.stderr.decode())
return None
def start_model():
global model
if model is None:
@@ -460,7 +541,7 @@ def delete_model():
def isAudioFileExtension(file_extension):
return file_extension.casefold() in \
[ '.mp3', '.flac', '.wav', '.alac', '.ape', '.ogg', '.wma', '.m4a', '.m4b', '.aac', '.aiff' ]
AUDIO_EXTENSIONS
def write_lrc(result, file_path):
with open(file_path, "w") as file:
@@ -469,7 +550,7 @@ def write_lrc(result, file_path):
fraction = int((segment.start - int(segment.start)) * 100)
file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n")
def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode | None = None) -> None:
def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode = LanguageCode.NONE) -> None:
"""Generates subtitles for a video file.
Args:
@@ -512,6 +593,8 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang
if is_audio_file and lrc_for_audio_files:
write_lrc(result, file_name + '.lrc')
else:
if not force_language:
force_language = LanguageCode.from_string(result.language)
result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight)
elapsed_time = time.time() - start_time
@@ -520,7 +603,7 @@ def gen_subtitles(file_path: str, transcription_type: str, force_language : Lang
f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
except Exception as e:
logging.info(f"Error processing or transcribing {file_path}: {e}")
logging.info(f"Error processing or transcribing {file_path} in {force_language}: {e}")
finally:
delete_model()
@@ -663,23 +746,31 @@ def choose_transcribe_language(file_path, forced_language):
determined.
"""
# todo handle iso 2/3
logger.debug(f"choose_transcribe_language({file_path}, {forced_language})")
if forced_language:
logger.debug(f"ENV FORCE_LANGUAGE is set: Forcing language to {forced_language}")
return forced_language
if force_detected_language_to:
logger.debug(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}")
return force_detected_language_to
audio_tracks = get_audio_tracks(file_path)
if has_language_audio_track(audio_tracks, preferred_audio_language):
language = preferred_audio_language
found_track_in_language = find_language_audio_track(audio_tracks, preferred_audio_languages)
if found_track_in_language:
language = found_track_in_language
if language:
logger.debug(f"Preferred language found: {language}")
return language
default_language = find_default_audio_track_language(audio_tracks)
if default_language:
logger.debug(f"Default language found: {default_language}")
return default_language
return None
return LanguageCode.NONE
def get_audio_tracks(video_file):
@@ -758,22 +849,23 @@ def get_audio_tracks(video_file):
logging.error(f"An error occurred while reading audio track information: {str(e)}")
return []
def has_language_audio_track(audio_tracks, find_language):
def find_language_audio_track(audio_tracks, find_languages):
"""
Checks if an audio track with the given language is present in the list of audio tracks.
Checks if an audio track with any of the given languages is present in the list of audio tracks.
Returns the first language from `find_languages` that matches.
Args:
audio_tracks (list): A list of dictionaries containing information about each audio track.
find_language (str): The ISO 639-2 code of the language to search for.
find_languages (list): A list language codes to search for.
Returns:
bool: True if an audio track with the given language was found, False otherwise.
str or None: The first language found from `find_languages`, or None if no match is found.
"""
for track in audio_tracks:
if track['language'] == find_language: #ISO 639-2
return True
return False
for language in find_languages:
for track in audio_tracks:
if track['language'] == language:
return language
return None
def find_default_audio_track_language(audio_tracks):
"""
Finds the language of the default audio track in the given list of audio tracks.
@@ -791,7 +883,7 @@ def find_default_audio_track_language(audio_tracks):
return None
def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode | None = None) -> None:
def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode = LanguageCode.NONE) -> None:
global task_queue
if not has_audio(file_path):
@@ -800,7 +892,17 @@ def gen_subtitles_queue(file_path: str, transcription_type: str, force_language:
force_language = choose_transcribe_language(file_path, force_language)
# check if we would like to detect audio language in case of no audio language specified. Will return here again with specified language from whisper
if not force_language and should_whiser_detect_audio_language:
# make a detect language task
task_id = { 'path': file_path, 'type': "detect_language" }
task_queue.put(task_id)
logging.info(f"task_queue.put(task_id)({file_path}, detect_language)")
return
if have_to_skip(file_path, force_language):
logging.debug(f"{file_path} already has subtitles in {force_language}, skipping.")
return
task = {
@@ -823,6 +925,10 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool:
Returns:
True if subtitle generation should be skipped; otherwise, False.
"""
if skip_unknown_language and transcribe_language == LanguageCode.NONE:
logging.debug(f"{file_path} has unknown language, skipping.")
return True
# Check if subtitles in the desired transcription language already exist
if skip_if_to_transcribe_sub_already_exist and has_subtitle_language(file_path, transcribe_language):
logging.debug(f"{file_path} already has subtitles in {transcribe_language}, skipping.")
@@ -834,9 +940,11 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool:
return True
# Check if external subtitles exist for the specified language
if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)):
logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.")
return True
# Probably not use LanguageCode for this, but just check with strings, to be able to skip with custom named languages.
if LanguageCode.is_valid_language(namesublang):
if skipifexternalsub and has_subtitle_language(file_path, LanguageCode.from_string(namesublang)):
logging.debug(f"{file_path} has external subtitles in {namesublang}, skipping.")
return True
# Skip if any language in the skip list is detected in existing subtitles
existing_sub_langs = get_subtitle_languages(file_path)
@@ -844,11 +952,17 @@ def have_to_skip(file_path: str, transcribe_language: LanguageCode) -> bool:
logging.debug(f"Languages in skip list {skip_lang_codes_list} detected in {file_path}, skipping.")
return True
# Skip if any language in the audio track skip list is detected
audio_langs = get_audio_languages(file_path)
if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs):
logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.")
return True
if preferred_audio_languages in audio_langs:
logging.debug(f"Preferred audio language {preferred_audio_languages} detected in {file_path}.")
# maybe not skip if subtitle exist in preferred audio language, but not in another preferred audio language if the file has multiple audio tracks matching the preferred audio languages
else:
if limit_to_preferred_audio_languages:
logging.debug(f"Only non-preferred audio language detected in {file_path}, skipping.")
return True
if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs):
logging.debug(f"Audio language in skip list {skip_if_audio_track_is_in_list} detected in {file_path}, skipping.")
return True
# If none of the conditions matched, do not skip
return False
@@ -903,9 +1017,6 @@ def has_subtitle_language(video_file, target_language: LanguageCode):
Returns:
bool: True if a subtitle file with the target language is found, False otherwise.
"""
logging.debug(f"has_subtitle_language({video_file}, {target_language})")
if target_language == LanguageCode.NONE:
return False
return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language)
def has_subtitle_language_in_file(video_file, target_language: LanguageCode):
@@ -919,21 +1030,25 @@ def has_subtitle_language_in_file(video_file, target_language: LanguageCode):
Returns:
bool: True if a subtitle file with the target language is found, False otherwise.
"""
logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})")
if target_language == LanguageCode.NONE:
# logging.debug(f"has_subtitle_language_in_file({video_file}, {target_language})")
if (target_language == LanguageCode.NONE and not skip_if_language_is_not_set_but_subtitles_exist) or only_skip_if_subgen_subtitle: # skip if language is not set or we are only interested in subgen subtitles which are not internal, only external
return False
try:
with av.open(video_file) as container:
subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and LanguageCode.from_string(stream.metadata['language']) == target_language), None)
subtitle_streams = (stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata)
if subtitle_stream:
if skip_if_language_is_not_set_but_subtitles_exist and target_language == LanguageCode.NONE and any(subtitle_streams):
logging.debug("Language is not set but internal subtitles exist.")
return True
if next(stream for stream in subtitle_streams if LanguageCode.from_string(stream.metadata['language']) == target_language):
logging.debug(f"Subtitles in '{target_language}' language found in the video.")
return True
else:
logging.debug(f"No subtitles in '{target_language}' language found in the video.")
return False
except Exception as e:
logging.info(f"An error occurred: {e}")
logging.error(f"An error occurred while checking the file with pyav: {e}") # TODO: figure out why this throws (empty) errors
return False
def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode, recursion = True):
@@ -961,9 +1076,32 @@ def has_subtitle_of_language_in_folder(video_file, target_language: LanguageCode
root, ext = os.path.splitext(file_name)
if root.startswith(video_file_stripped) and ext.lower() in subtitle_extensions:
parts = root[len(video_file_stripped):].lstrip(".").split(".")
# Check if the target language is one of the parts
has_subgen = "subgen" in parts # Checks if "subgen" is in parts
#checking this first because e.g LanguageCode.from_string("subgen") == LanguageCode.NONE is equal to True. Maybe handle this better with a check with a function like is language code. To check if part is a valid language before comparing it to target_language
if target_language == LanguageCode.NONE:
if only_skip_if_subgen_subtitle:
if has_subgen:
logger.debug("Subtitles from subgen found in the folder. ")
return skip_if_language_is_not_set_but_subtitles_exist
else:
#might be other subtitles that have subgen in the name
continue
logger.debug("Subtitles exist in the folder. and only_skip_if_subgen_subtitle is False.")
return skip_if_language_is_not_set_but_subtitles_exist
if any(LanguageCode.from_string(part) == target_language for part in parts):
# If the language is found, return True
# If the subtitle is found, return True
if only_skip_if_subgen_subtitle:
if has_subgen:
logger.debug(f"Subtitles from subgen in '{target_language}' language found in the folder.")
return True
else:
#might be other subtitles that have subgen in the name
continue
logger.debug(f"Subtitles in '{target_language}' language found in the folder.")
return True
elif os.path.isdir(file_path) and recursion:
# Looking in the subfolders of the video for subtitles
@@ -1105,8 +1243,11 @@ def get_jellyfin_admin(users):
def has_audio(file_path):
try:
if has_image_extension(file_path):
logging.debug(f"{file_path} is an image or is an invalid file or path (are your volumes correct?), skipping processing")
if not is_valid_path(file_path):
return False
if not (has_video_extension(file_path) or has_audio_extension(file_path)):
# logging.debug(f"{file_path} is an not a video or audio file, skipping processing. skipping processing")
return False
with av.open(file_path) as container:
@@ -1124,6 +1265,28 @@ def has_audio(file_path):
logging.debug(f"Error processing file {file_path}")
return False
def is_valid_path(file_path):
# Check if the path is a file
if not os.path.isfile(file_path):
# If it's not a file, check if it's a directory
if not os.path.isdir(file_path):
logging.warning(f"{file_path} is neither a file nor a directory. Are your volumes correct?")
return False
else:
logging.debug(f"{file_path} is a directory, skipping processing as a file.")
return False
else:
return True
def has_video_extension(file_name):
file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension
return file_extension in VIDEO_EXTENSIONS
def has_audio_extension(file_name):
file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension
return file_extension in AUDIO_EXTENSIONS
def path_mapping(fullpath):
if use_path_mapping:
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))