import random, threading, time import Queue as queue # producer/consumer queue, with (potentially) multiple consumers class PCQueue: def __init__(self, minthreads, maxthreads=0, qsize=0, name=None, q=None): if not name: name = self.__class__.__name__ if maxthreads == 0: maxthreads = minthreads if minthreads > maxthreads: raise ValueError('minthreads may not be greater than max') if not q: q = queue.Queue() self.q = q self.running = True self.name = name self.minthreads = minthreads self.maxthreads = maxthreads self.threadcount = 0 for i in range(minthreads): self.add_consumer() if maxthreads > minthreads: self.put = self._put else: self.put = lambda obj: self.q.put(obj) def run_consumer(self): while self.running: try: obj = self.next() except queue.Empty: if self.threadcount > self.minthreads: # (30) slows down thread destruction w/ minimal overhead if not random.randrange(30): break continue self.consume(obj) self.q.mutex.acquire() self.threadcount -= 1 self.q.mutex.release() def next(self): """ Return an object to call consume on. Should raise queue.Empty if nothing is available to process. Should not block indefinitely. """ return self.q.get(timeout=1) def _put(self, obj): # this method is unused if maxthreads == minthreads self.q.put(obj) self.q.mutex.acquire() try: if self.q._qsize() > 1 and self.threadcount < self.maxthreads: self.add_consumer() finally: self.q.mutex.release() def consume(self, obj): raise NotImplementedError() def add_consumer(self): self.threadcount += 1 th = threading.Thread(target=self.run_consumer, name='%s consumer %d' % (self.name, self.threadcount)) th.start() def stop(self): self.running = False def join(self): while self.q.qsize(): time.sleep(0.1) self.stop() while self.threadcount: time.sleep(0.1) def __repr__(self): return '%s(%s, %s)' % (self.__class__.__name__, self.threadcount, self.q) def __getattr__(self, name): return getattr(self.q, name)