Rename subgen/subgen.py to subgen.py
This commit is contained in:
775
subgen.py
Normal file
775
subgen.py
Normal file
@@ -0,0 +1,775 @@
|
||||
subgen_version = '2024.3.20.28'
|
||||
|
||||
from datetime import datetime
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
import xml.etree.ElementTree as ET
|
||||
import threading
|
||||
import sys
|
||||
import time
|
||||
import queue
|
||||
import logging
|
||||
import gc
|
||||
import io
|
||||
import random
|
||||
from typing import BinaryIO, Union, Any
|
||||
from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request
|
||||
from fastapi.responses import StreamingResponse, RedirectResponse
|
||||
import numpy as np
|
||||
import stable_whisper
|
||||
import requests
|
||||
import av
|
||||
import ffmpeg
|
||||
import whisper
|
||||
import re
|
||||
|
||||
def convert_to_bool(in_bool):
|
||||
# Convert the input to string and lower case, then check against true values
|
||||
return str(in_bool).lower() in ('true', 'on', '1')
|
||||
|
||||
# Replace your getenv calls with appropriate default values here
|
||||
plextoken = os.getenv('PLEXTOKEN', 'token here')
|
||||
plexserver = os.getenv('PLEXSERVER', 'http://192.168.1.111:32400')
|
||||
jellyfintoken = os.getenv('JELLYFINTOKEN', 'token here')
|
||||
jellyfinserver = os.getenv('JELLYFINSERVER', 'http://192.168.1.111:8096')
|
||||
whisper_model = os.getenv('WHISPER_MODEL', 'medium')
|
||||
whisper_threads = int(os.getenv('WHISPER_THREADS', 4))
|
||||
concurrent_transcriptions = int(os.getenv('CONCURRENT_TRANSCRIPTIONS', 2))
|
||||
transcribe_device = os.getenv('TRANSCRIBE_DEVICE', 'cpu')
|
||||
procaddedmedia = convert_to_bool(os.getenv('PROCADDEDMEDIA', True))
|
||||
procmediaonplay = convert_to_bool(os.getenv('PROCMEDIAONPLAY', True))
|
||||
namesublang = os.getenv('NAMESUBLANG', 'aa')
|
||||
skipifinternalsublang = os.getenv('SKIPIFINTERNALSUBLANG', 'eng')
|
||||
webhookport = int(os.getenv('WEBHOOKPORT', 9000))
|
||||
word_level_highlight = convert_to_bool(os.getenv('WORD_LEVEL_HIGHLIGHT', False))
|
||||
debug = convert_to_bool(os.getenv('DEBUG', True))
|
||||
use_path_mapping = convert_to_bool(os.getenv('USE_PATH_MAPPING', False))
|
||||
path_mapping_from = os.getenv('PATH_MAPPING_FROM', r'/tv')
|
||||
path_mapping_to = os.getenv('PATH_MAPPING_TO', r'/Volumes/TV')
|
||||
model_location = os.getenv('MODEL_PATH', './models')
|
||||
monitor = convert_to_bool(os.getenv('MONITOR', False))
|
||||
transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '')
|
||||
transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'transcribe')
|
||||
force_detected_language_to = os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')
|
||||
hf_transformers = convert_to_bool(os.getenv('HF_TRANSFORMERS', False))
|
||||
hf_batch_size = int(os.getenv('HF_BATCH_SIZE', 24))
|
||||
clear_vram_on_complete = convert_to_bool(os.getenv('CLEAR_VRAM_ON_COMPLETE', True))
|
||||
compute_type = os.getenv('COMPUTE_TYPE', 'auto')
|
||||
append = convert_to_bool(os.getenv('APPEND', False))
|
||||
reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', False))
|
||||
|
||||
if transcribe_device == "gpu":
|
||||
transcribe_device = "cuda"
|
||||
|
||||
if monitor:
|
||||
from watchdog.observers.polling import PollingObserver as Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
|
||||
app = FastAPI()
|
||||
model = None
|
||||
files_to_transcribe = []
|
||||
subextension = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.srt"
|
||||
subextensionSDH = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.sdh.srt"
|
||||
|
||||
in_docker = os.path.exists('/.dockerenv')
|
||||
docker_status = "Docker" if in_docker else "Standalone"
|
||||
last_print_time = None
|
||||
|
||||
# Define a filter class
|
||||
class MultiplePatternsFilter(logging.Filter):
|
||||
def filter(self, record):
|
||||
# Define the patterns to search for
|
||||
patterns = [
|
||||
"Compression ratio threshold is not met",
|
||||
"Processing segment at",
|
||||
"Log probability threshold is",
|
||||
"Reset prompt",
|
||||
"Attempting to release",
|
||||
"released on ",
|
||||
"Attempting to acquire",
|
||||
"acquired on",
|
||||
"header parsing failed",
|
||||
"timescale not set",
|
||||
"misdetection possible",
|
||||
"srt was added",
|
||||
"doesn't have any audio to transcribe",
|
||||
]
|
||||
# Return False if any of the patterns are found, True otherwise
|
||||
return not any(pattern in record.getMessage() for pattern in patterns)
|
||||
|
||||
# Configure logging
|
||||
if debug:
|
||||
level = logging.DEBUG
|
||||
logging.basicConfig(stream=sys.stderr, level=level, format="%(asctime)s %(levelname)s: %(message)s")
|
||||
else:
|
||||
level = logging.INFO
|
||||
logging.basicConfig(stream=sys.stderr, level=level)
|
||||
|
||||
# Get the root logger
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(level) # Set the logger level
|
||||
|
||||
for handler in logger.handlers:
|
||||
handler.addFilter(MultiplePatternsFilter())
|
||||
|
||||
logging.getLogger("multipart").setLevel(logging.WARNING)
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("asyncio").setLevel(logging.WARNING)
|
||||
logging.getLogger("watchfiles").setLevel(logging.WARNING)
|
||||
|
||||
#This forces a flush to print progress correctly
|
||||
def progress(seek, total):
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
if(docker_status) == 'Docker':
|
||||
global last_print_time
|
||||
# Get the current time
|
||||
current_time = time.time()
|
||||
|
||||
# Check if 5 seconds have passed since the last print
|
||||
if last_print_time is None or (current_time - last_print_time) >= 5:
|
||||
# Update the last print time
|
||||
last_print_time = current_time
|
||||
# Log the message
|
||||
logging.info("Force Update...")
|
||||
|
||||
TIME_OFFSET = 5
|
||||
|
||||
def appendLine(result):
|
||||
if append:
|
||||
lastSegment = result.segments[-1].copy()
|
||||
lastSegment.id += 1
|
||||
lastSegment.start += TIME_OFFSET
|
||||
lastSegment.end += TIME_OFFSET
|
||||
date_time_str = datetime.now().strftime("%d %b %Y - %H:%M:%S")
|
||||
lastSegment.text = f"Transcribed by whisperAI with faster-whisper ({whisper_model}) on {date_time_str}"
|
||||
lastSegment.words = []
|
||||
# lastSegment.words[0].word = lastSegment.text
|
||||
# lastSegment.words = lastSegment.words[:len(lastSegment.words)-1]
|
||||
result.segments.append(lastSegment)
|
||||
|
||||
@app.get("/plex")
|
||||
@app.get("/webhook")
|
||||
@app.get("/jellyfin")
|
||||
@app.get("/asr")
|
||||
@app.get("/emby")
|
||||
@app.get("/detect-language")
|
||||
@app.get("/tautulli")
|
||||
@app.get("/")
|
||||
def handle_get_request(request: Request):
|
||||
return {"You accessed this request incorrectly via a GET request. See https://github.com/McCloudS/subgen for proper configuration"}
|
||||
|
||||
@app.get("/status")
|
||||
def status():
|
||||
in_docker = os.path.exists('/.dockerenv')
|
||||
docker_status = "Docker" if in_docker else "Standalone"
|
||||
return {"version" : f"Subgen {subgen_version}, stable-ts {stable_whisper.__version__}, whisper {whisper.__version__} ({docker_status})"}
|
||||
|
||||
@app.post("/subsync")
|
||||
def subsync(
|
||||
audio_file: UploadFile = File(...),
|
||||
subtitle_file: UploadFile = File(...),
|
||||
language: Union[str, None] = Query(default=None),
|
||||
):
|
||||
try:
|
||||
logging.info(f"Syncing subtitle file from Subsync webhook")
|
||||
result = None
|
||||
|
||||
srt_content = subtitle_file.file.read().decode('utf-8')
|
||||
srt_content = re.sub(r'\{.*?\}', '', srt_content)
|
||||
# Remove numeric counters for each entry
|
||||
srt_content = re.sub(r'^\d+$', '', srt_content, flags=re.MULTILINE)
|
||||
# Remove timestamps and formatting
|
||||
srt_content = re.sub(r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', '', srt_content)
|
||||
# Remove any remaining newlines and spaces
|
||||
srt_content = re.sub(r'\n\n+', '\n', srt_content).strip()
|
||||
|
||||
start_time = time.time()
|
||||
start_model()
|
||||
|
||||
result = model.align(audio_file.file.read(), srt_content, language=language)
|
||||
appendLine(result)
|
||||
elapsed_time = time.time() - start_time
|
||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
||||
logging.info(f"Subsync is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing or aligning {audio_file.filename} or {subtitle_file.filename}: {e}")
|
||||
finally:
|
||||
delete_model()
|
||||
if result:
|
||||
return StreamingResponse(
|
||||
iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)),
|
||||
media_type="text/plain",
|
||||
headers={
|
||||
'Source': 'Aligned using stable-ts from Subgen!',
|
||||
})
|
||||
else:
|
||||
return
|
||||
|
||||
@app.post("/tautulli")
|
||||
def receive_tautulli_webhook(
|
||||
source: Union[str, None] = Header(None),
|
||||
event: str = Body(None),
|
||||
file: str = Body(None),
|
||||
):
|
||||
|
||||
if source == "Tautulli":
|
||||
logging.debug(f"Tautulli event detected is: {event}")
|
||||
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
|
||||
fullpath = file
|
||||
logging.debug("Path of file: " + fullpath)
|
||||
|
||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||
else:
|
||||
return {"This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"}
|
||||
|
||||
return ""
|
||||
|
||||
@app.post("/plex")
|
||||
def receive_plex_webhook(
|
||||
user_agent: Union[str, None] = Header(None),
|
||||
payload: Union[str, None] = Form(),
|
||||
):
|
||||
plex_json = json.loads(payload)
|
||||
logging.debug(f"Raw response: {payload}")
|
||||
|
||||
if "PlexMediaServer" in user_agent:
|
||||
event = plex_json["event"]
|
||||
logging.debug(f"Plex event detected is: {event}")
|
||||
if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)):
|
||||
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
||||
logging.debug("Path of file: " + fullpath)
|
||||
|
||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||
try:
|
||||
refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
|
||||
logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to refresh metadata for item {plex_json['Metadata']['ratingKey']}: {e}")
|
||||
else:
|
||||
return {"This doesn't appear to be a properly configured Plex webhook, please review the instructions again!"}
|
||||
|
||||
return ""
|
||||
|
||||
@app.post("/jellyfin")
|
||||
def receive_jellyfin_webhook(
|
||||
user_agent: Union[str, None] = Header(None),
|
||||
NotificationType: str = Body(None),
|
||||
file: str = Body(None),
|
||||
ItemId: str = Body(None),
|
||||
):
|
||||
|
||||
if "Jellyfin-Server" in user_agent:
|
||||
logging.debug("Jellyfin event detected is: " + NotificationType)
|
||||
logging.debug("itemid is: " + ItemId)
|
||||
if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)):
|
||||
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
|
||||
logging.debug(f"Path of file: {fullpath}")
|
||||
|
||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||
try:
|
||||
refresh_jellyfin_metadata(ItemId, jellyfinserver, jellyfintoken)
|
||||
logging.info(f"Metadata for item {ItemId} refreshed successfully.")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to refresh metadata for item {ItemId}: {e}")
|
||||
else:
|
||||
return {"This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"}
|
||||
|
||||
return ""
|
||||
|
||||
@app.post("/emby")
|
||||
def receive_emby_webhook(
|
||||
user_agent: Union[str, None] = Header(None),
|
||||
data: Union[str, None] = Form(None),
|
||||
):
|
||||
logging.debug("Raw response: %s", data)
|
||||
|
||||
if "Emby Server" in user_agent:
|
||||
if data:
|
||||
data_dict = json.loads(data)
|
||||
fullpath = data_dict['Item']['Path']
|
||||
event = data_dict['Event']
|
||||
logging.debug("Emby event detected is: " + event)
|
||||
if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)):
|
||||
logging.debug("Path of file: " + fullpath)
|
||||
|
||||
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
|
||||
else:
|
||||
return {"This doesn't appear to be a properly configured Emby webhook, please review the instructions again!"}
|
||||
|
||||
return ""
|
||||
|
||||
@app.post("/batch")
|
||||
def batch(
|
||||
directory: Union[str, None] = Query(default=None),
|
||||
forceLanguage: Union[str, None] = Query(default=None)
|
||||
):
|
||||
transcribe_existing(directory, forceLanguage)
|
||||
|
||||
# idea and some code for asr and detect language from https://github.com/ahmetoner/whisper-asr-webservice
|
||||
@app.post("//asr")
|
||||
@app.post("/asr")
|
||||
def asr(
|
||||
task: Union[str, None] = Query(default="transcribe", enum=["transcribe", "translate"]),
|
||||
language: Union[str, None] = Query(default=None),
|
||||
initial_prompt: Union[str, None] = Query(default=None), #not used by Bazarr
|
||||
audio_file: UploadFile = File(...),
|
||||
encode: bool = Query(default=True, description="Encode audio first through ffmpeg"), #not used by Bazarr/always False
|
||||
output: Union[str, None] = Query(default="srt", enum=["txt", "vtt", "srt", "tsv", "json"]),
|
||||
word_timestamps: bool = Query(default=False, description="Word level timestamps") #not used by Bazarr
|
||||
):
|
||||
try:
|
||||
logging.info(f"Transcribing file from Bazarr/ASR webhook")
|
||||
result = None
|
||||
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
|
||||
|
||||
start_time = time.time()
|
||||
start_model()
|
||||
files_to_transcribe.insert(0, f"Bazarr-asr-{random_name}")
|
||||
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
|
||||
if(hf_transformers):
|
||||
result = model.transcribe(audio_data, task=task, input_sr=16000, language=language, batch_size=hf_batch_size, progress_callback=progress)
|
||||
else:
|
||||
result = model.transcribe_stable(audio_data, task=task, input_sr=16000, language=language, progress_callback=progress)
|
||||
appendLine(result)
|
||||
elapsed_time = time.time() - start_time
|
||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
||||
logging.info(f"Bazarr transcription is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
|
||||
finally:
|
||||
if f"Bazarr-asr-{random_name}" in files_to_transcribe:
|
||||
files_to_transcribe.remove(f"Bazarr-asr-{random_name}")
|
||||
delete_model()
|
||||
if result:
|
||||
return StreamingResponse(
|
||||
iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)),
|
||||
media_type="text/plain",
|
||||
headers={
|
||||
'Source': 'Transcribed using stable-ts from Subgen!',
|
||||
})
|
||||
else:
|
||||
return
|
||||
|
||||
@app.post("//detect-language")
|
||||
@app.post("/detect-language")
|
||||
def detect_language(
|
||||
audio_file: UploadFile = File(...),
|
||||
#encode: bool = Query(default=True, description="Encode audio first through ffmpeg") # This is always false from Bazarr
|
||||
):
|
||||
detected_lang_code = "" # Initialize with an empty string
|
||||
try:
|
||||
start_model()
|
||||
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
|
||||
files_to_transcribe.insert(0, f"Bazarr-detect-language-{random_name}")
|
||||
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
|
||||
if(hf_transformers):
|
||||
detected_lang_code = model.transcribe(whisper.pad_or_trim(audio_data), input_sr=16000, batch_size=hf_batch_size).language
|
||||
else:
|
||||
detected_lang_code = model.transcribe_stable(whisper.pad_or_trim(audio_data), input_sr=16000).language
|
||||
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
|
||||
|
||||
finally:
|
||||
if f"Bazarr-detect-language-{random_name}" in files_to_transcribe:
|
||||
files_to_transcribe.remove(f"Bazarr-detect-language-{random_name}")
|
||||
delete_model()
|
||||
|
||||
return {"detected_language": whisper_languages.get(detected_lang_code, detected_lang_code) , "language_code": detected_lang_code}
|
||||
|
||||
def start_model():
|
||||
global model
|
||||
if model is None:
|
||||
logging.debug("Model was purged, need to re-create")
|
||||
if(hf_transformers):
|
||||
logging.debug("Using Hugging Face Transformers, whisper_threads, concurrent_transcriptions, and model_location variables are ignored!")
|
||||
model = stable_whisper.load_hf_whisper(whisper_model, device=transcribe_device)
|
||||
else:
|
||||
model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions, compute_type=compute_type)
|
||||
|
||||
def delete_model():
|
||||
if clear_vram_on_complete and len(files_to_transcribe) == 0:
|
||||
global model
|
||||
logging.debug("Queue is empty, clearing/releasing VRAM")
|
||||
model = None
|
||||
gc.collect()
|
||||
|
||||
def gen_subtitles(file_path: str, transcribe_or_translate: str, front=True, forceLanguage=None) -> None:
|
||||
"""Generates subtitles for a video file.
|
||||
|
||||
Args:
|
||||
file_path: str - The path to the video file.
|
||||
transcribe_or_translate: str - The type of transcription or translation to perform.
|
||||
front: bool - Whether to add the file to the front of the transcription queue. Default is True.
|
||||
forceLanguage: str - The language to force for transcription or translation. Default is None.
|
||||
"""
|
||||
|
||||
try:
|
||||
if not has_audio(file_path):
|
||||
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
|
||||
return None
|
||||
|
||||
if file_path not in files_to_transcribe:
|
||||
message = None
|
||||
if has_subtitle_language(file_path, skipifinternalsublang):
|
||||
message = f"{file_path} already has an internal subtitle we want, skipping generation"
|
||||
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
|
||||
message = f"{file_path} already has a subtitle created for this, skipping it"
|
||||
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
|
||||
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
|
||||
if message != None:
|
||||
logging.info(message)
|
||||
return message
|
||||
|
||||
if front:
|
||||
files_to_transcribe.insert(0, file_path)
|
||||
else:
|
||||
files_to_transcribe.append(file_path)
|
||||
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
|
||||
# Start transcription for the file in a separate thread
|
||||
|
||||
logging.info(f"{len(files_to_transcribe)} files in the queue for transcription")
|
||||
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
|
||||
start_time = time.time()
|
||||
start_model()
|
||||
global force_detected_language_to
|
||||
if force_detected_language_to:
|
||||
forceLanguage = force_detected_language_to
|
||||
logging.info(f"Forcing language to {forceLanguage}")
|
||||
if(hf_transformers):
|
||||
result = model.transcribe(file_path, language=forceLanguage, batch_size=hf_batch_size, task=transcribe_or_translate, progress_callback=progress)
|
||||
else:
|
||||
result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress)
|
||||
appendLine(result)
|
||||
result.to_srt_vtt(get_file_name_without_extension(file_path) + subextension, word_level=word_level_highlight)
|
||||
elapsed_time = time.time() - start_time
|
||||
minutes, seconds = divmod(int(elapsed_time), 60)
|
||||
logging.info(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
|
||||
else:
|
||||
logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
|
||||
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing or transcribing {file_path}: {e}")
|
||||
finally:
|
||||
if file_path in files_to_transcribe:
|
||||
files_to_transcribe.remove(file_path)
|
||||
delete_model()
|
||||
|
||||
def get_file_name_without_extension(file_path):
|
||||
file_name, file_extension = os.path.splitext(file_path)
|
||||
return file_name
|
||||
|
||||
def has_subtitle_language(video_file, target_language):
|
||||
try:
|
||||
with av.open(video_file) as container:
|
||||
subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and stream.metadata['language'] == target_language), None)
|
||||
|
||||
if subtitle_stream:
|
||||
logging.debug(f"Subtitles in '{target_language}' language found in the video.")
|
||||
return True
|
||||
else:
|
||||
logging.debug(f"No subtitles in '{target_language}' language found in the video.")
|
||||
except Exception as e:
|
||||
logging.info(f"An error occurred: {e}")
|
||||
return False
|
||||
|
||||
def get_plex_file_name(itemid: str, server_ip: str, plex_token: str) -> str:
|
||||
"""Gets the full path to a file from the Plex server.
|
||||
|
||||
Args:
|
||||
itemid: The ID of the item in the Plex library.
|
||||
server_ip: The IP address of the Plex server.
|
||||
plex_token: The Plex token.
|
||||
|
||||
Returns:
|
||||
The full path to the file.
|
||||
"""
|
||||
|
||||
url = f"{server_ip}/library/metadata/{itemid}"
|
||||
|
||||
headers = {
|
||||
"X-Plex-Token": plex_token,
|
||||
}
|
||||
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
root = ET.fromstring(response.content)
|
||||
fullpath = root.find(".//Part").attrib['file']
|
||||
return fullpath
|
||||
else:
|
||||
raise Exception(f"Error: {response.status_code}")
|
||||
|
||||
def refresh_plex_metadata(itemid: str, server_ip: str, plex_token: str) -> None:
|
||||
"""
|
||||
Refreshes the metadata of a Plex library item.
|
||||
|
||||
Args:
|
||||
itemid: The ID of the item in the Plex library whose metadata needs to be refreshed.
|
||||
server_ip: The IP address of the Plex server.
|
||||
plex_token: The Plex token used for authentication.
|
||||
|
||||
Raises:
|
||||
Exception: If the server does not respond with a successful status code.
|
||||
"""
|
||||
|
||||
# Plex API endpoint to refresh metadata for a specific item
|
||||
url = f"{server_ip}/library/metadata/{itemid}/refresh"
|
||||
|
||||
# Headers to include the Plex token for authentication
|
||||
headers = {
|
||||
"X-Plex-Token": plex_token,
|
||||
}
|
||||
|
||||
# Sending the PUT request to refresh metadata
|
||||
response = requests.put(url, headers=headers)
|
||||
|
||||
# Check if the request was successful
|
||||
if response.status_code == 200:
|
||||
logging.info("Metadata refresh initiated successfully.")
|
||||
else:
|
||||
raise Exception(f"Error refreshing metadata: {response.status_code}")
|
||||
|
||||
def refresh_jellyfin_metadata(itemid: str, server_ip: str, jellyfin_token: str) -> None:
|
||||
"""
|
||||
Refreshes the metadata of a Jellyfin library item.
|
||||
|
||||
Args:
|
||||
itemid: The ID of the item in the Jellyfin library whose metadata needs to be refreshed.
|
||||
server_ip: The IP address of the Jellyfin server.
|
||||
jellyfin_token: The Jellyfin token used for authentication.
|
||||
|
||||
Raises:
|
||||
Exception: If the server does not respond with a successful status code.
|
||||
"""
|
||||
|
||||
# Jellyfin API endpoint to refresh metadata for a specific item
|
||||
url = f"{server_ip}/Items/{itemid}/Refresh"
|
||||
|
||||
# Headers to include the Jellyfin token for authentication
|
||||
headers = {
|
||||
"Authorization": f"MediaBrowser Token={jellyfin_token}",
|
||||
}
|
||||
|
||||
# Cheap way to get the admin user id, and save it for later use.
|
||||
users = json.loads(requests.get(f"{server_ip}/Users", headers=headers).content)
|
||||
jellyfin_admin = get_jellyfin_admin(users)
|
||||
|
||||
response = requests.get(f"{server_ip}/Users/{jellyfin_admin}/Items/{itemid}/Refresh", headers=headers)
|
||||
|
||||
# Sending the PUT request to refresh metadata
|
||||
response = requests.post(url, headers=headers)
|
||||
|
||||
# Check if the request was successful
|
||||
if response.status_code == 204:
|
||||
logging.info("Metadata refresh queued successfully.")
|
||||
else:
|
||||
raise Exception(f"Error refreshing metadata: {response.status_code}")
|
||||
|
||||
|
||||
def get_jellyfin_file_name(item_id: str, jellyfin_url: str, jellyfin_token: str) -> str:
|
||||
"""Gets the full path to a file from the Jellyfin server.
|
||||
|
||||
Args:
|
||||
jellyfin_url: The URL of the Jellyfin server.
|
||||
jellyfin_token: The Jellyfin token.
|
||||
item_id: The ID of the item in the Jellyfin library.
|
||||
|
||||
Returns:
|
||||
The full path to the file.
|
||||
"""
|
||||
|
||||
headers = {
|
||||
"Authorization": f"MediaBrowser Token={jellyfin_token}",
|
||||
}
|
||||
|
||||
# Cheap way to get the admin user id, and save it for later use.
|
||||
users = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content)
|
||||
jellyfin_admin = get_jellyfin_admin(users)
|
||||
|
||||
response = requests.get(f"{jellyfin_url}/Users/{jellyfin_admin}/Items/{item_id}", headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
file_name = json.loads(response.content)['Path']
|
||||
return file_name
|
||||
else:
|
||||
raise Exception(f"Error: {response.status_code}")
|
||||
|
||||
def get_jellyfin_admin(users):
|
||||
for user in users:
|
||||
if user["Policy"]["IsAdministrator"]:
|
||||
return user["Id"]
|
||||
|
||||
raise Exception("Unable to find administrator user in Jellyfin")
|
||||
|
||||
def has_audio(file_path):
|
||||
try:
|
||||
with av.open(file_path) as container:
|
||||
return any(stream.type == 'audio' for stream in container.streams)
|
||||
except (av.AVError, UnicodeDecodeError):
|
||||
return False
|
||||
|
||||
def path_mapping(fullpath):
|
||||
if use_path_mapping:
|
||||
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
|
||||
return fullpath.replace(path_mapping_from, path_mapping_to)
|
||||
return fullpath
|
||||
|
||||
if monitor:
|
||||
# Define a handler class that will process new files
|
||||
class NewFileHandler(FileSystemEventHandler):
|
||||
def create_subtitle(self, event):
|
||||
# Only process if it's a file
|
||||
if not event.is_directory:
|
||||
file_path = event.src_path
|
||||
if has_audio(file_path):
|
||||
# Call the gen_subtitles function
|
||||
logging.info(f"File: {path_mapping(file_path)} was added")
|
||||
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False)
|
||||
def on_created(self, event):
|
||||
self.create_subtitle(event)
|
||||
def on_modified(self, event):
|
||||
self.create_subtitle(event)
|
||||
|
||||
def transcribe_existing(transcribe_folders, forceLanguage=None):
|
||||
transcribe_folders = transcribe_folders.split("|")
|
||||
logging.info("Starting to search folders to see if we need to create subtitles.")
|
||||
logging.debug("The folders are:")
|
||||
for path in transcribe_folders:
|
||||
logging.debug(path)
|
||||
for root, dirs, files in os.walk(path):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False, forceLanguage)
|
||||
# if the path specified was actually a single file and not a folder, process it
|
||||
if os.path.isfile(path):
|
||||
if has_audio(path):
|
||||
gen_subtitles(path_mapping(path), transcribe_or_translate, False, forceLanguage)
|
||||
# Set up the observer to watch for new files
|
||||
if monitor:
|
||||
observer = Observer()
|
||||
for path in transcribe_folders:
|
||||
if os.path.isdir(path):
|
||||
handler = NewFileHandler()
|
||||
observer.schedule(handler, path, recursive=True)
|
||||
observer.start()
|
||||
logging.info("Finished searching and queueing files for transcription. Now watching for new files.")
|
||||
|
||||
whisper_languages = {
|
||||
"en": "english",
|
||||
"zh": "chinese",
|
||||
"de": "german",
|
||||
"es": "spanish",
|
||||
"ru": "russian",
|
||||
"ko": "korean",
|
||||
"fr": "french",
|
||||
"ja": "japanese",
|
||||
"pt": "portuguese",
|
||||
"tr": "turkish",
|
||||
"pl": "polish",
|
||||
"ca": "catalan",
|
||||
"nl": "dutch",
|
||||
"ar": "arabic",
|
||||
"sv": "swedish",
|
||||
"it": "italian",
|
||||
"id": "indonesian",
|
||||
"hi": "hindi",
|
||||
"fi": "finnish",
|
||||
"vi": "vietnamese",
|
||||
"he": "hebrew",
|
||||
"uk": "ukrainian",
|
||||
"el": "greek",
|
||||
"ms": "malay",
|
||||
"cs": "czech",
|
||||
"ro": "romanian",
|
||||
"da": "danish",
|
||||
"hu": "hungarian",
|
||||
"ta": "tamil",
|
||||
"no": "norwegian",
|
||||
"th": "thai",
|
||||
"ur": "urdu",
|
||||
"hr": "croatian",
|
||||
"bg": "bulgarian",
|
||||
"lt": "lithuanian",
|
||||
"la": "latin",
|
||||
"mi": "maori",
|
||||
"ml": "malayalam",
|
||||
"cy": "welsh",
|
||||
"sk": "slovak",
|
||||
"te": "telugu",
|
||||
"fa": "persian",
|
||||
"lv": "latvian",
|
||||
"bn": "bengali",
|
||||
"sr": "serbian",
|
||||
"az": "azerbaijani",
|
||||
"sl": "slovenian",
|
||||
"kn": "kannada",
|
||||
"et": "estonian",
|
||||
"mk": "macedonian",
|
||||
"br": "breton",
|
||||
"eu": "basque",
|
||||
"is": "icelandic",
|
||||
"hy": "armenian",
|
||||
"ne": "nepali",
|
||||
"mn": "mongolian",
|
||||
"bs": "bosnian",
|
||||
"kk": "kazakh",
|
||||
"sq": "albanian",
|
||||
"sw": "swahili",
|
||||
"gl": "galician",
|
||||
"mr": "marathi",
|
||||
"pa": "punjabi",
|
||||
"si": "sinhala",
|
||||
"km": "khmer",
|
||||
"sn": "shona",
|
||||
"yo": "yoruba",
|
||||
"so": "somali",
|
||||
"af": "afrikaans",
|
||||
"oc": "occitan",
|
||||
"ka": "georgian",
|
||||
"be": "belarusian",
|
||||
"tg": "tajik",
|
||||
"sd": "sindhi",
|
||||
"gu": "gujarati",
|
||||
"am": "amharic",
|
||||
"yi": "yiddish",
|
||||
"lo": "lao",
|
||||
"uz": "uzbek",
|
||||
"fo": "faroese",
|
||||
"ht": "haitian creole",
|
||||
"ps": "pashto",
|
||||
"tk": "turkmen",
|
||||
"nn": "nynorsk",
|
||||
"mt": "maltese",
|
||||
"sa": "sanskrit",
|
||||
"lb": "luxembourgish",
|
||||
"my": "myanmar",
|
||||
"bo": "tibetan",
|
||||
"tl": "tagalog",
|
||||
"mg": "malagasy",
|
||||
"as": "assamese",
|
||||
"tt": "tatar",
|
||||
"haw": "hawaiian",
|
||||
"ln": "lingala",
|
||||
"ha": "hausa",
|
||||
"ba": "bashkir",
|
||||
"jw": "javanese",
|
||||
"su": "sundanese",
|
||||
}
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
logging.info(f"Subgen v{subgen_version}")
|
||||
logging.info("Starting Subgen with listening webhooks!")
|
||||
logging.info(f"Transcriptions are limited to running {str(concurrent_transcriptions)} at a time")
|
||||
logging.info(f"Running {str(whisper_threads)} threads per transcription")
|
||||
logging.info(f"Using {transcribe_device} to encode")
|
||||
if hf_transformers:
|
||||
logging.info(f"Using Hugging Face Transformers")
|
||||
else:
|
||||
logging.info(f"Using faster-whisper")
|
||||
if transcribe_folders:
|
||||
transcribe_existing(transcribe_folders)
|
||||
uvicorn.run("subgen:app", host="0.0.0.0", port=int(webhookport), reload=reload_script_on_change, use_colors=True)
|
||||
Reference in New Issue
Block a user