Loading huge data functionality

Out of curiosity, have you tried using ConcatDataset? I noticed that seemed to be the answer in a similar question someone had.

Hi, all my data is included in a multi-data.pt file, which includes all my training and validation input and labels. Do you have any idea how I can write a dataset class without loading the whole multi-data.pt file into memory?

What .pt stands for? Do you have a reader that can open it “out of core”? That is, load in memory only certain part of the file (eg the one needed to be read). If that reader is a library make sure it is thread/process safe (Hint: I (and others) learned the hard way that HDF5 is not).

While HDF5 have the above features (out of core reading/writting) it does not work with multiprocessing.

I am using memory mapped files (from numpy) but I am using fast.ai and have a custom “loader” that works with fast.ai: https://forums.fast.ai/t/out-of-core-data-block-itemlist-backed-up-by-memmap-files/39566 Hope it helps.

It is a pytorch file. It can symply be loaded by torch.load, but I dont want to load all into memory.

For those trying to load text data efficiently, you could leverage linecache and subprocess.
This works for the case when we have one huge file, lets say 100+ GB, where every row is one training example.

class LazyTextDataset(Dataset):
    def __init__(self, filename):
        self._filename = filename
        self._total_data = 0
        self._total_data = int(subprocess.check_output("wc -l " + filename, shell=True).split()[0])

    def __getitem__(self, idx):
        line = linecache.getline(self._filename, idx + 1)
        csv_line = csv.reader([line])
        return next(csv_line)
      
    def __len__(self):
        return self._total_data
6 Likes

Hi @lan2720, have you figured out a way to load a large text file? I have a huge CSV file of 5.9G, in which each line is a text with its label, after converting the whole file to embeddings it consume over 50G memory, which is not practical for me.

I can save each line to a file like images, but I doubt that this way is suitable for image data instead of text data.

correlated question
Hi I think I meet a problem correlated with what you wrote. Could you please take a look?

I am trying to load 2 Terabyte data of 1200 images, and each tensor image is of size 2 Gigs. I am using a typical Dataloader method to load data with 12 batch size for six GPUs. It is trying to load all the image on ram and then batch it on to GPUs. It is a waste of resources as GPUs are just waiting for all the images to load. Is there a better way to deal with such situations such that as soon as the batch size is loaded the GPUs can start to compute rather than waiting.

You could try to lazily load each data sample in order to avoid preloading the whole dataset.
Using multiple workers might hide the loading time, so that your GPU won’t be starving.

Hi, I tried that yesterday, and it seems worse. It is taking almost 2 hours to complete one epoch with the mean computation time of 3 secs for each batch. What I found out is that with an increase in the number of workers to 24, it is taking more time to load than at 0.

1 Like

A very high number of workers might decrease the performance.
Do you see any speedup using less workers, e.g. 4?

I tried with worker 4. Same time to load data. But loading large batch size for worker 4 gives os.fork() memory allocation error. Also how to work around not using input.cuda(). I get OOM error since 24*2 Gb is more than my GPU memory. How to split the data before passing the input.cuda() to model.

@apaszke Hi, I am a new comer for this topic and thank you for all the responsible replys.

However my situation is not mentioned in the discussion. Now I got a serialized video labels data, like the frame ID + bbox + keypoint. NO actural image data is included so each data frame is really light. The problem is I got millions of frames and they add up to about 60G.

My server has a total CPU memory of around 250G, which seems to be quite enough. But when I am using DDP in pytorch, the data is indenpendently loaded by 8 processes, which gives a OOM error. (since 60 * 8 >> 250)

The datas are in form of protobuf, and it is not practical to split it since the algorithm requires to truncate part of the dataframe. An intergrated dataset should be accessible to any of the 8 processes in DDP (started by torch.distributed.launch)

In conclusion I merely need 60G memory space but DDP starts eight independent processes and copies 8 ectypes of it, which causes an out-of-memory. I would like to ask if there are any way to share the same CPU memory across all processes started by torch launcher manually, thanks a lot~

When I used dataset and dataloader to load the data, I found that the data read using the code below would not release their CPU memory at the beginning of the next iteration, and the memory usage reached the highest point with the end of the first epoch. At the time, when my data set exceeded the machine memory, there would be an oom error. How can I avoid it?

# -*- coding: utf-8 -*-
# @Time    : 2019/8/23 21:54
# @Author  : zhoujun
import pathlib

import cv2
import numpy as np
import scipy.io as sio
from torch.utils.data import DataLoader,Dataset


class DemoDataset(Dataset):
    def __init__(self, transform=None, **kwargs):
        self.data_list = np.zeros((100,640,640,3))
        self.transform = transform


    def  __getitem__(self,idx):
        img = self.data_list[idx]
        if self.transform is not None:
            img = self.transform(img)
        return img
    
    def __len__(self):
        return len(self.data_list)


if __name__ == '__main__':
    import time
    from torchvision import transforms

    train_data = DemoDataset(transform=transforms.ToTensor())
    train_loader = DataLoader(dataset=train_data, batch_size=1, shuffle=True, num_workers=0)
    for e in range(2):
        start = time.time()
        for i, data in enumerate(train_loader):
            if e==0 and i == len(train_loader) - 1:
                print(time.time()-start)

memory jpg

run scipt, and the figure will show

 mprof run demo.py
 mprof plot

Is the plot showing the GPU memory or the CPU RAM?
I’m not familiar with mprof, but does it show the complete memory usage of the script, i.e. with loaded libraries?
You are initializing the data_list in your __init__ method, which will take approx. 468MB.

plot showing the CPU RAM

What does the graph show, if you reduce the data_list to a tensor containing a single float value?

the graph with follow code

# -*- coding: utf-8 -*-
# @Time    : 2019/8/23 21:54
# @Author  : zhoujun
import numpy as np
from torch.utils.data import DataLoader,Dataset


class DemoDataset(Dataset):
    def __init__(self):
        pass

    def  __getitem__(self,idx):
        img = np.zeros((1,3,640,640))
        return img
    
    def __len__(self):
        return 10000


if __name__ == '__main__':
    import time
    train_data = DemoDataset()
    train_loader = DataLoader(dataset=train_data, batch_size=1, num_workers=0)
    start_run = time.time()
    for e in range(10):
        for i, img in enumerate(train_loader):
            if i == 0:
                print('epoch: {} start time: {}'.format(e,time.time()-start_run))

# -*- coding: utf-8 -*-
# @Time    : 2019/8/23 21:54
# @Author  : zhoujun
import numpy as np
from torch.utils.data import DataLoader,Dataset


class DemoDataset(Dataset):
    def __init__(self):
        pass

    def  __getitem__(self,idx):
        img = 1#np.zeros((1,3,640,640))
        return img
    
    def __len__(self):
        return 10000


if __name__ == '__main__':
    import time
    train_data = DemoDataset()
    train_loader = DataLoader(dataset=train_data, batch_size=1, num_workers=0)
    start_run = time.time()
    for e in range(10):
        for i, img in enumerate(train_loader):
            if i == 0:
                print('epoch: {} start time: {}'.format(e,time.time()-start_run))

Hi @apaszke, I use same way to load 1K+ files via dataloader, so each dataloader __getindex__ will return one file, however, I always get error like

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 761, in _try_get_data
    data = self._data_queue.get(timeout=timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/queues.py", line 104, in get
    if not self._poll(timeout):
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 257, in poll
    return self._poll(timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 414, in _poll
    r = wait([self], timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 911, in wait
    ready = selector.select(timeout)
  File "/home/miniconda/lib/python3.6/selectors.py", line 376, in select
    fd_event_list = self._poll.poll(timeout)
  File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py", line 66, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 90300) is killed by signal: Killed.

seems each worker in data loader has either memory or time limit? if we put some parsing logic there(in our case we read parquet file), it will break?

I have checked the the source code for dataloader, basically we could set a timeout

   def _try_get_data(self, timeout=_utils.MP_STATUS_CHECK_INTERVAL):
        # Tries to fetch data from `self._data_queue` once for a given timeout.
        # This can also be used as inner loop of fetching without timeout, with
        # the sender status as the loop condition.
        #
        # This raises a `RuntimeError` if any worker died expectedly. This error
        # can come from either the SIGCHLD handler in `_utils/signal_handling.py`
        # (only for non-Windows platforms), or the manual check below on errors
        # and timeouts.
        #
        # Returns a 2-tuple:
        #   (bool: whether successfully get data, any: data if successful else None)
        try:
            data = self._data_queue.get(timeout=timeout)

the default is 5s,

MP_STATUS_CHECK_INTERVAL = 5.0
r"""Interval (in seconds) to check status of processes to avoid hanging in
    multiprocessing data loading. This is mainly used in getting data from
    another process, in which case we need to periodically check whether the
    sender is alive to prevent hanging."""

what is this interval means? I try to increase timeout in multi worker definition but get same error. only difference is final error msg will be like

RuntimeError: DataLoader worker (pid 320) is killed by signal: Segmentation fault.

1 Like