[torch.multiprocessing] BrokenPipeError: [Errno 32] Broken pipe

The setting

I have made a dataloader that uses torch.multiprocessing (self-written workers, not the ones inside to torch.data.utils.dataloader). Basic idea is

  • workers are initialised (with an input queue and output queue passed to them)
  • workers push to output queue and that queue is read by a global loop
  • once input and output queues are exhausted, the epoch ends, and a new loop begins with the input queues being repopulated again.

Check this pseudo-code

class myDataloader:

    def __init__(self, patientSlicesList, numWorkers, batchSize) -> None:
        ...        
        self._initWorkers()
    
    def _initWorkers(self):
        
        # Step 1 - Initialize vas
        self.workerProcesses    = []
        self.workerInputQueues = [torchMP.Queue() for _ in range(self.numWorkers)]
        self.workerOutputQueue = torchMP.Queue()
        
        for workerId in range(self.numWorkers):
                p = torchMP.Process(target=getSlice, args=(workerId, self.workerInputQueues[workerId], self.workerOutputQueue))
                p.start()

    def fillInputQueues(self):
        """
        This function allows to split patients and slices across workers. One can implement custom logic here.
        """
        patientNames = list(self.patientSlicesList.keys())
        for workerId in range(self.numWorkers):
            idxs = ...
            for patientName in patientNames[idxs]:
                for sliceId in self.patientSlicesList[patientName]:
                    self.workerInputQueues[workerId].put((patientName, sliceId))
    
    def emptyAllQueues(self):
        # empties the self.workerInputQueues and self.workerOutputQueue

    def __iter__(self):
        
        try:
            # Step 0 - Init
            self.fillInputQueues() # once for each epoch
            batchArray, batchMeta = [], []
            
            # Step 1 - Continuously yield results
            while True:
                if not self.workerOutputQueue.empty():

                    # Step 2.1 - Get data point
                    patientSliceArray, patientName, sliceId = self.workerOutputQueue.get(timeout=QUEUE_TIMEOUT)
                    
                    # Step 2.2 - Append to batch
                    ...
                    
                    # Step 2.3 - Yield batch
                    if len(batchArray) == self.batchSize:
                        batchArray = collate_tensor_fn(batchArray) 
                        yield batchArray, batchMeta
                        batchArray, batchMeta = [], []
                    
                    # Step 3 - End condition
                    if np.all([self.workerInputQueues[i].empty() for i in range(self.numWorkers)]) and self.workerOutputQueue.empty():
                        break 
        
        except GeneratorExit:
            self.emptyAllQueues()
        
        except KeyboardInterrupt:
            self.closeProcesses()

        except:
            traceback.print_exc()    

    def closeProcesses(self):
        pass

if __name__ == "__main__":
    
    # Step 1 - Setup patient slices (fixed count of slices per patient)
    patientSlicesList = {
        'P1': [45, 62, 32, 21, 69]
        , 'P2': [13, 23, 87, 54, 5]
        , 'P3': [34, 56, 78, 90, 12]
        , 'P4': [34, 56, 78, 90, 12]
        , 'P5': [45, 62, 32, 21, 69]
        , 'P6': [13, 23, 87, 54, 5]
        , 'P7': [34, 56, 78, 90, 12]
        , 'P8': [34, 56, 78, 90, 12, 21]
    }
    workerCount, batchSize, epochs = 4, 1, 3

    # Step 2 - Create new dataloader
    dataloaderNew = None
    try:
        dataloaderNew = myDataloader(patientSlicesList, numWorkers=workerCount, batchSize=batchSize)
        print ('\n - [main] Iterating over (my) dataloader...')
        for epochId in range(epochs):
            with tqdm.tqdm(total=len(dataset), desc=' - Epoch {}/{}'.format(epochId+1, epochs)) as pbar:
                for i, (X, meta) in enumerate(dataloaderNew):
                    print (' - [main] {}'.format(meta.tolist()))
                    pbar.update(X.shape[0])
        
        dataloaderNew.closeProcesses()
    
    except KeyboardInterrupt:
        if dataloader is not None: dataloader.closeProcesses()

    except:
        traceback.print_exc()
        if dataloaderNew is not None: dataloaderNew.closeProcesses()      
  • getSlice() - This is the main logic function that returns a slice from a patient volume.

More details are in a blog post here and sample code here


The problem

However, I randomly face BrokenPipeError: [Errno 32] Broken pipe errors.

 - dice: 0.1506  - [] workerInputQueues: [1285, 1379, 1350, 1325, 1361] || workerOutputQueue: 343:  17%|█▋        | 1536/8910 [7:28:03<228:00:04, 111.31s/it]
Traceback (most recent call last):
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/resource_sharer.py", line 138, in _serve
    with self._listener.accept() as conn:
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 465, in accept
    deliver_challenge(c, self._authkey)
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 738, in deliver_challenge
    connection.send_bytes(CHALLENGE + message)
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

I was unable to find solutions on how to deal with this in my main loop (check def __iter__(self) here.

Does anyone have a solution to this? I realise that my question might be a bit vague, so please let me know if I need to provide additional details. You may also not be able to reproduce this problem with the sample data gist above. I face this error when running it on a larger and more complex dataset.

Small update
I also get this message in my terminal (after the traceback)

Message from syslogd@res-hpc-lkeb08 at Oct  4 14:30:23 ...
kernel:watchdog: BUG: soft lockup - CPU#0 stuck for 22s! [python:2092402]