How to create DataPipe that best optimize the map-transform function that supports batching?

Given a transformation function that I can’t change, e.g. autobot_vectorize. It

  • takes in a list of N inputs and
  • output a tensor of N x 3 dimensions.
def autobot_vectorize(imgfiles):
    # This vectorizer takes N imgfiles and return the 
    # a tensor of size N x 3.
    # For example, we are simulating this with torch.rand(N, 3)
    """
    >>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
    tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
    """
    return torch.rand(len(imgfiles), 3)

And given a dataset like:

content = """imagefile,imagefile2,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)

The objective is to create a Pytorch datapipe that will output something like this:

>>> dp = MyDataPipe(csvfile)
>>> for row in dp.train_dataloader:
...     print(row)
...
(tensor([1.23, 4.56, 7.89]), 0)
(tensor([9.87, 6.54, 3.21]), 1)
(tensor([9.99, 8.88, 7.77]), 0)
(tensor([1.11, 2.22, 9.87]), 1)

What is the a simple way to create the DataLoader from the DataPipe that best optimize the batching function of the autobot_vectorize function?


I’ve tried these…

Attempt 1: Not fully utilizing batching in autobot_vectorize

I’ve tried doing this nesting the mapping function and by sending autobot_vectorize one input at a time but this is not fully utilizing its batching function capabilities, i.e.


def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', 'label': 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    return autobot_vectorize(row['imagefile'])[0], row['label'] 

[Working snippet]:

import torch 

from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper

content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)
    
def autobot_vectorize(imgfiles):
    # This vectorizer takes N imgfiles and return the 
    # a tensor of size N x 3.
    # For example, we are simulating this with torch.rand(N, 3)
    """
    >>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
    tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
    """
    return torch.rand(len(imgfiles), 3)


def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', 'label': 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    return autobot_vectorize(row['imagefile'])[0], row['label'] 
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files,
        skip_lines=0
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        self.dp_chained_datapipe = self.dp_chained_datapipe.map(optimus_prime)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe(['myfile.csv'])

for row in dp.dp_chained_datapipe:
    print(row)

[out]:

(tensor([0.0740, 0.6826, 0.1215]), '0')
(tensor([0.0084, 0.2512, 0.8702]), '0')
(tensor([0.7141, 0.4003, 0.7872]), '1')
(tensor([0.8011, 0.4163, 0.9410]), '1')

Attempt 2: Batching before flatmap, but can’t get labels to be included…

I’ve tried to process the X efficiently by kind of using .flatmap() but I don’t even know whether I’m really using the autobot_vectorize batching capabilities or is it still processing the batch, one at a time.

[Working snippet]:

import torch 

from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper

content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0
train/0/47215.png,1
train/0/93155.png,1"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)
    
def autobot_vectorize(imgfiles):
    # This vectorizer takes N imgfiles and return the 
    # a tensor of size N x 3.
    # For example, we are simulating this with torch.rand(N, 3)
    """
    >>> autobot_vectorize(['train/0/47215.png', 'train/0/56789.png'])
    tensor([[1.23, 4.56, 7.89], [9.99, 8.88, 7.77]])
    """
    return torch.rand(len(imgfiles), 3)


class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files,
        skip_lines=0
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        self.dp_chained_datapipe = self.dp_chained_datapipe.map(lambda x: [x['imagefile']])
        self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(autobot_vectorize)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe(['myfile.csv'])

for row in dp.dp_chained_datapipe:
    print(row)

Also, I’m not sure how to add the Ys (labels) when using flatmap as above.

What is the a simple way to create the DataLoader from the DataPipe that best optimize the batching function of the autobot_vectorize function?

Also asked on python - How to create DataPipe that best optimize the map-transform function that supports batching? - Stack Overflow

You can write your own IterDataPipe that collects b number of samples before transforming them and yielding that. It will look something like:

def __iter__(self):
    img_batch = []
    label_batch = []
    for img, label in self.source_datapipe:
        img_batch.append(img)
        label_batch.append(label)
        if len(img_batch) == b:  # reaches batch size
            return autobot_vectorize(img_batch), process_labels(label_batch)
        # Extra logic to handle `drop_last` and etc

You will need a function to process the label since I am not sure what your desired output is for labels.

You can also do dp.batch(batch_size).map(process_batch) by defining a function that can handle [(img1, label1), (img2, label2), (img3, label3), ...] and create the desired output.