Datapipe not helping in faster dataloading

Hello,

I have been working on a dataset containing RNA sequences in a text file. I am using the Datapipe from torch data to load the dataset.

from torchdata.datapipes.iter import IterDataPipe
from torch.utils.data import DataLoader
import torchdata.datapipes as dp
import tokenization
import random
import dataset_utilis as du
import torch

#profiling imports

import cProfile
import pstats


class SeqDatasetIter(IterDataPipe):
    def __init__(
        self,
        config
    ) -> None:
        super(SeqDatasetIter).__init__()
        self.input_dir = config['input_dir']
        self.max_length = config['max_length']
        self.min_length = config['min_length']
        self.masked_lm_prob = config['masked_lm_prob']
        self.max_predictions_per_seq = config['max_predictions_per_seq']
        self.rng = random.Random(1)
        self.k = config['k']
        self.tokenizer = tokenization.FullTokenizer(k=self.k)


    def mask_und_pad(self, tokenized_seq, vocab):
        segment_ids = [0] * len(tokenized_seq)
        (
            tokenized_seq,
            masked_lm_positions,
            masked_lm_labels,
        ) = du.create_masked_lm_predictions(
            tokenized_seq,
            self.masked_lm_prob,
            self.max_predictions_per_seq,
            vocab,
            self.rng,
        )
        tokenized_seq = self.tokenizer.convert_tokens_to_ids(tokenized_seq)
        masked_lm_labels = self.tokenizer.convert_tokens_to_ids(masked_lm_labels)
        # return tokenized_seq, segment_ids, masked_lm_positions, masked_lm_labels


        input_ids = torch.tensor(tokenized_seq, dtype=torch.long)
        segment_ids = torch.tensor(segment_ids, dtype=torch.long)
        input_mask = torch.ones_like(input_ids, dtype=torch.long)

        pad_length = self.max_length - input_ids.size(0)
        padding = torch.zeros(pad_length, dtype=torch.long)

        input_ids = torch.cat((input_ids, padding))
        segment_ids = torch.cat((segment_ids, padding))
        input_mask = torch.cat((input_mask, padding))

        masked_lm_positions = torch.tensor(masked_lm_positions, dtype=torch.long)
        masked_lm_ids = torch.tensor(masked_lm_labels, dtype=torch.long)
        masked_lm_weights = torch.tensor([1.0] * len(masked_lm_ids), dtype=torch.float32)

        pad_length = self.max_predictions_per_seq - masked_lm_positions.size(0)
        padding = torch.zeros(pad_length, dtype=torch.long)
        padding_weights = torch.zeros(pad_length, dtype=torch.float32)

        masked_lm_positions = torch.cat((masked_lm_positions, padding))
        masked_lm_ids = torch.cat((masked_lm_ids, padding))
        masked_lm_weights = torch.cat((masked_lm_weights, padding_weights))



        return (
        input_ids,
        segment_ids,
        masked_lm_positions,
        masked_lm_labels
    )

    def preprocess(self, sequence):
        line = tokenization.convert_to_unicode(sequence[0])
        tokenized_seq = self.tokenizer.tokenize(line)
        vocab = list(self.tokenizer.vocab.keys())
        tokenized_seq = [tokenization.CLS_TOKEN] + tokenized_seq
        projected_length = len(tokenized_seq)
        if projected_length >= self.min_length and projected_length <= self.max_length:
            (
                input_ids,
                segment_ids,
                masked_lm_positions,
                masked_lm_labels
        ) = self.mask_und_pad(tokenized_seq, vocab)
        else:
            sequence = sequence[0]
            sequence = sequence[:self.max_length-2] #why 2?
            line = tokenization.convert_to_unicode(sequence)
            #if the length of a sequence is more than thw
            #maximum length, we remove the extra part
            #subtracted 1 for the class token which is added later
            #otherwise length will become more than max length
            tokenized_seq = self.tokenizer.tokenize(line)
            vocab = list(self.tokenizer.vocab.keys())
            tokenized_seq = [tokenization.CLS_TOKEN] + tokenized_seq
            (
                input_ids,
                segment_ids,
                masked_lm_positions,
                masked_lm_labels
        ) = self.mask_und_pad(tokenized_seq, vocab)
            
        return (
            input_ids,
            segment_ids,
            masked_lm_positions,
            masked_lm_labels
        )
    
    def filter(self, fname):
        fname = fname.endswith('.txt')
        return fname

    def build_datapipes(self):
        datapipe = dp.iter.FileLister(self.input_dir)
        datapipe = datapipe.filter(filter_fn=self.filter)
        datapipe = datapipe.open_files()#dp.iter.FileOpener(datapipe)
        datapipe = datapipe.parse_csv(delimiter="\n") #line reader can also be used
        # Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
        datapipe = datapipe.shuffle()
        datapipe = datapipe.sharding_filter()
        datapipe = datapipe.map(self.preprocess)
        return datapipe
    
    def vocab_size(self):
        return len(self.tokenizer.vocab.keys())


if __name__ == "__main__":
    with cProfile.Profile() as profile:
        tokenizer = tokenization.FullTokenizer()
        max_length = 1000
        min_length = 0
        masked_lm_prob = 0.15
        max_predictions_per_seq = 3
        input_dir = "data/bigdata/test"
        config = {
        'max_length': max_length,
        'min_length': min_length,
        'masked_lm_prob': masked_lm_prob,
        'max_predictions_per_seq': max_predictions_per_seq,
        'input_dir': input_dir,
        'k': 3
    }
        dataset1 = SeqDatasetIter(config)
        datapipe = dataset1.build_datapipes()
        dataloader = DataLoader(dataset=datapipe, batch_size=128, num_workers=24)
        for epoch in range(10):
            for batch in dataloader:
                print(batch)

    results = pstats.Stats(profile)
    results.sort_stats(pstats.SortKey.TIME)
    results.print_stats()
    results.dump_stats('iter_dataloader.prof')

When I train my model using this code, everything works fine but as I increase the number of GPUs to get faster training time, I don’t see any improvement. I have also tried loading the data using MapStyle dataset and in that case, I see a clear improvement in training time when using more GPUs.

Can someone help me find possible reasons for this behaviour?

Thanks