Efficient tabular data loading from parquet files in GCS

Hi there,

I have my preprocessed dataset splits in Parquet files on GCS. The entire dataset won’t fit in memory. I am looking for advice on what’s the most efficient way to load in the dataset.

Currently, this is what I have:

class TabularDataset(IterableDataset):
    """Class for loading tabular data."""

    def __init__(
        self,
        data_dir_path: str,
        target_col: str,
        numeric_cols: List[str], 
        categorical_cols: List[str], 
        shuffle_file_order=False,
    ) -> None:
        """
        Args:
            data_dir_path (str): Directory path to the data. Can be local or GCS.
            target_col (str): Name of target column. We need this since there could be multiple
                    label columns.
        """
        self.data_dir_path = data_dir_path
        self.target_col = target_col
        self.numeric_cols = numeric_cols
        self.categorical_cols = categorical_cols    

        self.files = self._get_files()
        self.num_files = len(self.files)
        self.shuffle_file_order = shuffle_file_order

    @property
    def num_features(self) -> int:
        """
        Get the number of features in the dataset.
        """
        return len(self.numeric_cols) + len(self.categorical_cols)

    def _get_files(self, extension=".parquet") -> List[str]:
        assert fs.isdir(self.data_dir_path), "Data directory path does not exist!"
        all_files = fs.ls(self.data_dir_path)
        matched_files = sorted([f for f in all_files if f.endswith(extension)])
        return matched_files

    def __iter__(self) -> Iterator:
        files = self.files.copy()
        if self.shuffle_file_order:
            random.shuffle(files)
        
        for file in files:
            df = pd.read_parquet(file)

            X_numeric = df[self.numeric_cols].astype(np.float32).values
            X_categorical = df[self.categorical_cols].astype(np.int64).values
            y_vector = df[self.target_col].astype(np.int64).values

            for x_numeric, x_categorical, y_scalar in zip(
                X_numeric, X_categorical, y_vector
            ):
                yield {
                    "target": y_scalar,
                    "categorical": x_categorical,
                    "numeric": x_numeric,
                }

# load up dataset
ds = TabularDataset(
    data_dir_path="gs://path/to/train",
    target_col=..., 
    numeric_cols=..., 
    categorical_cols=..., 
)

dl = DataLoader(ds, batch_size=512, num_workers=0)

This works but has the following challenges:

  1. I cannot use num_workers >= 1 because reading the parquet file is not fork-safe (thats the error I am getting). This means the main process is processing the data and the GPU resource is idle at that time
  2. There is no easy way to shuffle IterableDataset using a buffer size
  3. I want to be able to cycle through the training set (have it as an argument so that I can turn it off for valid & test sets)

I found a great article in this link for Tensorflow (tf.data) that discusses how to efficient load up the data which ends up looking sth like this:

files = tf.data.Dataset.list_files(path)
dataset = files.interleave(
    lambda x: tfio.IODataset.from_parquet(x).map(parse, num_parallel_calls=8),
    cycle_length=4,
    num_parallel_calls=4
)

if is_train:
    dataset = dataset.shuffle(buffer_size=5000).repeat()

dataset = dataset.batch(512)
dataset = dataset.prefetch(2)

I know torchdata package was released recently that provides common modular data loading primitives. Is there any example of how I can load up the tabular dataset efficiently for my use case? Any help will be much appreciated.

1 Like