Tensorflow-esque bucket by sequence length

Sorry to bump in guys, but can someone explain what bucket_boundaries stand for here?( I am new to this trick,)
Thanks a lot

No worries at all. Bucket_boundaries determines how many ‘buckets’ you can put the sequences into. For example if all my sequences are between length 50 and 100, I may want to create 6 bucket boundaries: [50,60,70,80,90,100] this will mean that sequences length 50-60 will all be in one bucket, etc. (meaning that the max padding of a batch taken from this bucket will be 60).

You decide how granular you want the buckets to be and could also use something like list(np.arange(50,100,10) to create the above boundaries if you didn’t want to type them all out by hand.

NB. You need to have more sequences in each bucket than the batch size else you will get an error when it tries to split each bucket into batches.

Hope this helps.

1 Like

Wow thanks! This makes it clear (is identical to what np.histogram() does somewhat :slight_smile: )

To get your code to work I had to pass both the inputs, targets and lengths as np.arrays not as lists. Otherwise when __next__ tries to return the indexes I got the error: list indices must be integers or slices, not list because you can’t get multiple indices from a list.

I have put @vdw’s bucketer by length which removes any need for padding(!) into a BatchSampler object and introduced shuffling of the data and buckets to improve convergence while training. The perk of it being a BatchSampler object is that you can pass it into a DataLoader and parallelize inputting data into the GPU.

The BatchSampler:

(Here I am using an autoencoder so I don’t input targets to the function or yield it)

from collections import OrderedDict
import numpy as np
from random import shuffle

class BucketDataset(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.inputs = inputs      # shape = (N, max_seq_len)
        self.targets = targets    # shape = (N, ) or None (e.g., for autoencoder I can simply use inputs)
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append( (i, p.shape[0]) )
        self.ind_n_len = ind_n_len
        
    def _generate_batch_map(self):
        
        shuffle(self.ind_n_len) # shuffle all of the indices first so they are put into buckets differently
        
        batch_map = OrderedDict()
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return len(self.batch_list)

    def __len__(self):
        return len(self.lengths)

    def __iter__(self):

        batch_list = self._generate_batch_map()
        
        shuffle(batch_list) # shuffle all the batches so they arent ordered by bucket size
        for i in batch_list: 
            yield i 

Calling the BatchSampler and DataLoader:

sampler = BucketDataset(<your data in an np.array>, BATCH_SIZE)
dataloader = DataLoader(<your data as a DataSet object>, batch_size=1, 
                            batch_sampler=sampler, shuffle=False,
                            num_workers=8,  drop_last=False)
3 Likes

Very neat…much appreciated indeed!

I’ve noticed that I’ve kept equal_length=False in my code which is not used anywhere. The reason is that code I posted was simplified. My original code uses it – see snippet below – to mimic the BucketIterator of torchtext: batches contain sequences of similar or equal length. The latter obviously guarantees that more batches are full, but as I wrote previously, for large datasets it doesn’t matter. So the equal_length=False can me removed from the method header to make it cleaner.

For the sake of completeness, here’s the snippet that uses equal_length:

    if equal_length:
        self.batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                self.batch_list.append(group)
    else: # mimic torchtexts' BucketIterator
        indices = []
        [ indices.extend(v) for v in batch_map.values() ]
        self.batch_list = [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]
3 Likes

Glad I could help, your code does all the heavy lifting (and is very pythonic!). And yes I forgot that was there, will edit mine to simplify it.

I’ve took the liberty to make some slight modifications since it was running out of the box; see full code below. The changes in a nutshell:

  • Since it’s a sampler, I renamed it to BucketBatchSampler :). Previously BucketDataset was an crude mix of Dataset and Sampler. Your approach is much cleaner.
  • Since it’s now only a sampler, there’s no need to keep self.inputs and self.targets any longer. Saves a lot of memory.
  • In your code batch_count() and __len__() where no longer working since there’s no self.batch_list and self.lengths anymore. I’ve updated this.
  • I’ve added a “proper” BucketDataset class to implements the torch.utils.data.Dataset

The usage now looks like this:

X = <data as np.array>
bucket_batch_sampler = BucketBatchSampler(X, BATCH_SIZE) # <-- does not store X
bucket_dataset = BucketDataset(X, None)
dataloader = DataLoader(bucket_dataset, batch_size=1, batch_sampler=bucket_batch_sampler, shuffle=False, num_workers=8, drop_last=False)

The full code:

from torch.utils.data import Sampler, Dataset
from collections import OrderedDict
from random import shuffle


class BucketDataset(Dataset):

    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, index):
        if self.targets is None:
            return self.inputs[index]
        else:
            return self.inputs[index], self.targets[index]


class BucketBatchSampler(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append((i, p.shape[0]))
        self.ind_n_len = ind_n_len
        self.batch_list = self._generate_batch_map()
        self.num_batches = len(self.batch_list)

    def _generate_batch_map(self):
        # shuffle all of the indices first so they are put into buckets differently
        shuffle(self.ind_n_len)
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        batch_map = OrderedDict()
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i + self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return self.num_batches

    def __len__(self):
        return len(self.ind_n_len)

    def __iter__(self):
        self.batch_list = self._generate_batch_map()
        # shuffle all the batches so they arent ordered by bucket size
        shuffle(self.batch_list)
        for i in self.batch_list:
            yield i
8 Likes

sorry for bumping in like this but I am trying to understand the basics. What problem is this solving and what is the gist of how it’s solving it?

Say you’re building an RNN-based sentiment classifier for tweets/sentences. RNNs go over a sentences word by word, but sentences generally have different lengths. If your batch_size=1 it doesn’t matter since each sentences to processed/classified independently. However, with batch_size=1 you sacrifice a lot of performance. You want to process multiple sentence at once, e.g., batch_size=64. To deal with the different lengths of sentences you have different options:

  • Padding: All shorter sentences get extended by a default word, e.g., <pad>, to the length of the longest sentences. Now all sentences in the batch have the same length. The problem is that it’s not obvious if and how padding effects the training and accuracy of the classifier (particularly when the differences in lengths are very great).
  • PackedSequence: That’s concept provided by PyTorch that hides that the RNN stops at the last word of each sentence in batch, even if they have different lengths. This works fine if you only need the output of the last step. If you need the output at each steps (e.g., for pooling over all steps), I don’t think you can use that approach. It adds additional logic which reduces performance (I’ve noticed 10% in one of my cases but that’s anecdotal).
  • Bucketing: (the proposed idea in this thread): Well, you simply ignore all the issues by ensuring that all sentences in a batch have the same length. Needs no padding, needs no PackedSequence, the output of each time steps can be subsequently. In my opinion, that’s the most hassle-free approach for classification and sequence labeling task. Note that it’s not applicable as described above for sequence-to-sequence tasks (e.g., machine translation).
3 Likes

Amazing summary! Thanks!

Just to confirm I understand bucketing. Bucketing is just padding everything and running the RNN as is without using the optimizations of packing. Right? So the padding token <pad> is processed regardless. So the problem bucketing solves is the hassle of stopping at the right time and the hassle of implemented optimized GPU code.

Btw, your comment about outputs at each step. Are you sure that is correct? I am looking at my code from the Chatbot tutorial that uses packing and I thought the forward pass with padded packing did return everything from every time step:

    def forward(self, input_seq, input_lengths, hidden=None):
        '''
        Given input

        Details:
        Computation Graph:
            Convert word indexes to embeddings.
            Pack padded batch of sequences for RNN module.
            Forward pass through GRU.
            Unpack padding.
            Sum bidirectional GRU outputs.
            Return output and final hidden state.

        :parm torch.Tensor input_seq: padded tensor of indices of the words (max_seq_len, batch_size) e.g. (10, 64)
        :param torch.Tensor input_lengths: tensor with length of each sequence in the batch (batch_size) e.g. (64)
        :param torch.Tensor hidden: initial tensor for the first cell of the RNN/GRU (n_layers*nb_directions, batch_size, hidden_size) or None
        :return torch.Tensor outputs: output of encoder (max_seq_len, batch_size, hidden_dim)
        :return torch.Tensor hidden: final hidden state of encoder (n_layers*nb_directions, batch_size, hidden_size)
        '''
        # Convert word indexes  to embeddings (10,64) -> (10, 64, 500)
        embedded = self.embedding(input_seq) # (max_len, batch_size, embedding_dim=hidden_di) e.g (10,64,500)
        # Pack padded batch of sequences for RNN module (so that the padding isn't processed by the RNN module)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, input_lengths)
        # Forward pass through GRU (but not the padded part cuz its packed)
        outputs, hidden = self.gru(packed, hidden)
        # Unpack padding atfter it was processed correctly
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs)
        # Sum bidirectional GRU outputs (just a way to combine the direction, concat is more common)
        outputs = outputs[:, :, :self.hidden_size] + outputs[:, : ,self.hidden_size:]
        # Return output and final hidden state
        st()
        return outputs, hidden

Or are you referring to something else? Or misunderstood you?

What?! That’s possible? Is it still efficient in GPU?

Well that seems bad. Is there a reason for that? Can it be extended to seq2seq taks?

Thnx so much for ur help Chris! :slight_smile:


quora cross-post: what is the difference of bucketing vs packing + padding in Pytorch?

Sure, in principle it can be extended to Seq2Seq tasks…quite easily, I assume. It’s just not obvious how it effects the batch sizes. If you want to ensure that all inputs sequences have the same length (e.g., 12) and all all output sequences have the same length (e.g., 15), the likelihood to have enough item pairs for each existing combinations of lengths might be very low.

On general, for a large dataset, it should be not too bad, I think. One can argue that when an input sequence is of length X, then the length Y of output sequence should mostly be within the range of X. But I haven’t tested it yet. The very few times i dabbled with Seq2Seq models, I’ve used batches of size 1.

I don’t see how one would (nor should) control the output lengths in seq2seq, that is supposed to be learned and controlled by the model itself, no?
And you can’t know in advance what output length a model will actually produce, although you might have a desired target length.

Good question!

Essentially, it’s just for convenience; the model is agnostic to the sequence lengths. At least for training, as the lengths of the output sequences are known. But you’re right, predicting only works 1 sequences at a time and no longer in batches since the output sequence are likely to differ in lengths.

I actually implemented bucketing for Seq2Seq and use it all the time…again, simply for convenience and performance.

I don’t see how the output lengths would be known even during training? You might* know the target lengths, but the actual output is purely up to the model (though you’d probably enforce some max length), especially when you’re just starting training and the model is clueless, just any length would be possible.

*might : as sometimes there are more than one possible target, as in translation, where there might be more than one possible “correct” translation, which possibly vary in length as well, so you wouldn’t know even what the target length is.

Well, the model doesn’t “know” the length of the output sequence. But training a single sample is done when the decoder reaches the end-of-sequence (EOS) token or when there is not next token. So the model only knows the end when it has reached it…again, I’m referring only to the training.

The usage of a max_length or similar is only relevant for predicting sequences since it can always happen that the next predicted word is not EOS which would tell the decoder to stop. But training ends when the length of target sequences is reached. This is the basic setup for Seq2Seq with batch sizes of 1.

The only difference of bucketing is, that all target sequences are of the same length and the model reaches the EOS token for all samples in the batch at the same time.

I am sorry for bumping in too and I apologize for this not being related to Pytorch. I have been scratching my head trying to implement this exactly in Tensorflow.Keras. Would anyone here know how to approach this in Tensorflow.Keras? I have my data in:

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1) 

train_data = tf.data.Dataset.from_generator(lambda: (X_train, y_train),  output_types=(tf.float64, tf.float64),  output_shapes=(tf.TensorShape([None, 50]), tf.TensorShape([None, 2])))

val_data = tf.data.Dataset.from_generator(lambda: (X_test, y_test), output_types=(tf.float64, tf.float64),  output_shapes=(tf.TensorShape([None, 50]), ([None, 2])) )

This solution is OK when use Dataset. Your code requires the size of the dataset, but I have a IterableDataset, whose size is unknown. So how can I implement bucket_by_sequence_length?