I tired to create a simple dataset (SVHN
) using the code below, but it always generates the error :
BrokenPipeError: [Errno 32] Broken pipe
I have no idea why its failing! I used both 1 and 2 workers (num_works
) and it doesn’t make any difference.
Here is the actual code snippet that results in error :
class SVHN_Dataset(torch.utils.data.Dataset):
def __init__(self, img_size, split='train', label_mask_size = 1000):
super().__init__()
self.split = split.strip().lower()
self.img_size = img_size
self.mask_size = label_mask_size
trans = transforms.Compose([transforms.Resize(img_size),
transforms.ToTensor(),
transforms.Normalize(mean=(0.5,0.5,0.5), std=(0.5,0.5,0.5))])
self.dataset = datasets.SVHN(root='./SVHN',split=self.split, transform = trans, download=True)
self._create_label_mask()
def _is_train_split(self):
return True if self.split == 'train' else False
def _create_label_mask(self):
if not self._is_train_split():
self.mask = None
mask = np.zeros(shape=(len(self.dataset)))
mask[0 : self.mask_size] = 1
np.random.shuffle(mask)
self.mask = torch.LongTensor(mask)
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx):
imgs, labels = self.dataset.__getitem__(idx)
if self.is_train_split():
return imgs, labels, self.mask[idx]
return imgs, labels
# get data loaders and view a batch of images
def get_loaders(batch_size=32, image_size=32, mask_size=1000):
data_loader_train = torch.utils.data.DataLoader(SVHN_Dataset(image_size,split='train',label_mask_size=mask_size),
batch_size=batch_size,
shuffle=True,
num_workers = 1)
data_loader_test = torch.utils.data.DataLoader(SVHN_Dataset(image_size, split='test'),
batch_size=batch_size, num_workers=2)
return data_loader_train, data_loader_test
dataloader_train, dataloader_test = get_loaders(batch_size=32, image_size=32, mask_size=1000)
def visualize(imgs, rows=2, cols=10):
fig = plt.figure(figsize=(cols,rows))
plt.subplots_adjust(wspace=0,hspace=0)
imgs_numpy = imgs.detach().cpu().numpy().transpose(0,2,3,1)
for i in range(imgs.shape[0]):
ax = fig.add_subplot(rows, cols, i+1, xticks=[], yticks=[])
#denormalize
#imgs = (imgs/0.5) + 0.5
img = (imgs[i] + imgs[i].min())*255 // (imgs[i].max() - imgs[i].min())
ax.imshow(img)
imgs, labels, masks = next(iter(dataloader_train))
visualize(imgs)
imgs, labels = next(iter(dataloader_test))
visualize(imgs)
This is the full stack-trace :
Using downloaded and verified file: SVHN\train_32x32.mat
Using downloaded and verified file: SVHN\test_32x32.mat
---------------------------------------------------------------------------
BrokenPipeError Traceback (most recent call last)
in ()
23 ax.imshow(img)
24
---> 25 imgs, labels, masks = next(iter(dataloader_train))
26 visualize(imgs)
27
C:\Users\userx\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in __iter__(self)
191
192 def __iter__(self):
--> 193 return _DataLoaderIter(self)
194
195 def __len__(self):
C:\Users\userx\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in __init__(self, loader)
467 # before it starts, and __del__ tries to join but will get:
468 # AssertionError: can only join a started process.
--> 469 w.start()
470 self.index_queues.append(index_queue)
471 self.workers.append(w)
C:\Users\userx\Anaconda3\lib\multiprocessing\process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 # Avoid a refcycle if the target function holds an indirect
C:\Users\userx\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
221 @staticmethod
222 def _Popen(process_obj):
--> 223 return _default_context.get_context().Process._Popen(process_obj)
224
225 class DefaultContext(BaseContext):
C:\Users\userx\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
320 def _Popen(process_obj):
321 from .popen_spawn_win32 import Popen
--> 322 return Popen(process_obj)
323
324 class SpawnContext(BaseContext):
C:\Users\userx\Anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
63 try:
64 reduction.dump(prep_data, to_child)
---> 65 reduction.dump(process_obj, to_child)
66 finally:
67 set_spawning_popen(None)
C:\Users\userx\Anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
58 def dump(obj, file, protocol=None):
59 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60 ForkingPickler(file, protocol).dump(obj)
61
62 #
BrokenPipeError: [Errno 32] Broken pipe
[13]