From e198299aed3efaeccc6dd89ecfdd88cae19eefd4 Mon Sep 17 00:00:00 2001 From: McCloudS <64094529+McCloudS@users.noreply.github.com> Date: Sun, 2 Feb 2025 13:52:08 -0700 Subject: [PATCH] Properly de-duplicate the queue and processing --- subgen.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/subgen.py b/subgen.py index 87fc70c..61689e4 100644 --- a/subgen.py +++ b/subgen.py @@ -1,7 +1,8 @@ -subgen_version = '2025.01.02' +subgen_version = '2025.01.03' from language_code import LanguageCode from datetime import datetime +from threading import Lock import os import json import xml.etree.ElementTree as ET @@ -125,7 +126,7 @@ docker_status = "Docker" if in_docker else "Standalone" last_print_time = None #start queue -task_queue = queue.Queue() +task_queue = DeduplicatedQueue() def transcription_worker(): while True: @@ -145,6 +146,37 @@ def transcription_worker(): for _ in range(concurrent_transcriptions): threading.Thread(target=transcription_worker, daemon=True).start() +class DeduplicatedQueue(queue.Queue): + """Queue that prevents duplicates in both queued and in-progress tasks.""" + def __init__(self): + super().__init__() + self._queued = set() # Tracks paths in the queue + self._processing = set() # Tracks paths being processed + self._lock = Lock() # Ensures thread safety + + def put(self, item, block=True, timeout=None): + with self._lock: + path = item["path"] + if path not in self._queued and path not in self._processing: + super().put(item, block, timeout) + self._queued.add(path) + + def get(self, block=True, timeout=None): + item = super().get(block, timeout) + with self._lock: + path = item["path"] + self._queued.discard(path) # Remove from queued set + self._processing.add(path) # Mark as in-progress + return item + + def task_done(self): + super().task_done() + with self._lock: + # Assumes task_done() is called after processing the item from get() + # If your workers process multiple items per get(), adjust logic here + if self.unfinished_tasks == 0: + self._processing.clear() # Reset when all tasks are done + # Define a filter class to hide common logging we don't want to see class MultiplePatternsFilter(logging.Filter): def filter(self, record):