Iteratable Dataset for multiple parquet files: Tabular data

Hi All,

I am trying to define a custom dataloader using IterableDataset for Tabular form of data. The dataset I am dealing with is extremely big and cannot fit in the main memory. I am able to define an iteratable dataloader for data that is contained within a single file. However, for data that is partitioned into multiple files (in my case parquet files). I am unable to come up with a proper solution.

I was able to come up with a working example using the references listed at the end of the question.
In the following example, I am defining the IteratableDataset for obtaining batches from a single (large) csv file.

class MyIterableCsvDataset(IterableDataset):

    def __init__(self, data):
        self.file_name = data

    def process_data(self, line):

        line = line.strip()
        line = numpy.array(line.split(','), dtype=np.float64)
        return line

    def __iter__(self):
        worker_total_num = torch.utils.data.get_worker_info().num_workers
        worker_id = torch.utils.data.get_worker_info().id
        #Create an iterator
        file_itr = open(self.file_name)
        #Map each element using the line_mapper
        mapped_itr = map(self.process_data, file_itr)
        #Add multiworker functionalitys
        mapped_itr = itertools.islice(mapped_itr, worker_id, None, worker_total_num)
        return mapped_itr

Now, say I am initializing the dataloader as follows (please check the references I used to compile the solution):

    iterable_dataset = MyIterableCsvDataset(data)
    dataloader = DataLoader(iterable_dataset, batch_size = 256, num_workers = 1)

I am getting appropriate batches of size <256, feature_size>, where feature_size is the number of columns in the csv dataset.

Here is my code for reading multiple partitioned files in parquet format.

class MyIterableParquetDataset(IterableDataset):

    def __init__(self, files, path):
        self.file_iter = files
        self.data_path = path
    def process_data(self, file):

        data = pandas.read_parquet('{}/{}'.format(self.data_path, file))
        data = data.values
        return data
    
    def __iter__(self):
        worker_total_num = torch.utils.data.get_worker_info().num_workers
        worker_id = torch.utils.data.get_worker_info().id
        mapped_itr = map(self.process_data, self.file_iter)
        mapped_itr = itertools.islice(mapped_itr, worker_id, None, worker_total_num)
        return mapped_itr

I am initializing the dataloader as follows:

 parquet_files = ['file1.parquet',.....,'filen.parquet']
 iterable_dataset = MyIterableParquetDataset(parquet_files, path)

 dataloader = DataLoader(iterable_dataset, batch_size = 256, num_workers = 1)

The issue I am facing is in getting a batch of the defined size (i.e., 256). Say each parquet file contains 1K lines, then the batch size results in [256, 1000, <feature_size>]. This is obvious since the method process_data reads one parquet file and gets all the rows within the file rather than one row at a time (compared to the csv example I provided earlier). Can someone help me with solutions to overcome this issue?

References:

  1. Medium Article on building streaming data
  2. Iterable pytorch dataset with multiple workers

cc @ejguan @nivek for dataloader questions

I think you need to find a way to read parquet file row by row. I recommend you to search solution from pyarrow.