Data processing as a batch way

Hm, sorry, I do not really have a good explanation for why this might not work. On my machine, I haven’t encountered this issue yet… However, I saw that there’s a dedicated page in the h5py docs that discusses the use of parallel processing with the same hdf5 file – I suppose this is more geared towards read and wrote though (as opposed to just reading): http://docs.h5py.org/en/latest/mpi.html

On the other hand, I found a statement on stackoverflow from someone saying that parallel reads should be fine: https://stackoverflow.com/questions/29251839/is-it-possible-to-do-parallel-reads-on-one-h5py-file-using-multiprocessing

Since you are also getting the error with num_workers=1, maybe there’s a general issue with the database. Maybe you could just try to iterate over the HDF5 file manually for one training loop to try to isolate this issue from the DataLoader

Hey,

I had the very same problem reading h5 in parallel.

There is a good alternative to h5file: numpy memmaps

https://docs.scipy.org/doc/numpy-1.15.1/reference/generated/numpy.memmap.html

They works like numpy array, you can slice them etc. But they stay on your hard drive so they can be a big as you want. AND you can read them in parallel.

Perhaps you can have a look at the chunk dataset added to libtorch.

The key idea is exactly this scenario, where dataloading can happens in chunks and we can decouple prefetchers and transforms (collate) into different threads. We have use this and added python binding to python.

1 Like

Python binding is on the way: https://github.com/pytorch/pytorch/pull/21232

I ran across this code as I think it will help me process data sequentially but it yields an error and does not work out of the box.

'float' object cannot be interpreted as an integer

# Create dummy csv data
nb_samples = 110
a = np.arange(nb_samples)
df = pd.DataFrame(a, columns=['data'])
df.to_csv('data.csv', index=False)


# Create Dataset
class CSVDataset(torch.utils.data.Dataset):
    def __init__(self, path, chunksize, nb_samples):
        self.path = path
        self.chunksize = chunksize
        self.len = nb_samples / self.chunksize

    def __getitem__(self, index):
        x = next(
            pd.read_csv(
                self.path,
                skiprows=index * self.chunksize + 1,  #+1, since we skip the header
                chunksize=self.chunksize,
                names=['data']))
        x = torch.from_numpy(x.data.values)
        return x

    def __len__(self):
        return self.len


dataset = CSVDataset('data.csv', chunksize=10, nb_samples=nb_samples)
loader = DataLoader(dataset, batch_size=10, num_workers=1, shuffle=False)

for batch_idx, data in enumerate(loader):
    print('batch: {}\tdata: {}'.format(batch_idx, data))

Change this to self.len = nb_samples // self.chunksize

Thanks so much! This works!

I thought I might post this here since its on topic, but I am wondering if I can get some help debugging the loader.

What I am trying to do:

  1. I want to load in my string data (corpus in a csv) in a batched manner to manage memory issues and push it through some functions to: (1) tokenize it, (2) transform it into a tensor data set and (3) put in a dataloader to be iterated over. While iterating through the dataloader, I want to run each batch from the data loader through a function in a batch fashion as it is causing memory overload issues on my GPU (2080 Ti) otherwise.
# Create Dataset
class CSVDataset(torch.utils.data.Dataset):
    def __init__(self, path, chunksize, nb_samples):
        self.path = path
        self.chunksize = chunksize
        self.len = nb_samples // self.chunksize

    def __getitem__(self, index):
        x = next(
            pd.read_csv(
                self.path,
                skiprows=index * self.chunksize + 1,  #+1, since we skip the header
                chunksize=self.chunksize,
                names=['body']))
        #x = text_to_embedding(x.body.values, tokenizer, model)
        inputs, masks = tokenize_corpus(x.body.values, tokenizer, 512)
        tensor_set = TensorDataset(inputs, masks)
        #x = torch.from_numpy(x.body.values)
        return tensor_set

    def __len__(self):
        return self.len

dataset = CSVDataset('C:\\Users\\Andrew\\Desktop\\test1.csv', chunksize=1, nb_samples=1)
loader2 = DataLoader(dataset, batch_size=1, shuffle=False)
for data in loader2:
    other_funx_here(data)

Type Error: #default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'torch.utils.data.dataset.TensorDataset'>

Delete tensor_set = ... and return inputs, masks instead of tensor_set and that should work.

Indeed it does! Thank you again!!!

1 Like

Thank you for your solution!
I would like to update the Github link since the provided link is not available. New link: deeplearning-models/custom-data-loader-csv.ipynb at master · rasbt/deeplearning-models · GitHub

Surfacing this thread again.

If I understand @ptrblck’s comment above, using the pandas chunksize approach doesn’t allow for true shuffling, but using @rasbt’s HDF5 approach should, so I want to try that.

But I’ll be taking data that was preprocessed with PySpark, and it will be coming from a parquet format. I’m looking in to options to convert parquet to hdf5 and it looks like the vaex library might be a good option since it doesn’t require all of the data to be read into memory simultaneously.

I’m not super familiar with hdf5 files, but looking at the vaex documentation, it looks like there are differences in how packages handle the format, with pandas exporting it in a row-based format and vaex handling it in a column-based format (and the two don’t seem to be compatible).

Will the row-indexing approach that @rasbt suggests work with a column-based approach that vaex outputs? Or will I need to find another approach for the conversion. Unfortunately PySpark isn’t super friendly with csv output from super massive datasets. Parquet is much much easier to deal with on PySpark.

Other suggestions for file conversion also greatly appreciated.

I have this same issue and scoured the internet for a full solution all the way down to the dataloader step. My issue is that with @ptrblck code snippet, the getitem returns a chunk of samples, say the shape is 10 samples x 10 features. When I feed in this dataset into the dataloader, I specify 10 batches so the dataloader outputs 10 batches x 10 samples x 10 features. The problem arises when I tried to pass the dataloader into my network. I’m guessing the dataloader automatically flattens the 10 batches of 10 samples into 1 single sample of size (1,1000).

Here is the code snippet from @ptrblck :

Create dummy csv data

nb_samples = 110
a = np.arange(nb_samples)
df = pd.DataFrame(a, columns=[‘data’])
df.to_csv(‘data.csv’, index=False)

Create Dataset

class CSVDataset(Dataset):
def init(self, path, chunksize, nb_samples):
self.path = path
self.chunksize = chunksize
self.len = nb_samples / self.chunksize

def __getitem__(self, index):
    x = next(
        pd.read_csv(
            self.path,
            skiprows=index * self.chunksize + 1,  #+1, since we skip the header
            chunksize=self.chunksize,
            names=['data']))
    x = torch.from_numpy(x.data.values)
    return x

Nevermind, found a solution here! Dataset Multiple Samples per getitem Call - #2 by ptrblck

Using a custom collate_fn would be more elegant though!