Hello,
I have been working on a dataset containing RNA sequences in a text file. I am using the Datapipe from torch data to load the dataset.
from torchdata.datapipes.iter import IterDataPipe
from torch.utils.data import DataLoader
import torchdata.datapipes as dp
import tokenization
import random
import dataset_utilis as du
import torch
#profiling imports
import cProfile
import pstats
class SeqDatasetIter(IterDataPipe):
def __init__(
self,
config
) -> None:
super(SeqDatasetIter).__init__()
self.input_dir = config['input_dir']
self.max_length = config['max_length']
self.min_length = config['min_length']
self.masked_lm_prob = config['masked_lm_prob']
self.max_predictions_per_seq = config['max_predictions_per_seq']
self.rng = random.Random(1)
self.k = config['k']
self.tokenizer = tokenization.FullTokenizer(k=self.k)
def mask_und_pad(self, tokenized_seq, vocab):
segment_ids = [0] * len(tokenized_seq)
(
tokenized_seq,
masked_lm_positions,
masked_lm_labels,
) = du.create_masked_lm_predictions(
tokenized_seq,
self.masked_lm_prob,
self.max_predictions_per_seq,
vocab,
self.rng,
)
tokenized_seq = self.tokenizer.convert_tokens_to_ids(tokenized_seq)
masked_lm_labels = self.tokenizer.convert_tokens_to_ids(masked_lm_labels)
# return tokenized_seq, segment_ids, masked_lm_positions, masked_lm_labels
input_ids = torch.tensor(tokenized_seq, dtype=torch.long)
segment_ids = torch.tensor(segment_ids, dtype=torch.long)
input_mask = torch.ones_like(input_ids, dtype=torch.long)
pad_length = self.max_length - input_ids.size(0)
padding = torch.zeros(pad_length, dtype=torch.long)
input_ids = torch.cat((input_ids, padding))
segment_ids = torch.cat((segment_ids, padding))
input_mask = torch.cat((input_mask, padding))
masked_lm_positions = torch.tensor(masked_lm_positions, dtype=torch.long)
masked_lm_ids = torch.tensor(masked_lm_labels, dtype=torch.long)
masked_lm_weights = torch.tensor([1.0] * len(masked_lm_ids), dtype=torch.float32)
pad_length = self.max_predictions_per_seq - masked_lm_positions.size(0)
padding = torch.zeros(pad_length, dtype=torch.long)
padding_weights = torch.zeros(pad_length, dtype=torch.float32)
masked_lm_positions = torch.cat((masked_lm_positions, padding))
masked_lm_ids = torch.cat((masked_lm_ids, padding))
masked_lm_weights = torch.cat((masked_lm_weights, padding_weights))
return (
input_ids,
segment_ids,
masked_lm_positions,
masked_lm_labels
)
def preprocess(self, sequence):
line = tokenization.convert_to_unicode(sequence[0])
tokenized_seq = self.tokenizer.tokenize(line)
vocab = list(self.tokenizer.vocab.keys())
tokenized_seq = [tokenization.CLS_TOKEN] + tokenized_seq
projected_length = len(tokenized_seq)
if projected_length >= self.min_length and projected_length <= self.max_length:
(
input_ids,
segment_ids,
masked_lm_positions,
masked_lm_labels
) = self.mask_und_pad(tokenized_seq, vocab)
else:
sequence = sequence[0]
sequence = sequence[:self.max_length-2] #why 2?
line = tokenization.convert_to_unicode(sequence)
#if the length of a sequence is more than thw
#maximum length, we remove the extra part
#subtracted 1 for the class token which is added later
#otherwise length will become more than max length
tokenized_seq = self.tokenizer.tokenize(line)
vocab = list(self.tokenizer.vocab.keys())
tokenized_seq = [tokenization.CLS_TOKEN] + tokenized_seq
(
input_ids,
segment_ids,
masked_lm_positions,
masked_lm_labels
) = self.mask_und_pad(tokenized_seq, vocab)
return (
input_ids,
segment_ids,
masked_lm_positions,
masked_lm_labels
)
def filter(self, fname):
fname = fname.endswith('.txt')
return fname
def build_datapipes(self):
datapipe = dp.iter.FileLister(self.input_dir)
datapipe = datapipe.filter(filter_fn=self.filter)
datapipe = datapipe.open_files()#dp.iter.FileOpener(datapipe)
datapipe = datapipe.parse_csv(delimiter="\n") #line reader can also be used
# Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
datapipe = datapipe.shuffle()
datapipe = datapipe.sharding_filter()
datapipe = datapipe.map(self.preprocess)
return datapipe
def vocab_size(self):
return len(self.tokenizer.vocab.keys())
if __name__ == "__main__":
with cProfile.Profile() as profile:
tokenizer = tokenization.FullTokenizer()
max_length = 1000
min_length = 0
masked_lm_prob = 0.15
max_predictions_per_seq = 3
input_dir = "data/bigdata/test"
config = {
'max_length': max_length,
'min_length': min_length,
'masked_lm_prob': masked_lm_prob,
'max_predictions_per_seq': max_predictions_per_seq,
'input_dir': input_dir,
'k': 3
}
dataset1 = SeqDatasetIter(config)
datapipe = dataset1.build_datapipes()
dataloader = DataLoader(dataset=datapipe, batch_size=128, num_workers=24)
for epoch in range(10):
for batch in dataloader:
print(batch)
results = pstats.Stats(profile)
results.sort_stats(pstats.SortKey.TIME)
results.print_stats()
results.dump_stats('iter_dataloader.prof')
When I train my model using this code, everything works fine but as I increase the number of GPUs to get faster training time, I don’t see any improvement. I have also tried loading the data using MapStyle dataset and in that case, I see a clear improvement in training time when using more GPUs.
Can someone help me find possible reasons for this behaviour?
Thanks