I’ve been working on using petastorm to train PyTorch models from spark dataframes (somewhat following this guide). I’m curious if there are any ways I can speed up data loading.
Here’s a basic overview of my current flow. df_train
is a spark dataframe with three columns: x1 (float), x2 (binary 0,1), y (float). I’m using pyspark.
x_feat = ['x1', 'x2']
y_name = 'y'
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
converter_train = make_spark_converter(df_train)
with converter_train.make_torch_dataloader(batch_size=bs) as train_dataloader:
train_dataloader_iter = iter(train_dataloader)
steps_per_epoch = len(converter_train) // bs
for step in range(steps_per_epoch):
pd_batch = next(train_dataloader_iter)
pd_batch['features'] = torch.transpose(torch.stack([pd_batch[x] for x in x_feat]), 0, 1)
inputs = pd_batch['features'].to(device)
labels = pd_batch[y_name].to(device)
... # modeling and stuff
pd_batch
is a dictionary with an entry for each column in the original df_train
. My concern is that the torch
operations might not be optimal. Something else I tried was first creating an array column in my spark dataframe for x1 and x2. I was surprised to find that each epoch was more than 2 times slower than the above strategy.
df_train = df_train.withColumn("features", array("x1", 'x2')).select('features', 'y')
# remainder same as above except torch.transpose(torch.stack([pd_batch[x] for x in x_feat]), 0, 1) was removed
Are there any improvements I can make here?