I’m trying to create a DataPipe of .parquet files using ParquetDataFrameLoader.
My train data consists from 8 parquet files with 16 columns and more than 100k rows each.
My problem is when I try to iterate through the parquet_df_dp.
When I tried to create a batch of 4 rows I got a list of 4 elements where each element is a chunk of 10,000 rows. batch = next(iter(parquet_df_dp.batch(4)))
Unbatching also didn’t work - next(iter(parquet_df_dp.flatten().unbatch())) leads to one value of each columns instead of the entire row.
How can I define my desired batch size using the datapipe?
Thanks in advance.
I realized that using load_parquet_as_df.flatmap().batch(batch_size) is not a very efficient implementation for loading large amount of parquet files. I wrote a customized IterableDataPipe to achieve faster data loading time. An minimal example will be like
import pyarrow.parquet as pq
@functional_datapipe("load_parquet_batch")
class MyParquetLoaderIterDataPipe(IterDataPipe):
def __init__(self, source_dp, columns, batch_size):
self.source_dp = source_dp
self.columns = columns
self.batch_size = batch_size
def __iter__(self):
for path in self.source_dp:
parquet_file = pq.ParquetFile(path)
for batch in parquet_file.iter_batches(batch_size=self.batch_size, columns=self.columns):
yield batch.to_pydict()
The I created the following datapipe/dataloader2 for pytorch DDP :