Hi there,
I have my preprocessed dataset splits in Parquet files on GCS. The entire dataset won’t fit in memory. I am looking for advice on what’s the most efficient way to load in the dataset.
Currently, this is what I have:
class TabularDataset(IterableDataset):
"""Class for loading tabular data."""
def __init__(
self,
data_dir_path: str,
target_col: str,
numeric_cols: List[str],
categorical_cols: List[str],
shuffle_file_order=False,
) -> None:
"""
Args:
data_dir_path (str): Directory path to the data. Can be local or GCS.
target_col (str): Name of target column. We need this since there could be multiple
label columns.
"""
self.data_dir_path = data_dir_path
self.target_col = target_col
self.numeric_cols = numeric_cols
self.categorical_cols = categorical_cols
self.files = self._get_files()
self.num_files = len(self.files)
self.shuffle_file_order = shuffle_file_order
@property
def num_features(self) -> int:
"""
Get the number of features in the dataset.
"""
return len(self.numeric_cols) + len(self.categorical_cols)
def _get_files(self, extension=".parquet") -> List[str]:
assert fs.isdir(self.data_dir_path), "Data directory path does not exist!"
all_files = fs.ls(self.data_dir_path)
matched_files = sorted([f for f in all_files if f.endswith(extension)])
return matched_files
def __iter__(self) -> Iterator:
files = self.files.copy()
if self.shuffle_file_order:
random.shuffle(files)
for file in files:
df = pd.read_parquet(file)
X_numeric = df[self.numeric_cols].astype(np.float32).values
X_categorical = df[self.categorical_cols].astype(np.int64).values
y_vector = df[self.target_col].astype(np.int64).values
for x_numeric, x_categorical, y_scalar in zip(
X_numeric, X_categorical, y_vector
):
yield {
"target": y_scalar,
"categorical": x_categorical,
"numeric": x_numeric,
}
# load up dataset
ds = TabularDataset(
data_dir_path="gs://path/to/train",
target_col=...,
numeric_cols=...,
categorical_cols=...,
)
dl = DataLoader(ds, batch_size=512, num_workers=0)
This works but has the following challenges:
- I cannot use
num_workers >= 1
because reading the parquet file is not fork-safe (thats the error I am getting). This means the main process is processing the data and the GPU resource is idle at that time - There is no easy way to shuffle IterableDataset using a buffer size
- I want to be able to cycle through the training set (have it as an argument so that I can turn it off for valid & test sets)
I found a great article in this link for Tensorflow (tf.data) that discusses how to efficient load up the data which ends up looking sth like this:
files = tf.data.Dataset.list_files(path)
dataset = files.interleave(
lambda x: tfio.IODataset.from_parquet(x).map(parse, num_parallel_calls=8),
cycle_length=4,
num_parallel_calls=4
)
if is_train:
dataset = dataset.shuffle(buffer_size=5000).repeat()
dataset = dataset.batch(512)
dataset = dataset.prefetch(2)
I know torchdata package was released recently that provides common modular data loading primitives. Is there any example of how I can load up the tabular dataset efficiently for my use case? Any help will be much appreciated.