Files
Transcriptarr/subgen.py
2024-03-20 21:33:32 +00:00

776 lines
30 KiB
Python

subgen_version = '2024.3.20.35'
from datetime import datetime
import subprocess
import os
import json
import xml.etree.ElementTree as ET
import threading
import sys
import time
import queue
import logging
import gc
import io
import random
from typing import BinaryIO, Union, Any
from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request
from fastapi.responses import StreamingResponse, RedirectResponse
import numpy as np
import stable_whisper
import requests
import av
import ffmpeg
import whisper
import re
def convert_to_bool(in_bool):
# Convert the input to string and lower case, then check against true values
return str(in_bool).lower() in ('true', 'on', '1')
# Replace your getenv calls with appropriate default values here
plextoken = os.getenv('PLEXTOKEN', 'token here')
plexserver = os.getenv('PLEXSERVER', 'http://192.168.1.111:32400')
jellyfintoken = os.getenv('JELLYFINTOKEN', 'token here')
jellyfinserver = os.getenv('JELLYFINSERVER', 'http://192.168.1.111:8096')
whisper_model = os.getenv('WHISPER_MODEL', 'medium')
whisper_threads = int(os.getenv('WHISPER_THREADS', 4))
concurrent_transcriptions = int(os.getenv('CONCURRENT_TRANSCRIPTIONS', 2))
transcribe_device = os.getenv('TRANSCRIBE_DEVICE', 'cpu')
procaddedmedia = convert_to_bool(os.getenv('PROCADDEDMEDIA', True))
procmediaonplay = convert_to_bool(os.getenv('PROCMEDIAONPLAY', True))
namesublang = os.getenv('NAMESUBLANG', 'aa')
skipifinternalsublang = os.getenv('SKIPIFINTERNALSUBLANG', 'eng')
webhookport = int(os.getenv('WEBHOOKPORT', 9000))
word_level_highlight = convert_to_bool(os.getenv('WORD_LEVEL_HIGHLIGHT', False))
debug = convert_to_bool(os.getenv('DEBUG', True))
use_path_mapping = convert_to_bool(os.getenv('USE_PATH_MAPPING', False))
path_mapping_from = os.getenv('PATH_MAPPING_FROM', r'/tv')
path_mapping_to = os.getenv('PATH_MAPPING_TO', r'/Volumes/TV')
model_location = os.getenv('MODEL_PATH', './models')
monitor = convert_to_bool(os.getenv('MONITOR', False))
transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '')
transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'transcribe')
force_detected_language_to = os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')
hf_transformers = convert_to_bool(os.getenv('HF_TRANSFORMERS', False))
hf_batch_size = int(os.getenv('HF_BATCH_SIZE', 24))
clear_vram_on_complete = convert_to_bool(os.getenv('CLEAR_VRAM_ON_COMPLETE', True))
compute_type = os.getenv('COMPUTE_TYPE', 'auto')
append = convert_to_bool(os.getenv('APPEND', False))
reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', False))
if transcribe_device == "gpu":
transcribe_device = "cuda"
if monitor:
from watchdog.observers.polling import PollingObserver as Observer
from watchdog.events import FileSystemEventHandler
app = FastAPI()
model = None
files_to_transcribe = []
subextension = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.srt"
subextensionSDH = f".subgen.{whisper_model.split('.')[0]}.{namesublang}.sdh.srt"
in_docker = os.path.exists('/.dockerenv')
docker_status = "Docker" if in_docker else "Standalone"
last_print_time = None
# Define a filter class
class MultiplePatternsFilter(logging.Filter):
def filter(self, record):
# Define the patterns to search for
patterns = [
"Compression ratio threshold is not met",
"Processing segment at",
"Log probability threshold is",
"Reset prompt",
"Attempting to release",
"released on ",
"Attempting to acquire",
"acquired on",
"header parsing failed",
"timescale not set",
"misdetection possible",
"srt was added",
"doesn't have any audio to transcribe",
]
# Return False if any of the patterns are found, True otherwise
return not any(pattern in record.getMessage() for pattern in patterns)
# Configure logging
if debug:
level = logging.DEBUG
logging.basicConfig(stream=sys.stderr, level=level, format="%(asctime)s %(levelname)s: %(message)s")
else:
level = logging.INFO
logging.basicConfig(stream=sys.stderr, level=level)
# Get the root logger
logger = logging.getLogger()
logger.setLevel(level) # Set the logger level
for handler in logger.handlers:
handler.addFilter(MultiplePatternsFilter())
logging.getLogger("multipart").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
#This forces a flush to print progress correctly
def progress(seek, total):
sys.stdout.flush()
sys.stderr.flush()
if(docker_status) == 'Docker':
global last_print_time
# Get the current time
current_time = time.time()
# Check if 5 seconds have passed since the last print
if last_print_time is None or (current_time - last_print_time) >= 5:
# Update the last print time
last_print_time = current_time
# Log the message
logging.info("Force Update...")
TIME_OFFSET = 5
def appendLine(result):
if append:
lastSegment = result.segments[-1].copy()
lastSegment.id += 1
lastSegment.start += TIME_OFFSET
lastSegment.end += TIME_OFFSET
date_time_str = datetime.now().strftime("%d %b %Y - %H:%M:%S")
lastSegment.text = f"Transcribed by whisperAI with faster-whisper ({whisper_model}) on {date_time_str}"
lastSegment.words = []
# lastSegment.words[0].word = lastSegment.text
# lastSegment.words = lastSegment.words[:len(lastSegment.words)-1]
result.segments.append(lastSegment)
@app.get("/plex")
@app.get("/webhook")
@app.get("/jellyfin")
@app.get("/asr")
@app.get("/emby")
@app.get("/detect-language")
@app.get("/tautulli")
@app.get("/")
def handle_get_request(request: Request):
return {"You accessed this request incorrectly via a GET request. See https://github.com/McCloudS/subgen for proper configuration"}
@app.get("/status")
def status():
in_docker = os.path.exists('/.dockerenv')
docker_status = "Docker" if in_docker else "Standalone"
return {"version" : f"Subgen {subgen_version}, stable-ts {stable_whisper.__version__}, whisper {whisper.__version__} ({docker_status})"}
@app.post("/subsync")
def subsync(
audio_file: UploadFile = File(...),
subtitle_file: UploadFile = File(...),
language: Union[str, None] = Query(default=None),
):
try:
logging.info(f"Syncing subtitle file from Subsync webhook")
result = None
srt_content = subtitle_file.file.read().decode('utf-8')
srt_content = re.sub(r'\{.*?\}', '', srt_content)
# Remove numeric counters for each entry
srt_content = re.sub(r'^\d+$', '', srt_content, flags=re.MULTILINE)
# Remove timestamps and formatting
srt_content = re.sub(r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', '', srt_content)
# Remove any remaining newlines and spaces
srt_content = re.sub(r'\n\n+', '\n', srt_content).strip()
start_time = time.time()
start_model()
result = model.align(audio_file.file.read(), srt_content, language=language)
appendLine(result)
elapsed_time = time.time() - start_time
minutes, seconds = divmod(int(elapsed_time), 60)
logging.info(f"Subsync is completed, it took {minutes} minutes and {seconds} seconds to complete.")
except Exception as e:
logging.info(f"Error processing or aligning {audio_file.filename} or {subtitle_file.filename}: {e}")
finally:
delete_model()
if result:
return StreamingResponse(
iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)),
media_type="text/plain",
headers={
'Source': 'Aligned using stable-ts from Subgen!',
})
else:
return
@app.post("/tautulli")
def receive_tautulli_webhook(
source: Union[str, None] = Header(None),
event: str = Body(None),
file: str = Body(None),
):
if source == "Tautulli":
logging.debug(f"Tautulli event detected is: {event}")
if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)):
fullpath = file
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
else:
return {"This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"}
return ""
@app.post("/plex")
def receive_plex_webhook(
user_agent: Union[str, None] = Header(None),
payload: Union[str, None] = Form(),
):
plex_json = json.loads(payload)
logging.debug(f"Raw response: {payload}")
if "PlexMediaServer" in user_agent:
event = plex_json["event"]
logging.debug(f"Plex event detected is: {event}")
if((event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay)):
fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
try:
refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken)
logging.info(f"Metadata for item {plex_json['Metadata']['ratingKey']} refreshed successfully.")
except Exception as e:
logging.error(f"Failed to refresh metadata for item {plex_json['Metadata']['ratingKey']}: {e}")
else:
return {"This doesn't appear to be a properly configured Plex webhook, please review the instructions again!"}
return ""
@app.post("/jellyfin")
def receive_jellyfin_webhook(
user_agent: Union[str, None] = Header(None),
NotificationType: str = Body(None),
file: str = Body(None),
ItemId: str = Body(None),
):
if "Jellyfin-Server" in user_agent:
logging.debug("Jellyfin event detected is: " + NotificationType)
logging.debug("itemid is: " + ItemId)
if((NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay)):
fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken)
logging.debug(f"Path of file: {fullpath}")
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
try:
refresh_jellyfin_metadata(ItemId, jellyfinserver, jellyfintoken)
logging.info(f"Metadata for item {ItemId} refreshed successfully.")
except Exception as e:
logging.error(f"Failed to refresh metadata for item {ItemId}: {e}")
else:
return {"This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"}
return ""
@app.post("/emby")
def receive_emby_webhook(
user_agent: Union[str, None] = Header(None),
data: Union[str, None] = Form(None),
):
logging.debug("Raw response: %s", data)
if "Emby Server" in user_agent:
if data:
data_dict = json.loads(data)
fullpath = data_dict['Item']['Path']
event = data_dict['Event']
logging.debug("Emby event detected is: " + event)
if((event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay)):
logging.debug("Path of file: " + fullpath)
gen_subtitles(path_mapping(fullpath), transcribe_or_translate, True)
else:
return {"This doesn't appear to be a properly configured Emby webhook, please review the instructions again!"}
return ""
@app.post("/batch")
def batch(
directory: Union[str, None] = Query(default=None),
forceLanguage: Union[str, None] = Query(default=None)
):
transcribe_existing(directory, forceLanguage)
# idea and some code for asr and detect language from https://github.com/ahmetoner/whisper-asr-webservice
@app.post("//asr")
@app.post("/asr")
def asr(
task: Union[str, None] = Query(default="transcribe", enum=["transcribe", "translate"]),
language: Union[str, None] = Query(default=None),
initial_prompt: Union[str, None] = Query(default=None), #not used by Bazarr
audio_file: UploadFile = File(...),
encode: bool = Query(default=True, description="Encode audio first through ffmpeg"), #not used by Bazarr/always False
output: Union[str, None] = Query(default="srt", enum=["txt", "vtt", "srt", "tsv", "json"]),
word_timestamps: bool = Query(default=False, description="Word level timestamps") #not used by Bazarr
):
try:
logging.info(f"Transcribing file from Bazarr/ASR webhook")
result = None
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
start_time = time.time()
start_model()
files_to_transcribe.insert(0, f"Bazarr-asr-{random_name}")
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
if(hf_transformers):
result = model.transcribe(audio_data, task=task, input_sr=16000, language=language, batch_size=hf_batch_size, progress_callback=progress)
else:
result = model.transcribe_stable(audio_data, task=task, input_sr=16000, language=language, progress_callback=progress)
appendLine(result)
elapsed_time = time.time() - start_time
minutes, seconds = divmod(int(elapsed_time), 60)
logging.info(f"Bazarr transcription is completed, it took {minutes} minutes and {seconds} seconds to complete.")
except Exception as e:
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
finally:
if f"Bazarr-asr-{random_name}" in files_to_transcribe:
files_to_transcribe.remove(f"Bazarr-asr-{random_name}")
delete_model()
if result:
return StreamingResponse(
iter(result.to_srt_vtt(filepath = None, word_level=word_level_highlight)),
media_type="text/plain",
headers={
'Source': 'Transcribed using stable-ts from Subgen!',
})
else:
return
@app.post("//detect-language")
@app.post("/detect-language")
def detect_language(
audio_file: UploadFile = File(...),
#encode: bool = Query(default=True, description="Encode audio first through ffmpeg") # This is always false from Bazarr
):
detected_lang_code = "" # Initialize with an empty string
try:
start_model()
random_name = random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)
files_to_transcribe.insert(0, f"Bazarr-detect-language-{random_name}")
audio_data = np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0
if(hf_transformers):
detected_lang_code = model.transcribe(whisper.pad_or_trim(audio_data), input_sr=16000, batch_size=hf_batch_size).language
else:
detected_lang_code = model.transcribe_stable(whisper.pad_or_trim(audio_data), input_sr=16000).language
except Exception as e:
logging.info(f"Error processing or transcribing Bazarr {audio_file.filename}: {e}")
finally:
if f"Bazarr-detect-language-{random_name}" in files_to_transcribe:
files_to_transcribe.remove(f"Bazarr-detect-language-{random_name}")
delete_model()
return {"detected_language": whisper_languages.get(detected_lang_code, detected_lang_code) , "language_code": detected_lang_code}
def start_model():
global model
if model is None:
logging.debug("Model was purged, need to re-create")
if(hf_transformers):
logging.debug("Using Hugging Face Transformers, whisper_threads, concurrent_transcriptions, and model_location variables are ignored!")
model = stable_whisper.load_hf_whisper(whisper_model, device=transcribe_device)
else:
model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions, compute_type=compute_type)
def delete_model():
if clear_vram_on_complete and len(files_to_transcribe) == 0:
global model
logging.debug("Queue is empty, clearing/releasing VRAM")
model = None
gc.collect()
def gen_subtitles(file_path: str, transcribe_or_translate: str, front=True, forceLanguage=None) -> None:
"""Generates subtitles for a video file.
Args:
file_path: str - The path to the video file.
transcribe_or_translate: str - The type of transcription or translation to perform.
front: bool - Whether to add the file to the front of the transcription queue. Default is True.
forceLanguage: str - The language to force for transcription or translation. Default is None.
"""
try:
if not has_audio(file_path):
logging.debug(f"{file_path} doesn't have any audio to transcribe!")
return None
if file_path not in files_to_transcribe:
message = None
if has_subtitle_language(file_path, skipifinternalsublang):
message = f"{file_path} already has an internal subtitle we want, skipping generation"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextension):
message = f"{file_path} already has a subtitle created for this, skipping it"
elif os.path.exists(file_path.rsplit('.', 1)[0] + subextensionSDH):
message = f"{file_path} already has a SDH subtitle created for this, skipping it"
if message != None:
logging.info(message)
return message
if front:
files_to_transcribe.insert(0, file_path)
else:
files_to_transcribe.append(file_path)
logging.info(f"Added {os.path.basename(file_path)} for transcription.")
# Start transcription for the file in a separate thread
logging.info(f"{len(files_to_transcribe)} files in the queue for transcription")
logging.info(f"Transcribing file: {os.path.basename(file_path)}")
start_time = time.time()
start_model()
global force_detected_language_to
if force_detected_language_to:
forceLanguage = force_detected_language_to
logging.info(f"Forcing language to {forceLanguage}")
if(hf_transformers):
result = model.transcribe(file_path, language=forceLanguage, batch_size=hf_batch_size, task=transcribe_or_translate, progress_callback=progress)
else:
result = model.transcribe_stable(file_path, language=forceLanguage, task=transcribe_or_translate, progress_callback=progress)
appendLine(result)
result.to_srt_vtt(get_file_name_without_extension(file_path) + subextension, word_level=word_level_highlight)
elapsed_time = time.time() - start_time
minutes, seconds = divmod(int(elapsed_time), 60)
logging.info(f"Transcription of {os.path.basename(file_path)} is completed, it took {minutes} minutes and {seconds} seconds to complete.")
else:
logging.info(f"File {os.path.basename(file_path)} is already in the transcription list. Skipping.")
except Exception as e:
logging.info(f"Error processing or transcribing {file_path}: {e}")
finally:
if file_path in files_to_transcribe:
files_to_transcribe.remove(file_path)
delete_model()
def get_file_name_without_extension(file_path):
file_name, file_extension = os.path.splitext(file_path)
return file_name
def has_subtitle_language(video_file, target_language):
try:
with av.open(video_file) as container:
subtitle_stream = next((stream for stream in container.streams if stream.type == 'subtitle' and 'language' in stream.metadata and stream.metadata['language'] == target_language), None)
if subtitle_stream:
logging.debug(f"Subtitles in '{target_language}' language found in the video.")
return True
else:
logging.debug(f"No subtitles in '{target_language}' language found in the video.")
except Exception as e:
logging.info(f"An error occurred: {e}")
return False
def get_plex_file_name(itemid: str, server_ip: str, plex_token: str) -> str:
"""Gets the full path to a file from the Plex server.
Args:
itemid: The ID of the item in the Plex library.
server_ip: The IP address of the Plex server.
plex_token: The Plex token.
Returns:
The full path to the file.
"""
url = f"{server_ip}/library/metadata/{itemid}"
headers = {
"X-Plex-Token": plex_token,
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
root = ET.fromstring(response.content)
fullpath = root.find(".//Part").attrib['file']
return fullpath
else:
raise Exception(f"Error: {response.status_code}")
def refresh_plex_metadata(itemid: str, server_ip: str, plex_token: str) -> None:
"""
Refreshes the metadata of a Plex library item.
Args:
itemid: The ID of the item in the Plex library whose metadata needs to be refreshed.
server_ip: The IP address of the Plex server.
plex_token: The Plex token used for authentication.
Raises:
Exception: If the server does not respond with a successful status code.
"""
# Plex API endpoint to refresh metadata for a specific item
url = f"{server_ip}/library/metadata/{itemid}/refresh"
# Headers to include the Plex token for authentication
headers = {
"X-Plex-Token": plex_token,
}
# Sending the PUT request to refresh metadata
response = requests.put(url, headers=headers)
# Check if the request was successful
if response.status_code == 200:
logging.info("Metadata refresh initiated successfully.")
else:
raise Exception(f"Error refreshing metadata: {response.status_code}")
def refresh_jellyfin_metadata(itemid: str, server_ip: str, jellyfin_token: str) -> None:
"""
Refreshes the metadata of a Jellyfin library item.
Args:
itemid: The ID of the item in the Jellyfin library whose metadata needs to be refreshed.
server_ip: The IP address of the Jellyfin server.
jellyfin_token: The Jellyfin token used for authentication.
Raises:
Exception: If the server does not respond with a successful status code.
"""
# Jellyfin API endpoint to refresh metadata for a specific item
url = f"{server_ip}/Items/{itemid}/Refresh"
# Headers to include the Jellyfin token for authentication
headers = {
"Authorization": f"MediaBrowser Token={jellyfin_token}",
}
# Cheap way to get the admin user id, and save it for later use.
users = json.loads(requests.get(f"{server_ip}/Users", headers=headers).content)
jellyfin_admin = get_jellyfin_admin(users)
response = requests.get(f"{server_ip}/Users/{jellyfin_admin}/Items/{itemid}/Refresh", headers=headers)
# Sending the PUT request to refresh metadata
response = requests.post(url, headers=headers)
# Check if the request was successful
if response.status_code == 204:
logging.info("Metadata refresh queued successfully.")
else:
raise Exception(f"Error refreshing metadata: {response.status_code}")
def get_jellyfin_file_name(item_id: str, jellyfin_url: str, jellyfin_token: str) -> str:
"""Gets the full path to a file from the Jellyfin server.
Args:
jellyfin_url: The URL of the Jellyfin server.
jellyfin_token: The Jellyfin token.
item_id: The ID of the item in the Jellyfin library.
Returns:
The full path to the file.
"""
headers = {
"Authorization": f"MediaBrowser Token={jellyfin_token}",
}
# Cheap way to get the admin user id, and save it for later use.
users = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content)
jellyfin_admin = get_jellyfin_admin(users)
response = requests.get(f"{jellyfin_url}/Users/{jellyfin_admin}/Items/{item_id}", headers=headers)
if response.status_code == 200:
file_name = json.loads(response.content)['Path']
return file_name
else:
raise Exception(f"Error: {response.status_code}")
def get_jellyfin_admin(users):
for user in users:
if user["Policy"]["IsAdministrator"]:
return user["Id"]
raise Exception("Unable to find administrator user in Jellyfin")
def has_audio(file_path):
try:
with av.open(file_path) as container:
return any(stream.type == 'audio' for stream in container.streams)
except (av.AVError, UnicodeDecodeError):
return False
def path_mapping(fullpath):
if use_path_mapping:
logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to))
return fullpath.replace(path_mapping_from, path_mapping_to)
return fullpath
if monitor:
# Define a handler class that will process new files
class NewFileHandler(FileSystemEventHandler):
def create_subtitle(self, event):
# Only process if it's a file
if not event.is_directory:
file_path = event.src_path
if has_audio(file_path):
# Call the gen_subtitles function
logging.info(f"File: {path_mapping(file_path)} was added")
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False)
def on_created(self, event):
self.create_subtitle(event)
def on_modified(self, event):
self.create_subtitle(event)
def transcribe_existing(transcribe_folders, forceLanguage=None):
transcribe_folders = transcribe_folders.split("|")
logging.info("Starting to search folders to see if we need to create subtitles.")
logging.debug("The folders are:")
for path in transcribe_folders:
logging.debug(path)
for root, dirs, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
gen_subtitles(path_mapping(file_path), transcribe_or_translate, False, forceLanguage)
# if the path specified was actually a single file and not a folder, process it
if os.path.isfile(path):
if has_audio(path):
gen_subtitles(path_mapping(path), transcribe_or_translate, False, forceLanguage)
# Set up the observer to watch for new files
if monitor:
observer = Observer()
for path in transcribe_folders:
if os.path.isdir(path):
handler = NewFileHandler()
observer.schedule(handler, path, recursive=True)
observer.start()
logging.info("Finished searching and queueing files for transcription. Now watching for new files.")
whisper_languages = {
"en": "english",
"zh": "chinese",
"de": "german",
"es": "spanish",
"ru": "russian",
"ko": "korean",
"fr": "french",
"ja": "japanese",
"pt": "portuguese",
"tr": "turkish",
"pl": "polish",
"ca": "catalan",
"nl": "dutch",
"ar": "arabic",
"sv": "swedish",
"it": "italian",
"id": "indonesian",
"hi": "hindi",
"fi": "finnish",
"vi": "vietnamese",
"he": "hebrew",
"uk": "ukrainian",
"el": "greek",
"ms": "malay",
"cs": "czech",
"ro": "romanian",
"da": "danish",
"hu": "hungarian",
"ta": "tamil",
"no": "norwegian",
"th": "thai",
"ur": "urdu",
"hr": "croatian",
"bg": "bulgarian",
"lt": "lithuanian",
"la": "latin",
"mi": "maori",
"ml": "malayalam",
"cy": "welsh",
"sk": "slovak",
"te": "telugu",
"fa": "persian",
"lv": "latvian",
"bn": "bengali",
"sr": "serbian",
"az": "azerbaijani",
"sl": "slovenian",
"kn": "kannada",
"et": "estonian",
"mk": "macedonian",
"br": "breton",
"eu": "basque",
"is": "icelandic",
"hy": "armenian",
"ne": "nepali",
"mn": "mongolian",
"bs": "bosnian",
"kk": "kazakh",
"sq": "albanian",
"sw": "swahili",
"gl": "galician",
"mr": "marathi",
"pa": "punjabi",
"si": "sinhala",
"km": "khmer",
"sn": "shona",
"yo": "yoruba",
"so": "somali",
"af": "afrikaans",
"oc": "occitan",
"ka": "georgian",
"be": "belarusian",
"tg": "tajik",
"sd": "sindhi",
"gu": "gujarati",
"am": "amharic",
"yi": "yiddish",
"lo": "lao",
"uz": "uzbek",
"fo": "faroese",
"ht": "haitian creole",
"ps": "pashto",
"tk": "turkmen",
"nn": "nynorsk",
"mt": "maltese",
"sa": "sanskrit",
"lb": "luxembourgish",
"my": "myanmar",
"bo": "tibetan",
"tl": "tagalog",
"mg": "malagasy",
"as": "assamese",
"tt": "tatar",
"haw": "hawaiian",
"ln": "lingala",
"ha": "hausa",
"ba": "bashkir",
"jw": "javanese",
"su": "sundanese",
}
if __name__ == "__main__":
import uvicorn
logging.info(f"Subgen v{subgen_version}")
logging.info("Starting Subgen with listening webhooks!")
logging.info(f"Transcriptions are limited to running {str(concurrent_transcriptions)} at a time")
logging.info(f"Running {str(whisper_threads)} threads per transcription")
logging.info(f"Using {transcribe_device} to encode")
if hf_transformers:
logging.info(f"Using Hugging Face Transformers")
else:
logging.info(f"Using faster-whisper")
if transcribe_folders:
transcribe_existing(transcribe_folders)
uvicorn.run("subgen:app", host="0.0.0.0", port=int(webhookport), reload=reload_script_on_change, use_colors=True)