I am trying to implement a bounded buffer like approach where data generator and the model work as asynchronous processes. The data generator preprocess the data and stores in a shared queue (with predefined max size to limit the memory usage). The model on the other hand consumes data from this queue at its own pace until the queue is empty. Below is the snippet of my implementation.
''' self._buffer is an object of multiprocessing.Queue ''' def produce(self): for obj in self._generator: self._buffer.put(obj=obj, block=True, timeout=None) self._buffer.put(obj=None) def consume(self): while True: dat = self._buffer.get(block=True, timeout=None) if dat is None: break else: # Train model on `dat` def run(self): pt = multiprocessing.Process(target=self.produce) ct = multiprocessing.Process(target=self.consume) pt.start() ct.start() pt.join() ct.join()
I also set
multiprocessing.set_start_method('spawn') but I get an error
generator object cannot be pickled.
What is the correct way to achieve this?