Merge pull request #85 from xhzhu0628/add-queuing-and-threading

Add queuing and threading.  Bazarr doesn't respect `CONCURRENT_TRANSCRIPTIONS` intentionally because it wants a time-sensitive response.
This commit is contained in:
McCloudS
2024-04-21 09:20:13 -06:00
committed by GitHub

109
subgen.py
View File

@@ -109,12 +109,29 @@ update_env_variables()
app = FastAPI()
model = None
files_to_transcribe = []
in_docker = os.path.exists('/.dockerenv')
docker_status = "Docker" if in_docker else "Standalone"
last_print_time = None
#start queue
global task_queue
task_queue = queue.Queue()
def transcription_worker():
while True:
task = task_queue.get()
if 'Bazarr-' in task['path']:
logging.info(f"Skipping processing for {task['path']} as it is handled by ASR.")
else:
gen_subtitles(task['path'], task['transcribe_or_translate'], task['force_language'])
task_queue.task_done()
# show queue
logging.debug(f"There are {task_queue.qsize()} tasks left in the queue.")
for _ in range(concurrent_transcriptions):
threading.Thread(target=transcription_worker, daemon=True).start()
# Define a filter class
class MultiplePatternsFilter(logging.Filter):
def filter(self, record):
@@ -294,7 +311,7 @@ def receive_tautulli_webhook(
fullpath = file
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate, True)
else:
return {
"message": "This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"}
@@ -321,7 +338,7 @@ def receive_plex_webhook(
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate, True)
refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.")
except Exception as e:
@@ -346,7 +363,7 @@ def receive_jellyfin_webhook(
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
logging.debug(f"Path of file: {fullpath}")
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate, True)
try:
refresh_jellyfin_metadata(ItemId, jellyfinserver, jellyfintoken)
logging.info(f"Metadata for item {ItemId} refreshed successfully.")
@@ -379,7 +396,7 @@ def receive_emby_webhook(
if event == "library.new" and procaddedmedia or event == "playback.start" and procmediaonplay:
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate, True)
return ""
@@ -405,14 +422,17 @@ def asr(
try:
logging.info(f"Transcribing file from Bazarr/ASR webhook")
result = None
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6))
if force_detected_language_to:
language = force_detected_language_to
start_time = time.time()
start_model()
files_to_transcribe.insert(0, f"Bazarr-asr-{random_name}")
task_id = { 'path': f"Bazarr-asr-{random_name}" }
task_queue.put(task_id)
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
if model_prompt:
custom_prompt = greetings_translations.get(language, '') or custom_model_prompt
@@ -427,8 +447,7 @@ def asr(
except Exception as e:
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
finally:
if f"Bazarr-asr-{random_name}" in files_to_transcribe:
files_to_transcribe.remove(f"Bazarr-asr-{random_name}")
task_queue.task_done()
delete_model()
if result:
return StreamingResponse(
@@ -451,8 +470,11 @@ def detect_language(
logging.info(f"Detect language is set to detect on the first {detect_language_length} seconds of the audio.")
try:
start_model()
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
files_to_transcribe.insert(0, f"Bazarr-detect-language-{random_name}")
random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6))
task_id = { 'path': f"Bazarr-detect-language-{random_name}" }
task_queue.put(task_id)
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
detected_lang_code = model.transcribe_stable(whisper.pad_or_trim(audio_data, detect_language_length * 16000), input_sr=16000).language
@@ -460,8 +482,7 @@ def detect_language(
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
finally:
if f"Bazarr-detect-language-{random_name}" in files_to_transcribe:
files_to_transcribe.remove(f"Bazarr-detect-language-{random_name}")
task_queue.task_done()
delete_model()
return {"detected_language": whisper_languages.get(detected_lang_code, detected_lang_code) , "language_code": detected_lang_code}
@@ -473,7 +494,7 @@ def start_model():
model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions, compute_type=compute_type)
def delete_model():
if clear_vram_on_complete and len(files_to_transcribe) == 0:
if clear_vram_on_complete and task_queue.qsize() == 0:
global model
logging.debug("Queue is empty, clearing/releasing VRAM")
model = None
@@ -490,43 +511,17 @@ def write_lrc(result, file_path):
fraction = int((segment.start - int(segment.start)) * 100)
file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n")
def gen_subtitles(file_path: str, transcription_type: str, add_to_front=True, force_language=None) -> None:
def gen_subtitles(file_path: str, transcription_type: str, force_language=None) -> None:
"""Generates subtitles for a video file.
Args:
file_path: str - The path to the video file.
transcription_type: str - The type of transcription or translation to perform.
add_to_front: bool - Whether to add the file to the front of the transcription queue. Default is True.
force_language: str - The language to force for transcription or translation. Default is None.
"""
try:
if not has_audio(file_path):
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
return None
if file_path in files_to_transcribe:
logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
return
message = None
if has_subtitle_language(file_path, skipifinternalsublang):
message = f"{file_path} already has an internal subtitle we want, skipping generation"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
message = f"{file_path} already has a subtitle created for this, skipping it"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
if message:
logging.info(message)
return message
if add_to_front:
files_to_transcribe.insert(0, file_path)
else:
files_to_transcribe.append(file_path)
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
logging.info(f"{len(files_to_transcribe)} files in the queue for transcription")
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
start_time = time.time()
@@ -561,10 +556,32 @@ def gen_subtitles(file_path: str, transcription_type: str, add_to_front=True, fo
logging.info(f"Error processing or transcribing {file_path}: {e}")
finally:
if file_path in files_to_transcribe:
files_to_transcribe.remove(file_path)
delete_model()
def gen_subtitles_queue(file_path: str, transcription_type: str, force_language=None) -> None:
global task_queue
if not has_audio(file_path):
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
return
message = None
if has_subtitle_language(file_path, skipifinternalsublang):
message = f"{file_path} already has an internal subtitle we want, skipping generation"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
message = f"{file_path} already has a subtitle created for this, skipping it"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
if message:
logging.info(message)
return
task = {
'path': file_path,
'transcribe_or_translate': transcription_type,
'force_language':force_language
}
task_queue.put(task)
def get_file_name_without_extension(file_path):
file_name, file_extension = os.path.splitext(file_path)
@@ -736,7 +753,7 @@ if monitor:
if has_audio(file_path):
# Call the gen_subtitles function
logging.info(f"File: {path_mapping(file_path)} was added")
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False)
gen_subtitles_queue(path_mapping(file_path), transcribe_or_translate)
def on_created(self, event):
self.create_subtitle(event)
def on_modified(self, event):
@@ -751,11 +768,11 @@ def transcribe_existing(transcribe_folders, forceLanguage=None):
for root, dirs, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False, forceLanguage)
gen_subtitles_queue(path_mapping(file_path), transcribe_or_translate, forceLanguage)
# if the path specified was actually a single file and not a folder, process it
if os.path.isfile(path):
if has_audio(path):
gen_subtitles(path_mapping(path), transcribe_or_translate, False, forceLanguage)
gen_subtitles_queue(path_mapping(path), transcribe_or_translate, forceLanguage)
# Set up the observer to watch for new files
if monitor:
observer = Observer()