Efficient tabular data loading from parquet files in GCS

Hi there,

I have my preprocessed dataset splits in Parquet files on GCS. The entire dataset won’t fit in memory. I am looking for advice on what’s the most efficient way to load in the dataset.

Currently, this is what I have:

class TabularDataset(IterableDataset):
    """Class for loading tabular data."""

    def __init__(
        self,
        data_dir_path: str,
        target_col: str,
        numeric_cols: List[str], 
        categorical_cols: List[str], 
        shuffle_file_order=False,
    ) -> None:
        """
        Args:
            data_dir_path (str): Directory path to the data. Can be local or GCS.
            target_col (str): Name of target column. We need this since there could be multiple
                    label columns.
        """
        self.data_dir_path = data_dir_path
        self.target_col = target_col
        self.numeric_cols = numeric_cols
        self.categorical_cols = categorical_cols    

        self.files = self._get_files()
        self.num_files = len(self.files)
        self.shuffle_file_order = shuffle_file_order

    @property
    def num_features(self) -> int:
        """
        Get the number of features in the dataset.
        """
        return len(self.numeric_cols) + len(self.categorical_cols)

    def _get_files(self, extension=".parquet") -> List[str]:
        assert fs.isdir(self.data_dir_path), "Data directory path does not exist!"
        all_files = fs.ls(self.data_dir_path)
        matched_files = sorted([f for f in all_files if f.endswith(extension)])
        return matched_files

    def __iter__(self) -> Iterator:
        files = self.files.copy()
        if self.shuffle_file_order:
            random.shuffle(files)
        
        for file in files:
            df = pd.read_parquet(file)

            X_numeric = df[self.numeric_cols].astype(np.float32).values
            X_categorical = df[self.categorical_cols].astype(np.int64).values
            y_vector = df[self.target_col].astype(np.int64).values

            for x_numeric, x_categorical, y_scalar in zip(
                X_numeric, X_categorical, y_vector
            ):
                yield {
                    "target": y_scalar,
                    "categorical": x_categorical,
                    "numeric": x_numeric,
                }

# load up dataset
ds = TabularDataset(
    data_dir_path="gs://path/to/train",
    target_col=..., 
    numeric_cols=..., 
    categorical_cols=..., 
)

dl = DataLoader(ds, batch_size=512, num_workers=0)

This works but has the following challenges:

  1. I cannot use num_workers >= 1 because reading the parquet file is not fork-safe (thats the error I am getting). This means the main process is processing the data and the GPU resource is idle at that time
  2. There is no easy way to shuffle IterableDataset using a buffer size
  3. I want to be able to cycle through the training set (have it as an argument so that I can turn it off for valid & test sets)

I found a great article in this link for Tensorflow (tf.data) that discusses how to efficient load up the data which ends up looking sth like this:

files = tf.data.Dataset.list_files(path)
dataset = files.interleave(
    lambda x: tfio.IODataset.from_parquet(x).map(parse, num_parallel_calls=8),
    cycle_length=4,
    num_parallel_calls=4
)

if is_train:
    dataset = dataset.shuffle(buffer_size=5000).repeat()

dataset = dataset.batch(512)
dataset = dataset.prefetch(2)

I know torchdata package was released recently that provides common modular data loading primitives. Is there any example of how I can load up the tabular dataset efficiently for my use case? Any help will be much appreciated.

2 Likes

The TorchData tutorial provides an example of reading from CSV files. The library provides a buffer shuffle feature and a cycler to read the data multiple times.

For the most part, implementing a custom IterDataPipe is similar to implementing a IterableDataset. My best guess is that the implementation will look something like:

from torchdata.datapipes.iter import FileLister
dp = FileLister(".", masks="*.parquet")
dp = CustomParquetReader(dp, ...)  # Custom Parquet Reader DataPipe with additional argument and transformation
dp = dp.shuffle(buffer_size=BUFFER_SIZE)
dp = dp.cycle(count=NUMBER_OF_CYCLES)

I have not tried reading parquet files with multiprocessing, can you elaborate on what error you are seeing? And what is the output (if any) that you see?

Note that pyarrow supports multithreaded reading of parquet files. I would think that it should be possible.

import torcharrow as ta
from torchdata.datapipes.iter import FSSpecFileLister

def parse_ta_df(df: ta.DataFrame) -> ta.DataFrame:
    df["numeric"] = df[column_types['numeric']] 
    df["categorical"] = df[column_types['categorical']] 

    df = df[["numeric", "categorical", "label"]]
    
    return df

dp = FSSpecFileLister(root="gs://path/to/train", 
                     masks="*.parquet")
dp = dp.open_files_by_fsspec(mode='rb') # returns tuple (file_name, file_io_stream)
dp = dp.map(lambda x: x[1]) # map to get the io_stream
dp = dp.load_parquet_as_df() # returns Torch Arrow dataframe
dp = dp.map(fn=parse_ta_df)

dp = dp.batch(batch_size=2, drop_last=True)

^ This is what I have so far and it’s working till dp.load_parquet_as_df() where I get back a Torch Arrow df and the map(fn=parse_ta_df) takes the numeric & categorical cols and combines them into a List column in Torch Arrow df. However, the dp.batch(...) seems to be returning Torch Arrow dfs of the different parquet partitions (each df is ~100K rows) and the batch_size is disregarded.

I want to be able to get batches of sth like this:

{
   "numeric": torch.FloatTensor of shape (batch_size, num_numeric_cols)
   "categorical": torch.LongTensor of shape (batch_size, num_categorical_cols)
   "label": torch.LongTensor of shape (batch_size)
}

How would I go from the Torch Arrow df of a parquet row group to generating batches? @nivek

You can replicate this with a test case here:

import torcharrow as ta
import torcharrow.dtypes as dt

metadata_cols = ["merchant", "date"]
numeric_cols = ["qty", "amt"]
categorical_cols = ["cat_color", "cat_product"]
label_col = "target"

data = {
    "user": ["m1", "m2", "m3", "m4", "m1", "m2"],
    "date": ["2019-01-02", "2019-01-05", "2019-01-08", "2019-01-09", "2016-01-01", "2016-05-02"],
    "qty": [1, 5, 3, 8, 3, 2],
    "amt": [1.3, 2.3, 4.6, 3.2, 5.4, 2.3],
    "cat_color": [1, 1, 2, 2, 1, 3],
    "cat_product": [0, 1, 0, 2, 0, 1],
    "target": [0, 0, 2, 1, 2, 1]
}

ta_df = ta.dataframe(data)

dp = IterableWrapper([ta_df]) # output of the step: dp.load_parquet_as_df()
dp = dp.map(fn=parse_ta_df)
dp = dp.batch(batch_size=2, drop_last=True)

@salman1993 , I also encountered the same issue when using datapipe.load_parquet_as_df().batch()
which seems to give large number of rows, instead of the batch_size number of rows.

Any luck finding solutions for this?

face the same problem as @Wei-Cheng_Chang

load_parquet_as_df() is using row_groups of the parquet files to iterate over it, if your parquet has only one row_group it will load return it all. When saving a parquet file you can specify how do you want to partition the file to row_groups.

# torchdata.load_parquet_as_df
parquet_file = parquet.ParquetFile(path)
num_row_groups = parquet_file.num_row_groups
for i in range(num_row_groups):
    row_group = parquet_file.read_row_group(i, columns=self.columns, use_threads=self.use_threads)
    yield torcharrow.from_arrow(row_group, dtype=self.dtype)

you can use custom class like this (can be replaced with any parquet reader)

@dp.functional_datapipe("load_parquet_custom")
class ParquetCustomLoaderIterDataPipe(dp.iter.IterDataPipe):
    def __init__(
            self,
            source_dp: dp.iter.IterDataPipe[str]
    ):
        self.source_dp = source_dp

    def __iter__(self):
        for path in self.source_dp:
            yield from pd.read_parquet(path).iterrows()