Update subgen.py
Significant housekeeping. * Cleaned up logging to not spam as much * Move from Flask to FastAPI/uvicorn * Added some garbage collection vram clearing for GPUs when not transcoding
This commit is contained in:
241
subgen/subgen.py
241
subgen/subgen.py
@@ -9,14 +9,16 @@ import queue
|
|||||||
import logging
|
import logging
|
||||||
import gc
|
import gc
|
||||||
from array import array
|
from array import array
|
||||||
|
from typing import Union, Any
|
||||||
|
|
||||||
# List of packages to install
|
# List of packages to install
|
||||||
packages_to_install = [
|
packages_to_install = [
|
||||||
'numpy',
|
'numpy',
|
||||||
'stable-ts',
|
'stable-ts',
|
||||||
'flask',
|
'fastapi',
|
||||||
'requests',
|
'requests',
|
||||||
'faster-whisper',
|
'faster-whisper',
|
||||||
|
'uvicorn',
|
||||||
# Add more packages as needed
|
# Add more packages as needed
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -28,7 +30,8 @@ for package in packages_to_install:
|
|||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
print(f"Failed to install {package}: {e}")
|
print(f"Failed to install {package}: {e}")
|
||||||
|
|
||||||
from flask import Flask, request
|
from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request
|
||||||
|
from fastapi.responses import StreamingResponse, RedirectResponse
|
||||||
import stable_whisper
|
import stable_whisper
|
||||||
import requests
|
import requests
|
||||||
import av
|
import av
|
||||||
@@ -64,161 +67,161 @@ transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '')
|
|||||||
transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'translate')
|
transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'translate')
|
||||||
if transcribe_device == "gpu":
|
if transcribe_device == "gpu":
|
||||||
transcribe_device = "cuda"
|
transcribe_device = "cuda"
|
||||||
jellyfin_userid = ""
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = FastAPI()
|
||||||
model = None
|
model = None
|
||||||
files_to_transcribe = []
|
files_to_transcribe = []
|
||||||
subextension = '.subgen.' + whisper_model + '.' + namesublang + '.srt'
|
subextension = f".subgen.{whisper_model}.{namesublang}.srt"
|
||||||
print("Transcriptions are limited to running " + str(concurrent_transcriptions) + " at a time")
|
print("Transcriptions are limited to running " + str(concurrent_transcriptions) + " at a time")
|
||||||
print("Running " + str(whisper_threads) + " threads per transcription")
|
print("Running " + str(whisper_threads) + " threads per transcription")
|
||||||
if debug:
|
|
||||||
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
|
||||||
|
|
||||||
@app.route("/webhook", methods=["POST"])
|
if debug:
|
||||||
|
logging.basicConfig(stream=sys.stderr, level=logging.NOTSET)
|
||||||
|
else:
|
||||||
|
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
|
||||||
|
|
||||||
|
@app.post("/webhook")
|
||||||
def print_warning():
|
def print_warning():
|
||||||
print("*** This is the legacy webhook. You need to update to webhook urls to end in plex, tautulli, or jellyfin instead of webhook. ***")
|
print("*** This is the legacy webhook. You need to update to webhook urls to end in plex, tautulli, or jellyfin instead of webhook. ***")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@app.route("/tautulli", methods=["POST"])
|
@app.post("/tautulli")
|
||||||
def receive_tautulli_webhook():
|
def receive_tautulli_webhook(
|
||||||
logging.debug("This hook is from Tautulli webhook!")
|
source: Union[str, None] = Header(None),
|
||||||
logging.debug("Headers: %s", request.headers)
|
event: str = Body(None),
|
||||||
logging.debug("Raw response: %s", request.data)
|
file: str = Body(None),
|
||||||
|
):
|
||||||
|
|
||||||
if request.headers.get("Source") == "Tautulli":
|
if source == "Tautulli":
|
||||||
event = request.json["event"]
|
logging.debug(f"Tautulli event detected is: {event}")
|
||||||
logging.debug("Event detected is: " + event)
|
|
||||||
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
|
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
|
||||||
fullpath = request.json["file"]
|
fullpath = file
|
||||||
logging.debug("Path of file: " + fullpath)
|
logging.debug("Path of file: " + fullpath)
|
||||||
|
|
||||||
if use_path_mapping:
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
fullpath = fullpath.replace(path_mapping_from, path_mapping_to)
|
|
||||||
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
|
||||||
|
|
||||||
add_file_for_transcription(fullpath)
|
|
||||||
else:
|
else:
|
||||||
print("This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!")
|
print("This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!")
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@app.route("/plex", methods=["POST"])
|
@app.post("/plex")
|
||||||
def receive_plex_webhook():
|
def receive_plex_webhook(
|
||||||
logging.debug("This hook is from Plex webhook!")
|
user_agent: Union[str, None] = Header(None),
|
||||||
logging.debug("Headers: %s", request.headers)
|
payload: Union[str, None] = Form(),
|
||||||
logging.debug("Raw response: %s", json.loads(request.form['payload']))
|
):
|
||||||
|
plex_json = json.loads(payload)
|
||||||
|
logging.debug(f"Raw response: {payload}")
|
||||||
|
|
||||||
if "PlexMediaServer" in request.headers.get("User-Agent"):
|
if "PlexMediaServer" in user_agent:
|
||||||
plex_json = json.loads(request.form['payload'])
|
|
||||||
event = plex_json["event"]
|
event = plex_json["event"]
|
||||||
logging.debug("Event detected is: " + event)
|
logging.debug(f"Plex event detected is: {event}")
|
||||||
if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)):
|
if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)):
|
||||||
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
||||||
logging.debug("Path of file: " + fullpath)
|
logging.debug("Path of file: " + fullpath)
|
||||||
|
|
||||||
if use_path_mapping:
|
|
||||||
fullpath = fullpath.replace(path_mapping_from, path_mapping_to)
|
|
||||||
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
|
||||||
|
|
||||||
add_file_for_transcription(fullpath)
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
else:
|
else:
|
||||||
print("This doesn't appear to be a properly configured Plex webhook, please review the instructions again!")
|
print("This doesn't appear to be a properly configured Plex webhook, please review the instructions again!")
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@app.route("/jellyfin", methods=["POST"])
|
@app.post("/jellyfin")
|
||||||
def receive_jellyfin_webhook():
|
def receive_jellyfin_webhook(
|
||||||
logging.debug("This hook is from Jellyfin webhook!")
|
user_agent: Union[str, None] = Header(None),
|
||||||
logging.debug("Headers: %s", request.headers)
|
NotificationType: str = Body(None),
|
||||||
logging.debug("Raw response: %s", request.data)
|
file: str = Body(None),
|
||||||
|
ItemId: str = Body(None),
|
||||||
|
):
|
||||||
|
|
||||||
if "Jellyfin-Server" in request.headers.get("User-Agent"):
|
if "Jellyfin-Server" in user_agent:
|
||||||
event = request.json["NotificationType"]
|
logging.debug("Jellyfin event detected is: " + NotificationType)
|
||||||
logging.debug("Event detected is: " + event)
|
logging.debug("itemid is: " + ItemId)
|
||||||
if((event == "ItemAdded" and procaddedmedia) or (event == "PlaybackStart" and procmediaonplay)):
|
if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)):
|
||||||
fullpath = get_jellyfin_file_name(request.json["ItemId"], jellyfinserver, jellyfintoken)
|
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
|
||||||
logging.debug("Path of file: " + fullpath)
|
logging.debug(f"Path of file: {fullpath}")
|
||||||
|
|
||||||
if use_path_mapping:
|
|
||||||
fullpath = fullpath.replace(path_mapping_from, path_mapping_to)
|
|
||||||
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
|
||||||
|
|
||||||
add_file_for_transcription(fullpath)
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
else:
|
else:
|
||||||
print("This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!")
|
print("This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!")
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@app.route("/emby", methods=["POST"])
|
@app.post("/emby")
|
||||||
def receive_emby_webhook():
|
def receive_emby_webhook(
|
||||||
logging.debug("This hook is from Emby webhook!")
|
user_agent: Union[str, None] = Header(None),
|
||||||
logging.debug("Headers: %s", request.headers)
|
data: Union[str, None] = Form(None),
|
||||||
logging.debug("Raw response: %s", request.form)
|
):
|
||||||
|
logging.debug("Raw response: %s", data)
|
||||||
|
|
||||||
if "Emby Server" in request.headers.get("User-Agent"):
|
if "Emby Server" in user_agent:
|
||||||
data = request.form.get('data')
|
|
||||||
if data:
|
if data:
|
||||||
data_dict = json.loads(data)
|
data_dict = json.loads(data)
|
||||||
fullpath = data_dict.get('Item', {}).get('Path', '')
|
fullpath = data_dict['Item']['Path']
|
||||||
event = data_dict.get('Event', '')
|
event = data_dict['Event']
|
||||||
logging.debug("Event detected is: " + event)
|
logging.debug("Emby event detected is: " + event)
|
||||||
if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)):
|
if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)):
|
||||||
logging.debug("Path of file: " + fullpath)
|
logging.debug("Path of file: " + fullpath)
|
||||||
|
|
||||||
if use_path_mapping:
|
|
||||||
fullpath = fullpath.replace(path_mapping_from, path_mapping_to)
|
|
||||||
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
|
||||||
|
|
||||||
add_file_for_transcription(fullpath)
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
else:
|
else:
|
||||||
print("This doesn't appear to be a properly configured Emby webhook, please review the instructions again!")
|
print("This doesn't appear to be a properly configured Emby webhook, please review the instructions again!")
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
def gen_subtitles(video_file_path: str) -> None:
|
def gen_subtitles(file_path: str, transcribe_or_translate_str: str, front=True) -> None:
|
||||||
|
"""Generates subtitles for a video file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: The path to the video file.
|
||||||
|
transcription_or_translation: The type of transcription or translation to perform.
|
||||||
|
front: Whether to add the file to the front of the transcription queue.
|
||||||
|
"""
|
||||||
|
global model
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print(f"Transcribing file: {video_file_path}")
|
if not is_video_file(file_path):
|
||||||
start_time = time.time()
|
print(f"{file_path} isn't a video file!")
|
||||||
model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions)
|
return None
|
||||||
result = model.transcribe_stable(video_file_path, task=transcribe_or_translate)
|
|
||||||
result.to_srt_vtt(video_file_path.rsplit('.', 1)[0] + subextension, word_level=word_level_highlight)
|
if file_path not in files_to_transcribe:
|
||||||
elapsed_time = time.time() - start_time
|
if has_subtitle_language(file_path, skipifinternalsublang):
|
||||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
logging.debug(f"{file_path} already has an internal sub we want, skipping generation")
|
||||||
print(f"Transcription of {video_file_path} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
return f"{file_path} already has an internal sub we want, skipping generation"
|
||||||
|
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
|
||||||
|
print(f"{file_path} already has a subgen created for this, skipping it")
|
||||||
|
return f"{file_path} already has a subgen created for this, skipping it"
|
||||||
|
|
||||||
|
if front:
|
||||||
|
files_to_transcribe.insert(0, file_path)
|
||||||
|
else:
|
||||||
|
files_to_transcribe.append(file_path)
|
||||||
|
print(f"Added {os.path.basename(file_path)} for transcription.")
|
||||||
|
# Start transcription for the file in a separate thread
|
||||||
|
|
||||||
|
print(f"{len(files_to_transcribe)} files in the queue for transcription")
|
||||||
|
print(f"Transcribing file: {os.path.basename(file_path)}")
|
||||||
|
start_time = time.time()
|
||||||
|
if model is None:
|
||||||
|
logging.debug("Model was purged, need to re-create")
|
||||||
|
model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions)
|
||||||
|
|
||||||
|
result = model.transcribe_stable(file_path, task=transcribe_or_translate_str)
|
||||||
|
result.to_srt_vtt(file_path.rsplit('.', 1)[0] + subextension, word_level=word_level_highlight)
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
minutes, seconds = divmod(int(elapsed_time), 60)
|
||||||
|
print(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
||||||
|
files_to_transcribe.remove(file_path)
|
||||||
|
else:
|
||||||
|
print(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error processing or transcribing {video_file_path}: {e}")
|
print(f"Error processing or transcribing {file_path}: {e}")
|
||||||
finally:
|
finally:
|
||||||
files_to_transcribe.remove(video_file_path)
|
|
||||||
if len(files_to_transcribe) == 0:
|
if len(files_to_transcribe) == 0:
|
||||||
logging.debug("Queue is empty, clearing/releasing VRAM")
|
logging.debug("Queue is empty, clearing/releasing VRAM")
|
||||||
del model
|
del model
|
||||||
gc.collect()
|
gc.collect()
|
||||||
|
|
||||||
# Function to add a file for transcription
|
|
||||||
def add_file_for_transcription(file_path, front=True):
|
|
||||||
if file_path not in files_to_transcribe:
|
|
||||||
|
|
||||||
if has_subtitle_language(file_path, skipifinternalsublang):
|
|
||||||
logging.debug("File already has an internal sub we want, skipping generation")
|
|
||||||
return "File already has an internal sub we want, skipping generation"
|
|
||||||
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
|
|
||||||
print("We already have a subgen created for this file, skipping it")
|
|
||||||
return "We already have a subgen created for this file, skipping it"
|
|
||||||
|
|
||||||
if front:
|
|
||||||
files_to_transcribe.insert(0, file_path)
|
|
||||||
else:
|
|
||||||
files_to_transcribe.append(file_path)
|
|
||||||
print(f"Added {file_path} for transcription.")
|
|
||||||
# Start transcription for the file in a separate thread
|
|
||||||
|
|
||||||
print(f"{len(files_to_transcribe)} files in the queue for transcription")
|
|
||||||
gen_subtitles(file_path)
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"File {file_path} is already in the transcription list. Skipping.")
|
|
||||||
|
|
||||||
def has_subtitle_language(video_file, target_language):
|
def has_subtitle_language(video_file, target_language):
|
||||||
try:
|
try:
|
||||||
container = av.open(video_file)
|
container = av.open(video_file)
|
||||||
@@ -287,25 +290,24 @@ def get_jellyfin_file_name(item_id: str, jellyfin_url: str, jellyfin_token: str)
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Cheap way to get the admin user id, and save it for later use.
|
# Cheap way to get the admin user id, and save it for later use.
|
||||||
global jellyfin_userid
|
users = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content)
|
||||||
if not jellyfin_userid:
|
jellyfin_admin = get_jellyfin_admin(users)
|
||||||
users_request = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content)
|
|
||||||
for user in users_request:
|
|
||||||
if user['Policy']['IsAdministrator']:
|
|
||||||
jellyfin_userid = user['Id']
|
|
||||||
break
|
|
||||||
if not jellyfin_userid:
|
|
||||||
raise Exception("Unable to find administrator user in Jellyfin")
|
|
||||||
|
|
||||||
response = requests.get(f"{jellyfin_url}/Users/{jellyfin_userid}/Items/{item_id}", headers=headers)
|
response = requests.get(f"{jellyfin_url}/Users/{jellyfin_admin}/Items/{item_id}", headers=headers)
|
||||||
|
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
json_data = json.loads(response.content)
|
file_name = json.loads(response.content)['Path']
|
||||||
file_name = json_data['Path']
|
|
||||||
return file_name
|
return file_name
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Error: {response.status_code}")
|
raise Exception(f"Error: {response.status_code}")
|
||||||
|
|
||||||
|
def get_jellyfin_admin(users):
|
||||||
|
for user in users:
|
||||||
|
if user["Policy"]["IsAdministrator"]:
|
||||||
|
return user["Id"]
|
||||||
|
|
||||||
|
raise Exception("Unable to find administrator user in Jellyfin")
|
||||||
|
|
||||||
def is_video_file(file_path):
|
def is_video_file(file_path):
|
||||||
av.logging.set_level(av.logging.PANIC)
|
av.logging.set_level(av.logging.PANIC)
|
||||||
try:
|
try:
|
||||||
@@ -317,14 +319,22 @@ def is_video_file(file_path):
|
|||||||
except av.AVError:
|
except av.AVError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def path_mapping(fullpath):
|
||||||
|
if use_path_mapping:
|
||||||
|
fullpath = fullpath.replace(path_mapping_from, path_mapping_to)
|
||||||
|
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
||||||
|
return fullpath
|
||||||
|
|
||||||
def transcribe_existing():
|
def transcribe_existing():
|
||||||
print("Starting to search folders to see if we need to create subtitles.")
|
print("Starting to search folders to see if we need to create subtitles.")
|
||||||
|
logging.debug("The folders are:")
|
||||||
for path in transcribe_folders:
|
for path in transcribe_folders:
|
||||||
|
logging.debug(path)
|
||||||
for root, dirs, files in os.walk(path):
|
for root, dirs, files in os.walk(path):
|
||||||
for file in files:
|
for file in files:
|
||||||
file_path = os.path.join(root, file)
|
file_path = os.path.join(root, file)
|
||||||
if is_video_file(file_path):
|
if is_video_file(file_path):
|
||||||
threading.Thread(target=add_file_for_transcription, args=(file_path, False)).start()
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, False)
|
||||||
|
|
||||||
print("Finished searching and queueing files for transcription")
|
print("Finished searching and queueing files for transcription")
|
||||||
|
|
||||||
@@ -334,4 +344,5 @@ if transcribe_folders:
|
|||||||
|
|
||||||
print("Starting webhook!")
|
print("Starting webhook!")
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app.run(debug=debug, host='0.0.0.0', port=int(webhookport))
|
import uvicorn
|
||||||
|
uvicorn.run("subgen:app", host="0.0.0.0", port=int(webhookport), reload=debug)
|
||||||
|
|||||||
Reference in New Issue
Block a user