The setting
I have made a dataloader that uses torch.multiprocessing (self-written workers, not the ones inside to torch.data.utils.dataloader
). Basic idea is
- workers are initialised (with an input queue and output queue passed to them)
- workers push to output queue and that queue is read by a global loop
- once input and output queues are exhausted, the epoch ends, and a new loop begins with the input queues being repopulated again.
Check this pseudo-code
class myDataloader:
def __init__(self, patientSlicesList, numWorkers, batchSize) -> None:
...
self._initWorkers()
def _initWorkers(self):
# Step 1 - Initialize vas
self.workerProcesses = []
self.workerInputQueues = [torchMP.Queue() for _ in range(self.numWorkers)]
self.workerOutputQueue = torchMP.Queue()
for workerId in range(self.numWorkers):
p = torchMP.Process(target=getSlice, args=(workerId, self.workerInputQueues[workerId], self.workerOutputQueue))
p.start()
def fillInputQueues(self):
"""
This function allows to split patients and slices across workers. One can implement custom logic here.
"""
patientNames = list(self.patientSlicesList.keys())
for workerId in range(self.numWorkers):
idxs = ...
for patientName in patientNames[idxs]:
for sliceId in self.patientSlicesList[patientName]:
self.workerInputQueues[workerId].put((patientName, sliceId))
def emptyAllQueues(self):
# empties the self.workerInputQueues and self.workerOutputQueue
def __iter__(self):
try:
# Step 0 - Init
self.fillInputQueues() # once for each epoch
batchArray, batchMeta = [], []
# Step 1 - Continuously yield results
while True:
if not self.workerOutputQueue.empty():
# Step 2.1 - Get data point
patientSliceArray, patientName, sliceId = self.workerOutputQueue.get(timeout=QUEUE_TIMEOUT)
# Step 2.2 - Append to batch
...
# Step 2.3 - Yield batch
if len(batchArray) == self.batchSize:
batchArray = collate_tensor_fn(batchArray)
yield batchArray, batchMeta
batchArray, batchMeta = [], []
# Step 3 - End condition
if np.all([self.workerInputQueues[i].empty() for i in range(self.numWorkers)]) and self.workerOutputQueue.empty():
break
except GeneratorExit:
self.emptyAllQueues()
except KeyboardInterrupt:
self.closeProcesses()
except:
traceback.print_exc()
def closeProcesses(self):
pass
if __name__ == "__main__":
# Step 1 - Setup patient slices (fixed count of slices per patient)
patientSlicesList = {
'P1': [45, 62, 32, 21, 69]
, 'P2': [13, 23, 87, 54, 5]
, 'P3': [34, 56, 78, 90, 12]
, 'P4': [34, 56, 78, 90, 12]
, 'P5': [45, 62, 32, 21, 69]
, 'P6': [13, 23, 87, 54, 5]
, 'P7': [34, 56, 78, 90, 12]
, 'P8': [34, 56, 78, 90, 12, 21]
}
workerCount, batchSize, epochs = 4, 1, 3
# Step 2 - Create new dataloader
dataloaderNew = None
try:
dataloaderNew = myDataloader(patientSlicesList, numWorkers=workerCount, batchSize=batchSize)
print ('\n - [main] Iterating over (my) dataloader...')
for epochId in range(epochs):
with tqdm.tqdm(total=len(dataset), desc=' - Epoch {}/{}'.format(epochId+1, epochs)) as pbar:
for i, (X, meta) in enumerate(dataloaderNew):
print (' - [main] {}'.format(meta.tolist()))
pbar.update(X.shape[0])
dataloaderNew.closeProcesses()
except KeyboardInterrupt:
if dataloader is not None: dataloader.closeProcesses()
except:
traceback.print_exc()
if dataloaderNew is not None: dataloaderNew.closeProcesses()
getSlice()
- This is the main logic function that returns a slice from a patient volume.
More details are in a blog post here and sample code here
The problem
However, I randomly face BrokenPipeError: [Errno 32] Broken pipe
errors.
- dice: 0.1506 - [] workerInputQueues: [1285, 1379, 1350, 1325, 1361] || workerOutputQueue: 343: 17%|█▋ | 1536/8910 [7:28:03<228:00:04, 111.31s/it]
Traceback (most recent call last):
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/resource_sharer.py", line 138, in _serve
with self._listener.accept() as conn:
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 465, in accept
deliver_challenge(c, self._authkey)
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 738, in deliver_challenge
connection.send_bytes(CHALLENGE + message)
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/exports/lkeb-hpc/ppmody/anaconda3/envs/interactive-refiment/lib/python3.9/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
I was unable to find solutions on how to deal with this in my main loop (check def __iter__(self)
here.
Does anyone have a solution to this? I realise that my question might be a bit vague, so please let me know if I need to provide additional details. You may also not be able to reproduce this problem with the sample data gist above. I face this error when running it on a larger and more complex dataset.