Dataloader produces BrokenPipeError: [Errno 32] Broken pipe error

I tired to create a simple dataset (SVHN) using the code below, but it always generates the error :

BrokenPipeError: [Errno 32] Broken pipe

I have no idea why its failing! I used both 1 and 2 workers (num_works) and it doesn’t make any difference.
Here is the actual code snippet that results in error :

class SVHN_Dataset(torch.utils.data.Dataset):
    def __init__(self, img_size, split='train', label_mask_size = 1000):
        super().__init__()
        self.split = split.strip().lower()
        self.img_size = img_size
        self.mask_size = label_mask_size
        trans = transforms.Compose([transforms.Resize(img_size),
                                    transforms.ToTensor(),
                                    transforms.Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5))])
        self.dataset = datasets.SVHN(root='./SVHN',split=self.split, transform = trans, download=True)
        self._create_label_mask()
    
    def _is_train_split(self):
        return True if self.split == 'train' else False
    
    def _create_label_mask(self):
        if not self._is_train_split():
            self.mask = None
        mask = np.zeros(shape=(len(self.dataset)))    
        mask[0 : self.mask_size] = 1
        np.random.shuffle(mask)
        self.mask = torch.LongTensor(mask)
        

    def __len__(self):
        return len(self.dataset) 
    
    def __getitem__(self, idx):
        imgs, labels = self.dataset.__getitem__(idx)
        if self.is_train_split():
            return imgs, labels, self.mask[idx]
        return imgs, labels


# get data loaders and view a batch of images 
def get_loaders(batch_size=32, image_size=32, mask_size=1000):
    data_loader_train = torch.utils.data.DataLoader(SVHN_Dataset(image_size,split='train',label_mask_size=mask_size),
                                                    batch_size=batch_size,
                                                    shuffle=True,
                                                    num_workers = 1)
    data_loader_test = torch.utils.data.DataLoader(SVHN_Dataset(image_size, split='test'),
                                                   batch_size=batch_size, num_workers=2)
    return data_loader_train, data_loader_test

dataloader_train, dataloader_test = get_loaders(batch_size=32, image_size=32, mask_size=1000) 

def visualize(imgs, rows=2, cols=10):
    fig = plt.figure(figsize=(cols,rows))
    plt.subplots_adjust(wspace=0,hspace=0)
    imgs_numpy = imgs.detach().cpu().numpy().transpose(0,2,3,1)
    for i in range(imgs.shape[0]):
        ax = fig.add_subplot(rows, cols, i+1, xticks=[], yticks=[])
        #denormalize 
        #imgs = (imgs/0.5) + 0.5
        img = (imgs[i] + imgs[i].min())*255 // (imgs[i].max() - imgs[i].min())
        
        ax.imshow(img)

imgs, labels, masks = next(iter(dataloader_train))
visualize(imgs)

imgs, labels = next(iter(dataloader_test))
visualize(imgs)

This is the full stack-trace :

Using downloaded and verified file: SVHN\train_32x32.mat
Using downloaded and verified file: SVHN\test_32x32.mat
---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
 in ()
     23         ax.imshow(img)
     24 
---> 25 imgs, labels, masks = next(iter(dataloader_train))
     26 visualize(imgs)
     27 

C:\Users\userx\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in __iter__(self)
    191 
    192     def __iter__(self):
--> 193         return _DataLoaderIter(self)
    194 
    195     def __len__(self):

C:\Users\userx\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in __init__(self, loader)
    467                 #     before it starts, and __del__ tries to join but will get:
    468                 #     AssertionError: can only join a started process.
--> 469                 w.start()
    470                 self.index_queues.append(index_queue)
    471                 self.workers.append(w)

C:\Users\userx\Anaconda3\lib\multiprocessing\process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         # Avoid a refcycle if the target function holds an indirect

C:\Users\userx\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

C:\Users\userx\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

C:\Users\userx\Anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     63             try:
     64                 reduction.dump(prep_data, to_child)
---> 65                 reduction.dump(process_obj, to_child)
     66             finally:
     67                 set_spawning_popen(None)

C:\Users\userx\Anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

BrokenPipeError: [Errno 32] Broken pipe
[13]
2 Likes

Setting num_workers=0 actually showed the real problem which simply was :
AttributeError: 'SVHN_Dataset' object has no attribute 'is_train_split'
I simply had a misspelling and that was causing the issue. basically as noted here and here :

this error only occurs when you try to do multiprocessing on some code with errors in it.

hope this is useful for somebody out there!

1 Like