Hi All,
I am trying to define a custom dataloader using IterableDataset for Tabular form of data. The dataset I am dealing with is extremely big and cannot fit in the main memory. I am able to define an iteratable dataloader for data that is contained within a single file. However, for data that is partitioned into multiple files (in my case parquet files). I am unable to come up with a proper solution.
I was able to come up with a working example using the references listed at the end of the question.
In the following example, I am defining the IteratableDataset for obtaining batches from a single (large) csv file.
class MyIterableCsvDataset(IterableDataset):
def __init__(self, data):
self.file_name = data
def process_data(self, line):
line = line.strip()
line = numpy.array(line.split(','), dtype=np.float64)
return line
def __iter__(self):
worker_total_num = torch.utils.data.get_worker_info().num_workers
worker_id = torch.utils.data.get_worker_info().id
#Create an iterator
file_itr = open(self.file_name)
#Map each element using the line_mapper
mapped_itr = map(self.process_data, file_itr)
#Add multiworker functionalitys
mapped_itr = itertools.islice(mapped_itr, worker_id, None, worker_total_num)
return mapped_itr
Now, say I am initializing the dataloader as follows (please check the references I used to compile the solution):
iterable_dataset = MyIterableCsvDataset(data)
dataloader = DataLoader(iterable_dataset, batch_size = 256, num_workers = 1)
I am getting appropriate batches of size <256, feature_size>, where feature_size is the number of columns in the csv dataset.
Here is my code for reading multiple partitioned files in parquet format.
class MyIterableParquetDataset(IterableDataset):
def __init__(self, files, path):
self.file_iter = files
self.data_path = path
def process_data(self, file):
data = pandas.read_parquet('{}/{}'.format(self.data_path, file))
data = data.values
return data
def __iter__(self):
worker_total_num = torch.utils.data.get_worker_info().num_workers
worker_id = torch.utils.data.get_worker_info().id
mapped_itr = map(self.process_data, self.file_iter)
mapped_itr = itertools.islice(mapped_itr, worker_id, None, worker_total_num)
return mapped_itr
I am initializing the dataloader as follows:
parquet_files = ['file1.parquet',.....,'filen.parquet']
iterable_dataset = MyIterableParquetDataset(parquet_files, path)
dataloader = DataLoader(iterable_dataset, batch_size = 256, num_workers = 1)
The issue I am facing is in getting a batch of the defined size (i.e., 256). Say each parquet file contains 1K lines, then the batch size results in [256, 1000, <feature_size>]. This is obvious since the method process_data reads one parquet file and gets all the rows within the file rather than one row at a time (compared to the csv example I provided earlier). Can someone help me with solutions to overcome this issue?
References: