From 68508295747da0c5a04c3461e6a202ce5cdea518 Mon Sep 17 00:00:00 2001 From: McCloudS <64094529+McCloudS@users.noreply.github.com> Date: Sat, 28 Oct 2023 22:57:29 -0600 Subject: [PATCH] Update subgen.py Significant housekeeping. * Cleaned up logging to not spam as much * Move from Flask to FastAPI/uvicorn * Added some garbage collection vram clearing for GPUs when not transcoding --- subgen/subgen.py | 241 +++++++++++++++++++++++++---------------------- 1 file changed, 126 insertions(+), 115 deletions(-) diff --git a/subgen/subgen.py b/subgen/subgen.py index 5224aff..fa4a171 100644 --- a/subgen/subgen.py +++ b/subgen/subgen.py @@ -9,14 +9,16 @@ import queue import logging import gc from array import array +from typing import Union, Any # List of packages to install packages_to_install = [ 'numpy', 'stable-ts', - 'flask', + 'fastapi', 'requests', 'faster-whisper', + 'uvicorn', # Add more packages as needed ] @@ -28,7 +30,8 @@ for package in packages_to_install: except subprocess.CalledProcessError as e: print(f"Failed to install {package}: {e}") -from flask import Flask, request +from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request +from fastapi.responses import StreamingResponse, RedirectResponse import stable_whisper import requests import av @@ -64,161 +67,161 @@ transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '') transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'translate') if transcribe_device == "gpu": transcribe_device = "cuda" -jellyfin_userid = "" -app = Flask(__name__) +app = FastAPI() model = None files_to_transcribe = [] -subextension = '.subgen.' + whisper_model + '.' + namesublang + '.srt' +subextension = f".subgen.{whisper_model}.{namesublang}.srt" print("Transcriptions are limited to running " + str(concurrent_transcriptions) + " at a time") print("Running " + str(whisper_threads) + " threads per transcription") -if debug: - logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) -@app.route("/webhook", methods=["POST"]) +if debug: + logging.basicConfig(stream=sys.stderr, level=logging.NOTSET) +else: + logging.basicConfig(stream=sys.stderr, level=logging.INFO) + +@app.post("/webhook") def print_warning(): print("*** This is the legacy webhook. You need to update to webhook urls to end in plex, tautulli, or jellyfin instead of webhook. ***") return "" -@app.route("/tautulli", methods=["POST"]) -def receive_tautulli_webhook(): - logging.debug("This hook is from Tautulli webhook!") - logging.debug("Headers: %s", request.headers) - logging.debug("Raw response: %s", request.data) +@app.post("/tautulli") +def receive_tautulli_webhook( + source: Union[str, None] = Header(None), + event: str = Body(None), + file: str = Body(None), + ): - if request.headers.get("Source") == "Tautulli": - event = request.json["event"] - logging.debug("Event detected is: " + event) + if source == "Tautulli": + logging.debug(f"Tautulli event detected is: {event}") if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)): - fullpath = request.json["file"] + fullpath = file logging.debug("Path of file: " + fullpath) - if use_path_mapping: - fullpath = fullpath.replace(path_mapping_from, path_mapping_to) - logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) - - add_file_for_transcription(fullpath) + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) else: print("This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!") return "" -@app.route("/plex", methods=["POST"]) -def receive_plex_webhook(): - logging.debug("This hook is from Plex webhook!") - logging.debug("Headers: %s", request.headers) - logging.debug("Raw response: %s", json.loads(request.form['payload'])) +@app.post("/plex") +def receive_plex_webhook( + user_agent: Union[str, None] = Header(None), + payload: Union[str, None] = Form(), + ): + plex_json = json.loads(payload) + logging.debug(f"Raw response: {payload}") - if "PlexMediaServer" in request.headers.get("User-Agent"): - plex_json = json.loads(request.form['payload']) + if "PlexMediaServer" in user_agent: event = plex_json["event"] - logging.debug("Event detected is: " + event) + logging.debug(f"Plex event detected is: {event}") if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)): fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken) logging.debug("Path of file: " + fullpath) - - if use_path_mapping: - fullpath = fullpath.replace(path_mapping_from, path_mapping_to) - logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) - add_file_for_transcription(fullpath) + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) else: print("This doesn't appear to be a properly configured Plex webhook, please review the instructions again!") return "" -@app.route("/jellyfin", methods=["POST"]) -def receive_jellyfin_webhook(): - logging.debug("This hook is from Jellyfin webhook!") - logging.debug("Headers: %s", request.headers) - logging.debug("Raw response: %s", request.data) +@app.post("/jellyfin") +def receive_jellyfin_webhook( + user_agent: Union[str, None] = Header(None), + NotificationType: str = Body(None), + file: str = Body(None), + ItemId: str = Body(None), + ): - if "Jellyfin-Server" in request.headers.get("User-Agent"): - event = request.json["NotificationType"] - logging.debug("Event detected is: " + event) - if((event == "ItemAdded" and procaddedmedia) or (event == "PlaybackStart" and procmediaonplay)): - fullpath = get_jellyfin_file_name(request.json["ItemId"], jellyfinserver, jellyfintoken) - logging.debug("Path of file: " + fullpath) - - if use_path_mapping: - fullpath = fullpath.replace(path_mapping_from, path_mapping_to) - logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) + if "Jellyfin-Server" in user_agent: + logging.debug("Jellyfin event detected is: " + NotificationType) + logging.debug("itemid is: " + ItemId) + if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)): + fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken) + logging.debug(f"Path of file: {fullpath}") - add_file_for_transcription(fullpath) + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) else: print("This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!") return "" -@app.route("/emby", methods=["POST"]) -def receive_emby_webhook(): - logging.debug("This hook is from Emby webhook!") - logging.debug("Headers: %s", request.headers) - logging.debug("Raw response: %s", request.form) +@app.post("/emby") +def receive_emby_webhook( + user_agent: Union[str, None] = Header(None), + data: Union[str, None] = Form(None), + ): + logging.debug("Raw response: %s", data) - if "Emby Server" in request.headers.get("User-Agent"): - data = request.form.get('data') + if "Emby Server" in user_agent: if data: data_dict = json.loads(data) - fullpath = data_dict.get('Item', {}).get('Path', '') - event = data_dict.get('Event', '') - logging.debug("Event detected is: " + event) + fullpath = data_dict['Item']['Path'] + event = data_dict['Event'] + logging.debug("Emby event detected is: " + event) if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)): logging.debug("Path of file: " + fullpath) - - if use_path_mapping: - fullpath = fullpath.replace(path_mapping_from, path_mapping_to) - logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) - add_file_for_transcription(fullpath) + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) else: print("This doesn't appear to be a properly configured Emby webhook, please review the instructions again!") return "" -def gen_subtitles(video_file_path: str) -> None: +def gen_subtitles(file_path: str, transcribe_or_translate_str: str, front=True) -> None: + """Generates subtitles for a video file. + + Args: + file_path: The path to the video file. + transcription_or_translation: The type of transcription or translation to perform. + front: Whether to add the file to the front of the transcription queue. + """ + global model + try: - print(f"Transcribing file: {video_file_path}") - start_time = time.time() - model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions) - result = model.transcribe_stable(video_file_path, task=transcribe_or_translate) - result.to_srt_vtt(video_file_path.rsplit('.', 1)[0] + subextension, word_level=word_level_highlight) - elapsed_time = time.time() - start_time - minutes, seconds = divmod(int(elapsed_time), 60) - print(f"Transcription of {video_file_path} is completed, it took {minutes} minutes and {seconds} seconds to complete.") + if not is_video_file(file_path): + print(f"{file_path} isn't a video file!") + return None + + if file_path not in files_to_transcribe: + if has_subtitle_language(file_path, skipifinternalsublang): + logging.debug(f"{file_path} already has an internal sub we want, skipping generation") + return f"{file_path} already has an internal sub we want, skipping generation" + elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension): + print(f"{file_path} already has a subgen created for this, skipping it") + return f"{file_path} already has a subgen created for this, skipping it" + + if front: + files_to_transcribe.insert(0, file_path) + else: + files_to_transcribe.append(file_path) + print(f"Added {os.path.basename(file_path)} for transcription.") + # Start transcription for the file in a separate thread + + print(f"{len(files_to_transcribe)} files in the queue for transcription") + print(f"Transcribing file: {os.path.basename(file_path)}") + start_time = time.time() + if model is None: + logging.debug("Model was purged, need to re-create") + model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions) + + result = model.transcribe_stable(file_path, task=transcribe_or_translate_str) + result.to_srt_vtt(file_path.rsplit('.', 1)[0] + subextension, word_level=word_level_highlight) + elapsed_time = time.time() - start_time + minutes, seconds = divmod(int(elapsed_time), 60) + print(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.") + files_to_transcribe.remove(file_path) + else: + print(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.") + except Exception as e: - print(f"Error processing or transcribing {video_file_path}: {e}") + print(f"Error processing or transcribing {file_path}: {e}") finally: - files_to_transcribe.remove(video_file_path) if len(files_to_transcribe) == 0: logging.debug("Queue is empty, clearing/releasing VRAM") del model gc.collect() -# Function to add a file for transcription -def add_file_for_transcription(file_path, front=True): - if file_path not in files_to_transcribe: - - if has_subtitle_language(file_path, skipifinternalsublang): - logging.debug("File already has an internal sub we want, skipping generation") - return "File already has an internal sub we want, skipping generation" - elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension): - print("We already have a subgen created for this file, skipping it") - return "We already have a subgen created for this file, skipping it" - - if front: - files_to_transcribe.insert(0, file_path) - else: - files_to_transcribe.append(file_path) - print(f"Added {file_path} for transcription.") - # Start transcription for the file in a separate thread - - print(f"{len(files_to_transcribe)} files in the queue for transcription") - gen_subtitles(file_path) - - else: - print(f"File {file_path} is already in the transcription list. Skipping.") - def has_subtitle_language(video_file, target_language): try: container = av.open(video_file) @@ -287,25 +290,24 @@ def get_jellyfin_file_name(item_id: str, jellyfin_url: str, jellyfin_token: str) } # Cheap way to get the admin user id, and save it for later use. - global jellyfin_userid - if not jellyfin_userid: - users_request = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content) - for user in users_request: - if user['Policy']['IsAdministrator']: - jellyfin_userid = user['Id'] - break - if not jellyfin_userid: - raise Exception("Unable to find administrator user in Jellyfin") + users = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content) + jellyfin_admin = get_jellyfin_admin(users) - response = requests.get(f"{jellyfin_url}/Users/{jellyfin_userid}/Items/{item_id}", headers=headers) + response = requests.get(f"{jellyfin_url}/Users/{jellyfin_admin}/Items/{item_id}", headers=headers) if response.status_code == 200: - json_data = json.loads(response.content) - file_name = json_data['Path'] + file_name = json.loads(response.content)['Path'] return file_name else: raise Exception(f"Error: {response.status_code}") +def get_jellyfin_admin(users): + for user in users: + if user["Policy"]["IsAdministrator"]: + return user["Id"] + + raise Exception("Unable to find administrator user in Jellyfin") + def is_video_file(file_path): av.logging.set_level(av.logging.PANIC) try: @@ -317,14 +319,22 @@ def is_video_file(file_path): except av.AVError: return False +def path_mapping(fullpath): + if use_path_mapping: + fullpath = fullpath.replace(path_mapping_from, path_mapping_to) + logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) + return fullpath + def transcribe_existing(): print("Starting to search folders to see if we need to create subtitles.") + logging.debug("The folders are:") for path in transcribe_folders: + logging.debug(path) for root, dirs, files in os.walk(path): for file in files: file_path = os.path.join(root, file) if is_video_file(file_path): - threading.Thread(target=add_file_for_transcription, args=(file_path, False)).start() + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, False) print("Finished searching and queueing files for transcription") @@ -334,4 +344,5 @@ if transcribe_folders: print("Starting webhook!") if __name__ == "__main__": - app.run(debug=debug, host='0.0.0.0', port=int(webhookport)) + import uvicorn + uvicorn.run("subgen:app", host="0.0.0.0", port=int(webhookport), reload=debug)