diff --git a/subgen.py b/subgen.py index cee0913..4be0e93 100644 --- a/subgen.py +++ b/subgen.py @@ -239,7 +239,7 @@ async def form_post(request: Request): form_data = await request.form() # Read the existing content of the file try: - with open(f"{env_path}", "r") as file: + with open(env_path, "r") as file: lines = file.readlines() except FileNotFoundError: lines = [] @@ -276,113 +276,71 @@ async def form_post(request: Request): update_env_variables() return f"Configuration saved to {env_path}, reloading your subgen with your new values!" -@app.get("/status") -def status(): - in_docker = os.path.exists('/.dockerenv') - docker_status = "Docker" if in_docker else "Standalone" - return {"version" : f"Subgen {subgen_version}, stable-ts {stable_whisper.__version__}, whisper {whisper.__version__} ({docker_status})"} - -@app.post("/subsync") -def subsync( - audio_file: UploadFile = File(...), - subtitle_file: UploadFile = File(...), - language: Union[str, None] = Query(default=None), -): - try: - logging.info(f"Syncing subtitle file from Subsync webhook") - result = None - - srt_content = subtitle_file.file.read().decode('utf-8') - srt_content = re.sub(r'\{.*?\}', '', srt_content) - # Remove numeric counters for each entry - srt_content = re.sub(r'^\d+$', '', srt_content, flags=re.MULTILINE) - # Remove timestamps and formatting - srt_content = re.sub(r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', '', srt_content) - # Remove any remaining newlines and spaces - srt_content = re.sub(r'\n\n+', '\n', srt_content).strip() - - start_time = time.time() - start_model() - - result = model.align(audio_file.file.read(), srt_content, language=language) - appendLine(result) - elapsed_time = time.time() - start_time - minutes, seconds = divmod(int(elapsed_time), 60) - logging.info(f"Subsync is completed, it took {minutes} minutes and {seconds} seconds to complete.") - except Exception as e: - logging.info(f"Error processing or aligning {audio_file.filename} or {subtitle_file.filename}: {e}") - finally: - delete_model() - if result: - return StreamingResponse( - iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)), - media_type="text/plain", - headers={ - 'Source': 'Aligned using stable-ts from Subgen!', - }) - else: - return @app.post("/tautulli") def receive_tautulli_webhook( - source: Union[str, None] = Header(None), - event: str = Body(None), - file: str = Body(None), - ): - + source: Union[str, None] = Header(None), + event: str = Body(None), + file: str = Body(None), +): if source == "Tautulli": logging.debug(f"Tautulli event detected is: {event}") if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)): fullpath = file logging.debug("Path of file: " + fullpath) - + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) else: - return {"This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"} - + return { + "message": "This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"} + return "" - + + @app.post("/plex") def receive_plex_webhook( - user_agent: Union[str, None] = Header(None), - payload: Union[str, None] = Form(), - ): - plex_json = json.loads(payload) - logging.debug(f"Raw response: {payload}") - - if "PlexMediaServer" in user_agent: + user_agent: Optional[str] = Header(None), + payload: Optional[str] = Form(), +): + try: + plex_json = json.loads(payload) + logging.debug(f"Raw response: {payload}") + + if "PlexMediaServer" not in user_agent: + return {"message": "This doesn't appear to be a properly configured Plex webhook, please review the instructions again"} + event = plex_json["event"] logging.debug(f"Plex event detected is: {event}") - if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)): + + if (event in ["library.new", "media.play"] and (procaddedmedia or procmediaonplay)): fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken) logging.debug("Path of file: " + fullpath) - + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) - try: - refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken) - logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.") - except Exception as e: - logging.error(f"Failed to refresh metadata for item {plex_json['Metadata']['ratingKey']}: {e}") - else: - return {"This doesn't appear to be a properly configured Plex webhook, please review the instructions again!"} - + refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken) + logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.") + except Exception as e: + logging.error(f"Failed to process Plex webhook: {e}") + return "" + @app.post("/jellyfin") def receive_jellyfin_webhook( - user_agent: Union[str, None] = Header(None), - NotificationType: str = Body(None), - file: str = Body(None), - ItemId: str = Body(None), - ): - + user_agent: str = Header(None), + NotificationType: str = Body(None), + file: str = Body(None), + ItemId: str = Body(None), +): if "Jellyfin-Server" in user_agent: - logging.debug("Jellyfin event detected is: " + NotificationType) - logging.debug("itemid is: " + ItemId) - if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)): + logging.debug(f"Jellyfin event detected is: {NotificationType}") + logging.debug(f"itemid is: {ItemId}") + + if (NotificationType == "ItemAdded" and procaddedmedia) or ( + NotificationType == "PlaybackStart" and procmediaonplay): fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken) logging.debug(f"Path of file: {fullpath}") - + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) try: refresh_jellyfin_metadata(ItemId, jellyfinserver, jellyfintoken) @@ -390,30 +348,34 @@ def receive_jellyfin_webhook( except Exception as e: logging.error(f"Failed to refresh metadata for item {ItemId}: {e}") else: - return {"This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"} - + return { + "message": "This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"} + return "" + @app.post("/emby") def receive_emby_webhook( - user_agent: Union[str, None] = Header(None), - data: Union[str, None] = Form(None), - ): + user_agent: Union[str, None] = Header(None), + data: Union[str, None] = Form(None), +): logging.debug("Raw response: %s", data) - - if "Emby Server" in user_agent: - if data: - data_dict = json.loads(data) - fullpath = data_dict['Item']['Path'] - event = data_dict['Event'] - logging.debug("Emby event detected is: " + event) - if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)): - logging.debug("Path of file: " + fullpath) - - gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) - else: + + if "Emby Server" not in user_agent: return {"This doesn't appear to be a properly configured Emby webhook, please review the instructions again!"} - + + if not data: + return "" + + data_dict = json.loads(data) + fullpath = data_dict['Item']['Path'] + event = data_dict['Event'] + logging.debug("Emby event detected is: " + event) + + if event == "library.new" and procaddedmedia or event == "playback.start" and procmediaonplay: + logging.debug("Path of file: " + fullpath) + gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True) + return "" @app.post("/batch") @@ -518,71 +480,82 @@ def write_lrc(result, file_path): fraction = int((segment.start - int(segment.start)) * 100) file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}] {segment.text}\n") -def gen_subtitles(file_path: str, transcribe_or_translate: str, front=True, forceLanguage=None) -> None: + +def gen_subtitles(file_path: str, transcription_type: str, add_to_front=True, force_language=None) -> None: """Generates subtitles for a video file. Args: file_path: str - The path to the video file. - transcribe_or_translate: str - The type of transcription or translation to perform. - front: bool - Whether to add the file to the front of the transcription queue. Default is True. - forceLanguage: str - The language to force for transcription or translation. Default is None. + transcription_type: str - The type of transcription or translation to perform. + add_to_front: bool - Whether to add the file to the front of the transcription queue. Default is True. + force_language: str - The language to force for transcription or translation. Default is None. """ - + try: if not has_audio(file_path): logging.debug(f"{file_path} doesn't have any audio to transcribe!") return None - - if file_path not in files_to_transcribe: - message = None - if has_subtitle_language(file_path, skipifinternalsublang): - message = f"{file_path} already has an internal subtitle we want, skipping generation" - elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension): - message = f"{file_path} already has a subtitle created for this, skipping it" - elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH): - message = f"{file_path} already has a SDH subtitle created for this, skipping it" - if message != None: - logging.info(message) - return message - - if front: - files_to_transcribe.insert(0, file_path) - else: - files_to_transcribe.append(file_path) - logging.info(f"Added {os.path.basename(file_path)} for transcription.") - # Start transcription for the file in a separate thread - logging.info(f"{len(files_to_transcribe)} files in the queue for transcription") - logging.info(f"Transcribing file: {os.path.basename(file_path)}") - start_time = time.time() - start_model() - global force_detected_language_to - if force_detected_language_to: - forceLanguage = force_detected_language_to - logging.info(f"Forcing language to {forceLanguage}") - if custom_regroup: - result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress, initial_prompt=custom_model_prompt, regroup=custom_regroup) - else: - result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress, initial_prompt=custom_model_prompt) - appendLine(result) - file_name, file_extension = os.path.splitext(file_path) - if isAudioFileExtension(file_extension) and lrc_for_audio_files: - write_lrc(result, file_name + '.lrc') - else: - result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight) - elapsed_time = time.time() - start_time - minutes, seconds = divmod(int(elapsed_time), 60) - logging.info(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.") - else: + if file_path in files_to_transcribe: logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.") + return + + message = None + if has_subtitle_language(file_path, skipifinternalsublang): + message = f"{file_path} already has an internal subtitle we want, skipping generation" + elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension): + message = f"{file_path} already has a subtitle created for this, skipping it" + elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH): + message = f"{file_path} already has a SDH subtitle created for this, skipping it" + if message: + logging.info(message) + return message + + if add_to_front: + files_to_transcribe.insert(0, file_path) + else: + files_to_transcribe.append(file_path) + logging.info(f"Added {os.path.basename(file_path)} for transcription.") + logging.info(f"{len(files_to_transcribe)} files in the queue for transcription") + logging.info(f"Transcribing file: {os.path.basename(file_path)}") + + start_time = time.time() + start_model() + + if force_detected_language_to: + force_language = force_detected_language_to + logging.info(f"Forcing language to {force_language}") + + if custom_regroup: + result = model.transcribe_stable(file_path, language=force_language, task=transcription_type, + progress_callback=progress, initial_prompt=custom_model_prompt, + regroup=custom_regroup) + else: + result = model.transcribe_stable(file_path, language=force_language, task=transcription_type, + progress_callback=progress, initial_prompt=custom_model_prompt) + + appendLine(result) + file_name, file_extension = os.path.splitext(file_path) + + if isAudioFileExtension(file_extension) and lrc_for_audio_files: + write_lrc(result, file_name + '.lrc') + else: + result.to_srt_vtt(file_name + subextension, word_level=word_level_highlight) + + elapsed_time = time.time() - start_time + minutes, seconds = divmod(int(elapsed_time), 60) + logging.info( + f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.") except Exception as e: logging.info(f"Error processing or transcribing {file_path}: {e}") + finally: if file_path in files_to_transcribe: files_to_transcribe.remove(file_path) delete_model() + def get_file_name_without_extension(file_path): file_name, file_extension = os.path.splitext(file_path) return file_name