From 9655686a50bcaf88190b02a634dd181c9b3fdab4 Mon Sep 17 00:00:00 2001 From: Dasemu Date: Fri, 16 Jan 2026 15:10:14 +0100 Subject: [PATCH] chore: cleanup legacy files and update gitignore - Remove obsolete Docker files (will be recreated later) - Remove legacy launcher.py and transcriptarr.py - Remove subgen.xml configuration - Remove test_backend.py (tests will be restructured) - Remove language_code.py from root (moved to backend/core/) - Update .gitignore for Python project structure --- .gitignore | 10 +- Dockerfile | 45 -- Dockerfile.cpu | 34 - docker-compose.yml | 40 - language_code.py | 198 ----- launcher.py | 182 ----- subgen.xml | 56 -- test_backend.py | 163 ---- transcriptarr.py | 1779 -------------------------------------------- 9 files changed, 9 insertions(+), 2498 deletions(-) delete mode 100755 Dockerfile delete mode 100644 Dockerfile.cpu delete mode 100644 docker-compose.yml delete mode 100644 language_code.py delete mode 100644 launcher.py delete mode 100644 subgen.xml delete mode 100755 test_backend.py delete mode 100644 transcriptarr.py diff --git a/.gitignore b/.gitignore index 24c4960..9bf3075 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,12 @@ #ignore our settings .env -models/ \ No newline at end of file +models/ + +transcriptarr.db + +# Python cache +__pycache__/ +**/__pycache__/ +*.pyc +*.pyo \ No newline at end of file diff --git a/Dockerfile b/Dockerfile deleted file mode 100755 index 3bef3a2..0000000 --- a/Dockerfile +++ /dev/null @@ -1,45 +0,0 @@ -# Stage 1: Builder -FROM nvidia/cuda:12.3.2-cudnn9-runtime-ubuntu22.04 AS builder - -WORKDIR /subgen - -ARG DEBIAN_FRONTEND=noninteractive - -# Install system dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - python3 \ - python3-pip \ - ffmpeg \ - git \ - tzdata \ - && rm -rf /var/lib/apt/lists/* - -# Copy requirements and install Python dependencies -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY . . - -# Stage 2: Runtime -FROM nvidia/cuda:12.3.2-cudnn9-runtime-ubuntu22.04 - -WORKDIR /subgen - -# Copy necessary files from the builder stage -COPY --from=builder /subgen/launcher.py . -COPY --from=builder /subgen/subgen.py . -COPY --from=builder /subgen/language_code.py . -COPY --from=builder /usr/local/lib/python3.10/dist-packages /usr/local/lib/python3.10/dist-packages - -# Install runtime dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - python3 \ - curl \ - && rm -rf /var/lib/apt/lists/* - -ENV PYTHONUNBUFFERED=1 - -# Set command to run the application -CMD ["python3", "launcher.py"] diff --git a/Dockerfile.cpu b/Dockerfile.cpu deleted file mode 100644 index caaa58e..0000000 --- a/Dockerfile.cpu +++ /dev/null @@ -1,34 +0,0 @@ -# === Stage 1: Build dependencies and install packages === -FROM python:3.11-slim-bullseye AS builder - -WORKDIR /subgen - -# Install required build dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - git \ - tzdata \ - && rm -rf /var/lib/apt/lists/* - -# Copy and install dependencies -COPY requirements.txt . -RUN pip install --no-cache-dir --prefix=/install torch torchaudio --extra-index-url https://download.pytorch.org/whl/cpu && pip install --no-cache-dir --prefix=/install -r requirements.txt --extra-index-url https://download.pytorch.org/whl/cpu - -# === Stage 2: Create a minimal runtime image === -FROM python:3.11-slim-bullseye AS runtime - -WORKDIR /subgen - -# Install only required runtime dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - curl \ - && rm -rf /var/lib/apt/lists/* - -# Copy only necessary files from builder stage -COPY --from=builder /install /usr/local - -# Copy source code -COPY launcher.py subgen.py language_code.py /subgen/ - -CMD ["python3", "launcher.py"] diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index c9ca94e..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,40 +0,0 @@ -#docker-compose.yml -version: '2' -services: - subgen: - container_name: subgen - tty: true - image: mccloud/subgen - environment: - - "WHISPER_MODEL=medium" - - "WHISPER_THREADS=4" - - "PROCADDEDMEDIA=True" - - "PROCMEDIAONPLAY=False" - - "NAMESUBLANG=aa" - - "SKIPIFINTERNALSUBLANG=eng" - - "PLEXTOKEN=plextoken" - - "PLEXSERVER=http://plexserver:32400" - - "JELLYFINTOKEN=token here" - - "JELLYFINSERVER=http://jellyfin:8096" - - "WEBHOOKPORT=9000" - - "CONCURRENT_TRANSCRIPTIONS=2" - - "WORD_LEVEL_HIGHLIGHT=False" - - "DEBUG=True" - - "USE_PATH_MAPPING=False" - - "PATH_MAPPING_FROM=/tv" - - "PATH_MAPPING_TO=/Volumes/TV" - - "TRANSCRIBE_DEVICE=cpu" - - "CLEAR_VRAM_ON_COMPLETE=True" - - "MODEL_PATH=./models" - - "UPDATE=False" - - "APPEND=False" - - "USE_MODEL_PROMPT=False" - - "CUSTOM_MODEL_PROMPT=" - - "LRC_FOR_AUDIO_FILES=True" - - "CUSTOM_REGROUP=cm_sl=84_sl=42++++++1" - volumes: - - "${TV}:/tv" - - "${MOVIES}:/movies" - - "${APPDATA}/subgen/models:/subgen/models" - ports: - - "9000:9000" diff --git a/language_code.py b/language_code.py deleted file mode 100644 index 93b55fb..0000000 --- a/language_code.py +++ /dev/null @@ -1,198 +0,0 @@ -from enum import Enum - -class LanguageCode(Enum): - # ISO 639-1, ISO 639-2/T, ISO 639-2/B, English Name, Native Name - AFAR = ("aa", "aar", "aar", "Afar", "Afar") - AFRIKAANS = ("af", "afr", "afr", "Afrikaans", "Afrikaans") - AMHARIC = ("am", "amh", "amh", "Amharic", "አማርኛ") - ARABIC = ("ar", "ara", "ara", "Arabic", "العربية") - ASSAMESE = ("as", "asm", "asm", "Assamese", "অসমীয়া") - AZERBAIJANI = ("az", "aze", "aze", "Azerbaijani", "Azərbaycanca") - BASHKIR = ("ba", "bak", "bak", "Bashkir", "Башҡортса") - BELARUSIAN = ("be", "bel", "bel", "Belarusian", "Беларуская") - BULGARIAN = ("bg", "bul", "bul", "Bulgarian", "Български") - BENGALI = ("bn", "ben", "ben", "Bengali", "বাংলা") - TIBETAN = ("bo", "bod", "tib", "Tibetan", "བོད་ཡིག") - BRETON = ("br", "bre", "bre", "Breton", "Brezhoneg") - BOSNIAN = ("bs", "bos", "bos", "Bosnian", "Bosanski") - CATALAN = ("ca", "cat", "cat", "Catalan", "Català") - CZECH = ("cs", "ces", "cze", "Czech", "Čeština") - WELSH = ("cy", "cym", "wel", "Welsh", "Cymraeg") - DANISH = ("da", "dan", "dan", "Danish", "Dansk") - GERMAN = ("de", "deu", "ger", "German", "Deutsch") - GREEK = ("el", "ell", "gre", "Greek", "Ελληνικά") - ENGLISH = ("en", "eng", "eng", "English", "English") - SPANISH = ("es", "spa", "spa", "Spanish", "Español") - ESTONIAN = ("et", "est", "est", "Estonian", "Eesti") - BASQUE = ("eu", "eus", "baq", "Basque", "Euskara") - PERSIAN = ("fa", "fas", "per", "Persian", "فارسی") - FINNISH = ("fi", "fin", "fin", "Finnish", "Suomi") - FAROESE = ("fo", "fao", "fao", "Faroese", "Føroyskt") - FRENCH = ("fr", "fra", "fre", "French", "Français") - GALICIAN = ("gl", "glg", "glg", "Galician", "Galego") - GUJARATI = ("gu", "guj", "guj", "Gujarati", "ગુજરાતી") - HAUSA = ("ha", "hau", "hau", "Hausa", "Hausa") - HAWAIIAN = ("haw", "haw", "haw", "Hawaiian", "ʻŌlelo Hawaiʻi") - HEBREW = ("he", "heb", "heb", "Hebrew", "עברית") - HINDI = ("hi", "hin", "hin", "Hindi", "हिन्दी") - CROATIAN = ("hr", "hrv", "hrv", "Croatian", "Hrvatski") - HAITIAN_CREOLE = ("ht", "hat", "hat", "Haitian Creole", "Kreyòl Ayisyen") - HUNGARIAN = ("hu", "hun", "hun", "Hungarian", "Magyar") - ARMENIAN = ("hy", "hye", "arm", "Armenian", "Հայերեն") - INDONESIAN = ("id", "ind", "ind", "Indonesian", "Bahasa Indonesia") - ICELANDIC = ("is", "isl", "ice", "Icelandic", "Íslenska") - ITALIAN = ("it", "ita", "ita", "Italian", "Italiano") - JAPANESE = ("ja", "jpn", "jpn", "Japanese", "日本語") - JAVANESE = ("jw", "jav", "jav", "Javanese", "ꦧꦱꦗꦮ") - GEORGIAN = ("ka", "kat", "geo", "Georgian", "ქართული") - KAZAKH = ("kk", "kaz", "kaz", "Kazakh", "Қазақша") - KHMER = ("km", "khm", "khm", "Khmer", "ភាសាខ្មែរ") - KANNADA = ("kn", "kan", "kan", "Kannada", "ಕನ್ನಡ") - KOREAN = ("ko", "kor", "kor", "Korean", "한국어") - LATIN = ("la", "lat", "lat", "Latin", "Latina") - LUXEMBOURGISH = ("lb", "ltz", "ltz", "Luxembourgish", "Lëtzebuergesch") - LINGALA = ("ln", "lin", "lin", "Lingala", "Lingála") - LAO = ("lo", "lao", "lao", "Lao", "ພາສາລາວ") - LITHUANIAN = ("lt", "lit", "lit", "Lithuanian", "Lietuvių") - LATVIAN = ("lv", "lav", "lav", "Latvian", "Latviešu") - MALAGASY = ("mg", "mlg", "mlg", "Malagasy", "Malagasy") - MAORI = ("mi", "mri", "mao", "Maori", "Te Reo Māori") - MACEDONIAN = ("mk", "mkd", "mac", "Macedonian", "Македонски") - MALAYALAM = ("ml", "mal", "mal", "Malayalam", "മലയാളം") - MONGOLIAN = ("mn", "mon", "mon", "Mongolian", "Монгол") - MARATHI = ("mr", "mar", "mar", "Marathi", "मराठी") - MALAY = ("ms", "msa", "may", "Malay", "Bahasa Melayu") - MALTESE = ("mt", "mlt", "mlt", "Maltese", "Malti") - BURMESE = ("my", "mya", "bur", "Burmese", "မြန်မာစာ") - NEPALI = ("ne", "nep", "nep", "Nepali", "नेपाली") - DUTCH = ("nl", "nld", "dut", "Dutch", "Nederlands") - NORWEGIAN_NYNORSK = ("nn", "nno", "nno", "Norwegian Nynorsk", "Nynorsk") - NORWEGIAN = ("no", "nor", "nor", "Norwegian", "Norsk") - OCCITAN = ("oc", "oci", "oci", "Occitan", "Occitan") - PUNJABI = ("pa", "pan", "pan", "Punjabi", "ਪੰਜਾਬੀ") - POLISH = ("pl", "pol", "pol", "Polish", "Polski") - PASHTO = ("ps", "pus", "pus", "Pashto", "پښتو") - PORTUGUESE = ("pt", "por", "por", "Portuguese", "Português") - ROMANIAN = ("ro", "ron", "rum", "Romanian", "Română") - RUSSIAN = ("ru", "rus", "rus", "Russian", "Русский") - SANSKRIT = ("sa", "san", "san", "Sanskrit", "संस्कृतम्") - SINDHI = ("sd", "snd", "snd", "Sindhi", "سنڌي") - SINHALA = ("si", "sin", "sin", "Sinhala", "සිංහල") - SLOVAK = ("sk", "slk", "slo", "Slovak", "Slovenčina") - SLOVENE = ("sl", "slv", "slv", "Slovene", "Slovenščina") - SHONA = ("sn", "sna", "sna", "Shona", "ChiShona") - SOMALI = ("so", "som", "som", "Somali", "Soomaaliga") - ALBANIAN = ("sq", "sqi", "alb", "Albanian", "Shqip") - SERBIAN = ("sr", "srp", "srp", "Serbian", "Српски") - SUNDANESE = ("su", "sun", "sun", "Sundanese", "Basa Sunda") - SWEDISH = ("sv", "swe", "swe", "Swedish", "Svenska") - SWAHILI = ("sw", "swa", "swa", "Swahili", "Kiswahili") - TAMIL = ("ta", "tam", "tam", "Tamil", "தமிழ்") - TELUGU = ("te", "tel", "tel", "Telugu", "తెలుగు") - TAJIK = ("tg", "tgk", "tgk", "Tajik", "Тоҷикӣ") - THAI = ("th", "tha", "tha", "Thai", "ไทย") - TURKMEN = ("tk", "tuk", "tuk", "Turkmen", "Türkmençe") - TAGALOG = ("tl", "tgl", "tgl", "Tagalog", "Tagalog") - TURKISH = ("tr", "tur", "tur", "Turkish", "Türkçe") - TATAR = ("tt", "tat", "tat", "Tatar", "Татарча") - UKRAINIAN = ("uk", "ukr", "ukr", "Ukrainian", "Українська") - URDU = ("ur", "urd", "urd", "Urdu", "اردو") - UZBEK = ("uz", "uzb", "uzb", "Uzbek", "Oʻzbek") - VIETNAMESE = ("vi", "vie", "vie", "Vietnamese", "Tiếng Việt") - YIDDISH = ("yi", "yid", "yid", "Yiddish", "ייִדיש") - YORUBA = ("yo", "yor", "yor", "Yoruba", "Yorùbá") - CHINESE = ("zh", "zho", "chi", "Chinese", "中文") - CANTONESE = ("yue", "yue", "yue", "Cantonese", "粵語") - NONE = (None, None, None, None, None) # For no language - # und for Undetermined aka unknown language https://www.loc.gov/standards/iso639-2/faq.html#25 - - def __init__(self, iso_639_1, iso_639_2_t, iso_639_2_b, name_en, name_native): - self.iso_639_1 = iso_639_1 - self.iso_639_2_t = iso_639_2_t - self.iso_639_2_b = iso_639_2_b - self.name_en = name_en - self.name_native = name_native - - @staticmethod - def from_iso_639_1(code): - for lang in LanguageCode: - if lang.iso_639_1 == code: - return lang - return LanguageCode.NONE - - @staticmethod - def from_iso_639_2(code): - for lang in LanguageCode: - if lang.iso_639_2_t == code or lang.iso_639_2_b == code: - return lang - return LanguageCode.NONE - - @staticmethod - def from_name(name : str): - """Convert a language name (either English or native) to LanguageCode enum.""" - for lang in LanguageCode: - if lang.name_en.lower() == name.lower() or lang.name_native.lower() == name.lower(): - return lang - LanguageCode.NONE - - - @staticmethod - def from_string(value: str): - """ - Convert a string to a LanguageCode instance. Matches on ISO codes, English name, or native name. - """ - if value is None: - return LanguageCode.NONE - value = value.strip().lower() - for lang in LanguageCode: - if lang is LanguageCode.NONE: - continue - elif ( - value == lang.iso_639_1 - or value == lang.iso_639_2_t - or value == lang.iso_639_2_b - or value == lang.name_en.lower() - or value == lang.name_native.lower() - ): - return lang - return LanguageCode.NONE - - # is valid language - @staticmethod - def is_valid_language(language: str): - return LanguageCode.from_string(language) is not LanguageCode.NONE - - def to_iso_639_1(self): - return self.iso_639_1 - - def to_iso_639_2_t(self): - return self.iso_639_2_t - - def to_iso_639_2_b(self): - return self.iso_639_2_b - - def to_name(self, in_english=True): - return self.name_en if in_english else self.name_native - def __str__(self): - if self.name_en is None: - return "Unknown" - return self.name_en - - def __bool__(self): - return True if self.iso_639_1 is not None else False - - def __eq__(self, other): - """ - Compare the LanguageCode instance to another object. - Explicitly handle comparison to None. - """ - if other is None: - # If compared to None, return False unless self is None - return self.iso_639_1 is None - if isinstance(other, str): # Allow comparison with a string - return self.value == LanguageCode.from_string(other) - if isinstance(other, LanguageCode): - # Normal comparison for LanguageCode instances - return self.iso_639_1 == other.iso_639_1 - # Otherwise, defer to the default equality - return NotImplemented diff --git a/launcher.py b/launcher.py deleted file mode 100644 index d07c2b1..0000000 --- a/launcher.py +++ /dev/null @@ -1,182 +0,0 @@ -import os -import sys -import urllib.request -import subprocess -import argparse - -def convert_to_bool(in_bool): - # Convert the input to string and lower case, then check against true values - return str(in_bool).lower() in ('true', 'on', '1', 'y', 'yes') - -def install_packages_from_requirements(requirements_file): - try: - subprocess.run(['pip3', 'install', '-r', requirements_file, '--upgrade'], check=True) - print("Packages installed successfully using pip3.") - except subprocess.CalledProcessError: - try: - subprocess.run(['pip', 'install', '-r', requirements_file, '--upgrade'], check=True) - print("Packages installed successfully using pip.") - except subprocess.CalledProcessError: - print("Failed to install packages using both pip3 and pip.") - -def download_from_github(url, output_file): - try: - with urllib.request.urlopen(url) as response, open(output_file, 'wb') as out_file: - data = response.read() - out_file.write(data) - print(f"File downloaded successfully to {output_file}") - except urllib.error.HTTPError as e: - print(f"Failed to download file from {url}. HTTP Error Code: {e.code}") - except urllib.error.URLError as e: - print(f"URL Error: {e.reason}") - except Exception as e: - print(f"An error occurred: {e}") - -def prompt_and_save_bazarr_env_variables(): - instructions = ( - "You will be prompted for several configuration values.\n" - "If you wish to use the default value for any of them, simply press Enter without typing anything.\n" - "The default values are shown in brackets [] next to the prompts.\n" - "Items can be the value of true, on, 1, y, yes, false, off, 0, n, no, or an appropriate text response.\n" - ) - print(instructions) - env_vars = { - 'WHISPER_MODEL': ('Whisper Model', 'Enter the Whisper model you want to run: tiny, tiny.en, base, base.en, small, small.en, medium, medium.en, large, distil-large-v2, distil-medium.en, distil-small.en', 'medium'), - 'WEBHOOKPORT': ('Webhook Port', 'Default listening port for transcriptarr.py', '9000'), - 'TRANSCRIBE_DEVICE': ('Transcribe Device', 'Set as cpu or gpu', 'gpu'), - # Defaulting to False here for the prompt, user can change - 'DEBUG': ('Debug', 'Enable debug logging (true/false)', 'False'), - 'CLEAR_VRAM_ON_COMPLETE': ('Clear VRAM', 'Attempt to clear VRAM when complete (Windows users may need to set this to False)', 'False'), - 'APPEND': ('Append', 'Append \'Transcribed by whisper\' to generated subtitle (true/false)', 'False'), - } - - user_input = {} - with open('.env', 'w') as file: - for var, (description, prompt, default) in env_vars.items(): - value = input(f"{prompt} [{default}]: ") or default - file.write(f"{var}={value}\n") - print("Environment variables have been saved to .env") - -def load_env_variables(env_filename='.env'): - try: - with open(env_filename, 'r') as file: - for line in file: - line = line.strip() - if line and not line.startswith('#') and '=' in line: - var, value = line.split('=', 1) - # Only set if not already set by a higher priority mechanism (like external env var) - # For this simple loader, we'll let it overwrite, - # and CLI args will overwrite these later if specified. - os.environ[var] = value - print(f"Environment variables have been loaded from {env_filename}") - except FileNotFoundError: - print(f"{env_filename} file not found. Consider running with --setup-bazarr or creating it manually.") - -def main(): - if 'python3' in sys.executable: - python_cmd = 'python3' - elif 'python' in sys.executable: - python_cmd = 'python' - else: - print("Script started with an unknown command") - sys.exit(1) - if sys.version_info[0] < 3: - print(f"This script requires Python 3 or higher, you are running {sys.version}") - sys.exit(1) - - os.chdir(os.path.dirname(os.path.abspath(__file__))) - - parser = argparse.ArgumentParser(prog="python launcher.py", formatter_class=argparse.ArgumentDefaultsHelpFormatter) - # Changed: action='store_true' means it's False by default, True if flag is present - parser.add_argument('-d', '--debug', action='store_true', help="Enable console debugging (overrides .env and external ENV)") - parser.add_argument('-i', '--install', action='store_true', help="Install/update all necessary packages") - # Changed: action='store_true' - parser.add_argument('-a', '--append', action='store_true', help="Append 'Transcribed by whisper' (overrides .env and external ENV)") - parser.add_argument('-u', '--update', action='store_true', help="Update Subgen") - parser.add_argument('-x', '--exit-early', action='store_true', help="Exit without running transcriptarr.py") - parser.add_argument('-s', '--setup-bazarr', action='store_true', help="Prompt for common Bazarr setup parameters and save them for future runs") - parser.add_argument('-b', '--branch', type=str, default='main', help='Specify the branch to download from') - parser.add_argument('-l', '--launcher-update', action='store_true', help="Update launcher.py and re-launch") - - args = parser.parse_args() - - branch_name = args.branch if args.branch != 'main' else os.getenv('BRANCH', 'main') - script_name_suffix = f"-{branch_name}.py" if branch_name != "main" else ".py" - subgen_script_to_run = f"subgen{script_name_suffix}" - language_code_script_to_download = f"language_code{script_name_suffix}" - - - if args.launcher_update or convert_to_bool(os.getenv('LAUNCHER_UPDATE')): - print(f"Updating launcher.py from GitHub branch {branch_name}...") - download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/launcher.py", f'launcher{script_name_suffix}') - excluded_args = ['--launcher-update', '-l'] - new_args = [arg for arg in sys.argv[1:] if arg not in excluded_args] - print(f"Relaunching updated launcher: launcher{script_name_suffix}") - os.execl(sys.executable, sys.executable, f"launcher{script_name_suffix}", *new_args) - # The script will not continue past os.execl - - # --- Environment Variable Handling --- - # 1. Load from .env file first. This sets a baseline. - # External environment variables (set before launcher.py) will already be in os.environ - # and won't be overwritten by load_env_variables IF load_env_variables checked for existence. - # For simplicity, this version of load_env_variables *will* overwrite. - # If you need to preserve external env vars over .env, load_env_variables needs adjustment. - if args.setup_bazarr: - prompt_and_save_bazarr_env_variables() - # After saving, load them immediately for this run - load_env_variables() - else: - # Load if not setting up, assuming .env might exist - load_env_variables() - - - # 2. Override with command-line arguments (highest priority for these specific flags) - if args.debug: # If -d or --debug was passed - os.environ['DEBUG'] = 'True' - print("Launcher CLI: DEBUG set to True") - elif 'DEBUG' not in os.environ: # If not set by CLI and not by .env or external - os.environ['DEBUG'] = 'False' # Default to False if nothing else specified it - print("Launcher: DEBUG defaulted to False (no prior setting)") - - - if args.append: # If -a or --append was passed - os.environ['APPEND'] = 'True' - print("Launcher CLI: APPEND set to True") - elif 'APPEND' not in os.environ: # If not set by CLI and not by .env or external - os.environ['APPEND'] = 'False' # Default to False if nothing else specified it - #print("Launcher: APPEND defaulted to False (no prior setting)") - # --- End Environment Variable Handling --- - - - requirements_url = "https://raw.githubusercontent.com/McCloudS/subgen/main/requirements.txt" - requirements_file = "requirements.txt" - - if args.install: - download_from_github(requirements_url, requirements_file) - install_packages_from_requirements(requirements_file) - - if not os.path.exists(subgen_script_to_run) or args.update or convert_to_bool(os.getenv('UPDATE')): - print(f"Downloading {subgen_script_to_run} from GitHub branch {branch_name}...") - download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/transcriptarr.py", subgen_script_to_run) - print(f"Downloading {language_code_script_to_download} from GitHub branch {branch_name}...") - download_from_github(f"https://raw.githubusercontent.com/McCloudS/subgen/{branch_name}/language_code.py", language_code_script_to_download) - - else: - print(f"{subgen_script_to_run} exists and UPDATE is set to False, skipping download.") - - if not args.exit_early: - #print(f"DEBUG environment variable for transcriptarr.py: {os.getenv('DEBUG')}") - #print(f"APPEND environment variable for transcriptarr.py: {os.getenv('APPEND')}") - print(f'Launching {subgen_script_to_run}') - try: - subprocess.run([python_cmd, '-u', subgen_script_to_run], check=True) - except FileNotFoundError: - print(f"Error: Could not find {subgen_script_to_run}. Make sure it was downloaded correctly.") - except subprocess.CalledProcessError as e: - print(f"Error running {subgen_script_to_run}: {e}") - - else: - print("Not running transcriptarr.py: -x or --exit-early set") - -if __name__ == "__main__": - main() diff --git a/subgen.xml b/subgen.xml deleted file mode 100644 index bc0e586..0000000 --- a/subgen.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - subgen - --gpus all - false - CATEGORY: - mccloud/subgen - https://github.com/McCloudS/subgen - If you appreciate my work, then please consider donating - https://www.paypal.com/donate/?hosted_button_id=SU4QQP6LH5PF6 - https://www.paypal.com/en_US/i/btn/btn_donate_SM.gif - bridge - false - https://github.com/McCloudS/subgen/issues - bash - https://github.com/McCloudS/subgen - https://github.com/McCloudS/subgen/blob/main/README.md - https://github.com/McCloudS/subgen - subgen will transcribe your personal media on a Plex, Emby, or Jellyfin server to create subtitles (.srt) from audio/video files, it can also be used as a Whisper Provider in Bazarr - http://[IP]:[PORT:9000]/docs - https://github.com/McCloudS/subgen/blob/main/subgen.xml - https://raw.githubusercontent.com/McCloudS/subgen/main/icon.png - 2024-03-23 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/test_backend.py b/test_backend.py deleted file mode 100755 index 2e87914..0000000 --- a/test_backend.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python3 -"""Test script for TranscriptorIO backend components.""" -import sys -import logging - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def test_config(): - """Test configuration loading.""" - logger.info("Testing configuration...") - try: - from backend.config import settings - logger.info(f"✓ Config loaded successfully") - logger.info(f" - Mode: {settings.transcriptarr_mode}") - logger.info(f" - Database: {settings.database_type.value}") - logger.info(f" - Whisper Model: {settings.whisper_model}") - logger.info(f" - Device: {settings.transcribe_device}") - return True - except Exception as e: - logger.error(f"✗ Config test failed: {e}") - return False - - -def test_database(): - """Test database connection and table creation.""" - logger.info("\nTesting database...") - try: - from backend.core.database import database - from backend.core.models import Base - - # Clean database for fresh test - try: - database.drop_tables() - logger.info(f" - Dropped existing tables for clean test") - except: - pass - - database.create_tables() - logger.info(f"✓ Database initialized with fresh tables") - - # Test connection with health check - if database.health_check(): - logger.info(f"✓ Database connection OK") - else: - logger.error("✗ Database health check failed (but tables were created)") - # Don't fail the test if health check fails but tables exist - return True - - # Get stats - stats = database.get_stats() - logger.info(f" - Type: {stats['type']}") - logger.info(f" - URL: {stats['url']}") - - return True - except Exception as e: - logger.error(f"✗ Database test failed: {e}") - import traceback - traceback.print_exc() - return False - - -def test_queue_manager(): - """Test queue manager operations.""" - logger.info("\nTesting queue manager...") - try: - from backend.core.queue_manager import queue_manager - from backend.core.models import QualityPreset - - # Add a test job - job = queue_manager.add_job( - file_path="/test/anime.mkv", - file_name="anime.mkv", - source_lang="ja", - target_lang="es", - quality_preset=QualityPreset.FAST, - priority=5 - ) - - if job: - logger.info(f"✓ Job created: {job.id}") - logger.info(f" - File: {job.file_name}") - logger.info(f" - Status: {job.status.value}") - logger.info(f" - Priority: {job.priority}") - else: - logger.error("✗ Failed to create job") - return False - - # Get queue stats - stats = queue_manager.get_queue_stats() - logger.info(f"✓ Queue stats:") - logger.info(f" - Total: {stats['total']}") - logger.info(f" - Queued: {stats['queued']}") - logger.info(f" - Processing: {stats['processing']}") - logger.info(f" - Completed: {stats['completed']}") - - # Try to add duplicate - duplicate = queue_manager.add_job( - file_path="/test/anime.mkv", - file_name="anime.mkv", - source_lang="ja", - target_lang="es", - quality_preset=QualityPreset.FAST - ) - - if duplicate is None: - logger.info(f"✓ Duplicate detection working") - else: - logger.warning(f"⚠ Duplicate job was created (should have been rejected)") - - # Get next job - next_job = queue_manager.get_next_job("test-worker-1") - if next_job: - logger.info(f"✓ Got next job: {next_job.id} (assigned to test-worker-1)") - logger.info(f" - Status: {next_job.status.value}") - else: - logger.error("✗ Failed to get next job") - return False - - return True - except Exception as e: - logger.error(f"✗ Queue manager test failed: {e}") - import traceback - traceback.print_exc() - return False - - -def main(): - """Run all tests.""" - logger.info("=" * 60) - logger.info("TranscriptorIO Backend Test Suite") - logger.info("=" * 60) - - results = { - "Config": test_config(), - "Database": test_database(), - "Queue Manager": test_queue_manager(), - } - - logger.info("\n" + "=" * 60) - logger.info("Test Results:") - logger.info("=" * 60) - - all_passed = True - for test_name, passed in results.items(): - status = "✓ PASSED" if passed else "✗ FAILED" - logger.info(f"{test_name}: {status}") - if not passed: - all_passed = False - - logger.info("=" * 60) - - if all_passed: - logger.info("🎉 All tests passed!") - return 0 - else: - logger.error("❌ Some tests failed") - return 1 - - -if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file diff --git a/transcriptarr.py b/transcriptarr.py deleted file mode 100644 index d28ed56..0000000 --- a/transcriptarr.py +++ /dev/null @@ -1,1779 +0,0 @@ -subgen_version = '2026.01.5' - -""" -ENVIRONMENT VARIABLES DOCUMENTATION - -This application supports both new standardized environment variable names and legacy names -for backwards compatibility. The new names follow a consistent naming convention: - -STANDARDIZED NAMING CONVENTION: -- Use UPPERCASE with underscores for separation -- Group related variables with consistent prefixes: - * PLEX_* for Plex server integration - * JELLYFIN_* for Jellyfin server integration - * PROCESS_* for media processing triggers - * SKIP_* for all skip conditions - * SUBTITLE_* for subtitle-related settings - * WHISPER_* for Whisper model settings - * TRANSCRIBE_* for transcription settings - -BACKWARDS COMPATIBILITY: -Legacy environment variable names are still supported. If both new and old names are set, -the new standardized name takes precedence. - -NEW NAME → OLD NAME (for backwards compatibility): -- PLEX_TOKEN → PLEXTOKEN -- PLEX_SERVER → PLEXSERVER -- JELLYFIN_TOKEN → JELLYFINTOKEN -- JELLYFIN_SERVER → JELLYFINSERVER -- PROCESS_ADDED_MEDIA → PROCADDEDMEDIA -- PROCESS_MEDIA_ON_PLAY → PROCMEDIAONPLAY -- SUBTITLE_LANGUAGE_NAME → NAMESUBLANG -- WEBHOOK_PORT → WEBHOOKPORT -- SKIP_IF_EXTERNAL_SUBTITLES_EXIST → SKIPIFEXTERNALSUB -- SKIP_IF_TARGET_SUBTITLES_EXIST → SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST -- SKIP_IF_INTERNAL_SUBTITLES_LANGUAGE → SKIPIFINTERNALSUBLANG -- SKIP_SUBTITLE_LANGUAGES → SKIP_LANG_CODES -- SKIP_IF_AUDIO_LANGUAGES → SKIP_IF_AUDIO_TRACK_IS -- SKIP_ONLY_SUBGEN_SUBTITLES → ONLY_SKIP_IF_SUBGEN_SUBTITLE -- SKIP_IF_NO_LANGUAGE_BUT_SUBTITLES_EXIST → SKIP_IF_LANGUAGE_IS_NOT_SET_BUT_SUBTITLES_EXIST - -MIGRATION GUIDE: -Users can gradually migrate to the new names. Both will work simultaneously during the -transition period. The old names may be deprecated in future versions. -""" - -from language_code import LanguageCode -from datetime import datetime -from threading import Lock -import os -import json -import xml.etree.ElementTree as ET -import threading -import sys -import time -import queue -import logging -import gc -import random -from typing import Union, Any, Optional -from fastapi import FastAPI, File, UploadFile, Query, Header, Body, Form, Request -from fastapi.responses import StreamingResponse -import numpy as np -import stable_whisper -from stable_whisper import Segment -import requests -import av -import ffmpeg -import whisper -import ast -from watchdog.observers.polling import PollingObserver as Observer -from watchdog.events import FileSystemEventHandler -import faster_whisper -from io import BytesIO -import io -import asyncio -import torch -import ctypes, ctypes.util -from typing import List -from enum import Enum - -def convert_to_bool(in_bool): - # Convert the input to string and lower case, then check against true values - return str(in_bool).lower() in ('true', 'on', '1', 'y', 'yes') - -def get_env_with_fallback(new_name: str, old_name: str, default_value=None, convert_func=None): - """ - Get environment variable with backwards compatibility fallback. - - Args: - new_name: The new standardized environment variable name - old_name: The legacy environment variable name for backwards compatibility - default_value: Default value if neither variable is set - convert_func: Optional function to convert the value (e.g., convert_to_bool, int) - - Returns: - The environment variable value, converted if convert_func is provided - """ - # Try new name first, then fall back to old name - value = os.getenv(new_name) or os.getenv(old_name) - - if value is None: - value = default_value - - # Apply conversion function if provided - if convert_func and value is not None: - return convert_func(value) - - return value - -# Server Integration - with backwards compatibility -plextoken = get_env_with_fallback('PLEX_TOKEN', 'PLEXTOKEN', 'token here') -plexserver = get_env_with_fallback('PLEX_SERVER', 'PLEXSERVER', 'http://192.168.1.111:32400') -jellyfintoken = get_env_with_fallback('JELLYFIN_TOKEN', 'JELLYFINTOKEN', 'token here') -jellyfinserver = get_env_with_fallback('JELLYFIN_SERVER', 'JELLYFINSERVER', 'http://192.168.1.111:8096') - -# Whisper Configuration -whisper_model = os.getenv('WHISPER_MODEL', 'medium') -whisper_threads = int(os.getenv('WHISPER_THREADS', 4)) -concurrent_transcriptions = int(os.getenv('CONCURRENT_TRANSCRIPTIONS', 2)) -transcribe_device = os.getenv('TRANSCRIBE_DEVICE', 'cpu') - -# Processing Control - with backwards compatibility -procaddedmedia = get_env_with_fallback('PROCESS_ADDED_MEDIA', 'PROCADDEDMEDIA', True, convert_to_bool) -procmediaonplay = get_env_with_fallback('PROCESS_MEDIA_ON_PLAY', 'PROCMEDIAONPLAY', True, convert_to_bool) - -# Subtitle Configuration - with backwards compatibility -namesublang = get_env_with_fallback('SUBTITLE_LANGUAGE_NAME', 'NAMESUBLANG', '') - -# System Configuration - with backwards compatibility -webhookport = get_env_with_fallback('WEBHOOK_PORT', 'WEBHOOKPORT', 9000, int) -word_level_highlight = convert_to_bool(os.getenv('WORD_LEVEL_HIGHLIGHT', False)) -debug = convert_to_bool(os.getenv('DEBUG', True)) -use_path_mapping = convert_to_bool(os.getenv('USE_PATH_MAPPING', False)) -path_mapping_from = os.getenv('PATH_MAPPING_FROM', r'/tv') -path_mapping_to = os.getenv('PATH_MAPPING_TO', r'/Volumes/TV') -model_location = os.getenv('MODEL_PATH', './models') -monitor = convert_to_bool(os.getenv('MONITOR', False)) -transcribe_folders = os.getenv('TRANSCRIBE_FOLDERS', '') -transcribe_or_translate = os.getenv('TRANSCRIBE_OR_TRANSLATE', 'transcribe').lower() -clear_vram_on_complete = convert_to_bool(os.getenv('CLEAR_VRAM_ON_COMPLETE', True)) -compute_type = os.getenv('COMPUTE_TYPE', 'auto') -append = convert_to_bool(os.getenv('APPEND', False)) -reload_script_on_change = convert_to_bool(os.getenv('RELOAD_SCRIPT_ON_CHANGE', False)) -lrc_for_audio_files = convert_to_bool(os.getenv('LRC_FOR_AUDIO_FILES', True)) -custom_regroup = os.getenv('CUSTOM_REGROUP', 'cm_sl=84_sl=42++++++1') -detect_language_length = int(os.getenv('DETECT_LANGUAGE_LENGTH', 30)) -detect_language_offset = int(os.getenv('DETECT_LANGUAGE_OFFSET', 0)) - -# Skip Configuration - with backwards compatibility -skipifexternalsub = get_env_with_fallback('SKIP_IF_EXTERNAL_SUBTITLES_EXIST', 'SKIPIFEXTERNALSUB', False, convert_to_bool) -skip_if_to_transcribe_sub_already_exist = get_env_with_fallback('SKIP_IF_TARGET_SUBTITLES_EXIST', 'SKIP_IF_TO_TRANSCRIBE_SUB_ALREADY_EXIST', True, convert_to_bool) -skipifinternalsublang = LanguageCode.from_string(get_env_with_fallback('SKIP_IF_INTERNAL_SUBTITLES_LANGUAGE', 'SKIPIFINTERNALSUBLANG', '')) -plex_queue_next_episode = convert_to_bool(os.getenv('PLEX_QUEUE_NEXT_EPISODE', False)) -plex_queue_season = convert_to_bool(os.getenv('PLEX_QUEUE_SEASON', False)) -plex_queue_series = convert_to_bool(os.getenv('PLEX_QUEUE_SERIES', False)) -# Language and Skip Configuration - with backwards compatibility -skip_lang_codes_list = ( - [LanguageCode.from_string(code) for code in get_env_with_fallback('SKIP_SUBTITLE_LANGUAGES', 'SKIP_LANG_CODES', '').split("|")] - if get_env_with_fallback('SKIP_SUBTITLE_LANGUAGES', 'SKIP_LANG_CODES') - else [] -) -force_detected_language_to = LanguageCode.from_string(os.getenv('FORCE_DETECTED_LANGUAGE_TO', '')) -preferred_audio_languages = ( - [LanguageCode.from_string(code) for code in os.getenv('PREFERRED_AUDIO_LANGUAGES', 'eng').split("|")] - if os.getenv('PREFERRED_AUDIO_LANGUAGES') - else [] -) # in order of preference -limit_to_preferred_audio_languages = convert_to_bool(os.getenv('LIMIT_TO_PREFERRED_AUDIO_LANGUAGE', False)) #TODO: add support for this -skip_if_audio_track_is_in_list = ( - [LanguageCode.from_string(code) for code in get_env_with_fallback('SKIP_IF_AUDIO_LANGUAGES', 'SKIP_IF_AUDIO_TRACK_IS', '').split("|")] - if get_env_with_fallback('SKIP_IF_AUDIO_LANGUAGES', 'SKIP_IF_AUDIO_TRACK_IS') - else [] -) - -# Additional Subtitle Configuration - with backwards compatibility -subtitle_language_naming_type = os.getenv('SUBTITLE_LANGUAGE_NAMING_TYPE', 'ISO_639_2_B') -only_skip_if_subgen_subtitle = get_env_with_fallback('SKIP_ONLY_SUBGEN_SUBTITLES', 'ONLY_SKIP_IF_SUBGEN_SUBTITLE', False, convert_to_bool) -skip_unknown_language = convert_to_bool(os.getenv('SKIP_UNKNOWN_LANGUAGE', False)) -skip_if_language_is_not_set_but_subtitles_exist = get_env_with_fallback('SKIP_IF_NO_LANGUAGE_BUT_SUBTITLES_EXIST', 'SKIP_IF_LANGUAGE_IS_NOT_SET_BUT_SUBTITLES_EXIST', False, convert_to_bool) -should_whiser_detect_audio_language = convert_to_bool(os.getenv('SHOULD_WHISPER_DETECT_AUDIO_LANGUAGE', False)) -show_in_subname_subgen = convert_to_bool(os.getenv('SHOW_IN_SUBNAME_SUBGEN', True)) -show_in_subname_model = convert_to_bool(os.getenv('SHOW_IN_SUBNAME_MODEL', True)) - -# Advanced Configuration -try: - kwargs = ast.literal_eval(os.getenv('SUBGEN_KWARGS', '{}') or '{}') -except ValueError: - kwargs = {} - logging.info("kwargs (SUBGEN_KWARGS) is an invalid dictionary, defaulting to empty '{}'") - -if transcribe_device == "gpu": - transcribe_device = "cuda" - - -VIDEO_EXTENSIONS = ( - ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".mpg", ".mpeg", - ".3gp", ".ogv", ".vob", ".rm", ".rmvb", ".ts", ".m4v", ".f4v", ".svq3", - ".asf", ".m2ts", ".divx", ".xvid" -) - -AUDIO_EXTENSIONS = ( - ".mp3", ".wav", ".aac", ".flac", ".ogg", ".wma", ".alac", ".m4a", ".opus", - ".aiff", ".aif", ".pcm", ".ra", ".ram", ".mid", ".midi", ".ape", ".wv", - ".amr", ".vox", ".tak", ".spx", ".m4b", ".mka" -) - - -app = FastAPI() -model = None - -in_docker = os.path.exists('/.dockerenv') -docker_status = "Docker" if in_docker else "Standalone" - -class DeduplicatedQueue(queue.Queue): - """Queue that prevents duplicates in both queued and in-progress tasks.""" - def __init__(self): - super().__init__() - self._queued = set() # Tracks paths in the queue - self._processing = set() # Tracks paths being processed - self._lock = Lock() # Ensures thread safety - - def put(self, item, block=True, timeout=None): - with self._lock: - path = item["path"] - if path not in self._queued and path not in self._processing: - super().put(item, block, timeout) - self._queued.add(path) - - def get(self, block=True, timeout=None): - item = super().get(block, timeout) - with self._lock: - path = item["path"] - self._queued.discard(path) # Remove from queued set - self._processing.add(path) # Mark as in-progress - return item - - def task_done(self): - super().task_done() - with self._lock: - # Assumes task_done() is called after processing the item from get() - # If your workers process multiple items per get(), adjust logic here - if self.unfinished_tasks == 0: - self._processing.clear() # Reset when all tasks are done - - def is_processing(self): - """Return True if any tasks are being processed.""" - with self._lock: - return len(self._processing) > 0 - - def is_idle(self): - """Return True if queue is empty AND no tasks are processing.""" - return self.empty() and not self.is_processing() - - def get_queued_tasks(self): - """Return a list of queued task paths.""" - with self._lock: - return list(self._queued) - - def get_processing_tasks(self): - """Return a list of paths being processed.""" - with self._lock: - return list(self._processing) - -#start queue -task_queue = DeduplicatedQueue() - -def transcription_worker(): - while True: - task = None - try: - task = task_queue.get(block=True, timeout=1) - if "type" in task and task["type"] == "detect_language": - detect_language_task(task['path']) - elif 'Bazarr-' in task['path']: - logging.info(f"Task {task['path']} is being handled by ASR.") - else: - logging.info(f"Task {task['path']} is being handled by Subgen.") - gen_subtitles(task['path'], task['transcribe_or_translate'], task['force_language']) - except queue.Empty: - continue - except Exception as e: - logging.error(f"Error processing task: {e}", exc_info=True) - finally: - if task: # Ensure a task was actually retrieved before calling task_done - task_queue.task_done() - delete_model() - -for _ in range(concurrent_transcriptions): - threading.Thread(target=transcription_worker, daemon=True).start() - -# Define a filter class to hide common logging we don't want to see -class MultiplePatternsFilter(logging.Filter): - def filter(self, record): - # Define the patterns to search for - patterns = [ - "Compression ratio threshold is not met", - "Processing segment at", - "Log probability threshold is", - "Reset prompt", - "Attempting to release", - "released on ", - "Attempting to acquire", - "acquired on", - "header parsing failed", - "timescale not set", - "misdetection possible", - "srt was added", - "doesn't have any audio to transcribe", - "Calling on_" - ] - # Return False if any of the patterns are found, True otherwise - return not any(pattern in record.getMessage() for pattern in patterns) - -# Configure logging -if debug: - level = logging.DEBUG -else: - level = logging.INFO - -logging.basicConfig(stream=sys.stderr, level=level, format="%(asctime)s %(levelname)s: %(message)s") - -# Get the root logger -logger = logging.getLogger() -logger.setLevel(level) # Set the logger level - -for handler in logger.handlers: - handler.addFilter(MultiplePatternsFilter()) - -logging.getLogger("multipart").setLevel(logging.WARNING) -logging.getLogger("urllib3").setLevel(logging.WARNING) -logging.getLogger("watchfiles").setLevel(logging.WARNING) -logging.getLogger("asyncio").setLevel(logging.WARNING) - -last_print_time = None - -#This forces a flush to print progress correctly -def progress(seek, total): - sys.stdout.flush() - sys.stderr.flush() - if(docker_status) == 'Docker': - global last_print_time - # Get the current time - current_time = time.time() - - # Check if 5 seconds have passed since the last print - if last_print_time is None or (current_time - last_print_time) >= 5: - # Update the last print time - last_print_time = current_time - # Log the message - logging.info("") - #if concurrent_transcriptions == 1: - #processing = task_queue.get_processing_tasks()[0] - #logging.debug(f"Processing file: {processing}") - -TIME_OFFSET = 5 - -def appendLine(result): - if append: - lastSegment = result.segments[-1] - date_time_str = datetime.now().strftime("%d %b %Y - %H:%M:%S") - appended_text = f"Transcribed by whisperAI with faster-whisper ({whisper_model}) on {date_time_str}" - - # Create a new segment with the updated information - newSegment = Segment( - start=lastSegment.start + TIME_OFFSET, - end=lastSegment.end + TIME_OFFSET, - text=appended_text, - words=[], # Empty list for words - id=lastSegment.id + 1 - ) - - # Append the new segment to the result's segments - result.segments.append(newSegment) - -@app.get("/plex") -@app.get("/webhook") -@app.get("/jellyfin") -@app.get("/asr") -@app.get("/emby") -@app.get("/detect-language") -@app.get("/tautulli") -def handle_get_request(request: Request): - return {"You accessed this request incorrectly via a GET request. See https://github.com/McCloudS/subgen for proper configuration"} - -@app.get("/") -def webui(): - return {"The webui for configuration was removed on 1 October 2024, please configure via environment variables or in your Docker settings."} - -@app.get("/status") -def status(): - return {"version" : f"Subgen {subgen_version}, stable-ts {stable_whisper.__version__}, faster-whisper {faster_whisper.__version__} ({docker_status})"} - -@app.post("/tautulli") -def receive_tautulli_webhook( - source: Union[str, None] = Header(None), - event: str = Body(None), - file: str = Body(None), -): - if source == "Tautulli": - logging.debug(f"Tautulli event detected is: {event}") - if((event == "added" and procaddedmedia) or (event == "played" and procmediaonplay)): - fullpath = file - logging.debug(f"Full file path: {fullpath}") - - gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate) - else: - return { - "message": "This doesn't appear to be a properly configured Tautulli webhook, please review the instructions again!"} - - return "" - - -@app.post("/plex") -def receive_plex_webhook( - user_agent: Union[str] = Header(None), - payload: Union[str] = Form(), -): - try: - plex_json = json.loads(payload) - logging.debug(f"Raw response: {payload}") - - if "PlexMediaServer" not in user_agent: - return {"message": "This doesn't appear to be a properly configured Plex webhook, please review the instructions again"} - - event = plex_json["event"] - logging.debug(f"Plex event detected is: {event}") - - if (event == "library.new" and procaddedmedia) or (event == "media.play" and procmediaonplay): - fullpath = get_plex_file_name(plex_json['Metadata']['ratingKey'], plexserver, plextoken) - logging.debug(f"Full file path: {fullpath}") - - gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate) - refresh_plex_metadata(plex_json['Metadata']['ratingKey'], plexserver, plextoken) - if plex_queue_next_episode: - gen_subtitles_queue(path_mapping(get_plex_file_name(get_next_plex_episode(plex_json['Metadata']['ratingKey'], stay_in_season=False), plexserver, plextoken)), transcribe_or_translate) - - if plex_queue_series or plex_queue_season: - current_rating_key = plex_json['Metadata']['ratingKey'] - stay_in_season = plex_queue_season # Determine if we're staying in the season or not - - while current_rating_key is not None: - try: - # Queue the current episode - file_path = path_mapping(get_plex_file_name(current_rating_key, plexserver, plextoken)) - gen_subtitles_queue(file_path, transcribe_or_translate) - logging.debug(f"Queued episode with ratingKey {current_rating_key}") - - # Get the next episode - next_episode_rating_key = get_next_plex_episode(current_rating_key, stay_in_season=stay_in_season) - if next_episode_rating_key is None: - break # Exit the loop if no next episode - current_rating_key = next_episode_rating_key - - except Exception as e: - logging.error(f"Error processing episode with ratingKey {current_rating_key} or reached end of series: {e}") - break # Stop processing on error - - logging.info("All episodes in the series (or season) have been queued.") - - - except Exception as e: - logging.error(f"Failed to process Plex webhook: {e}") - - return "" - -@app.post("/jellyfin") -def receive_jellyfin_webhook( - user_agent: str = Header(None), - NotificationType: str = Body(None), - file: str = Body(None), - ItemId: str = Body(None), -): - if "Jellyfin-Server" in user_agent: - logging.debug(f"Jellyfin event detected is: {NotificationType}") - logging.debug(f"itemid is: {ItemId}") - - if (NotificationType == "ItemAdded" and procaddedmedia) or (NotificationType == "PlaybackStart" and procmediaonplay): - fullpath = get_jellyfin_file_name(ItemId, jellyfinserver, jellyfintoken) - logging.debug(f"Full file path: {fullpath}") - - gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate) - try: - refresh_jellyfin_metadata(ItemId, jellyfinserver, jellyfintoken) - logging.info(f"Metadata for item {ItemId} refreshed successfully.") - except Exception as e: - logging.error(f"Failed to refresh metadata for item {ItemId}: {e}") - else: - return { - "message": "This doesn't appear to be a properly configured Jellyfin webhook, please review the instructions again!"} - - return "" - - -@app.post("/emby") -def receive_emby_webhook( - user_agent: Union[str, None] = Header(None), - data: Union[str, None] = Form(None), -): - logging.debug("Raw response: %s", data) - - if not data: - return "" - - data_dict = json.loads(data) - event = data_dict['Event'] - logging.debug("Emby event detected is: " + event) - - # Check if it's a notification test event - if event == "system.notificationtest": - logging.info("Emby test message received!") - return {"message": "Notification test received successfully!"} - - if (event == "library.new" and procaddedmedia) or (event == "playback.start" and procmediaonplay): - fullpath = data_dict['Item']['Path'] - logging.debug(f"Full file path: {fullpath}") - gen_subtitles_queue(path_mapping(fullpath), transcribe_or_translate) - - return "" - -@app.post("/batch") -def batch( - directory: str = Query(...), - forceLanguage: Union[str, None] = Query(default=None) -): - transcribe_existing(directory, LanguageCode.from_string(forceLanguage)) - -# idea and some code for asr and detect language from https://github.com/ahmetoner/whisper-asr-webservice -@app.post("//asr") -@app.post("/asr") -async def asr( - task: Union[str, None] = Query(default="transcribe", enum=["transcribe", "translate"]), - language: Union[str, None] = Query(default=None), - video_file: Union[str, None] = Query(default=None), - initial_prompt: Union[str, None] = Query(default=None), # Not used by Bazarr - audio_file: UploadFile = File(...), - encode: bool = Query(default=True, description="Encode audio first through ffmpeg"), # Not used by Bazarr/always False - output: Union[str, None] = Query(default="srt", enum=["txt", "vtt", "srt", "tsv", "json"]), - word_timestamps: bool = Query(default=False, description="Word-level timestamps"), # Not used by Bazarr -): - try: - logging.info(f"{task.capitalize()} of file '{video_file}' from Bazarr/ASR webhook" if video_file else "{task.capitalize()} of file from Bazarr/ASR webhook") - - result = None - random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)) - - if force_detected_language_to: - language = force_detected_language_to.to_iso_639_1() - logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") - - start_time = time.time() - start_model() - - task_id = {'path': f"Bazarr-asr-{random_name}"} - task_queue.put(task_id) - - args = {} - args['progress_callback'] = progress - - file_content = audio_file.file.read() - - if encode: - args['audio'] = file_content - else: - args['audio'] = np.frombuffer(file_content, np.int16).flatten().astype(np.float32) / 32768.0 - args['input_sr'] = 16000 - - if custom_regroup: - args['regroup'] = custom_regroup - - args.update(kwargs) - - result = model.transcribe(task=task, language=language, **args) - appendLine(result) - - elapsed_time = time.time() - start_time - minutes, seconds = divmod(int(elapsed_time), 60) - logging.info( - f"{task.capitalize()} '{video_file}' from Bazarr complete, it took {minutes} minutes and {seconds} seconds to complete." if video_file - else f"{task.capitalize()} complete, it took {minutes} minutes and {seconds} seconds to complete.") - - except Exception as e: - logging.error( - f"Error processing or {task.capitalize()} of Bazarr file: {video_file} -- Exception: {e}" if video_file - else f"Error processing or {task.capitalize()} of Bazarr file Exception: {e}" - ) - - finally: - await audio_file.close() - delete_model() - - if result: - return StreamingResponse( - iter(result.to_srt_vtt(filepath=None, word_level=word_level_highlight)), - media_type="text/plain", - headers={ - 'Source': '{task.capitalize()}d using stable-ts from Subgen!', - } - ) - else: - return -@app.post("//detect-language") -@app.post("/detect-language") -async def detect_language( - audio_file: UploadFile = File(...), - encode: bool = Query(default=True, description="Encode audio first through ffmpeg"), # This is always false from Bazarr - video_file: Union[str, None] = Query(default=None), - detect_lang_length: int = Query(default=detect_language_length, description="Detect language on X seconds of the file"), - detect_lang_offset: int = Query(default=detect_language_offset, description="Start Detect language X seconds into the file") -): - - if force_detected_language_to: - #logging.info(f"language is: {force_detected_language_to.to_name()}") - logging.debug(f"Skipping detect language, we have forced it as {force_detected_language_to.to_name()}") - return { - "detected_language": force_detected_language_to.to_name(), - "language_code": force_detected_language_to.to_iso_639_1() - } - - global detect_language_length, detect_language_offset - detected_language = LanguageCode.NONE - language_code = 'und' - if force_detected_language_to: - logging.info(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}\n Returning without detection") - return {"detected_language": force_detected_language_to.to_name(), "language_code": force_detected_language_to.to_iso_639_1()} - - # Update detection parameters if custom values are provided - if detect_lang_length != detect_language_length: - logging.info(f"Language detection window: First {detect_lang_length}s of audio") - detect_language_length = detect_lang_length - - if detect_lang_offset != detect_language_offset: - logging.info(f"Language detection offset: {detect_lang_offset}s from start") - detect_language_offset = detect_lang_offset - try: - logging.info(f"Detecting language for file '{video_file}' from Bazarr/detect-language webhook" if video_file else "Detecting language from Bazarr/detect-language webhook") - start_model() - random_name = ''.join(random.choices("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", k=6)) - - task_id = { 'path': f"Bazarr-detect-language-{random_name}" } - task_queue.put(task_id) - args = {} - #sample_rate = next(stream.rate for stream in av.open(audio_file.file).streams if stream.type == 'audio') - #logging.info(f"Sample rate is: {sample_rate}") - audio_file.file.seek(0) - args['progress_callback'] = progress - - if encode: - args['audio'] = extract_audio_segment_to_memory(audio_file, detect_language_offset, detect_language_length).read() - args['input_sr'] = 16000 - else: - #args['audio'] = whisper.pad_or_trim(np.frombuffer(audio_file.file.read(), np.int16).flatten().astype(np.float32) / 32768.0, args['input_sr'] * int(detect_language_length)) - args['audio'] = await get_audio_chunk(audio_file, detect_lang_offset, detect_lang_length) - args['input_sr'] = 16000 - - args.update(kwargs) - detected_language = LanguageCode.from_name(model.transcribe(**args).language) - language_code = detected_language.to_iso_639_1() - logging.debug(f"Language detection: {detected_language.to_name()} (Code: {language_code})") - - except Exception as e: - logging.error( - f"Error processing or transcribing Bazarr file: {video_file} -- Exception: {e}" if video_file - else f"Error processing or transcribing Bazarr file Exception: {e}" - ) - - finally: - #await audio_file.close() - delete_model() - - return {"detected_language": detected_language.to_name(), "language_code": language_code} - -async def get_audio_chunk(audio_file, offset=detect_language_offset, length=detect_language_length, sample_rate=16000, audio_format=np.int16): - """ - Extract a chunk of audio from a file, starting at the given offset and of the given length. - - :param audio_file: The audio file (UploadFile or file-like object). - :param offset: The offset in seconds to start the extraction. - :param length: The length in seconds for the chunk to be extracted. - :param sample_rate: The sample rate of the audio (default 16000). - :param audio_format: The audio format to interpret (default int16, 2 bytes per sample). - - :return: A numpy array containing the extracted audio chunk. - """ - - # Number of bytes per sample (for int16, 2 bytes per sample) - bytes_per_sample = np.dtype(audio_format).itemsize - - # Calculate the start byte based on offset and sample rate - start_byte = offset * sample_rate * bytes_per_sample - - # Calculate the length in bytes based on the length in seconds - length_in_bytes = length * sample_rate * bytes_per_sample - - # Seek to the start position (this assumes the audio_file is a file-like object) - await audio_file.seek(start_byte) - - # Read the required chunk of audio (length_in_bytes) - chunk = await audio_file.read(length_in_bytes) - - # Convert the chunk into a numpy array (normalized to float32) - audio_data = np.frombuffer(chunk, dtype=audio_format).flatten().astype(np.float32) / 32768.0 - - return audio_data - -def detect_language_task(path): - detected_language = LanguageCode.NONE - language_code = 'und' - global detect_language_length, detect_language_offset - - logger.info(f"Detecting language of file: {path} for {detect_language_length} seconds starting at {detect_language_offset} seconds in") - - try: - start_model() - - audio_segment = extract_audio_segment_to_memory(path, detect_language_offset, int(detect_language_length)).read() - - - detected_language = LanguageCode.from_name(model.transcribe(audio_segment).language) - logging.debug(f"Detected language: {detected_language.to_name()}") - # reverse lookup of language -> code, ex: "english" -> "en", "nynorsk" -> "nn", ... - language_code = detected_language.to_iso_639_1() - logging.debug(f"Language Code: {language_code}") - - except Exception as e: - logging.info(f"Error detecting language of file with whisper: {e}") - - finally: - task_queue.task_done() - delete_model() - # put task to transcribe this with the detected language - task_id = { 'path': path, "transcribe_or_translate": transcribe_or_translate, 'force_language': detected_language } - task_queue.put(task_id) - - #maybe modify the file to contain detected language so we won't trigger this again - - return - -def extract_audio_segment_to_memory(input_file, start_time, duration): - """ - Extract a segment of audio from input_file, starting at start_time for duration seconds. - - :param input_file: UploadFile object or path to the input audio file - :param start_time: Start time in seconds (e.g., 60 for 1 minute) - :param duration: Duration in seconds (e.g., 30 for 30 seconds) - :return: BytesIO object containing the audio segment - """ - try: - if hasattr(input_file, 'file') and hasattr(input_file.file, 'read'): # Handling UploadFile - input_file.file.seek(0) # Ensure the file pointer is at the beginning - input_stream = 'pipe:0' - input_kwargs = {'input': input_file.file.read()} - elif isinstance(input_file, str): # Handling local file path - input_stream = input_file - input_kwargs = {} - else: - raise ValueError("Invalid input: input_file must be a file path or an UploadFile object.") - - logging.info(f"Extracting audio from: {input_stream}, start_time: {start_time}, duration: {duration}") - - # Run FFmpeg to extract the desired segment - out, _ = ( - ffmpeg - .input(input_stream, ss=start_time, t=duration) # Set start time and duration - .output('pipe:1', format='wav', acodec='pcm_s16le', ar=16000) # Output to pipe as WAV - .run(capture_stdout=True, capture_stderr=True, **input_kwargs) - ) - - # Check if the output is empty or null - if not out: - raise ValueError("FFmpeg output is empty, possibly due to invalid input.") - - return io.BytesIO(out) # Convert output to BytesIO for in-memory processing - - except ffmpeg.Error as e: - logging.error(f"FFmpeg error: {e.stderr.decode()}") - return None - except Exception as e: - logging.error(f"Error: {str(e)}") - return None - - except ffmpeg.Error as e: - logging.error(f"FFmpeg error: {e.stderr.decode()}") - return None - except Exception as e: - logging.error(f"Error: {str(e)}") - return None - -def start_model(): - global model - if model is None: - logging.debug("Model was purged, need to re-create") - model = stable_whisper.load_faster_whisper(whisper_model, download_root=model_location, device=transcribe_device, cpu_threads=whisper_threads, num_workers=concurrent_transcriptions, compute_type=compute_type) - -def delete_model(): - global model - if clear_vram_on_complete and task_queue.is_idle(): - logging.debug("Queue idle; clearing model from memory.") - if model: - model.model.unload_model() - del model - model = None - if transcribe_device.lower() == 'cuda' and torch.cuda.is_available(): - torch.cuda.empty_cache() - logging.debug("CUDA cache cleared.") - if os.name != 'nt': # don't garbage collect on Windows, it will crash the script - gc.collect() - ctypes.CDLL(ctypes.util.find_library('c')).malloc_trim(0) - -def isAudioFileExtension(file_extension): - return file_extension.casefold() in \ - AUDIO_EXTENSIONS - -def write_lrc(result, file_path): - with open(file_path, "w") as file: - for segment in result.segments: - minutes, seconds = divmod(int(segment.start), 60) - fraction = int((segment.start - int(segment.start)) * 100) - # remove embedded newlines in text, since some players ignore text after newlines - text = segment.text[:].replace('\n', '') - file.write(f"[{minutes:02d}:{seconds:02d}.{fraction:02d}]{text}\n") - -def gen_subtitles(file_path: str, transcription_type: str, force_language : LanguageCode = LanguageCode.NONE) -> None: - """Generates subtitles for a video file. - - Args: - file_path: str - The path to the video file. - transcription_type: str - The type of transcription or translation to perform. - force_language: str - The language to force for transcription or translation. Default is None. - """ - - try: - logging.info(f"Queuing file for processing: {os.path.basename(file_path)}") - #logging.info(f"Transcribing file: {os.path.basename(file_path)}") - #logging.info(f"Transcribing file language: {force_language}") - - start_time = time.time() - start_model() - - # Check if the file is an audio file before trying to extract audio - file_name, file_extension = os.path.splitext(file_path) - is_audio_file = isAudioFileExtension(file_extension) - - data = file_path - # Extract audio from the file if it has multiple audio tracks - extracted_audio_file = handle_multiple_audio_tracks(file_path, force_language) - if extracted_audio_file: - data = extracted_audio_file.read() - - args = {} - args['progress_callback'] = progress - - if custom_regroup: - args['regroup'] = custom_regroup - - args.update(kwargs) - - result = model.transcribe(data, language=force_language.to_iso_639_1(), task=transcription_type, **args) - - appendLine(result) - - # If it is an audio file, write the LRC file - if is_audio_file and lrc_for_audio_files: - write_lrc(result, file_name + '.lrc') - else: - if not force_language: - force_language = LanguageCode.from_string(result.language) - result.to_srt_vtt(name_subtitle(file_path, force_language), word_level=word_level_highlight) - - elapsed_time = time.time() - start_time - minutes, seconds = divmod(int(elapsed_time), 60) - logging.info(f"Completed transcription: {os.path.basename(file_path)} in {minutes}m {seconds}s") - - except Exception as e: - logging.info(f"Error processing or transcribing {file_path} in {force_language}: {e}") - - finally: - delete_model() - -def define_subtitle_language_naming(language: LanguageCode, type): - """ - Determines the naming format for a subtitle language based on the given type. - - Args: - language (LanguageCode): The language code object containing methods to get different formats of the language name. - type (str): The type of naming format desired, such as 'ISO_639_1', 'ISO_639_2_T', 'ISO_639_2_B', 'NAME', or 'NATIVE'. - - Returns: - str: The language name in the specified format. If an invalid type is provided, it defaults to the language's name. - """ - if namesublang: - return namesublang - # If we are translating, then we ALWAYS output an english file. - switch_dict = { - "ISO_639_1": language.to_iso_639_1, - "ISO_639_2_T": language.to_iso_639_2_t, - "ISO_639_2_B": language.to_iso_639_2_b, - "NAME": language.to_name, - "NATIVE": lambda : language.to_name(in_english=False) - } - if transcribe_or_translate == 'translate': - language = LanguageCode.ENGLISH - return switch_dict.get(type, language.to_name)() - -def name_subtitle(file_path: str, language: LanguageCode) -> str: - """ - Name the the subtitle file to be written, based on the source file and the language of the subtitle. - - Args: - file_path: The path to the source file. - language: The language of the subtitle. - - Returns: - The name of the subtitle file to be written. - """ - subgen_part = ".subgen" if show_in_subname_subgen else "" - model_part = f".{whisper_model}" if show_in_subname_model else "" - lang_part = define_subtitle_language_naming(language, subtitle_language_naming_type) - - return f"{os.path.splitext(file_path)[0]}{subgen_part}{model_part}.{lang_part}.srt" - -def handle_multiple_audio_tracks(file_path: str, language: LanguageCode | None = None) -> BytesIO | None: - """ - Handles the possibility of a media file having multiple audio tracks. - - If the media file has multiple audio tracks, it will extract the audio track of the selected language. Otherwise, it will extract the first audio track. - - Parameters: - file_path (str): The path to the media file. - language (LanguageCode | None): The language of the audio track to search for. If None, it will extract the first audio track. - - Returns: - io.BytesIO | None: The audio or None if no audio track was extracted. - """ - audio_bytes = None - audio_tracks = get_audio_tracks(file_path) - - if len(audio_tracks) > 1: - logging.debug(f"Handling multiple audio tracks from {file_path} and planning to extract audio track of language {language}") - logging.debug( - "Audio tracks:\n" - + "\n".join([f" - {track['index']}: {track['codec']} {track['language']} {('default' if track['default'] else '')}" for track in audio_tracks]) - ) - - if language is not None: - audio_track = get_audio_track_by_language(audio_tracks, language) - if audio_track is None: - audio_track = audio_tracks[0] - - audio_bytes = extract_audio_track_to_memory(file_path, audio_track["index"]) - if audio_bytes is None: - logging.error(f"Failed to extract audio track {audio_track['index']} from {file_path}") - return None - return audio_bytes - -def extract_audio_track_to_memory(input_video_path, track_index) -> BytesIO | None: - """ - Extract a specific audio track from a video file to memory using FFmpeg. - - Args: - input_video_path (str): The path to the video file. - track_index (int): The index of the audio track to extract. If None, skip extraction. - - Returns: - io.BytesIO | None: The audio data as a BytesIO object, or None if extraction failed. - """ - if track_index is None: - logging.warning(f"Skipping audio track extraction for {input_video_path} because track index is None") - return None - - try: - # Use FFmpeg to extract the specific audio track and output to memory - out, _ = ( - ffmpeg.input(input_video_path) - .output( - "pipe:", # Direct output to a pipe - map=f"0:{track_index}", # Select the specific audio track - format="wav", # Output format - ac=1, # Mono audio (optional) - ar=16000, # Sample rate 16 kHz (recommended for speech models) - loglevel="quiet" - ) - .run(capture_stdout=True, capture_stderr=True) # Capture output in memory - ) - # Return the audio data as a BytesIO object - return BytesIO(out) - - except ffmpeg.Error as e: - print("An error occurred:", e.stderr.decode()) - return None - -def get_audio_track_by_language(audio_tracks, language): - """ - Returns the first audio track with the given language. - - Args: - audio_tracks (list): A list of dictionaries containing information about each audio track. - language (str): The language of the audio track to search for. - - Returns: - dict: The first audio track with the given language, or None if no match is found. - """ - for track in audio_tracks: - if track['language'] == language: - return track - return None - -def choose_transcribe_language(file_path, forced_language): - """ - Determines the language to be used for transcription based on the provided - file path and language preferences. - - Args: - file_path: The path to the file for which the audio tracks are analyzed. - forced_language: The language to force for transcription if specified. - - Returns: - The language code to be used for transcription. It prioritizes the - `forced_language`, then the environment variable `force_detected_language_to`, - then the preferred audio language if available, and finally the default - language of the audio tracks. Returns None if no language preference is - determined. - """ - - logger.debug(f"choose_transcribe_language({file_path}, {forced_language})") - - if forced_language: - logger.debug(f"ENV FORCE_LANGUAGE is set: Forcing language to {forced_language}") - return forced_language - - if force_detected_language_to: - logger.debug(f"ENV FORCE_DETECTED_LANGUAGE_TO is set: Forcing detected language to {force_detected_language_to}") - return force_detected_language_to - - audio_tracks = get_audio_tracks(file_path) - - preferred_track_language = find_language_audio_track(audio_tracks, preferred_audio_languages) - - if preferred_track_language: - logging.debug(f"Preferred language found: {preferred_track_language}") - return preferred_track_language - - default_language = find_default_audio_track_language(audio_tracks) - if default_language: - logger.debug(f"Default language found: {default_language}") - return default_language - - return LanguageCode.NONE - - -def get_audio_tracks(video_file): - """ - Extracts information about the audio tracks in a file. - - Returns: - List of dictionaries with information about each audio track. - Each dictionary has the following keys: - index (int): The stream index of the audio track. - codec (str): The name of the audio codec. - channels (int): The number of audio channels. - language (LanguageCode): The language of the audio track. - title (str): The title of the audio track. - default (bool): Whether the audio track is the default for the file. - forced (bool): Whether the audio track is forced. - original (bool): Whether the audio track is the original. - commentary (bool): Whether the audio track is a commentary. - - Example: - >>> get_audio_tracks("french_movie_with_english_dub.mp4") - [ - { - "index": 0, - "codec": "dts", - "channels": 6, - "language": LanguageCode.FRENCH, - "title": "French", - "default": True, - "forced": False, - "original": True, - "commentary": False - }, - { - "index": 1, - "codec": "aac", - "channels": 2, - "language": LanguageCode.ENGLISH, - "title": "English", - "default": False, - "forced": False, - "original": False, - "commentary": False - } - ] - - Raises: - ffmpeg.Error: If FFmpeg fails to probe the file. - """ - try: - # Probe the file to get audio stream metadata - probe = ffmpeg.probe(video_file, select_streams='a') - audio_streams = probe.get('streams', []) - - # Extract information for each audio track - audio_tracks = [] - for stream in audio_streams: - audio_track = { - "index": int(stream.get("index", None)), - "codec": stream.get("codec_name", "Unknown"), - "channels": int(stream.get("channels", None)), - "language": LanguageCode.from_iso_639_2(stream.get("tags", {}).get("language", "Unknown")), - "title": stream.get("tags", {}).get("title", "None"), - "default": stream.get("disposition", {}).get("default", 0) == 1, - "forced": stream.get("disposition", {}).get("forced", 0) == 1, - "original": stream.get("disposition", {}).get("original", 0) == 1, - "commentary": "commentary" in stream.get("tags", {}).get("title", "").lower() - } - audio_tracks.append(audio_track) - return audio_tracks - - except ffmpeg.Error as e: - logging.error(f"FFmpeg error: {e.stderr}") - return [] - except Exception as e: - logging.error(f"An error occurred while reading audio track information: {str(e)}") - return [] - -def find_language_audio_track(audio_tracks, find_languages): - """ - Checks if an audio track with any of the given languages is present in the list of audio tracks. - Returns the first language from `find_languages` that matches. - - Args: - audio_tracks (list): A list of dictionaries containing information about each audio track. - find_languages (list): A list language codes to search for. - - Returns: - str or None: The first language found from `find_languages`, or None if no match is found. - """ - for language in find_languages: - for track in audio_tracks: - if track['language'] == language: - return language - return None - -def find_default_audio_track_language(audio_tracks): - """ - Finds the language of the default audio track in the given list of audio tracks. - - Args: - audio_tracks (list): A list of dictionaries containing information about each audio track. - Must contain the key "default" which is a boolean indicating if the track is the default track. - - Returns: - str: The ISO 639-2 code of the language of the default audio track, or None if no default track was found. - """ - for track in audio_tracks: - if track['default'] is True: - return track['language'] - return None - - -def gen_subtitles_queue(file_path: str, transcription_type: str, force_language: LanguageCode = LanguageCode.NONE) -> None: - global task_queue - - if not has_audio(file_path): - logging.debug(f"{file_path} doesn't have any audio to transcribe!") - return - - force_language = choose_transcribe_language(file_path, force_language) - - if should_skip_file(file_path, force_language): # skip a file before we waste time detecting it's language - return - - # check if we would like to detect audio language in case of no audio language specified. Will return here again with specified language from whisper - if not force_language and should_whiser_detect_audio_language: - # make a detect language task - task_id = { 'path': file_path, 'type': "detect_language" } - task_queue.put(task_id) - logging.debug(f"Added to queue: {task_id['path']} [type: {task_id.get('type', 'transcribe')}]") - return - - - task = { - 'path': file_path, - 'transcribe_or_translate': transcription_type, - 'force_language': force_language - } - task_queue.put(task) - logging.debug(f"Added to queue: {task['path']}, {task['transcribe_or_translate']}, {task['force_language']}") - -def should_skip_file(file_path: str, target_language: LanguageCode) -> bool: - """ - Determines if subtitle generation should be skipped for a file. - - Args: - file_path: Path to the media file. - target_language: The desired language for transcription. - - Returns: - True if the file should be skipped, False otherwise. - """ - base_name = os.path.basename(file_path) - file_name, file_ext = os.path.splitext(base_name) - if transcribe_or_translate == 'translate': - target_language = LanguageCode.ENGLISH # Force our target language as english if we are translating - # 1. Skip if it's an audio file and an LRC file already exists. - if isAudioFileExtension(file_ext) and lrc_for_audio_files: - lrc_path = os.path.join(os.path.dirname(file_path), f"{file_name}.lrc") - if os.path.exists(lrc_path): - logging.info(f"Skipping {base_name}: LRC file already exists.") - return True - - # 2. Skip if language detection failed and we are configured to skip unknowns. - if skip_unknown_language and target_language == LanguageCode.NONE: - logging.info(f"Skipping {base_name}: Unknown language and skip_unknown_language is enabled.") - return True - - # 3. Skip if a subtitle already exists in the target language. - if skip_if_to_transcribe_sub_already_exist and has_subtitle_language(file_path, target_language): - lang_name = target_language.to_name() - logging.info(f"Skipping {base_name}: Subtitles already exist in {lang_name}.") - return True - - # 4. Skip if an internal subtitle exists in skipifinternalsublang language. - if skipifinternalsublang and has_subtitle_language_in_file(file_path, skipifinternalsublang): - lang_name = skipifinternalsublang.to_name() - logging.info(f"Skipping {base_name}: Internal subtitles in {lang_name} already exist.") - return True - - # 5. Skip if an external subtitle exists in the namesublang language - if skipifexternalsub and namesublang and LanguageCode.is_valid_language(namesublang): - external_lang = LanguageCode.from_string(namesublang) - if has_subtitle_of_language_in_folder(file_path, external_lang): - lang_name = external_lang.to_name() - logging.info(f"Skipping {base_name}: External subtitles in {lang_name} already exist.") - return True - - # 6. Skip if any subtitle language is in the skip list. - if any(lang in skip_lang_codes_list for lang in get_subtitle_languages(file_path)): - logging.info(f"Skipping {base_name}: Contains a skipped subtitle language.") - return True - - # 7. Audio track checks - audio_langs = get_audio_languages(file_path) - - # 7a. Limit to preferred audio languages - if limit_to_preferred_audio_languages: - if not any(lang in preferred_audio_languages for lang in audio_langs): - preferred_names = [lang.to_name() for lang in preferred_audio_languages] - logging.info(f"Skipping {base_name}: No preferred audio tracks found (looking for {', '.join(preferred_names)})") - return True - - # 7b. Skip if the audio track language is in the skip list - if any(lang in skip_if_audio_track_is_in_list for lang in audio_langs): - logging.info(f"Skipping {base_name}: Contains a skipped audio language.") - return True - - logging.debug(f"Processing {base_name}: No skip conditions met.") - return False - - -def get_subtitle_languages(video_path): - """ - Extract language codes from each audio stream in the video file using pyav. - :param video_path: Path to the video file - :return: List of language codes for each subtitle stream - """ - languages = [] - - # Open the video file - with av.open(video_path) as container: - # Iterate through each audio stream - for stream in container.streams.subtitles: - # Access the metadata for each audio stream - lang_code = stream.metadata.get('language') - if lang_code: - languages.append(LanguageCode.from_iso_639_2(lang_code)) - else: - # Append 'und' (undefined) if no language metadata is present - languages.append(LanguageCode.NONE) - - return languages - -def get_file_name_without_extension(file_path): - file_name, file_extension = os.path.splitext(file_path) - return file_name - -def get_audio_languages(video_path): - """ - Extract language codes from each audio stream in the video file. - - :param video_path: Path to the video file - :return: List of language codes for each audio stream - """ - audio_tracks = get_audio_tracks(video_path) - return [track['language'] for track in audio_tracks] - -def has_subtitle_language(video_file, target_language: LanguageCode): - """ - Determines if a subtitle file with the target language is available for a specified video file. - - This function checks both within the video file and in its associated folder for subtitles - matching the specified language. - - Args: - video_file: The path to the video file. - target_language: The language of the subtitle file to search for. - - Returns: - bool: True if a subtitle file with the target language is found, False otherwise. - """ - return has_subtitle_language_in_file(video_file, target_language) or has_subtitle_of_language_in_folder(video_file, target_language) - -def has_subtitle_language_in_file(video_file: str, target_language: Union[LanguageCode, None]): - """ - Checks if a video file contains subtitles with a specific language. - - Args: - video_file (str): The path to the video file. - target_language (LanguageCode | None): The language of the subtitle file to search for. - - Returns: - bool: True if a subtitle file with the target language is found, False otherwise. - """ - try: - with av.open(video_file) as container: - # Create a list of subtitle streams with 'language' metadata - subtitle_streams = [ - stream for stream in container.streams - if stream.type == 'subtitle' and 'language' in stream.metadata - ] - - # Skip logic if target_language is None - if target_language is LanguageCode.NONE: - if skip_if_language_is_not_set_but_subtitles_exist and subtitle_streams: - logging.debug("Language is not set, but internal subtitles exist.") - return True - if only_skip_if_subgen_subtitle: - logging.debug("Skipping since only external subgen subtitles are considered.") - return False # Skip if only looking for external subgen subtitles - - # Check if any subtitle stream matches the target language - for stream in subtitle_streams: - # Convert the subtitle stream's language to a LanguageCode instance and compare - stream_language = LanguageCode.from_string(stream.metadata.get('language', '').lower()) - if stream_language == target_language: - logging.debug(f"Subtitles in '{target_language}' language found in the video.") - return True - - logging.debug(f"No subtitles in '{target_language}' language found in the video.") - return False - - except Exception as e: - logging.error(f"An error occurred while checking the file with pyav: {type(e).__name__}: {e}") - return False - -def has_subtitle_of_language_in_folder(video_file: str, target_language: LanguageCode, recursion: bool = True, only_skip_if_subgen_subtitle: bool = False) -> bool: - """Checks if the given folder has a subtitle file with the given language. - - Args: - video_file (str): The path of the video file. - target_language (LanguageCode): The language of the subtitle file to search for. - recursion (bool): If True, search subfolders. If False, only the current folder. - only_skip_if_subgen_subtitle (bool): If True, only skip if subtitles are auto-generated ("subgen"). - - Returns: - bool: True if a matching subtitle file is found, False otherwise. - """ - subtitle_extensions = {'.srt', '.vtt', '.sub', '.ass', '.ssa', '.idx', '.sbv', '.pgs', '.ttml', '.lrc'} - - video_folder = os.path.dirname(video_file) - video_name = os.path.splitext(os.path.basename(video_file))[0] - - logging.debug(f"Searching for subtitles in: {video_folder}") - - for file_name in os.listdir(video_folder): - file_path = os.path.join(video_folder, file_name) - - # If it's a file and has a subtitle extension - if os.path.isfile(file_path) and file_path.endswith(tuple(subtitle_extensions)): - subtitle_name, ext = os.path.splitext(file_name) - - # Ensure the subtitle name starts with the video name - if not subtitle_name.startswith(video_name): - continue - - # Extract parts after video filename - subtitle_parts = subtitle_name[len(video_name):].lstrip(".").split(".") - - # Check for "subgen" - has_subgen = "subgen" in subtitle_parts - - # Special handling if only skipping for subgen subtitles - if target_language == LanguageCode.NONE: - if only_skip_if_subgen_subtitle: - if has_subgen: - logging.debug("Skipping subtitles because they are auto-generated ('subgen').") - return False - logging.debug("Skipping subtitles because language is NONE.") - return True # Default behavior if subtitles exist - - # Check if the subtitle file matches the target language - if is_valid_subtitle_language(subtitle_parts, target_language): - if only_skip_if_subgen_subtitle and not has_subgen: - continue # Ignore non-subgen subtitles if flag is set - logging.debug(f"Found matching subtitle: {file_name} for language {target_language.name} (subgen={has_subgen})") - return True - - # Recursively search subfolders - elif os.path.isdir(file_path) and recursion: - if has_subtitle_of_language_in_folder(os.path.join(file_path, os.path.basename(video_file)), target_language, False, only_skip_if_subgen_subtitle): - return True - - return False - -def is_valid_subtitle_language(subtitle_parts: List[str], target_language: LanguageCode) -> bool: - """Checks if any part of the subtitle name matches the target language.""" - return any(LanguageCode.from_string(part) == target_language for part in subtitle_parts) - -def get_next_plex_episode(current_episode_rating_key, stay_in_season: bool = False): - """ - Get the next episode's ratingKey based on the current episode in Plex. - - Args: - current_episode_rating_key (str): The ratingKey of the current episode. - stay_in_season (bool): If True, only find the next episode within the current season. - If False, find the next episode in the series. - - Returns: - str: The ratingKey of the next episode, or None if it's the last episode. - """ - try: - # Get current episode's metadata to fetch parent (season) ratingKey - url = f"{plexserver}/library/metadata/{current_episode_rating_key}" - headers = {"X-Plex-Token": plextoken} - response = requests.get(url, headers=headers) - response.raise_for_status() - - # Parse XML response - root = ET.fromstring(response.content) - - # Find the show ID - grandparent_rating_key = root.find(".//Video").get("grandparentRatingKey") - if grandparent_rating_key is None: - logging.debug(f"Show not found for episode {current_episode_rating_key}") - return None - - # Find the parent season ratingKey - parent_rating_key = root.find(".//Video").get("parentRatingKey") - if parent_rating_key is None: - logging.debug(f"Parent season not found for episode {current_episode_rating_key}") - return None - - # Get the list of seasons - url = f"{plexserver}/library/metadata/{grandparent_rating_key}/children" - response = requests.get(url, headers=headers) - response.raise_for_status() - seasons = ET.fromstring(response.content).findall(".//Directory[@type='season']") - - # Get the list of episodes in the parent season - url = f"{plexserver}/library/metadata/{parent_rating_key}/children" - response = requests.get(url, headers=headers) - response.raise_for_status() - #print(response.content) - - # Parse XML response for the list of episodes - episodes = ET.fromstring(response.content).findall(".//Video") - episodes_in_season = len(episodes) #episodes.get('size') # changed from episodes.get("size") because size is not available - - # Find the current episode index and get the next one - current_episode_number = None - current_season_number = None - next_season_number = None - for episode in episodes: - if episode.get("ratingKey") == current_episode_rating_key: - current_episode_number = int(episode.get("index")) - current_season_number = episode.get("parentIndex") - break - #if rating_key_element is None: - # logging.warning(f"ratingKey not found for episode at index") - # continue - - # Logic to find the next episode - if stay_in_season: - if current_episode_number == episodes_in_season: - return None # End of season - for episode in episodes: - if int(episode.get("index")) == int(current_episode_number)+1: - return episode.get("ratingKey") - else: # Not staying in season, find the next overall episode - # Find next season if it exists - for season in seasons: - if int(season.get("index")) == int(current_season_number)+1: - #print(f"next season is: {episode.get('ratingKey')}") - #print(season.get("title")) - next_season_number = season.get("ratingKey") - break - - if current_episode_number == episodes_in_season: # changed to episodes_in_season from int(episodes_in_season) - if next_season_number is not None: - logging.debug("At end of season, try to find next season and first episode.") - url = f"{plexserver}/library/metadata/{next_season_number}/children" - response = requests.get(url, headers=headers) - response.raise_for_status() - episodes = ET.fromstring(response.content).findall(".//Video") - current_episode_number = 0 - else: - return None - for episode in episodes: - if int(episode.get("index")) == int(current_episode_number)+1: - return episode.get("ratingKey") - - logging.debug(f"No next episode found for {get_plex_file_name(current_episode_rating_key, plexserver, plextoken)}, possibly end of season or series") - return None - - except requests.exceptions.RequestException as e: - logging.error(f"Error fetching data from Plex: {e}") - return None - except Exception as e: - logging.error(f"An unexpected error occurred: {e}") - return None - -def get_plex_file_name(itemid: str, server_ip: str, plex_token: str) -> str: - """Gets the full path to a file from the Plex server. - - Args: - itemid: The ID of the item in the Plex library. - server_ip: The IP address of the Plex server. - plex_token: The Plex token. - - Returns: - The full path to the file. - """ - - url = f"{server_ip}/library/metadata/{itemid}" - - headers = { - "X-Plex-Token": plex_token, - } - - response = requests.get(url, headers=headers) - - if response.status_code == 200: - root = ET.fromstring(response.content) - fullpath = root.find(".//Part").attrib['file'] - return fullpath - else: - raise Exception(f"Error: {response.status_code}") - -def refresh_plex_metadata(itemid: str, server_ip: str, plex_token: str) -> None: - """ - Refreshes the metadata of a Plex library item. - - Args: - itemid: The ID of the item in the Plex library whose metadata needs to be refreshed. - server_ip: The IP address of the Plex server. - plex_token: The Plex token used for authentication. - - Raises: - Exception: If the server does not respond with a successful status code. - """ - - # Plex API endpoint to refresh metadata for a specific item - url = f"{server_ip}/library/metadata/{itemid}/refresh" - - # Headers to include the Plex token for authentication - headers = { - "X-Plex-Token": plex_token, - } - - # Sending the PUT request to refresh metadata - response = requests.put(url, headers=headers) - - # Check if the request was successful - if response.status_code == 200: - logging.info("Metadata refresh initiated successfully.") - else: - raise Exception(f"Error refreshing metadata: {response.status_code}") - -def refresh_jellyfin_metadata(itemid: str, server_ip: str, jellyfin_token: str) -> None: - """ - Refreshes the metadata of a Jellyfin library item. - - Args: - itemid: The ID of the item in the Jellyfin library whose metadata needs to be refreshed. - server_ip: The IP address of the Jellyfin server. - jellyfin_token: The Jellyfin token used for authentication. - - Raises: - Exception: If the server does not respond with a successful status code. - """ - - # Jellyfin API endpoint to refresh metadata for a specific item - url = f"{server_ip}/Items/{itemid}/Refresh" - - # Headers to include the Jellyfin token for authentication - headers = { - "Authorization": f"MediaBrowser Token={jellyfin_token}", - } - - # Cheap way to get the admin user id, and save it for later use. - users = json.loads(requests.get(f"{server_ip}/Users", headers=headers).content) - jellyfin_admin = get_jellyfin_admin(users) - - response = requests.get(f"{server_ip}/Users/{jellyfin_admin}/Items/{itemid}/Refresh", headers=headers) - - # Sending the PUT request to refresh metadata - response = requests.post(url, headers=headers) - - # Check if the request was successful - if response.status_code == 204: - logging.info("Metadata refresh queued successfully.") - else: - raise Exception(f"Error refreshing metadata: {response.status_code}") - - -def get_jellyfin_file_name(item_id: str, jellyfin_url: str, jellyfin_token: str) -> str: - """Gets the full path to a file from the Jellyfin server. - - Args: - jellyfin_url: The URL of the Jellyfin server. - jellyfin_token: The Jellyfin token. - item_id: The ID of the item in the Jellyfin library. - - Returns: - The full path to the file. - """ - - headers = { - "Authorization": f"MediaBrowser Token={jellyfin_token}", - } - - # Cheap way to get the admin user id, and save it for later use. - users = json.loads(requests.get(f"{jellyfin_url}/Users", headers=headers).content) - jellyfin_admin = get_jellyfin_admin(users) - - response = requests.get(f"{jellyfin_url}/Users/{jellyfin_admin}/Items/{item_id}", headers=headers) - - if response.status_code == 200: - file_name = json.loads(response.content)['Path'] - return file_name - else: - raise Exception(f"Error: {response.status_code}") - -def get_jellyfin_admin(users): - for user in users: - if user["Policy"]["IsAdministrator"]: - return user["Id"] - - raise Exception("Unable to find administrator user in Jellyfin") - -def has_audio(file_path): - try: - if not is_valid_path(file_path): - return False - - if not (has_video_extension(file_path) or has_audio_extension(file_path)): - # logging.debug(f"{file_path} is an not a video or audio file, skipping processing. skipping processing") - return False - - with av.open(file_path) as container: - # Check for an audio stream and ensure it has a valid codec - for stream in container.streams: - if stream.type == 'audio': - # Check if the stream has a codec and if it is valid - if stream.codec_context and stream.codec_context.name != 'none': - return True - else: - logging.debug(f"Unsupported or missing codec for audio stream in {file_path}") - return False - - except (av.FFmpegError, UnicodeDecodeError): - logging.debug(f"Error processing file {file_path}") - return False - -def is_valid_path(file_path): - # Check if the path is a file - if not os.path.isfile(file_path): - # If it's not a file, check if it's a directory - if not os.path.isdir(file_path): - logging.warning(f"{file_path} is neither a file nor a directory. Are your volumes correct?") - return False - else: - logging.debug(f"{file_path} is a directory, skipping processing as a file.") - return False - else: - return True - -def has_video_extension(file_name): - file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension - return file_extension in VIDEO_EXTENSIONS - -def has_audio_extension(file_name): - file_extension = os.path.splitext(file_name)[1].lower() # Get the file extension - return file_extension in AUDIO_EXTENSIONS - - -def path_mapping(fullpath): - if use_path_mapping: - logging.debug("Updated path: " + fullpath.replace(path_mapping_from, path_mapping_to)) - return fullpath.replace(path_mapping_from, path_mapping_to) - return fullpath - -def is_file_stable(file_path, wait_time=2, check_intervals=3): - """Returns True if the file size is stable for a given number of checks.""" - if not os.path.exists(file_path): - return False - - previous_size = -1 - for _ in range(check_intervals): - try: - current_size = os.path.getsize(file_path) - except OSError: - return False # File might still be inaccessible - - if current_size == previous_size: - return True # File is stable - previous_size = current_size - time.sleep(wait_time) - - return False # File is still changing - -if monitor: - # Define a handler class that will process new files - class NewFileHandler(FileSystemEventHandler): - def create_subtitle(self, event): - # Only process if it's a file - if not event.is_directory: - file_path = event.src_path - if has_audio(file_path): - logging.info(f"File: {path_mapping(file_path)} was added") - gen_subtitles_queue(path_mapping(file_path), transcribe_or_translate) - - def handle_event(self, event): - """Wait for stability before processing the file.""" - file_path = event.src_path - if is_file_stable(file_path): - self.create_subtitle(event) - - def on_created(self, event): - time.sleep(5) # Extra buffer time for new files - self.handle_event(event) - - def on_modified(self, event): - self.handle_event(event) - -def transcribe_existing(transcribe_folders, forceLanguage : LanguageCode | None = None): - transcribe_folders = transcribe_folders.split("|") - logging.info("Starting to search folders to see if we need to create subtitles.") - logging.debug("The folders are:") - for path in transcribe_folders: - logging.debug(path) - for root, dirs, files in os.walk(path): - for file in files: - file_path = os.path.join(root, file) - gen_subtitles_queue(path_mapping(file_path), transcribe_or_translate, forceLanguage) - # if the path specified was actually a single file and not a folder, process it - if os.path.isfile(path): - if has_audio(path): - gen_subtitles_queue(path_mapping(path), transcribe_or_translate, forceLanguage) - # Set up the observer to watch for new files - if monitor: - observer = Observer() - for path in transcribe_folders: - if os.path.isdir(path): - handler = NewFileHandler() - observer.schedule(handler, path, recursive=True) - observer.start() - logging.info("Finished searching and queueing files for transcription. Now watching for new files.") - - -if __name__ == "__main__": - import uvicorn - logging.info(f"Subgen v{subgen_version}") - logging.info(f"Threads: {str(whisper_threads)}, Concurrent transcriptions: {str(concurrent_transcriptions)}") - logging.info(f"Transcribe device: {transcribe_device}, Model: {whisper_model}") - os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" - if transcribe_folders: - transcribe_existing(transcribe_folders) - uvicorn.run("__main__:app", host="0.0.0.0", port=int(webhookport), reload=reload_script_on_change, use_colors=True)