From 6eaa5e746fbed91114bba63956453e25c29e19c2 Mon Sep 17 00:00:00 2001 From: McCloudS Date: Thu, 6 Feb 2025 11:39:00 -0700 Subject: [PATCH] Try to fix file updates in progress --- subgen.py | 73 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/subgen.py b/subgen.py index 40375ba..1bf2b69 100644 --- a/subgen.py +++ b/subgen.py @@ -1,4 +1,4 @@ -subgen_version = '2025.02.59' +subgen_version = '2025.02.60' from language_code import LanguageCode from datetime import datetime @@ -121,20 +121,20 @@ model = None in_docker = os.path.exists('/.dockerenv') docker_status = "Docker" if in_docker else "Standalone" -last_print_time = None class DeduplicatedQueue(queue.Queue): - """Queue that prevents duplicates in both queued and in-progress tasks.""" - def __init__(self): - super().__init__() - self._queued = set() # Tracks paths in the queue - self._processing = set() # Tracks paths being processed - self._lock = Lock() # Ensures thread safety + """Queue that prevents duplicates and tracks worker ID.""" + + def init(self): + super().init() + self._queued = set() + self._processing = {} # Store processing tasks by path: {path: worker_id} + self._lock = threading.Lock() def put(self, item, block=True, timeout=None): with self._lock: path = item["path"] - if path not in self._queued and path not in self._processing: + if path not in self._queued : #and path not in self._processing: # Check against the queue super().put(item, block, timeout) self._queued.add(path) @@ -142,37 +142,35 @@ class DeduplicatedQueue(queue.Queue): item = super().get(block, timeout) with self._lock: path = item["path"] - self._queued.discard(path) # Remove from queued set - self._processing.add(path) # Mark as in-progress + self._queued.discard(path) + # Assuming each worker has a unique ID, you'd store the + # worker's ID here + worker_id = threading.get_ident() # Get the thread ID + self._processing[path] = worker_id # Store the worker's ID return item def task_done(self): super().task_done() with self._lock: - # Assumes task_done() is called after processing the item from get() - # If your workers process multiple items per get(), adjust logic here + # The processing is over if self.unfinished_tasks == 0: - self._processing.clear() # Reset when all tasks are done + self._processing.clear() #Reset when all the tasks are done. def is_processing(self): - """Return True if any tasks are being processed.""" with self._lock: return len(self._processing) > 0 def is_idle(self): - """Return True if queue is empty AND no tasks are processing.""" return self.empty() and not self.is_processing() def get_queued_tasks(self): - """Return a list of queued task paths.""" with self._lock: return list(self._queued) def get_processing_tasks(self): - """Return a list of paths being processed.""" with self._lock: - return list(self._processing) - + return list(self._processing.keys()) # Return the list of paths, which is what we want. + #start queue task_queue = DeduplicatedQueue() @@ -241,23 +239,34 @@ logging.getLogger("watchfiles").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) #This forces a flush to print progress correctly +global last_print_times # Use a dictionary to store last_print_time per file +last_print_times = {} # Initialize it + def progress(seek, total): sys.stdout.flush() sys.stderr.flush() - if(docker_status) == 'Docker': - global last_print_time - # Get the current time + if docker_status == 'Docker': + # Determine the file based on the worker's ID + worker_id = threading.get_ident() + processing_file = None + with task_queue._lock: # Access the queue safely + for path, worker in task_queue._processing.items(): + if worker == worker_id: + processing_file = path + break + + if processing_file is None: + # This can happen briefly as the task starts or finishes + # or if the process is somehow inconsistent. + return + + if processing_file not in last_print_times: + last_print_times[processing_file] = None current_time = time.time() - - # Check if 5 seconds have passed since the last print - if last_print_time is None or (current_time - last_print_time) >= 5: - # Update the last print time - last_print_time = current_time - # Log the message + if (last_print_times[processing_file] is None or (current_time - last_print_times[processing_file]) >= 5): + last_print_times[processing_file] = current_time logging.info("") - if concurrent_transcriptions == 1: - processing = task_queue.get_processing_tasks()[0] - logging.debug(f"Processing file: {processing}") + logging.debug(f"Processing file: {processing_file}") # Use the derived file path TIME_OFFSET = 5