Tensorflow-esque bucket by sequence length

I switched to PyTorch from Tensorflow recently and kind of re-created TF’s bucket_by_sequence_length https://www.tensorflow.org/api_docs/python/tf/data/experimental/bucket_by_sequence_length function in the form of a PyTorch Batch Sampler that can be used with a DataLoader. I know that PyTorch has pack_padded_sequence but because this doesn’t work with dense layers and my sequence data has high variance in its length so I wanted to minimize padding and masking by feeding in data that is already grouped by sequence length (while still shuffling it somewhat). Here is my current solution in numpy. I will need to convert every function over to torch to allow it to run on the GPU and am sure there are many other ways to optimize it further. Hope this helps others and that maybe it can become a new PyTorch Batch Sampler someday.

General approach to how it works:

  • Decide what your bucket boundaries for the data are.
  • Iterate through your data (provided in an array) and for each element its index and length is recorded
  • Given these indices and lengths, each index is assigned to a bucket ID (I took this whole function from the tensorflow batch_by_sequence_length linked to above)
  • Shuffle the data in these buckets
  • Split the data in each bucket into approximately the batch size (may be slightly larger)
  • Shuffle all of the batches made
  • yield a batch (which contains index references to your data)

Here is the Batch Sampler:

import numpy as np
from random import shuffle
class BySequenceLengthSampler(Sampler):

    def __init__(self, data_source,  
                bucket_boundaries, batch_size=64,):
        ind_n_len = []
        for i, p in enumerate(data_source):
            ind_n_len.append( (i, p.shape[0]) )
        self.ind_n_len = ind_n_len
        self.bucket_boundaries = bucket_boundaries
        self.batch_size = batch_size
        
        
    def __iter__(self):
        data_buckets = dict()
        # where p is the id number and seq_len is the length of this id number. 
        for p, seq_len in self.ind_n_len:
            pid = self.element_to_bucket_id(p,seq_len)
            if pid in data_buckets.keys():
                data_buckets[pid].append(p)
            else:
                data_buckets[pid] = [p]

        for k in data_buckets.keys():

            data_buckets[k] = np.asarray(data_buckets[k])

        iter_list = []
        for k in data_buckets.keys():
            np.random.shuffle(data_buckets[k])
            iter_list += (np.array_split(data_buckets[k]
                           , int(data_buckets[k].shape[0]/self.batch_size)))
        shuffle(iter_list) # shuffle all the batches so they arent ordered by bucket
        # size
        for i in iter_list: 
            yield i.tolist() # as it was stored in an array
    
    def __len__(self):
        return len(self.data_source)
    
    def element_to_bucket_id(self, x, seq_length):
        boundaries = list(self.bucket_boundaries)
        buckets_min = [np.iinfo(np.int32).min] + boundaries
        buckets_max = boundaries + [np.iinfo(np.int32).max]
        conditions_c = np.logical_and(
          np.less_equal(buckets_min, seq_length),
          np.less(seq_length, buckets_max))
        bucket_id = np.min(np.where(conditions_c))
        return bucket_id

To call it and use it in a DataLoader (Replace the batch_size and bucket_boundaries values):

bucket_boundaries = [50,100,125,150,175,200,250,300]
batch_sizes=32
sampler = BySequenceLengthSampler(<your data>,bucket_boundaries, batch_sizes)

dataloader = DataLoader(<your DataSet Object>, batch_size=1, 
                        batch_sampler=sampler, 
                        num_workers=0, 
                        drop_last=False, pin_memory=False)

As it is numpy functions you’ll need to keep it on the CPU for now. And as your BatchSampler already creates the batches, your DataLoader should have a batch size of 1.

Also, buckets for values smaller and larger than your buckets are also created so you won’t lose any data.

NB. Currently the batch size must be smaller than smallest number of sequences in any bucket so you may have to adjust your bucket boundaries depending on your batch sizes.

Gist of the same code that may be more up to date: https://gist.github.com/TrentBrick/bac21af244e7c772dc8651ab9c58328c

1 Like

Just to add to the discussion: torchtext has its own BucketIterator which groups sequences of similar lengths into batches. How does you approach differ from that.

I’ve wrote my own (completely unoptimized!) data loader to return batches where all sequences have exactly the same length. This avoids any padding (and involved issues) and makes RNNs so much more convenient. I used it for a Seq2Seq autoencoder network where the decoder generates words step by step. Having sequences of the exact same length made the training with batches trivial.

This differs from TorchText’s solution in that it is easier to pass data into it. Given an array of arrays (in my case of characters) I had a hard time getting TorchText to work for me. But thank for noting that this function does exist for people who might use it.

And your solution seems useful. You should share your code for it if you want!

Well, if it’s of any use, I’ve posted my code below. Since this doesn’t use any fancy torch or even numpy stuff, it’s almost trivial. In my case, inputs is a list of N encoded sequences with a maximum length max_seq_len that are padded if needed. lengths is a list with lengths for each of the N sequences (max_seq_len - padding) – I have my own vectorizer which returns lengths with the encoded inputs.

With this I can simply call

train_iter = BucketDataset(X_train, y_train, lengths_train, batch_size)

with my training loop looks like this:

for idx, (inputs, lengths) in enumerate(train_iter):
    batch_size = inputs.shape[0]
    # Remove unnecessary padding (all sequences have the same length)
    inputs = inputs[:, 0:lengths[0]]
    # Convert to tensors and move to device
    inputs = torch.tensor(inputs).to(self.device)
    ...

Uh, that’s it :). Of course, some batches might be smaller than batch_size, but if the dataset is reasonably large those number is almost negligible. For example, when I use a dataset of mine with about 840k sentences, 99.7% of batches are full and the rest are almost full. So there’s no measurable performance loss here.

from collections import OrderedDict

class BucketDataset:

    def __init__(self, inputs, targets, lengths, batch_size):
        self.inputs = inputs      # shape = (N, max_seq_len)
        self.targets = targets    # shape = (N, ) or None (e.g., for autoencoder I can simply use inputs)
        self.lengths = lengths    # shape = (N, )
        self.batch_size = batch_size
        self.current = -1
        self._generate_batch_map()

    def _generate_batch_map(self, equal_length=False):
        batch_map = OrderedDict()
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        for idx, length in enumerate(self.lengths):
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        self.batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                self.batch_list.append(group)

    def batch_count(self):
        return len(self.batch_list)

    def __len__(self):
        return len(self.lengths)

    def __iter__(self):
        return self

    def __next__(self):
        self.current += 1
        if self.current > len(self.batch_list)-1:
            self.current = -1
            raise StopIteration
        else:
            if self.targets is None:
                return self.inputs[self.batch_list[self.current]], \
                       self.lengths[self.batch_list[self.current]]
            else:
                return self.inputs[self.batch_list[self.current]], \
                       self.targets[self.batch_list[self.current]], \
                       self.lengths[self.batch_list[self.current]]
2 Likes

Thanks for sharing, this is great!

Sorry to bump in guys, but can someone explain what bucket_boundaries stand for here?( I am new to this trick,)
Thanks a lot

No worries at all. Bucket_boundaries determines how many ‘buckets’ you can put the sequences into. For example if all my sequences are between length 50 and 100, I may want to create 6 bucket boundaries: [50,60,70,80,90,100] this will mean that sequences length 50-60 will all be in one bucket, etc. (meaning that the max padding of a batch taken from this bucket will be 60).

You decide how granular you want the buckets to be and could also use something like list(np.arange(50,100,10) to create the above boundaries if you didn’t want to type them all out by hand.

NB. You need to have more sequences in each bucket than the batch size else you will get an error when it tries to split each bucket into batches.

Hope this helps.

1 Like

Wow thanks! This makes it clear (is identical to what np.histogram() does somewhat :slight_smile: )

To get your code to work I had to pass both the inputs, targets and lengths as np.arrays not as lists. Otherwise when __next__ tries to return the indexes I got the error: list indices must be integers or slices, not list because you can’t get multiple indices from a list.

I have put @vdw’s bucketer by length which removes any need for padding(!) into a BatchSampler object and introduced shuffling of the data and buckets to improve convergence while training. The perk of it being a BatchSampler object is that you can pass it into a DataLoader and parallelize inputting data into the GPU.

The BatchSampler:

(Here I am using an autoencoder so I don’t input targets to the function or yield it)

from collections import OrderedDict
import numpy as np
from random import shuffle

class BucketDataset(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.inputs = inputs      # shape = (N, max_seq_len)
        self.targets = targets    # shape = (N, ) or None (e.g., for autoencoder I can simply use inputs)
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append( (i, p.shape[0]) )
        self.ind_n_len = ind_n_len
        
    def _generate_batch_map(self):
        
        shuffle(self.ind_n_len) # shuffle all of the indices first so they are put into buckets differently
        
        batch_map = OrderedDict()
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return len(self.batch_list)

    def __len__(self):
        return len(self.lengths)

    def __iter__(self):

        batch_list = self._generate_batch_map()
        
        shuffle(batch_list) # shuffle all the batches so they arent ordered by bucket size
        for i in batch_list: 
            yield i 

Calling the BatchSampler and DataLoader:

sampler = BucketDataset(<your data in an np.array>, BATCH_SIZE)
dataloader = DataLoader(<your data as a DataSet object>, batch_size=1, 
                            batch_sampler=sampler, shuffle=False,
                            num_workers=8,  drop_last=False)
1 Like

Very neat…much appreciated indeed!

I’ve noticed that I’ve kept equal_length=False in my code which is not used anywhere. The reason is that code I posted was simplified. My original code uses it – see snippet below – to mimic the BucketIterator of torchtext: batches contain sequences of similar or equal length. The latter obviously guarantees that more batches are full, but as I wrote previously, for large datasets it doesn’t matter. So the equal_length=False can me removed from the method header to make it cleaner.

For the sake of completeness, here’s the snippet that uses equal_length:

    if equal_length:
        self.batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                self.batch_list.append(group)
    else: # mimic torchtexts' BucketIterator
        indices = []
        [ indices.extend(v) for v in batch_map.values() ]
        self.batch_list = [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]
2 Likes

Glad I could help, your code does all the heavy lifting (and is very pythonic!). And yes I forgot that was there, will edit mine to simplify it.

I’ve took the liberty to make some slight modifications since it was running out of the box; see full code below. The changes in a nutshell:

  • Since it’s a sampler, I renamed it to BucketBatchSampler :). Previously BucketDataset was an crude mix of Dataset and Sampler. Your approach is much cleaner.
  • Since it’s now only a sampler, there’s no need to keep self.inputs and self.targets any longer. Saves a lot of memory.
  • In your code batch_count() and __len__() where no longer working since there’s no self.batch_list and self.lengths anymore. I’ve updated this.
  • I’ve added a “proper” BucketDataset class to implements the torch.utils.data.Dataset

The usage now looks like this:

X = <data as np.array>
bucket_batch_sampler = BucketBatchSampler(X, BATCH_SIZE) # <-- does not store X
bucket_dataset = BucketDataset(X, None)
dataloader = DataLoader(bucket_dataset, batch_size=1, batch_sampler=bucket_batch_sampler, shuffle=False, num_workers=8, drop_last=False)

The full code:

from torch.utils.data import Sampler, Dataset
from collections import OrderedDict
from random import shuffle


class BucketDataset(Dataset):

    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, index):
        if self.targets is None:
            return self.inputs[index]
        else:
            return self.inputs[index], self.targets[index]


class BucketBatchSampler(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append((i, p.shape[0]))
        self.ind_n_len = ind_n_len
        self.batch_list = self._generate_batch_map()
        self.num_batches = len(self.batch_list)

    def _generate_batch_map(self):
        # shuffle all of the indices first so they are put into buckets differently
        shuffle(self.ind_n_len)
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        batch_map = OrderedDict()
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i + self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return self.num_batches

    def __len__(self):
        return len(self.ind_n_len)

    def __iter__(self):
        self.batch_list = self._generate_batch_map()
        # shuffle all the batches so they arent ordered by bucket size
        shuffle(self.batch_list)
        for i in self.batch_list:
            yield i
3 Likes

sorry for bumping in like this but I am trying to understand the basics. What problem is this solving and what is the gist of how it’s solving it?

Say you’re building an RNN-based sentiment classifier for tweets/sentences. RNNs go over a sentences word by word, but sentences generally have different lengths. If your batch_size=1 it doesn’t matter since each sentences to processed/classified independently. However, with batch_size=1 you sacrifice a lot of performance. You want to process multiple sentence at once, e.g., batch_size=64. To deal with the different lengths of sentences you have different options:

  • Padding: All shorter sentences get extended by a default word, e.g., <pad>, to the length of the longest sentences. Now all sentences in the batch have the same length. The problem is that it’s not obvious if and how padding effects the training and accuracy of the classifier (particularly when the differences in lengths are very great).
  • PackedSequence: That’s concept provided by PyTorch that hides that the RNN stops at the last word of each sentence in batch, even if they have different lengths. This works fine if you only need the output of the last step. If you need the output at each steps (e.g., for pooling over all steps), I don’t think you can use that approach. It adds additional logic which reduces performance (I’ve noticed 10% in one of my cases but that’s anecdotal).
  • Bucketing: (the proposed idea in this thread): Well, you simply ignore all the issues by ensuring that all sentences in a batch have the same length. Needs no padding, needs no PackedSequence, the output of each time steps can be subsequently. In my opinion, that’s the most hassle-free approach for classification and sequence labeling task. Note that it’s not applicable as described above for sequence-to-sequence tasks (e.g., machine translation).
1 Like

Amazing summary! Thanks!

Just to confirm I understand bucketing. Bucketing is just padding everything and running the RNN as is without using the optimizations of packing. Right? So the padding token <pad> is processed regardless. So the problem bucketing solves is the hassle of stopping at the right time and the hassle of implemented optimized GPU code.

Btw, your comment about outputs at each step. Are you sure that is correct? I am looking at my code from the Chatbot tutorial that uses packing and I thought the forward pass with padded packing did return everything from every time step:

    def forward(self, input_seq, input_lengths, hidden=None):
        '''
        Given input

        Details:
        Computation Graph:
            Convert word indexes to embeddings.
            Pack padded batch of sequences for RNN module.
            Forward pass through GRU.
            Unpack padding.
            Sum bidirectional GRU outputs.
            Return output and final hidden state.

        :parm torch.Tensor input_seq: padded tensor of indices of the words (max_seq_len, batch_size) e.g. (10, 64)
        :param torch.Tensor input_lengths: tensor with length of each sequence in the batch (batch_size) e.g. (64)
        :param torch.Tensor hidden: initial tensor for the first cell of the RNN/GRU (n_layers*nb_directions, batch_size, hidden_size) or None
        :return torch.Tensor outputs: output of encoder (max_seq_len, batch_size, hidden_dim)
        :return torch.Tensor hidden: final hidden state of encoder (n_layers*nb_directions, batch_size, hidden_size)
        '''
        # Convert word indexes  to embeddings (10,64) -> (10, 64, 500)
        embedded = self.embedding(input_seq) # (max_len, batch_size, embedding_dim=hidden_di) e.g (10,64,500)
        # Pack padded batch of sequences for RNN module (so that the padding isn't processed by the RNN module)
        packed = nn.utils.rnn.pack_padded_sequence(embedded, input_lengths)
        # Forward pass through GRU (but not the padded part cuz its packed)
        outputs, hidden = self.gru(packed, hidden)
        # Unpack padding atfter it was processed correctly
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs)
        # Sum bidirectional GRU outputs (just a way to combine the direction, concat is more common)
        outputs = outputs[:, :, :self.hidden_size] + outputs[:, : ,self.hidden_size:]
        # Return output and final hidden state
        st()
        return outputs, hidden

Or are you referring to something else? Or misunderstood you?

What?! That’s possible? Is it still efficient in GPU?

Well that seems bad. Is there a reason for that? Can it be extended to seq2seq taks?

Thnx so much for ur help Chris! :slight_smile:


quora cross-post: what is the difference of bucketing vs packing + padding in Pytorch?