General refactoring
This commit is contained in:
245
subgen.py
245
subgen.py
@@ -239,7 +239,7 @@ async def form_post(request: Request):
|
|||||||
form_data = await request.form()
|
form_data = await request.form()
|
||||||
# Read the existing content of the file
|
# Read the existing content of the file
|
||||||
try:
|
try:
|
||||||
with open(f"{env_path}", "r") as file:
|
with open(env_path, "r") as file:
|
||||||
lines = file.readlines()
|
lines = file.readlines()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
lines = []
|
lines = []
|
||||||
@@ -276,60 +276,13 @@ async def form_post(request: Request):
|
|||||||
update_env_variables()
|
update_env_variables()
|
||||||
return f"Configuration saved to {env_path}, reloading your subgen with your new values!"
|
return f"Configuration saved to {env_path}, reloading your subgen with your new values!"
|
||||||
|
|
||||||
@app.get("/status")
|
|
||||||
def status():
|
|
||||||
in_docker = os.path.exists('/.dockerenv')
|
|
||||||
docker_status = "Docker" if in_docker else "Standalone"
|
|
||||||
return {"version" : f"Subgen {subgen_version}, stable-ts {stable_whisper.__version__}, whisper {whisper.__version__} ({docker_status})"}
|
|
||||||
|
|
||||||
@app.post("/subsync")
|
|
||||||
def subsync(
|
|
||||||
audio_file: UploadFile = File(...),
|
|
||||||
subtitle_file: UploadFile = File(...),
|
|
||||||
language: Union[str, None] = Query(default=None),
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
logging.info(f"Syncing subtitle file from Subsync webhook")
|
|
||||||
result = None
|
|
||||||
|
|
||||||
srt_content = subtitle_file.file.read().decode('utf-8')
|
|
||||||
srt_content = re.sub(r'\{.*?\}', '', srt_content)
|
|
||||||
# Remove numeric counters for each entry
|
|
||||||
srt_content = re.sub(r'^\d+$', '', srt_content, flags=re.MULTILINE)
|
|
||||||
# Remove timestamps and formatting
|
|
||||||
srt_content = re.sub(r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', '', srt_content)
|
|
||||||
# Remove any remaining newlines and spaces
|
|
||||||
srt_content = re.sub(r'\n\n+', '\n', srt_content).strip()
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
start_model()
|
|
||||||
|
|
||||||
result = model.align(audio_file.file.read(), srt_content, language=language)
|
|
||||||
appendLine(result)
|
|
||||||
elapsed_time = time.time() - start_time
|
|
||||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
|
||||||
logging.info(f"Subsync is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
|
||||||
except Exception as e:
|
|
||||||
logging.info(f"Error processing or aligning {audio_file.filename} or {subtitle_file.filename}: {e}")
|
|
||||||
finally:
|
|
||||||
delete_model()
|
|
||||||
if result:
|
|
||||||
return StreamingResponse(
|
|
||||||
iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)),
|
|
||||||
media_type="text/plain",
|
|
||||||
headers={
|
|
||||||
'Source': 'Aligned using stable-ts from Subgen!',
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
|
|
||||||
@app.post("/tautulli")
|
@app.post("/tautulli")
|
||||||
def receive_tautulli_webhook(
|
def receive_tautulli_webhook(
|
||||||
source: Union[str, None] = Header(None),
|
source: Union[str, None] = Header(None),
|
||||||
event: str = Body(None),
|
event: str = Body(None),
|
||||||
file: str = Body(None),
|
file: str = Body(None),
|
||||||
):
|
):
|
||||||
|
|
||||||
if source == "Tautulli":
|
if source == "Tautulli":
|
||||||
logging.debug(f"Tautulli event detected is: {event}")
|
logging.debug(f"Tautulli event detected is: {event}")
|
||||||
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
|
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
|
||||||
@@ -338,48 +291,53 @@ def receive_tautulli_webhook(
|
|||||||
|
|
||||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
else:
|
else:
|
||||||
return {"This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"}
|
return {
|
||||||
|
"message": "This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"}
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
@app.post("/plex")
|
@app.post("/plex")
|
||||||
def receive_plex_webhook(
|
def receive_plex_webhook(
|
||||||
user_agent: Union[str, None] = Header(None),
|
user_agent: Optional[str] = Header(None),
|
||||||
payload: Union[str, None] = Form(),
|
payload: Optional[str] = Form(),
|
||||||
):
|
):
|
||||||
plex_json = json.loads(payload)
|
try:
|
||||||
logging.debug(f"Raw response: {payload}")
|
plex_json = json.loads(payload)
|
||||||
|
logging.debug(f"Raw response: {payload}")
|
||||||
|
|
||||||
|
if "PlexMediaServer" not in user_agent:
|
||||||
|
return {"message": "This doesn't appear to be a properly configured Plex webhook, please review the instructions again"}
|
||||||
|
|
||||||
if "PlexMediaServer" in user_agent:
|
|
||||||
event = plex_json["event"]
|
event = plex_json["event"]
|
||||||
logging.debug(f"Plex event detected is: {event}")
|
logging.debug(f"Plex event detected is: {event}")
|
||||||
if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)):
|
|
||||||
|
if (event in ["library.new", "media.play"] and (procaddedmedia or procmediaonplay)):
|
||||||
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
||||||
logging.debug("Path of file: " + fullpath)
|
logging.debug("Path of file: " + fullpath)
|
||||||
|
|
||||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
try:
|
refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
||||||
refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.")
|
||||||
logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.")
|
except Exception as e:
|
||||||
except Exception as e:
|
logging.error(f"Failed to process Plex webhook: {e}")
|
||||||
logging.error(f"Failed to refresh metadata for item {plex_json['Metadata']['ratingKey']}: {e}")
|
|
||||||
else:
|
|
||||||
return {"This doesn't appear to be a properly configured Plex webhook, please review the instructions again!"}
|
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
@app.post("/jellyfin")
|
@app.post("/jellyfin")
|
||||||
def receive_jellyfin_webhook(
|
def receive_jellyfin_webhook(
|
||||||
user_agent: Union[str, None] = Header(None),
|
user_agent: str = Header(None),
|
||||||
NotificationType: str = Body(None),
|
NotificationType: str = Body(None),
|
||||||
file: str = Body(None),
|
file: str = Body(None),
|
||||||
ItemId: str = Body(None),
|
ItemId: str = Body(None),
|
||||||
):
|
):
|
||||||
|
|
||||||
if "Jellyfin-Server" in user_agent:
|
if "Jellyfin-Server" in user_agent:
|
||||||
logging.debug("Jellyfin event detected is: " + NotificationType)
|
logging.debug(f"Jellyfin event detected is: {NotificationType}")
|
||||||
logging.debug("itemid is: " + ItemId)
|
logging.debug(f"itemid is: {ItemId}")
|
||||||
if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)):
|
|
||||||
|
if (NotificationType == "ItemAdded" and procaddedmedia) or (
|
||||||
|
NotificationType == "PlaybackStart" and procmediaonplay):
|
||||||
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
|
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
|
||||||
logging.debug(f"Path of file: {fullpath}")
|
logging.debug(f"Path of file: {fullpath}")
|
||||||
|
|
||||||
@@ -390,30 +348,34 @@ def receive_jellyfin_webhook(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Failed to refresh metadata for item {ItemId}: {e}")
|
logging.error(f"Failed to refresh metadata for item {ItemId}: {e}")
|
||||||
else:
|
else:
|
||||||
return {"This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"}
|
return {
|
||||||
|
"message": "This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"}
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
@app.post("/emby")
|
@app.post("/emby")
|
||||||
def receive_emby_webhook(
|
def receive_emby_webhook(
|
||||||
user_agent: Union[str, None] = Header(None),
|
user_agent: Union[str, None] = Header(None),
|
||||||
data: Union[str, None] = Form(None),
|
data: Union[str, None] = Form(None),
|
||||||
):
|
):
|
||||||
logging.debug("Raw response: %s", data)
|
logging.debug("Raw response: %s", data)
|
||||||
|
|
||||||
if "Emby Server" in user_agent:
|
if "Emby Server" not in user_agent:
|
||||||
if data:
|
|
||||||
data_dict = json.loads(data)
|
|
||||||
fullpath = data_dict['Item']['Path']
|
|
||||||
event = data_dict['Event']
|
|
||||||
logging.debug("Emby event detected is: " + event)
|
|
||||||
if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)):
|
|
||||||
logging.debug("Path of file: " + fullpath)
|
|
||||||
|
|
||||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
|
||||||
else:
|
|
||||||
return {"This doesn't appear to be a properly configured Emby webhook, please review the instructions again!"}
|
return {"This doesn't appear to be a properly configured Emby webhook, please review the instructions again!"}
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
data_dict = json.loads(data)
|
||||||
|
fullpath = data_dict['Item']['Path']
|
||||||
|
event = data_dict['Event']
|
||||||
|
logging.debug("Emby event detected is: " + event)
|
||||||
|
|
||||||
|
if event == "library.new" and procaddedmedia or event == "playback.start" and procmediaonplay:
|
||||||
|
logging.debug("Path of file: " + fullpath)
|
||||||
|
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
@app.post("/batch")
|
@app.post("/batch")
|
||||||
@@ -518,14 +480,15 @@ def write_lrc(result, file_path):
|
|||||||
fraction = int((segment.start - int(segment.start)) * 100)
|
fraction = int((segment.start - int(segment.start)) * 100)
|
||||||
file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n")
|
file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n")
|
||||||
|
|
||||||
def gen_subtitles(file_path: str, transcribe_or_translate: str, front=True, forceLanguage=None) -> None:
|
|
||||||
|
def gen_subtitles(file_path: str, transcription_type: str, add_to_front=True, force_language=None) -> None:
|
||||||
"""Generates subtitles for a video file.
|
"""Generates subtitles for a video file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_path: str - The path to the video file.
|
file_path: str - The path to the video file.
|
||||||
transcribe_or_translate: str - The type of transcription or translation to perform.
|
transcription_type: str - The type of transcription or translation to perform.
|
||||||
front: bool - Whether to add the file to the front of the transcription queue. Default is True.
|
add_to_front: bool - Whether to add the file to the front of the transcription queue. Default is True.
|
||||||
forceLanguage: str - The language to force for transcription or translation. Default is None.
|
force_language: str - The language to force for transcription or translation. Default is None.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -533,56 +496,66 @@ def gen_subtitles(file_path: str, transcribe_or_translate: str, front=True, forc
|
|||||||
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
|
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if file_path not in files_to_transcribe:
|
if file_path in files_to_transcribe:
|
||||||
message = None
|
|
||||||
if has_subtitle_language(file_path, skipifinternalsublang):
|
|
||||||
message = f"{file_path} already has an internal subtitle we want, skipping generation"
|
|
||||||
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
|
|
||||||
message = f"{file_path} already has a subtitle created for this, skipping it"
|
|
||||||
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
|
|
||||||
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
|
|
||||||
if message != None:
|
|
||||||
logging.info(message)
|
|
||||||
return message
|
|
||||||
|
|
||||||
if front:
|
|
||||||
files_to_transcribe.insert(0, file_path)
|
|
||||||
else:
|
|
||||||
files_to_transcribe.append(file_path)
|
|
||||||
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
|
|
||||||
# Start transcription for the file in a separate thread
|
|
||||||
|
|
||||||
logging.info(f"{len(files_to_transcribe)} files in the queue for transcription")
|
|
||||||
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
|
|
||||||
start_time = time.time()
|
|
||||||
start_model()
|
|
||||||
global force_detected_language_to
|
|
||||||
if force_detected_language_to:
|
|
||||||
forceLanguage = force_detected_language_to
|
|
||||||
logging.info(f"Forcing language to {forceLanguage}")
|
|
||||||
if custom_regroup:
|
|
||||||
result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress, initial_prompt=custom_model_prompt, regroup=custom_regroup)
|
|
||||||
else:
|
|
||||||
result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress, initial_prompt=custom_model_prompt)
|
|
||||||
appendLine(result)
|
|
||||||
file_name, file_extension = os.path.splitext(file_path)
|
|
||||||
if isAudioFileExtension(file_extension) and lrc_for_audio_files:
|
|
||||||
write_lrc(result, file_name + '.lrc')
|
|
||||||
else:
|
|
||||||
result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight)
|
|
||||||
elapsed_time = time.time() - start_time
|
|
||||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
|
||||||
logging.info(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
|
||||||
else:
|
|
||||||
logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
|
logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
|
||||||
|
return
|
||||||
|
|
||||||
|
message = None
|
||||||
|
if has_subtitle_language(file_path, skipifinternalsublang):
|
||||||
|
message = f"{file_path} already has an internal subtitle we want, skipping generation"
|
||||||
|
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
|
||||||
|
message = f"{file_path} already has a subtitle created for this, skipping it"
|
||||||
|
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
|
||||||
|
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
|
||||||
|
if message:
|
||||||
|
logging.info(message)
|
||||||
|
return message
|
||||||
|
|
||||||
|
if add_to_front:
|
||||||
|
files_to_transcribe.insert(0, file_path)
|
||||||
|
else:
|
||||||
|
files_to_transcribe.append(file_path)
|
||||||
|
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
|
||||||
|
logging.info(f"{len(files_to_transcribe)} files in the queue for transcription")
|
||||||
|
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
start_model()
|
||||||
|
|
||||||
|
if force_detected_language_to:
|
||||||
|
force_language = force_detected_language_to
|
||||||
|
logging.info(f"Forcing language to {force_language}")
|
||||||
|
|
||||||
|
if custom_regroup:
|
||||||
|
result = model.transcribe_stable(file_path, language=force_language, task=transcription_type,
|
||||||
|
progress_callback=progress, initial_prompt=custom_model_prompt,
|
||||||
|
regroup=custom_regroup)
|
||||||
|
else:
|
||||||
|
result = model.transcribe_stable(file_path, language=force_language, task=transcription_type,
|
||||||
|
progress_callback=progress, initial_prompt=custom_model_prompt)
|
||||||
|
|
||||||
|
appendLine(result)
|
||||||
|
file_name, file_extension = os.path.splitext(file_path)
|
||||||
|
|
||||||
|
if isAudioFileExtension(file_extension) and lrc_for_audio_files:
|
||||||
|
write_lrc(result, file_name + '.lrc')
|
||||||
|
else:
|
||||||
|
result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight)
|
||||||
|
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
minutes, seconds = divmod(int(elapsed_time), 60)
|
||||||
|
logging.info(
|
||||||
|
f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.info(f"Error processing or transcribing {file_path}: {e}")
|
logging.info(f"Error processing or transcribing {file_path}: {e}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if file_path in files_to_transcribe:
|
if file_path in files_to_transcribe:
|
||||||
files_to_transcribe.remove(file_path)
|
files_to_transcribe.remove(file_path)
|
||||||
delete_model()
|
delete_model()
|
||||||
|
|
||||||
|
|
||||||
def get_file_name_without_extension(file_path):
|
def get_file_name_without_extension(file_path):
|
||||||
file_name, file_extension = os.path.splitext(file_path)
|
file_name, file_extension = os.path.splitext(file_path)
|
||||||
return file_name
|
return file_name
|
||||||
|
|||||||
Reference in New Issue
Block a user