Recommended way to create a data pipeline with torchtext>=v0.10.0

I’m pretty new to torchtext and recently I’ve digged through a lot of tutorials for NLP with Torchtext although most of them use APIs that are now moved to torchtext.legacy and current docs on PyTorch use functional methods and don’t have a standard conventional way of implementing a data pipeline like torchvision. What I currently do is extending torch.utils.data.Dataset like below:

# A custom language translation dataset using Multi30k
class Multi30kDe2En(Dataset):
    UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
    SPECIAL_SYMBOLS = ['<unk>', '<pad>', '<bos>', '<eos>']

    def __init__(self, base_url, urls):
        super(Multi30kDe2En, self).__init__()
        self.base_url = base_url
        self.urls = urls
        # paths to text files containing queries (e.g. train.de, train.en)
        self.paths = [extract_archive(download_from_url(self.base_url + url))[0] for url in self.urls]
        # load tokenizers
        self.de_tokenizer = get_tokenizer('spacy', language='de_core_news_sm')
        self.en_tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
        self.de_vocab, self.en_vocab = self._load_vocabs()
        # list of all queries; a sentence per item
        self.de_texts = list((io.open(self.paths[0], encoding="utf8")))
        self.en_texts = list((io.open(self.paths[1], encoding="utf8")))

    def __len__(self):
        return len(self.en_texts)

    def __getitem__(self, index):
        de_text = self.de_texts[index]
        en_text = self.en_texts[index]
        de_tensor = torch.tensor([self.de_vocab[token] for token in self.de_tokenizer(de_text)], dtype=torch.long)
        en_tensor = torch.tensor([self.en_vocab[token] for token in self.en_tokenizer(en_text)], dtype=torch.long)
        return de_tensor, en_tensor

    def _load_vocabs(self):
        # REALLY LOOKING FOR AN ALTERNATIVE WAY INSTEAD OF AN UGLY ITERATOR
        def yield_tokens(filepath, tokenizer):
            with io.open(filepath, encoding='utf8') as f:
                for text in f:
                    yield tokenizer(text)

        de_vocab = build_vocab_from_iterator(yield_tokens(self.paths[0], self.de_tokenizer),
                                             specials=self.SPECIAL_SYMBOLS)
        en_vocab = build_vocab_from_iterator(yield_tokens(self.paths[1], self.en_tokenizer),
                                             specials=self.SPECIAL_SYMBOLS)
        de_vocab.set_default_index(self.UNK_IDX)
        en_vocab.set_default_index(self.UNK_IDX)

        return de_vocab, en_vocab

    @classmethod
    def collate_fn(cls, batch):
        # Just to concat bos and eos tokens to the sides of the tensor
        de_batch, en_batch = [], []
        for de, en in batch:
            de_batch.append(torch.cat([torch.tensor([cls.BOS_IDX]), de, torch.tensor([cls.EOS_IDX])], dim=0))
            en_batch.append(torch.cat([torch.tensor([cls.BOS_IDX]), en, torch.tensor([cls.EOS_IDX])], dim=0))
        de_batch = pad_sequence(de_batch, padding_value=cls.PAD_IDX).permute(1, 0)
        en_batch = pad_sequence(en_batch, padding_value=cls.PAD_IDX).permute(1, 0)
        return de_batch, en_batch

if __name__ == '__main__':
    from torch.utils.data import DataLoader

    base_url = 'https://raw.githubusercontent.com/multi30k/dataset/master/data/task1/raw/'
    urls = ('train.de.gz', 'train.en.gz')
    dataset = Multi30kDe2En(base_url, urls)
    dataloader = DataLoader(dataset, batch_size=1, collate_fn=Multi30kDe2En.collate_fn)
    de_batch, en_batch = next(iter(dataloader))
    print(f'German Tensor Batch : {de_batch}')
    print(f'English Tensor Batch: {en_batch}')

I’d appreciate any suggestions on other alternative ways. Thanks!

First of all, this code has a major problem which is, the vocabs are created every time a dataset is instantiated from this class (like train , val) so the validation set will have different set of vocab mappings, hence really bad results on validation set while evaluating. The better way to implement this to have consistent set of vocab is like below:

import torch
from torch.utils.data import Dataset
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import Multi30k
from torch.nn.utils.rnn import pad_sequence
from torchtext.data.utils import get_tokenizer
from typing import Literal

class Multi30kDe2En(Dataset):
    UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
    SPECIAL_SYMBOLS = ['<unk>', '<pad>', '<bos>', '<eos>']

    def __init__(self, split: Literal['train', 'valid']):
        super(Multi30kDe2En, self).__init__()
        self.split = split
        self.iter = Multi30k(split=split, language_pair=('de', 'en'))
        self.de_texts, self.en_texts = list(zip(*self.iter))
        self.de_tokenizer = get_tokenizer('spacy', language='de_core_news_sm')
        self.en_tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
        self.de_vocab, self.en_vocab = self._load_vocabs()

    def __len__(self):
        return len(self.en_texts)

    def __getitem__(self, index):
        de_text = self.de_texts[index]
        en_text = self.en_texts[index]
        de_tensor = torch.tensor([self.de_vocab[token] for token in self.de_tokenizer(de_text)], dtype=torch.long)
        en_tensor = torch.tensor([self.en_vocab[token] for token in self.en_tokenizer(en_text)], dtype=torch.long)
        return de_tensor, en_tensor

    def _load_vocabs(self):
        data_iter = Multi30k(split='train', language_pair=('de', 'en'))  # vocabs must be extracted from train split
        de_texts, en_texts = list(zip(*data_iter))
        de_tokens = [self.de_tokenizer(text.rstrip('\n')) for text in de_texts]
        en_tokens = [self.en_tokenizer(text.rstrip('\n')) for text in en_texts]
        de_vocab = build_vocab_from_iterator(iter(de_tokens), specials=self.SPECIAL_SYMBOLS)
        en_vocab = build_vocab_from_iterator(iter(en_tokens), specials=self.SPECIAL_SYMBOLS)
        de_vocab.set_default_index(self.UNK_IDX)
        en_vocab.set_default_index(self.UNK_IDX)

        return de_vocab, en_vocab

    @classmethod
    def collate_fn(cls, batch):
        de_batch, en_batch = [], []
        for de, en in batch:
            de_batch.append(torch.cat([torch.tensor([cls.BOS_IDX]), de, torch.tensor([cls.EOS_IDX])], dim=0))
            en_batch.append(torch.cat([torch.tensor([cls.BOS_IDX]), en, torch.tensor([cls.EOS_IDX])], dim=0))
        de_batch = pad_sequence(de_batch, padding_value=cls.PAD_IDX).permute(1, 0)
        en_batch = pad_sequence(en_batch, padding_value=cls.PAD_IDX).permute(1, 0)
        return de_batch, en_batch


if __name__ == '__main__':
    from torch.utils.data import DataLoader

    dataset = Multi30kDe2En('train')
    dataloader = DataLoader(dataset, batch_size=16, collate_fn=Multi30kDe2En.collate_fn)
    de, en = next(iter(dataloader))
    print('done')

Hey if we’d had to have a custom pretraining vector like glove or fasttext how should that be added in the pipeline and also how could we sort the iterator based on the samples height such that similar length batches get accumulated in a batch and minimum padding is needed. Just the way it was done in Bucket iterator in legacy torch text