Tensorflow-esque bucket by sequence length


(Trent Brick) #1

I switched to PyTorch from Tensorflow recently and kind of re-created TF’s bucket_by_sequence_length https://www.tensorflow.org/api_docs/python/tf/data/experimental/bucket_by_sequence_length function in the form of a PyTorch Batch Sampler that can be used with a DataLoader. I know that PyTorch has pack_padded_sequence but because this doesn’t work with dense layers and my sequence data has high variance in its length so I wanted to minimize padding and masking by feeding in data that is already grouped by sequence length (while still shuffling it somewhat). Here is my current solution in numpy. I will need to convert every function over to torch to allow it to run on the GPU and am sure there are many other ways to optimize it further. Hope this helps others and that maybe it can become a new PyTorch Batch Sampler someday.

General approach to how it works:

  • Decide what your bucket boundaries for the data are.
  • Iterate through your data (provided in an array) and for each element its index and length is recorded
  • Given these indices and lengths, each index is assigned to a bucket ID (I took this whole function from the tensorflow batch_by_sequence_length linked to above)
  • Shuffle the data in these buckets
  • Split the data in each bucket into approximately the batch size (may be slightly larger)
  • Shuffle all of the batches made
  • yield a batch (which contains index references to your data)

Here is the Batch Sampler:

import numpy as np
from random import shuffle
class BySequenceLengthSampler(Sampler):

    def __init__(self, data_source,  
                bucket_boundaries, batch_size=64,):
        ind_n_len = []
        for i, p in enumerate(data_source):
            ind_n_len.append( (i, p.shape[0]) )
        self.ind_n_len = ind_n_len
        self.bucket_boundaries = bucket_boundaries
        self.batch_size = batch_size
        
        
    def __iter__(self):
        data_buckets = dict()
        # where p is the id number and seq_len is the length of this id number. 
        for p, seq_len in self.ind_n_len:
            pid = self.element_to_bucket_id(p,seq_len)
            if pid in data_buckets.keys():
                data_buckets[pid].append(p)
            else:
                data_buckets[pid] = [p]

        for k in data_buckets.keys():

            data_buckets[k] = np.asarray(data_buckets[k])

        iter_list = []
        for k in data_buckets.keys():
            np.random.shuffle(data_buckets[k])
            iter_list += (np.array_split(data_buckets[k]
                           , int(data_buckets[k].shape[0]/self.batch_size)))
        shuffle(iter_list) # shuffle all the batches so they arent ordered by bucket
        # size
        for i in iter_list: 
            yield i.tolist() # as it was stored in an array
    
    def __len__(self):
        return len(self.data_source)
    
    def element_to_bucket_id(self, x, seq_length):
        boundaries = list(self.bucket_boundaries)
        buckets_min = [np.iinfo(np.int32).min] + boundaries
        buckets_max = boundaries + [np.iinfo(np.int32).max]
        conditions_c = np.logical_and(
          np.less_equal(buckets_min, seq_length),
          np.less(seq_length, buckets_max))
        bucket_id = np.min(np.where(conditions_c))
        return bucket_id

To call it and use it in a DataLoader (Replace the batch_size and bucket_boundaries values):

bucket_boundaries = [50,100,125,150,175,200,250,300]
batch_sizes=32
sampler = BySequenceLengthSampler(<your data>,bucket_boundaries, batch_sizes)

dataloader = DataLoader(<your DataSet Object>, batch_size=1, 
                        batch_sampler=sampler, 
                        num_workers=0, 
                        drop_last=False, pin_memory=False)

As it is numpy functions you’ll need to keep it on the CPU for now. And as your BatchSampler already creates the batches, your DataLoader should have a batch size of 1.

Also, buckets for values smaller and larger than your buckets are also created so you won’t lose any data.

NB. Currently the batch size must be smaller than smallest number of sequences in any bucket so you may have to adjust your bucket boundaries depending on your batch sizes.

Gist of the same code that may be more up to date: https://gist.github.com/TrentBrick/bac21af244e7c772dc8651ab9c58328c


DataLoader for various length of data
(Chris) #2

Just to add to the discussion: torchtext has its own BucketIterator which groups sequences of similar lengths into batches. How does you approach differ from that.

I’ve wrote my own (completely unoptimized!) data loader to return batches where all sequences have exactly the same length. This avoids any padding (and involved issues) and makes RNNs so much more convenient. I used it for a Seq2Seq autoencoder network where the decoder generates words step by step. Having sequences of the exact same length made the training with batches trivial.


(Trent Brick) #3

This differs from TorchText’s solution in that it is easier to pass data into it. Given an array of arrays (in my case of characters) I had a hard time getting TorchText to work for me. But thank for noting that this function does exist for people who might use it.

And your solution seems useful. You should share your code for it if you want!


(Chris) #4

Well, if it’s of any use, I’ve posted my code below. Since this doesn’t use any fancy torch or even numpy stuff, it’s almost trivial. In my case, inputs is a list of N encoded sequences with a maximum length max_seq_len that are padded if needed. lengths is a list with lengths for each of the N sequences (max_seq_len - padding) – I have my own vectorizer which returns lengths with the encoded inputs.

With this I can simply call

train_iter = BucketDataset(X_train, y_train, lengths_train, batch_size)

with my training loop looks like this:

for idx, (inputs, lengths) in enumerate(train_iter):
    batch_size = inputs.shape[0]
    # Remove unnecessary padding (all sequences have the same length)
    inputs = inputs[:, 0:lengths[0]]
    # Convert to tensors and move to device
    inputs = torch.tensor(inputs).to(self.device)
    ...

Uh, that’s it :). Of course, some batches might be smaller than batch_size, but if the dataset is reasonably large those number is almost negligible. For example, when I use a dataset of mine with about 840k sentences, 99.7% of batches are full and the rest are almost full. So there’s no measurable performance loss here.

from collections import OrderedDict

class BucketDataset:

    def __init__(self, inputs, targets, lengths, batch_size):
        self.inputs = inputs      # shape = (N, max_seq_len)
        self.targets = targets    # shape = (N, ) or None (e.g., for autoencoder I can simply use inputs)
        self.lengths = lengths    # shape = (N, )
        self.batch_size = batch_size
        self.current = -1
        self._generate_batch_map()

    def _generate_batch_map(self, equal_length=False):
        batch_map = OrderedDict()
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        for idx, length in enumerate(self.lengths):
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        self.batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                self.batch_list.append(group)

    def batch_count(self):
        return len(self.batch_list)

    def __len__(self):
        return len(self.lengths)

    def __iter__(self):
        return self

    def __next__(self):
        self.current += 1
        if self.current > len(self.batch_list)-1:
            self.current = -1
            raise StopIteration
        else:
            if self.targets is None:
                return self.inputs[self.batch_list[self.current]], \
                       self.lengths[self.batch_list[self.current]]
            else:
                return self.inputs[self.batch_list[self.current]], \
                       self.targets[self.batch_list[self.current]], \
                       self.lengths[self.batch_list[self.current]]

(Trent Brick) #5

Thanks for sharing, this is great!


(Aditya) #6

Sorry to bump in guys, but can someone explain what bucket_boundaries stand for here?( I am new to this trick,)
Thanks a lot


(Trent Brick) #7

No worries at all. Bucket_boundaries determines how many ‘buckets’ you can put the sequences into. For example if all my sequences are between length 50 and 100, I may want to create 6 bucket boundaries: [50,60,70,80,90,100] this will mean that sequences length 50-60 will all be in one bucket, etc. (meaning that the max padding of a batch taken from this bucket will be 60).

You decide how granular you want the buckets to be and could also use something like list(np.arange(50,100,10) to create the above boundaries if you didn’t want to type them all out by hand.

NB. You need to have more sequences in each bucket than the batch size else you will get an error when it tries to split each bucket into batches.

Hope this helps.


(Aditya) #8

Wow thanks! This makes it clear (is identical to what np.histogram() does somewhat :slight_smile: )


(Trent Brick) #9

To get your code to work I had to pass both the inputs, targets and lengths as np.arrays not as lists. Otherwise when __next__ tries to return the indexes I got the error: list indices must be integers or slices, not list because you can’t get multiple indices from a list.


(Trent Brick) #10

I have put @vdw’s bucketer by length which removes any need for padding(!) into a BatchSampler object and introduced shuffling of the data and buckets to improve convergence while training. The perk of it being a BatchSampler object is that you can pass it into a DataLoader and parallelize inputting data into the GPU.

The BatchSampler:

(Here I am using an autoencoder so I don’t input targets to the function or yield it)

from collections import OrderedDict
import numpy as np
from random import shuffle

class BucketDataset(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.inputs = inputs      # shape = (N, max_seq_len)
        self.targets = targets    # shape = (N, ) or None (e.g., for autoencoder I can simply use inputs)
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append( (i, p.shape[0]) )
        self.ind_n_len = ind_n_len
        
    def _generate_batch_map(self):
        
        shuffle(self.ind_n_len) # shuffle all of the indices first so they are put into buckets differently
        
        batch_map = OrderedDict()
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return len(self.batch_list)

    def __len__(self):
        return len(self.lengths)

    def __iter__(self):

        batch_list = self._generate_batch_map()
        
        shuffle(batch_list) # shuffle all the batches so they arent ordered by bucket size
        for i in batch_list: 
            yield i 

Calling the BatchSampler and DataLoader:

sampler = BucketDataset(<your data in an np.array>, BATCH_SIZE)
dataloader = DataLoader(<your data as a DataSet object>, batch_size=1, 
                            batch_sampler=sampler, shuffle=False,
                            num_workers=8,  drop_last=False)

(Chris) #11

Very neat…much appreciated indeed!

I’ve noticed that I’ve kept equal_length=False in my code which is not used anywhere. The reason is that code I posted was simplified. My original code uses it – see snippet below – to mimic the BucketIterator of torchtext: batches contain sequences of similar or equal length. The latter obviously guarantees that more batches are full, but as I wrote previously, for large datasets it doesn’t matter. So the equal_length=False can me removed from the method header to make it cleaner.

For the sake of completeness, here’s the snippet that uses equal_length:

    if equal_length:
        self.batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                self.batch_list.append(group)
    else: # mimic torchtexts' BucketIterator
        indices = []
        [ indices.extend(v) for v in batch_map.values() ]
        self.batch_list = [indices[i:(i+self.batch_size)] for i in range(0, len(indices), self.batch_size)]

(Trent Brick) #12

Glad I could help, your code does all the heavy lifting (and is very pythonic!). And yes I forgot that was there, will edit mine to simplify it.


(Chris) #13

I’ve took the liberty to make some slight modifications since it was running out of the box; see full code below. The changes in a nutshell:

  • Since it’s a sampler, I renamed it to BucketBatchSampler :). Previously BucketDataset was an crude mix of Dataset and Sampler. Your approach is much cleaner.
  • Since it’s now only a sampler, there’s no need to keep self.inputs and self.targets any longer. Saves a lot of memory.
  • In your code batch_count() and __len__() where no longer working since there’s no self.batch_list and self.lengths anymore. I’ve updated this.
  • I’ve added a “proper” BucketDataset class to implements the torch.utils.data.Dataset

The usage now looks like this:

X = <data as np.array>
bucket_batch_sampler = BucketBatchSampler(X, BATCH_SIZE) # <-- does not store X
bucket_dataset = BucketDataset(X, None)
dataloader = DataLoader(bucket_dataset, batch_size=1, batch_sampler=bucket_batch_sampler, shuffle=False, num_workers=8, drop_last=False)

The full code:

from torch.utils.data import Sampler, Dataset
from collections import OrderedDict
from random import shuffle


class BucketDataset(Dataset):

    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, index):
        if self.targets is None:
            return self.inputs[index]
        else:
            return self.inputs[index], self.targets[index]


class BucketBatchSampler(Sampler):
    # want inputs to be an array
    def __init__(self, inputs, batch_size):
        self.batch_size = batch_size
        ind_n_len = []
        for i, p in enumerate(inputs):
            ind_n_len.append((i, p.shape[0]))
        self.ind_n_len = ind_n_len
        self.batch_list = self._generate_batch_map()
        self.num_batches = len(self.batch_list)

    def _generate_batch_map(self):
        # shuffle all of the indices first so they are put into buckets differently
        shuffle(self.ind_n_len)
        # Organize lengths, e.g., batch_map[10] = [30, 124, 203, ...] <= indices of sequences of length 10
        batch_map = OrderedDict()
        for idx, length in self.ind_n_len:
            if length not in batch_map:
                batch_map[length] = [idx]
            else:
                batch_map[length].append(idx)
        # Use batch_map to split indices into batches of equal size
        # e.g., for batch_size=3, batch_list = [[23,45,47], [49,50,62], [63,65,66], ...]
        batch_list = []
        for length, indices in batch_map.items():
            for group in [indices[i:(i + self.batch_size)] for i in range(0, len(indices), self.batch_size)]:
                batch_list.append(group)
        return batch_list

    def batch_count(self):
        return self.num_batches

    def __len__(self):
        return len(self.ind_n_len)

    def __iter__(self):
        self.batch_list = self._generate_batch_map()
        # shuffle all the batches so they arent ordered by bucket size
        shuffle(self.batch_list)
        for i in self.batch_list:
            yield i