Data processing as a batch way

Hi everyone,

Here is my problem:

suppose I have a 1G .csv file, then I will process it which will make it expand to 30G. It’s unacceptable to load the whole file into memory then process it, so I’m considering to use Dataset & Dataloader to do that.

Anyone can tell me how to do that in detail?

Thanks!

5 Likes

You could try to use pandas to read the csv file in chunks.
In your Dataset read the chunks in the __getitem__ method with pd.read_csv(..., skiprows=index*chunksize, chunksize=chunksize).
Note that you have to take care of the __len__ of the dataset, since the index should now be in [0, nb_samples/chunksize].

1 Like

Thanks!

I still have some confusions when I followed this method.

  1. What does batch mean
    Suppose the csv data has (360000 - 200) lines, so I set
    chunk_size = 3600
    batch_size = 10
    then I found the len(loader) is 10 (360000 = 3600 x 10 x 10) which indicate the batch is for chunks, but not for the lines in one chunk. Every batch it grabs 10 chunks, right?

  2. In which place I should process data
    The data processing procedure can be inserted into two places, the first place is in " getitem (self, index)", the other place is to handle the data in enumerate(loader), which place is best?

  3. RuntimeError: inconsistent tensor sizes at THTensorMath.c
    I think the data is just a batch of chunks(3600 x 10) in every enumerate(loader), maybe it’s ok to send 3600 x 10 lines to Net(lines), but before this step, it will meet a RuntimeError in enumerate(loader) place. The reason is the total lines is actually less than 360000, so there must be one loader which contains 9 batches of 3600 lines plus 1 batch which is less than 3600 lines(3600 - 200). I don’t how to fix this problem, should I expand every loader into lines?

The pseudo code is like this:

class myDataset(Dataset):
def init (self, path):
# init
def getitem (self, index):
data = read_chunck(dp)
label = read_chunck(lp)
return data, label

def __len__ (self):
    return math.ceil(filesize / chunksize)


.
.
data = myDataset()
loader = torch.utils.data.DataLoader(data, batch_size, shuffle=True)

for epoch in range(nepochs):
for i, (samples, labels) in enumerate(loader): # RUNTIME ERROR
samples = Variable(samples)
labels = Variable(labels)

outputs = net(samples.float())

#!!!
return torch.cat(inputs, dim)
RuntimeError: inconsistent tensor sizes at /opt/conda/conda-bld/pytorch_1512386481460/work/torch/lib/TH/generic/THTensorMath.c:2864

  1. You are right. Every batch will grab 10 chunks of size 3600.

  2. I would suggest to preprocess your data in the __getitem__ method, since you will most likely wrap your Dataset into a DataLoader, which can load the batches using multi-processing.
    Using this, your DataLoader can grab some batches in the background, while your training loop is still busy.

  3. I have created a small example snippet to play around:

# Create dummy csv data
nb_samples = 110
a = np.arange(nb_samples)
df = pd.DataFrame(a, columns=['data'])
df.to_csv('data.csv', index=False)


# Create Dataset
class CSVDataset(Dataset):
    def __init__(self, path, chunksize, nb_samples):
        self.path = path
        self.chunksize = chunksize
        self.len = nb_samples / self.chunksize

    def __getitem__(self, index):
        x = next(
            pd.read_csv(
                self.path,
                skiprows=index * self.chunksize + 1,  #+1, since we skip the header
                chunksize=self.chunksize,
                names=['data']))
        x = torch.from_numpy(x.data.values)
        return x

    def __len__(self):
        return self.len


dataset = CSVDataset('data.csv', chunksize=10, nb_samples=nb_samples)
loader = DataLoader(dataset, batch_size=10, num_workers=1, shuffle=False)

for batch_idx, data in enumerate(loader):
    print('batch: {}\tdata: {}'.format(batch_idx, data))

The last batch will only have 10 samples.
Is this code of any help?
I think your data should be divisible without a remainder by chunksize, because otherwise the last batch will have different sizes. At least I cannot come up with a fast solution at the moment. :wink:

7 Likes

So each batch will have 100 examples if I see that correctly? In general, I would maybe recommend to convert the CSV file to HDF5, so you could have true shuffling.

Once converted, in __getitem__, you could then use something like

with h5py.File('your_dataset.h5', 'r') as h5f:
    features = h5f['features'][index])
    target = h5f['targets'][index])
return features, target

Could be a bit slower though since you’d require more __get_item__ calls (100 instead of 10) to get e.g. a batch size of 100

3 Likes

That’s exactly what I mean! It’s very helpful, thank you!

Right now I realize that the 3rd one is the key problem I met.

The real dataset usually has irregular line numbers such as 361234 which cannot be divisible without a remainder by chunk size. So the chunk size matters rather than batch size, and the lazy way to handle this runtime error is to make #lines be divisible by chunk size, right?

You are right, the lazy way it to drop the remainder of your data, if that’s possible. :wink:
However, have a look at @rasbt 's suggestion as well, which seems to have some advantages (true shuffling etc.).

Yes, thanks for your mention, I also have problems with data persistence.

  1. Split data into train_set and test_set
    Previously I loaded all data into memory and random index it to generate train_set and test_set, but right now I don’t know how to do this better. The only way I know is to split .csv file in advance then process them by chunking, is there any better way?

  2. Store the processed data
    The data processing procedure will expand data X20-30 times so it’s better to dump them. Is there any recommended way/format to do this? I guess I can do this in “”“getitem”"" function.

I would recommend looking into HDF5. The handling is similar to numpy arrays (with the indexing), but the dataset is not loaded into memory until you access it. I just wrote a quick example for converting a CSV into HDF5 using Iris for illustration purposes; here, imagine the iris dataset is a super large dataset that doesn’t fit into memory

import pandas as pd
import numpy as np
import h5py

# suppose this is a large CSV that does not 
# fit into memory:
csv_path = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'

# Get number of lines in the CSV file if it's on your hard drive:
#num_lines = subprocess.check_output(['wc', '-l', in_csv])
#num_lines = int(nlines.split()[0]) 
num_lines = 150
num_features = 4

class_dict = {'Iris-setosa': 0,
              'Iris-versicolor': 1,
              'Iris-virginica': 2}

# use 10,000 or 100,000 or so for large files
chunksize = 10

# this is your HDF5 database:
with h5py.File('iris.h5', 'w') as h5f:
    
    # use num_features-1 if the csv file has a column header
    dset1 = h5f.create_dataset('features',
                               shape=(num_lines, num_features),
                               compression=None,
                               dtype='float32')
    dset2 = h5f.create_dataset('labels',
                               shape=(num_lines,),
                               compression=None,
                               dtype='int32')

    # change range argument from 0 -> 1 if your csv file contains a column header
    for i in range(0, num_lines, chunksize):  

        df = pd.read_csv(csv_path,  
                header=None,  # no header, define column header manually later
                nrows=chunksize, # number of rows to read at each iteration
                skiprows=i)   # skip rows that were already read
        
        df[4] = df[4].map(class_dict)

        features = df.values[:, :4]
        labels = df.values[:, -1]
        
        # use i-1 and i-1+10 if csv file has a column header
        dset1[i:i+10, :] = features
        dset2[i:i+10] = labels[0]

Once you have converted the dataset, you can check and work with it as follows:

with h5py.File('iris.h5', 'r') as h5f:
    print(h5f['features'].shape)
    print(h5f['labels'].shape)

prints

(150, 4)
(150,)
with h5py.File('iris.h5', 'r') as h5f:
    print('First feature entry', h5f['features'][0])
    print('First label entry', h5f['labels'][0])

prints

First feature entry [ 5.0999999   3.5         1.39999998  0.2       ]
First label entry 0

Afer that, you can use the HDF5 database to set up a DataLoader via PyTorch

6 Likes

I just put up a full example here (https://github.com/rasbt/deep-learning-book/blob/master/code/model_zoo/pytorch_ipynb/custom-data-loader-csv.ipynb) that might be handy as a template for your scenario

3 Likes

If you have put your data into an hdf5 dataset, with 0 being the batch dimension, you could use the below code to sample batches batches directly from disk, while splitting into test and training.

train_loader, valid_loader = train_valid_loaders(DatasetFromHDF5(data_path,'data'),
                                                 valid_fraction=0.1,
                                                 batch_size=BATCH_SIZE,
                                                 pin_memory=use_gpu)


import torch
import h5py
import math, np
from torch.utils.data.dataset import Dataset
from torch.utils.data.sampler import SubsetRandomSampler

class DatasetFromHDF5(Dataset):
    def __init__(self, filename, dataset):
        h5f = h5py.File(filename, 'r')
        self.data = h5f[dataset]

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, item):
        return self.data[item].astype(float)

def train_valid_loaders(dataset, valid_fraction =0.1, **kwargs):
    num_train = len(dataset)
    indices = list(range(num_train))
    split = int(math.floor(valid_fraction* num_train))

    if not('shuffle' in kwargs and not kwargs['shuffle']):
            #np.random.seed(random_seed)
            np.random.shuffle(indices)
    if 'num_workers' not in kwargs:
        kwargs['num_workers'] = 1

    train_idx, valid_idx = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    train_loader = torch.utils.data.DataLoader(dataset,
                                               sampler=train_sampler,
                                               **kwargs)
    valid_loader = torch.utils.data.DataLoader(dataset,
                                               sampler=valid_sampler,
                                               **kwargs)
    return train_loader, valid_loader

The source code lives here

1 Like

I’m following your code to convert csv to hdf5. There is an error when I set num_workers greater than 2:
KeyError: ‘Unable to open object (bad object header version number)’\n’”

Is it a safer way by using hdf5 to set the num_workers=1 in dataloader?

@ptrblck

Is it a bottleneck to process data in getitem ?

I mean everytime when Dataset fetches data through getitem method, it should wait for the data processing. This is done by CPU, right? Although DataLoader will load the batches using multi-processing(by CPU?), it’s still slower than batch-load simple data and batch-process this data.cuda(by GPU), right?

I found it will slow down when I use process(data) in getitem, but I haven’t compared with data.cuda since my GPU environment is on the remote server and difficult for debug.

It shouldn’t be a bottleneck and I would even recommend using it!
The __getitem__ method is usually called by the CPU and I’m not sure if it’s possible to call it from the GPU or just to transfer the calculations to it.
Using num_workers > 1 will use multi-processing on the CPU.
AFAIK even num_workers=1 creates another process, while num_workers=0 processes the data in the main process.

It’s not slower necessarily, since the data can be loaded and processed by your CPUs while the GPU is busy performing the forward and backward call. After the GPU finishes it can just get the new batch, which is probably already waiting for it. You should also use pin_memory=True in your DataLoader to speed up the data transfer between CPU and GPU.

GPU operations are called in an asynchronously way, so that they are not blocking your CPU.

1 Like

num_workers=1 spawns a separate process to load the data (I double-checked in top when running :slight_smile: )

If you try to use more than one worker, my guess is they will get in each other’s way when trying to read/write from the file, I think that option is more for loading data from sources where it’s easy to have multiple simultaneous handles to it.

In any case I think disk i/o is likely to be the bottleneck in your case, so I woudn’t expect increasing the number of workers to increase performance much.

1 Like

By the way, Pycharm’s support for remote development is awesome. I’ve configured my pytorch project to have a remote interpreter on an AWS box, and together with configuring it as a remote server I just write code on my laptop in PyCharm, press ‘debug’, and it auto-uploads it to AWS, and runs it there while debugging on my local laptop! So I can set breakpoints, inspect variables, etc, in my local IDE against the process running on AWS - really nice, I can’t recommed it enough.

1 Like

yes, it’s really a good way to do remote work.
But, duo authentication with a remote task queue now … :sob:

Finally, I finished the training procedure successfully!

I learn the chunk part from @ptrblck, hdf5 part from @rasbt, and the sampler part from @Egor_Kraev, thanks for all of you guys!

There are two problems I have to mention, in case of any other people might cost time on them.

  1. KeyError of unable to open hdf5 file

[… in h5py.h5o.open\nKeyError: ‘Unable to open object (bad object header version number)’\n’]

It’s easy to get this error when the num_workers is greater than 2, but even num_workers=1 can get this error except the frequency is much lower. It might be an h5py or hdf5 problem, rerun the code can bypass it.

  1. RuntimeError: Assertion `cur_target >= 0 && cur_target < n_classes’ failed

[return torch._C._nn.nll_loss(input, target, weight, size_average, ignore_index, reduce)
RuntimeError: Assertion `cur_target >= 0 && cur_target < n_classes’ failed. at /opt/conda/conda-bld/pytorch_1512386481460/work/torch/lib/THNN/generic/ClassNLLCriterion.c:87]

If you run on gpu, it might be a “triggle device side … error”.

/opt/conda/conda-bld/pytorch_1503970438496/work/torch/lib/THCUNN/ClassNLLCriterion.cu:57: void cunn_ClassNLLCriterion_updateOutput_kernel(Dtype *, Dtype *, Dtype *, long *, Dtype *, int, int, int, int, long) [with Dtype = float, Acctype = float]: block: [0,0,0], thread: [28,0,0] Assertion t >= 0 && t < n_classes failed.
THCudaCheck FAIL file=/opt/conda/conda-bld/pytorch_1503970438496/work/torch/lib/THC/generic/THCStorage.c line=32 error=59 : device-side assert triggered

This is a very weird error since I’m definitely sure my target is in [0, n_classes-1], actually just 0 and 1, and also, this is not an always error. Sometimes everything is fine but sometimes it will show up, just like error 1. The only place I suspect is that the target is int32 in hdf5 on the contrary int64 in LongTensor. So I print them out and was shocked by the contents. Some batches can print out 0 and 1 but the error batch print out 0.0…0e0 and 1.12…6e9 something like that, before I cast them. I’m stuck on this error for a long time until I remove all num_workers setting. Besides, I found the smaller the batch size, the lower frequency this error show up.

I think the hdf5/h5py might have some multi-processing reading bugs which is far beyond my ability. So I have to sacrifice the training time to get the output model.:disappointed_relieved:

Hm, are you trying to open the HDF5 file multiple times via h5py.File maybe?

I try 2 ways but both of them have those errors sometimes.

First way:

if __name__ == '__main__':
   ...
   with h5py.File(h5_path, 'r') as h5f:
         dataset = HDF5Dataset(h5f, transform)
         train_loader, valid_loader = loaders(dataset, valid_fraction=0.1, **kwargs)
         model = train_model(net, train_loader, valid_loader, criterion, optimizer, nepochs)
   ...

Second way

class HDF5Dataset(Dataset):
    def __init__(self, file, transform=None):
        self.h5f = h5py.File(file, 'r')
...

if __name__ == '__main__':
    dataset = HDF5Dataset(file, transform)
    train_loader, valid_loader = loaders(dataset, valid_fraction=0.1, **kwargs)
    model = train_model(net, train_loader, valid_loader, criterion, optimizer, nepochs)

    dataset.h5f.close()