Dataloader for a folder with multiple files. PyTorch solutions that is equivalent to TFRecordDataset in TF2.0

Hi,

Suppose I have a folder which contain multiple files, Is there some way for create a dataloader to read the files? For example, after a spark or a mapreduce job, the outputs in a folder is like

part-00000
part-00001
...
part-00999

Usually the files in the folder is very large and cannot fit to memory.
For TensorFlow 2.0, we can convert the file to tfrecord format and feed the folder path TFRecordDataset to read the data smoothly and efficiently, like below

raw_dataset = tf.data.TFRecordDataset(folder_path)
def _parse_example(example_string): 
    feature_dict = tf.io.parse_single_example(example_string, features)
    return feature_dict
dataset = raw_dataset.map(_parse_example)

However, I did not find a proper way for pyTorch to deal with this situation. I have searched the answer on the internet for a long time but I did not figure it out.
So I come here to get some help.

Actually, I think this is a normal requirements, I would like to have the answer. And I believe the powerful PyTorch should have an elegant way to this problem, like Tensorflow 2.0.

Thanks very much!

NVidia’s DALI supports reading TFRecord and MXNet recordIO. The readers are in C++ with a Python interface so it’s probably the most performant option out there.

https://docs.nvidia.com/deeplearning/sdk/dali-developer-guide/docs/examples/dataloading.html

I’ve seen a few stand alone pure Python attempts but they’ll be pretty slow as it’s not a task that Python is well suited for

Hi Ross,

Thanks for the reply! The Nvidia’s DALI is a good way to reading the TFRecord and MXNet format.

However, I have the following questions:

  1. Does PyTorch has its own recordIO so that we do not need the TFRecord format?
  2. If the files in a folder is NOT in TFRecord or MXNet format, but in user defined format, such as label key1: value_list1 key2: value_list2 ..., then what is the recommend way to read the files by PyTorch. Note that there are multiple files in the folder and the total size of files is large that cannot be put into memory.

Thanks very much!
Yin

Hi Ross,

Thanks for the reply! The Nvidia’s DALI is a good way to reading the TFRecord and MXNet format.

However, I have the following questions:

  1. Does PyTorch has its own recordIO so that we do not need the TFRecord format?
  2. If the files in a folder is NOT in TFRecord or MXNet format, but in user defined format, such as label key1: value_list1 key2: value_list2 ... , then what is the recommend way to read the files by PyTorch. Note that there are multiple files in the folder and the total size of files is large that cannot be put into memory.

Thanks very much!
Yin

Hey Yin, spark to torch dataloader does require some custom work but is fairly easy to build. How I do it is I use torch.utils.data.IterableDataset. Firstly I load all the avro/parquet (as you are working with spark) to a DataReader object which is a generator (where I do some of my custom processing on each record). Then I build my own collate_fn function to load the data from a generator to a DataLoader object and return a data loader. Below is a brief sketch of how I do it:

from typing import Dict, Any, Optional, List, Callable

import torch
from fastavro import reader
from functools import partial
from torch.utils.data import DataLoader

class AvroDataReader(torch.utils.data.IterableDataset):

    def __init__(self, data_location: str, custom_fn: Callable):
        self.data_location = data_location
        self.custom_fn = custom_fn

    def __iter__(self):
        avro_files = get_files(self.data_location, '*.avro')
        for filename in avro_files:
            with file_reader(filename) as local_file:
                with smart_open(local_file, 'rb') as i:
                    for record in reader(i):
                        sample = self.custom_fn(record)
                        if sample is not None:
                            yield sample


def _collate_examples(batch_of_data):
    return SomeType(
        token_ids=torch.stack([b.token_ids for b in batch_of_data]),
        label_idx=torch.stack([b.label_idx for b in batch_of_data]),
    )


def get_dataloader(dataset, batch_size: int,
                   collate_fn: Callable):
    return DataLoader(dataset, batch_size=batch_size, collate_fn=collate_fn)


def get_torch_dataloader(data_location: str, batch_size: int, label_vocab: Dict[str, int]):
    dataset = AvroDataReader(data_location=data_location,
                             custom_fn=partial(custom, label_vocab=label_vocab))
    return get_dataloader(dataset=dataset, batch_size=batch_size,
                          collate_fn=_collate_labelled_examples)

You need to obviously adapt your code to the example I provided, but should give you a good sketch of how it should work and should not throw any memory issues.

Hi Mac,

Thanks very much for your reply! It is really clear and helpful.

Yin

Hi Mac,

I just have another related question. I hope that I can get some suggestions from you. Thanks in advance!

Actually, my data is current in tfrecord format. Is there some suggestions about how to use the tfrecords files in pytorch? Should I read the tfrecords by tensorflow and then convert the tfdata to pytorch tensor? Maybe that is inefficient. The other plan is to convert the tfrecords to other format so that pytorch can deserialize directly. But I have no idea what I should do to serialize and deserialize data for pytorch.

Do you have some suggestions? Thanks very much!

Yin

Hey Yin,

I think there is no ‘easy’ way to do it (i.e. a function which does it efficiently for you in pytorch). I had a similar problem and how I did it is I simply had another job which converted tfrecords to .avro, saved it, and then I read avro to the dataloader. I did it cause it was the easiest way to do and my data is usually in avro/parquet. Probably the easiest way is to built a custom function (in place of custom_fn in the AvroDataReader object in the snippet above), which converts tfrecord to something like tf.float32 and then tf.float32 (but don’t know whether you will need a tf session for that).

This thread looks very relevant.

Hi Mac,

Having both TF and Pytorch in the same script is a little bit weird, and I am afraid that there might be some efficiency problem if we load data by TF and then convert it to Pytorch Tensor for Dataloader. So I think converting tfrecord to arvo and read avro to the dataloader is really a good idea.
Could you please show me a sketch about how to convert tf data to arvo and load them to pytorch tensor? Thanks very much! I really appreciate it.

I have also find the link you sent to me. However, it seems this problem is not addressed in that post. People still complain about the efficiency.

Regards,
Yin

Hey Mac,

I have check the link you sent to me. However, it seems this problem is not addressed in that post. People still complain about the efficiency.

Having both TF and Pytorch in the same script is a little bit weird, and I am afraid that there might be some efficiency problem if we load data by TF and then convert it to Pytorch Tensor for Dataloader. So I think converting tfrecord to arvo and read avro to the dataloader is really a good idea.
Could you please show me a sketch about how to convert tf data to arvo and load them to pytorch tensor? Thanks very much! I really appreciate it.

Regards,
Yin

Hey Yin, I agree, having both TR and Pytorch is not a good idea, not only in the same script but also in the same python module.

I don’t have the exact code anywhere near as it was a while ago since I used tfrecords, but databricks seems to have some good documentation. See it below:

from pyspark.sql.types import *

schema = StructType([StructField('image/class/label', IntegerType(), True),
                     StructField('image/width', IntegerType(), True),
                     StructField('image/height', IntegerType(), True),
                     StructField('image/format', StringType(), True),
                     StructField('image/encoded', BinaryType(), True)])

df = spark.read.format("tfrecords").schema(schema).load(input_local_dir+'/flowers_train*.tfrecord')
df = df.limit(3200)
display(df)

In general, the idea is to read the tfrecords to a pyspark dataframe (do same preprocessing on it) and then save it as avro with df.write.format("avro").save(/tmp/loc).

Link to the databricks notebook on tfrecords is here.

Also, happy to see people using pytorch and spark, big fan of the combination!