From 64028b09778953c83e96ac6ed615554196e402af Mon Sep 17 00:00:00 2001 From: McCloudS Date: Sat, 8 Feb 2025 08:17:07 -0700 Subject: [PATCH] Remove printing the current file we're working on and remove random thread spawns? --- subgen.py | 72 +++++++++++++++++++++++++------------------------------ 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/subgen.py b/subgen.py index 394c5d1..1f0ea79 100644 --- a/subgen.py +++ b/subgen.py @@ -1,4 +1,4 @@ -subgen_version = '2025.02.63' +subgen_version = '2025.02.65' from language_code import LanguageCode from datetime import datetime @@ -124,18 +124,17 @@ in_docker = os.path.exists('/.dockerenv') docker_status = "Docker" if in_docker else "Standalone" class DeduplicatedQueue(queue.Queue): - """Queue that prevents duplicates and tracks worker ID.""" - + """Queue that prevents duplicates in both queued and in-progress tasks.""" def __init__(self): - super().__init__() # Call the superclass's __init__ - self._queued = set() - self._processing = {} # Store processing tasks by path: {path: worker_id} - self._lock = threading.Lock() + super().__init__() + self._queued = set() # Tracks paths in the queue + self._processing = set() # Tracks paths being processed + self._lock = Lock() # Ensures thread safety def put(self, item, block=True, timeout=None): with self._lock: path = item["path"] - if path not in self._queued: #and path not in self._processing: # Check against the queue + if path not in self._queued and path not in self._processing: super().put(item, block, timeout) self._queued.add(path) @@ -143,35 +142,37 @@ class DeduplicatedQueue(queue.Queue): item = super().get(block, timeout) with self._lock: path = item["path"] - self._queued.discard(path) - # Assuming each worker has a unique ID, you'd store the - # worker's ID here - worker_id = threading.get_ident() # Get the thread ID - self._processing[path] = worker_id # Store the worker's ID + self._queued.discard(path) # Remove from queued set + self._processing.add(path) # Mark as in-progress return item def task_done(self): super().task_done() with self._lock: - # The processing is over + # Assumes task_done() is called after processing the item from get() + # If your workers process multiple items per get(), adjust logic here if self.unfinished_tasks == 0: - self._processing.clear() #Reset when all the tasks are done. + self._processing.clear() # Reset when all tasks are done def is_processing(self): + """Return True if any tasks are being processed.""" with self._lock: return len(self._processing) > 0 def is_idle(self): + """Return True if queue is empty AND no tasks are processing.""" return self.empty() and not self.is_processing() def get_queued_tasks(self): + """Return a list of queued task paths.""" with self._lock: return list(self._queued) def get_processing_tasks(self): + """Return a list of paths being processed.""" with self._lock: - return list(self._processing.keys()) # Return the list of paths, which is what we want. - + return list(self._processing) + #start queue task_queue = DeduplicatedQueue() @@ -239,35 +240,26 @@ logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("watchfiles").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) -#This forces a flush to print progress correctly -global last_print_times # Use a dictionary to store last_print_time per file -last_print_times = {} # Initialize it +last_print_time = None +#This forces a flush to print progress correctly def progress(seek, total): sys.stdout.flush() sys.stderr.flush() - if docker_status == 'Docker': - # Determine the file based on the worker's ID - worker_id = threading.get_ident() - processing_file = None - with task_queue._lock: # Access the queue safely - for path, worker in task_queue._processing.items(): - if worker == worker_id: - processing_file = path - break - - if processing_file is None: - # This can happen briefly as the task starts or finishes - # or if the process is somehow inconsistent. - return - - if processing_file not in last_print_times: - last_print_times[processing_file] = None + if(docker_status) == 'Docker': + global last_print_time + # Get the current time current_time = time.time() - if (last_print_times[processing_file] is None or (current_time - last_print_times[processing_file]) >= 5): - last_print_times[processing_file] = current_time + + # Check if 5 seconds have passed since the last print + if last_print_time is None or (current_time - last_print_time) >= 5: + # Update the last print time + last_print_time = current_time + # Log the message logging.info("") - logging.debug(f"Processing file: {processing_file}") # Use the derived file path + if concurrent_transcriptions == 1: + processing = task_queue.get_processing_tasks()[0] + logging.debug(f"Processing file: {processing}") TIME_OFFSET = 5