Reading and batching a large parquet file

I have a large dataset stored remotely in a parquet file, which I would like to read and train without storing the entire file in memory. The dataset is multivariate timeseries for clients and goes in the format:

N*Q*M
# N: number of users (50000)
# Q: sequence length (365)
# M: number of features (20)

In a dataframe this would be represented such that rows for a given user are consecutive, but there is also an additional user index column.

How do I read and batch such a large file without reading it all in memory, assuming that I set a batch size of 16 (the number of clients is 16, so the total number of sequences in each batch is 16 * 365 = 5840) ?

Splitting into a smaller number of files and using torch.utils.data.Dataset is not really an option since reading many files from a remote directly in every epoch is very expensive. Therefore, I have been trying with torch.utils.data.IterableDataset but I haven’t figured a way for reading certain records of the file. (This is all assuming I am using no workers, as increasing the number of workers for distributed reading is a whole new issue.)

1 Like

I found a workaround using torch.utils.data.Dataset, but the data must be manipulated using dask beforehand such that each partition is a user, stored as its own parquet file, but can be read only once later. In the following code, the labels and the data are stored separately:

import dask.dataframe as dd
import pandas as pd
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader, IterableDataset, Dataset

# Breakdown file
raw_ddf = dd.read_parquet("data.parquet") # Read huge file using dask
raw_ddf = raw_ddf.set_index("userid") # set the userid as index
userids = raw_ddf.index.unique().compute().values.tolist() # get a list of indices
new_ddf = raw_ddf.repartition(divisions = userids) # repartition by userids
new_ddf.to_parquet("my_folder") # this will save each user as its own parquet file within "my_folder"

# Dask to read the partitions
train_ddf = dd.read_parquet("my_folder/*.parquet") # read all files

# Read labels file
labels_df = pd.read("label.csv")
y_labels = np.array(labels_df["class"])

# Define the Dataset class
class UsersDataset(Dataset):
    def __init__(self, dask_df, labels):
        self.dask_df = dask_df
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx): 
        X_df = self.dask_df.get_partition(idx).compute()
        X = np.row_stack([X_df])
        X_tensor = torch.tensor(X, dtype=torch.float32)
        y = self.labels[idx]
        y_tensor = torch.tensor(y, dtype=torch.long)
        sample = (X_tensor, y_tensor) 
        return sample

# Create a Dataset object
user_dataset = UsersDataset(dask_df=ddf_train, labels = y_train) 

# Create a DataLoader object
dataloader = DataLoader(user_dataset, batch_size=4, shuffle=True, num_workers=0)

# Print output of the first batch to ensure it works
for i_batch, sample_batched in enumerate(dataloader): 
    print("Batch number ", i_batch)
    print(sample_batched[0]) # print X
    print(sample_batched[1]) # print y

    # stop after first batch.
    if i_batch == 0:
        break

I would like to know how can I adapt my approach when using >= 2 workers to read the data, without duplicate entries. Any insights on this are greatly appreciated.

2 Likes