Properly de-duplicate the queue and processing
This commit is contained in:
36
subgen.py
36
subgen.py
@@ -1,7 +1,8 @@
|
|||||||
subgen_version = '2025.01.02'
|
subgen_version = '2025.01.03'
|
||||||
|
|
||||||
from language_code import LanguageCode
|
from language_code import LanguageCode
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from threading import Lock
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
@@ -125,7 +126,7 @@ docker_status = "Docker" if in_docker else "Standalone"
|
|||||||
last_print_time = None
|
last_print_time = None
|
||||||
|
|
||||||
#start queue
|
#start queue
|
||||||
task_queue = queue.Queue()
|
task_queue = DeduplicatedQueue()
|
||||||
|
|
||||||
def transcription_worker():
|
def transcription_worker():
|
||||||
while True:
|
while True:
|
||||||
@@ -145,6 +146,37 @@ def transcription_worker():
|
|||||||
for _ in range(concurrent_transcriptions):
|
for _ in range(concurrent_transcriptions):
|
||||||
threading.Thread(target=transcription_worker, daemon=True).start()
|
threading.Thread(target=transcription_worker, daemon=True).start()
|
||||||
|
|
||||||
|
class DeduplicatedQueue(queue.Queue):
|
||||||
|
"""Queue that prevents duplicates in both queued and in-progress tasks."""
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self._queued = set() # Tracks paths in the queue
|
||||||
|
self._processing = set() # Tracks paths being processed
|
||||||
|
self._lock = Lock() # Ensures thread safety
|
||||||
|
|
||||||
|
def put(self, item, block=True, timeout=None):
|
||||||
|
with self._lock:
|
||||||
|
path = item["path"]
|
||||||
|
if path not in self._queued and path not in self._processing:
|
||||||
|
super().put(item, block, timeout)
|
||||||
|
self._queued.add(path)
|
||||||
|
|
||||||
|
def get(self, block=True, timeout=None):
|
||||||
|
item = super().get(block, timeout)
|
||||||
|
with self._lock:
|
||||||
|
path = item["path"]
|
||||||
|
self._queued.discard(path) # Remove from queued set
|
||||||
|
self._processing.add(path) # Mark as in-progress
|
||||||
|
return item
|
||||||
|
|
||||||
|
def task_done(self):
|
||||||
|
super().task_done()
|
||||||
|
with self._lock:
|
||||||
|
# Assumes task_done() is called after processing the item from get()
|
||||||
|
# If your workers process multiple items per get(), adjust logic here
|
||||||
|
if self.unfinished_tasks == 0:
|
||||||
|
self._processing.clear() # Reset when all tasks are done
|
||||||
|
|
||||||
# Define a filter class to hide common logging we don't want to see
|
# Define a filter class to hide common logging we don't want to see
|
||||||
class MultiplePatternsFilter(logging.Filter):
|
class MultiplePatternsFilter(logging.Filter):
|
||||||
def filter(self, record):
|
def filter(self, record):
|
||||||
|
|||||||
Reference in New Issue
Block a user