Define a Batch Size For ParquetDataFrameLoader

Hey,

I’m trying to create a DataPipe of .parquet files using ParquetDataFrameLoader.
My train data consists from 8 parquet files with 16 columns and more than 100k rows each.

The code I wrote based on the documentation:

source_dp = FileLister("/path/to/parquet/files/", masks="*.parquet")
parquet_df_dp = source_dp.load_parquet_as_df()

My problem is when I try to iterate through the parquet_df_dp.
When I tried to create a batch of 4 rows I got a list of 4 elements where each element is a chunk of 10,000 rows.
batch = next(iter(parquet_df_dp.batch(4)))

Unbatching also didn’t work - next(iter(parquet_df_dp.flatten().unbatch())) leads to one value of each columns instead of the entire row.

How can I define my desired batch size using the datapipe?
Thanks in advance.

1 Like

What is the output of parquet_df_dp.flatten()? Is it 40,000 rows? If so, you can do parquet_df_dp.flatten().batch(4).

@lidorb , I encounter similar issue. Do you find a way to do batching for load_parquet_as_df() ?

@Wei-Cheng_Chang Have you tried this?

for my case, load_parquet_as_df() will output a torcharrow object of shape N rows and 3 columns.

After load_parquet_as_df.flatten() will output a list of Nx3 elements (which is not desirable).

For me, I realize the solution is

load_parquet_as_df.flatmap().batch(), which output a Bx3 object

1 Like

Yes, calling .batch() is necessary if you want batches.

Using load_parquet_as_df.flatmap().batch(batch_size) worked great for me.
Thanks @Wei-Cheng_Chang!

@lidorb ,

I realized that using load_parquet_as_df.flatmap().batch(batch_size) is not a very efficient implementation for loading large amount of parquet files. I wrote a customized IterableDataPipe to achieve faster data loading time. An minimal example will be like

import pyarrow.parquet as pq

@functional_datapipe("load_parquet_batch")
class MyParquetLoaderIterDataPipe(IterDataPipe):
    def __init__(self, source_dp, columns, batch_size):
        self.source_dp = source_dp
        self.columns = columns
        self.batch_size = batch_size
    def __iter__(self):
        for path in self.source_dp:
            parquet_file = pq.ParquetFile(path)
            for batch in parquet_file.iter_batches(batch_size=self.batch_size, columns=self.columns):
                yield batch.to_pydict()

The I created the following datapipe/dataloader2 for pytorch DDP :

dp = IterableWrapper(FILE_LIST).shuffle().sharding_filter()
dp = dp.load_parquet_batch(columns=COLUMN_LIST, batch_size=BSZ_PER_GPU)
dp = dp.map(transform_fn)
dist_rs = DistributedReadingService()
dl = DataLoader2(dp, reading_service=dist_rs)

Hope that helps in your case.

1 Like

@Wei-Cheng_Chang I get
AttributeError: ‘ShardingFilterIterDataPipe’ object has no attribute 'load_parquet_batch