DataLoader fails with custom collate_fn

I am trying to train a convolutional network using images of variable size. For this purpose I use DataLoader with custom collate_fn function.

class ImagesFromList(data.Dataset):
    def __init__(self, images):
        self.images_fn = images

    def __getitem__(self, index):
        global images
        file1 = images[self.images_fn[index][0]]
        file2 = images[self.images_fn[index][1]]
        val = self.images_fn[index][2]
        files = [file1, file2]
        return files, val
    
    def __len__(self):
        return len(self.images_fn)

loader_train = torch.utils.data.DataLoader(
        ImagesFromList(images=trainset),
        batch_size=1, shuffle=True, num_workers=1, pin_memory=True, collate_fn = my_collate
    )

It works when I use the following my_collate:

def my_collate(batch):
    elem_type = type(batch[0])
    if isinstance(batch[0], torch.Tensor):
        return torch.stack(batch, 0, out=None)
    elif isinstance(batch[0], int_classes):
        return torch.LongTensor(batch)
    elif isinstance(batch[0], collections.Sequence):
        transposed = zip(*batch)
        return [default_collate(samples) for samples in transposed]

which is just a little modified version of default_collate. But it fails when batch_size > 1 due to torch.stack. Because of this I use the following my_collate:

def my_collate(batch):
    elem_type = type(batch[0])
    if isinstance(batch[0], torch.Tensor):
        return batch
    elif isinstance(batch[0], int_classes):
        return torch.LongTensor(batch)
    elif isinstance(batch[0], collections.Sequence):
        transposed = zip(*batch)
        return [default_collate(samples) for samples in transposed]

And in this case the simple code

for i, inp in enumerate(loader_train):
    pass

fails with the error:

Exception in thread Thread-14:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/stanismorozov/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py", line 71, in _worker_manager_loop
    r = in_queue.get()
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 345, in get
    return ForkingPickler.loads(res)
  File "/home/stanismorozov/.local/lib/python3.5/site-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
    fd = df.detach()
  File "/usr/lib/python3.5/multiprocessing/resource_sharer.py", line 58, in detach
    return reduction.recv_handle(conn)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 181, in recv_handle
    return recvfds(s, 1)[0]
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 154, in recvfds
    raise EOFError
EOFError

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-55-3d1f5f03c1e9> in <module>()
----> 1 for i, inp in enumerate(loader_train):
      2     pass

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in __next__(self)
    278         while True:
    279             assert (not self.shutdown and self.batches_outstanding > 0)
--> 280             idx, batch = self._get_batch()
    281             self.batches_outstanding -= 1
    282             if idx != self.rcvd_idx:

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in _get_batch(self)
    257                 raise RuntimeError('DataLoader timed out after {} seconds'.format(self.timeout))
    258         else:
--> 259             return self.data_queue.get()
    260 
    261     def __next__(self):

/usr/lib/python3.5/queue.py in get(self, block, timeout)
    162             elif timeout is None:
    163                 while not self._qsize():
--> 164                     self.not_empty.wait()
    165             elif timeout < 0:
    166                 raise ValueError("'timeout' must be a non-negative number")

/usr/lib/python3.5/threading.py in wait(self, timeout)
    291         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    292             if timeout is None:
--> 293                 waiter.acquire()
    294                 gotit = True
    295             else:

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in handler(signum, frame)
    176         # This following call uses `waitid` with WNOHANG from C side. Therefore,
    177         # Python can still get and update the process status successfully.
--> 178         _error_if_any_worker_fails()
    179         if previous_handler is not None:
    180             previous_handler(signum, frame)

RuntimeError: DataLoader worker (pid 25564) is killed by signal: Aborted.

I don’t understand what’s going on. I’m stuck and don’t know what to do. Thank you for your help and explanation.

@stanis-morozov could you please explain how do you want to stack together in a single batch (torch.tensor) images of variable size ?
In your new collate function:

def my_collate(batch):
    elem_type = type(batch[0])
    if isinstance(batch[0], torch.Tensor):
        return batch

what is the type(batch), a list/tuple ?

I want to stack images in a list or something like this.
batch is a tuple of tensors (<class 'tuple'>).

you can try something like this:

import torch
from torch.utils.data import Dataset, DataLoader


class VarSizeImages(Dataset):
    
    def __len__(self):
        return 100
    
    def __getitem__(self, idx):
        size = torch.randint(10, 20, size=(2, ), dtype=torch.long)
        img = torch.rand(*size).numpy()
        label = torch.randint(0, 10, size=(1, ), dtype=torch.long).item()
        return img, label
        

ds = VarSizeImages()

dp = ds[0]
dp[0].shape, dp[1]

loader = DataLoader(ds, batch_size=10, collate_fn=lambda batch: [(torch.from_numpy(dp[0]), torch.tensor(dp[1])) for dp in batch])

for batch in loader:
    pass

print(len(batch), type(batch))
print(len(batch[0]), type(batch[0][0]), type(batch[0][1]))
> 10 <class 'list'>
> 2 <class 'torch.Tensor'> <class 'torch.Tensor'>

HTH

@vfdev-5, I modified your code such that it gives me the same error.

import torch
from torch.utils.data import Dataset, DataLoader

sz = 1000

images = []
for i in range(sz):
    size = torch.randint(10, 20, size=(2, ), dtype=torch.long)
    img = torch.rand(tuple(size.numpy()))
    images.append(img)
    

class VarSizeImages(Dataset):
    
    def __len__(self):
        return sz
    
    def __getitem__(self, idx):
        global images
        img = images[idx]
        label = torch.randint(0, 2, size=(1, ), dtype=torch.long).item()
        return img, label
        
def my_collate(batch):
    return [(dp[0], torch.tensor(dp[1])) for dp in batch]

ds = VarSizeImages()

loader = DataLoader(ds, batch_size=10, shuffle=True, num_workers=1, pin_memory=True, collate_fn=my_collate)

for batch in loader:
    pass

The main difference is that I don’t generate random images, but read it from the list.
I can’t understand why this code doesn’t work.

Tried the above code and no error (pytorch 0.4.1). What is your pytorch version and what is the error ?

My version is 0.4.0. The error is:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/stanismorozov/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py", line 71, in _worker_manager_loop
    r = in_queue.get()
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 345, in get
    return ForkingPickler.loads(res)
  File "/home/stanismorozov/.local/lib/python3.5/site-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
    fd = df.detach()
  File "/usr/lib/python3.5/multiprocessing/resource_sharer.py", line 58, in detach
    return reduction.recv_handle(conn)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 181, in recv_handle
    return recvfds(s, 1)[0]
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 154, in recvfds
    raise EOFError
EOFError

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-1-4216f61f2faa> in <module>()
     29 loader = DataLoader(ds, batch_size=10, shuffle=True, num_workers=1, pin_memory=True, collate_fn=my_collate)
     30 
---> 31 for batch in loader:
     32     pass

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in __next__(self)
    278         while True:
    279             assert (not self.shutdown and self.batches_outstanding > 0)
--> 280             idx, batch = self._get_batch()
    281             self.batches_outstanding -= 1
    282             if idx != self.rcvd_idx:

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in _get_batch(self)
    257                 raise RuntimeError('DataLoader timed out after {} seconds'.format(self.timeout))
    258         else:
--> 259             return self.data_queue.get()
    260 
    261     def __next__(self):

/usr/lib/python3.5/queue.py in get(self, block, timeout)
    162             elif timeout is None:
    163                 while not self._qsize():
--> 164                     self.not_empty.wait()
    165             elif timeout < 0:
    166                 raise ValueError("'timeout' must be a non-negative number")

/usr/lib/python3.5/threading.py in wait(self, timeout)
    291         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    292             if timeout is None:
--> 293                 waiter.acquire()
    294                 gotit = True
    295             else:

~/.local/lib/python3.5/site-packages/torch/utils/data/dataloader.py in handler(signum, frame)
    176         # This following call uses `waitid` with WNOHANG from C side. Therefore,
    177         # Python can still get and update the process status successfully.
--> 178         _error_if_any_worker_fails()
    179         if previous_handler is not None:
    180             previous_handler(signum, frame)

RuntimeError: DataLoader worker (pid 2092) is killed by signal: Aborted.


Take a look at this issue and maybe update pytorch …

@vfdev-5 I updated pytorch to 0.4.1 and now I get the following error:

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2963, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-16-bbe1da4f61df>", line 31, in <module>
    for batch in loader:
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 330, in __next__
    idx, batch = self._get_batch()
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 309, in _get_batch
    return self.data_queue.get()
  File "/home/stanismorozov/anaconda3/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/torch/multiprocessing/reductions.py", line 151, in rebuild_storage_fd
    fd = df.detach()
  File "/home/stanismorozov/anaconda3/lib/python3.6/multiprocessing/resource_sharer.py", line 58, in detach
    return reduction.recv_handle(conn)
  File "/home/stanismorozov/anaconda3/lib/python3.6/multiprocessing/reduction.py", line 182, in recv_handle
    return recvfds(s, 1)[0]
  File "/home/stanismorozov/anaconda3/lib/python3.6/multiprocessing/reduction.py", line 155, in recvfds
    raise EOFError
EOFError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 1863, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'EOFError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/ultratb.py", line 1095, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/ultratb.py", line 311, in wrapped
    return f(*args, **kwargs)
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/ultratb.py", line 345, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/home/stanismorozov/anaconda3/lib/python3.6/inspect.py", line 1483, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/home/stanismorozov/anaconda3/lib/python3.6/inspect.py", line 1445, in getframeinfo
    lines, lnum = findsource(frame)
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/IPython/core/ultratb.py", line 165, in findsource
    file = getsourcefile(object) or getfile(object)
  File "/home/stanismorozov/anaconda3/lib/python3.6/inspect.py", line 696, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/home/stanismorozov/anaconda3/lib/python3.6/inspect.py", line 735, in getmodule
    if f == _filesbymodname.get(modname, None):
  File "/home/stanismorozov/anaconda3/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 227, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 5944) exited unexpectedly with exit code 1. Details are lost due to multiprocessing. Rerunning with num_workers=0 may give better error trace.

---------------------------------------------------------------------------

It is interesting that if I put num_workers = 0 then it works well. Is it bug somewhere in my code?

1 Like

It is multiprocessing related, do you import cv2 or something ?
Could you provide a complete snippet of the code, please

Here is the full code:

import torch
from torch.utils.data import Dataset, DataLoader

sz = 100000

images = []
for i in range(sz):
    size = torch.randint(10, 20, size=(2, ), dtype=torch.long)
    img = torch.rand(tuple(size.numpy()))
    images.append(img)
    

class VarSizeImages(Dataset):
    
    def __len__(self):
        return sz
    
    def __getitem__(self, idx):
        global images
        img = images[idx]
        label = torch.randint(0, 2, size=(1, ), dtype=torch.long).item()
        return img, label
        
def my_collate(batch):
    return [(dp[0], torch.tensor(dp[1])) for dp in batch]

ds = VarSizeImages()

loader = DataLoader(ds, batch_size=10, shuffle=True, num_workers=1, pin_memory=True, collate_fn=my_collate)

for batch in loader:
    pass

I just run it and get the error above.

Finally, I could reproduce the error. The failing depends on the size sz and I think it is not a good way to create images globally, wrap them with a Dataset and then fork them into multiple processes.
I think it is better to produce tensors at __getitem__. And try to change num_workers=2, 4, 8

The following works for me:

import torch
from torch.utils.data import Dataset, DataLoader
    

class VarSizeImages(Dataset):
    
    def __init__(self):
        self.sz = 100000
        self.images = []
        for i in range(self.sz):
            size = torch.randint(10, 20, size=(2, ), dtype=torch.long)
            img = torch.rand(tuple(size.numpy()))
            self.images.append(img)
            
    def __len__(self):
        return self.sz
    
    def __getitem__(self, idx):
        img = self.images[idx]
        label = torch.randint(0, 2, size=(1, ), dtype=torch.long).item()
        return img, label
        
def my_collate(batch):
    return [(dp[0], torch.tensor(dp[1])) for dp in batch]

ds = VarSizeImages()

loader = DataLoader(ds, batch_size=10, shuffle=True, num_workers=2, pin_memory=True, collate_fn=my_collate)

for batch in loader:
    pass

@vfdev-5, unfortunately, your code still gives me the same error.

For anyone experiencing this error it probably has to do with collate_fn returning a list/tuple of tensors. This is explained further in an open issue.

Consider packing your data by padding it and then stacking it. You do however need to keep track of the amount of padding applied to each data entry when unpacking.

My problem was passing variable number of targets for each image. The only difference between targets were how many rows they had. So I stacked them by prepending the batch index to each row first. My code looks something like:

    def my_collate(batch):
        targets = ...
        for i in range(len(targets)):
            if len(targets[i]) > 0:
                # Some images have no targets so we need
               # this check before attempting to prepend the batch index.
                targets[i] = torch.cat(
                    [
                        i * torch.ones(targets[i].size(0), 1),
                        targets[i],
                    ],
                    dim = -1
                )
        targets = torch.cat(targets, dim=0)
        return ...

You could customize the collate functions like this

I’ve encountered the same problem recently.

If you’re using the docker to run the pytorch program, with high probability, it’s because the shared memory of docker is not big enough for running your program in the specified batchsize.

The solutions for this circumstance are:

  1. use a smaller batchsize to train your model.
  2. exit the current docker, and re-run the docker with specified “–shm-size=16g” or bigger depends on your machine.

Hope this could help those who have same problem.:+1:

Recently, I got stuck with Batch Sampler and collate_fn for vairable input sizes.
I got a way around by transforming the input (padding) before Dataloader and directly use batch_size

Right. I really feel like this is a common issue in object detection. Thanks for sharing the code