Given a transformation function that I can’t change, e.g. autobot_vectorize
. It
- takes in a list of N inputs and
- output a tensor of N x 3 dimensions.
def autobot_vectorize(imgfiles):
# This vectorizer takes N imgfiles and return the
# a tensor of size N x 3.
# For example, we are simulating this with torch.rand(N, 3)
"""
>>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
"""
return torch.rand(len(imgfiles), 3)
And given a dataset like:
content = """imagefile,imagefile2,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""
with open('myfile.csv', 'w') as fout:
fout.write(content)
The objective is to create a Pytorch datapipe that will output something like this:
>>> dp = MyDataPipe(csvfile)
>>> for row in dp.train_dataloader:
... print(row)
...
(tensor([1.23, 4.56, 7.89]), 0)
(tensor([9.87, 6.54, 3.21]), 1)
(tensor([9.99, 8.88, 7.77]), 0)
(tensor([1.11, 2.22, 9.87]), 1)
What is the a simple way to create the DataLoader from the DataPipe that best optimize the batching function of the autobot_vectorize
function?
I’ve tried these…
Attempt 1: Not fully utilizing batching in autobot_vectorize
I’ve tried doing this nesting the mapping function and by sending autobot_vectorize
one input at a time but this is not fully utilizing its batching function capabilities, i.e.
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', 'label': 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0)
"""
return autobot_vectorize(row['imagefile'])[0], row['label']
[Working snippet]:
import torch
from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""
with open('myfile.csv', 'w') as fout:
fout.write(content)
def autobot_vectorize(imgfiles):
# This vectorizer takes N imgfiles and return the
# a tensor of size N x 3.
# For example, we are simulating this with torch.rand(N, 3)
"""
>>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
"""
return torch.rand(len(imgfiles), 3)
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', 'label': 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0)
"""
return autobot_vectorize(row['imagefile'])[0], row['label']
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files,
skip_lines=0
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
self.dp_chained_datapipe = self.dp_chained_datapipe.map(optimus_prime)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
dp = MyDataPipe(['myfile.csv'])
for row in dp.dp_chained_datapipe:
print(row)
[out]:
(tensor([0.0740, 0.6826, 0.1215]), '0')
(tensor([0.0084, 0.2512, 0.8702]), '0')
(tensor([0.7141, 0.4003, 0.7872]), '1')
(tensor([0.8011, 0.4163, 0.9410]), '1')
Attempt 2: Batching before flatmap, but can’t get labels to be included…
I’ve tried to process the X efficiently by kind of using .flatmap()
but I don’t even know whether I’m really using the autobot_vectorize
batching capabilities or is it still processing the batch, one at a time.
[Working snippet]:
import torch
from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""
with open('myfile.csv', 'w') as fout:
fout.write(content)
def autobot_vectorize(imgfiles):
# This vectorizer takes N imgfiles and return the
# a tensor of size N x 3.
# For example, we are simulating this with torch.rand(N, 3)
"""
>>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
"""
return torch.rand(len(imgfiles), 3)
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files,
skip_lines=0
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
self.dp_chained_datapipe = self.dp_chained_datapipe.map(lambda x: [x['imagefile']])
self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(autobot_vectorize)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
dp = MyDataPipe(['myfile.csv'])
for row in dp.dp_chained_datapipe:
print(row)
Also, I’m not sure how to add the Ys (labels) when using flatmap as above.