How can one normalize the sequential data with varying length?

I have a time series data and I am using a recurrent model to predict output sequence given inputs where the input/output data have different length in each batch.

class subDataset(torch.utils.data.Dataset):
    def __init__(self, Data, Target):
        super().__init__()
        # Data size of (batch_size, seq_len, input_size)
        data, target, seq_len = self._preprocess(Data, Target)
        self.data = data
        self.target = target
        self.seq_len = seq_len
        
    def __len__(self):
        return self.data.size(0)

    def __getitem__(self, index):
        data = self.data[index,:]
        target = self.target[index,:]
        return data, target, self.seq_len[index]

    @staticmethod
    def _preprocess(data, target):
        x =[torch.stack([data[i][j] for j in range(len(data[i]))], dim=0) for i in range(len(data))]
        y =[torch.stack([target[i][j] for j in range(len(target[i]))], dim=0) for i in range(len(target))]

        x_padded = torch.nn.utils.rnn.pad_sequence(x, batch_first=True, padding_value=0)
        y_padded = torch.nn.utils.rnn.pad_sequence(y, batch_first=True, padding_value=0)
        seq_len = torch.LongTensor(list(map(len,x)))
        print(f"maximum size of sequence {torch.max(seq_len)} {seq_len} {x_padded.shape}")
        # Data size of ( batch_size, seq_len, input_size)

        return x_padded, y_padded, seq_len

When I trained my model it seems that the mean of predicted outputs from my model has slightly off compared to the mean of true outputs.
I also used this class to normalize the input and output.

class Normalizer1D(nn.Module):
    _epsilon = 1e-16
    # Data size of (batch_size, seq_len, input_size)
    def __init__(self, scale, offset):
        super(Normalizer1D, self).__init__()
        self.register_buffer('scale', torch.tensor(scale, dtype=torch.float32) + self._epsilon)
        self.register_buffer('offset', torch.tensor(offset, dtype=torch.float32))

    def normalize(self, x):
        #(T, B, D)
        x = (x - self.offset) / self.scale
        return x

    def unnormalize(self, x):
        #(T, B, D)
        x = x * self.scale + self.offset
        return x

    def unnormalize_mean(self, x_mu):
        #(T, B, D)
        x_mu = x_mu * self.scale + self.offset
        return x_mu

    def unnormalize_sigma(self, x_sigma):
        #(T, B, D)        
        x_sigma = x_sigma * self.scale        
        return x_sigma


def compute_normalizer(loader_train):
    # definition #batch_size,seq_len, input_dim
    variance_scaler = 1

    # initialization
    total_batches = 0
    u_mean = 0
    y_mean = 0
    u_var = 0
    y_var = 0
    for i, (u, y, s) in enumerate(loader_train):
        total_batches += u.size()[0]
        u_mean += torch.mean(u, dim=(0, 1))
        y_mean += torch.mean(y, dim=(0, 1))
        u_var += torch.mean(torch.var(u, dim=1, unbiased=False), dim=(0,))
        y_var += torch.mean(torch.var(y, dim=1, unbiased=False), dim=(0,))

    u_mean = u_mean.numpy()
    y_mean = y_mean.numpy()
    u_var = u_var.numpy()
    y_var = y_var.numpy()


    u_normalizer = Normalizer1D(np.sqrt(u_var / total_batches) * variance_scaler, u_mean / total_batches)
    y_normalizer = Normalizer1D(np.sqrt(y_var / total_batches) * variance_scaler, y_mean / total_batches)

    return u_normalizer, y_normalizer

However, I think the compute_normalizer has a problem, since it doesn’t take into account that a big chunk on the data is masked and contain zeros. This will bias the mean and variance. Does anyone has a suggestion how to normalize the data with variable sequence length properly in pytorch?

Not sure. Have you thought about batch normalization of inputs and target . Also not sure , whether it helps or not. Below is one normalization I used in one of my rnn, Although, in my case, each batch has similar lengths.

class Normalization:
def init(self, mean_val=None,std_val=None):
self.mean_val = mean_val
self.std_val = std_val

def normalize(self, x):
    return (x-self.mean_val)/self.std_val

def unnormalize(self, x):
    return x*self.std_val + self.mean_val

I modified my normalization class to this

def get_mask_from_sequence_lengths(
    sequence_lengths: torch.Tensor, max_length: int
) -> torch.BoolTensor:
    """
    Given a variable of shape `(batch_size,)` that represents the sequence lengths of each batch
    element, this function returns a `(batch_size, max_length)` mask variable.  For example, if
    our input was `[2, 2, 3]`, with a `max_length` of 4, we'd return
    `[[1, 1, 0, 0], [1, 1, 0, 0], [1, 1, 1, 0]]`.

    We require `max_length` here instead of just computing it from the input `sequence_lengths`
    because it lets us avoid finding the max, then copying that value from the GPU to the CPU so
    that we can use it to construct a new tensor.
    """
    # (batch_size, max_length)
    ones = sequence_lengths.new_ones(sequence_lengths.size(0), max_length)
    range_tensor = ones.cumsum(dim=1)
    return sequence_lengths.unsqueeze(1) >= range_tensor 

class Normalizer1D(nn.Module):
    _epsilon = 1e-16
    # Data size of ( batch_size, seq_len, input_size)
    def __init__(self, scale, offset):
        super(Normalizer1D, self).__init__()
        self.register_buffer('scale', torch.tensor(scale, dtype=torch.float32) + self._epsilon)
        self.register_buffer('offset', torch.tensor(offset, dtype=torch.float32))

    def normalize(self, x):
        length= torch.LongTensor([torch.max((x[i,:,0]!=0).nonzero()).item()+1 for i in range(x.shape[0])])
        max_len=x.size(1)
        mask = get_mask_from_sequence_lengths( length, max_len)
        mask= einops.repeat(mask, 'm n -> m n k', k = x.size(2))

        x = (x - self.offset) / self.scale
        x = torch.mul(x, mask)
        return x

    def unnormalize(self, x):
        length= torch.LongTensor([torch.max((x[i,:,0]!=0).nonzero()).item()+1 for i in range(x.shape[0])])
        max_len=x.size(1)
        mask = get_mask_from_sequence_lengths( length, max_len)
        mask= einops.repeat(mask, 'm n -> m n k', k = x.size(2))

        x = x * self.scale + self.offset
        x = torch.mul(x, mask)
        return x

    def unnormalize_mean(self, x_mu):
        length= torch.LongTensor([torch.max((x_mu[i,:,0]!=0).nonzero()).item()+1 for i in range(x_mu.shape[0])])

        max_len=x_mu.size(1)
        mask = get_mask_from_sequence_lengths( length, max_len)
        mask= einops.repeat(mask, 'm n -> m n k', k = x_mu.size(2))

        x_mu = x_mu * self.scale + self.offset
        x_mu = torch.mul(x_mu, mask)
        return x_mu

    def unnormalize_sigma(self, x_sigma):

        length = torch.LongTensor([torch.max((x_sigma[i,:,0]!=0).nonzero()).item()+1 for i in range(x_sigma.shape[0])])

        max_len =x_sigma.size(1)
        mask = get_mask_from_sequence_lengths( length, max_len)
        mask= einops.repeat(mask, 'm n -> m n k', k = x_sigma.size(2))

        x_sigma = x_sigma * self.scale
        x_sigma = torch.mul(x_sigma, mask)
        return x_sigma

I am not sure it is the best way to normalize my time series dataset with different length in each episode or not? It doesn’t seem this also solved my problems.

I am still not sure. I tried to understand your code though. But with variable length per batch. I think one could do normalization per batch. Within each batch, I would pick non zero elements of tensor and find its mean and standard deviation to standardize that batch. With making equal lengths using zero pad and finding mean, would be different, if your target is not originated from those additional zero elements. Also it may not achieve the exact mean but it may be relative

To explain my code, I tried to exclude the zero-pad part of the data both when I compute the mean and variance and when I normalize the batches of the data and target. Right now, I modified my normalization class to this

class Normalizer1D(nn.Module):
    _epsilon = 1e-16
    # Data size of (seq_len, batch_size, input_size)
    def __init__(self, scale, offset):
        super(Normalizer1D, self).__init__()
        self.register_buffer('scale', torch.tensor(scale, dtype=torch.float32) + self._epsilon)
        self.register_buffer('offset', torch.tensor(offset, dtype=torch.float32))

    def normalize(self, x):
        length= torch.LongTensor([torch.max((x[i,:,0]!=0).nonzero()).item()+1 for i in range(x.shape[0])])
        max_len=x.size(1)
        mask = get_mask_from_sequence_lengths( length, max_len)
        m    = einops.repeat(mask, 'm n -> m n k', k = x.size(2))

        x = (x - self.offset) / self.scale
        x = torch.mul(x, m )
        return x, mask

    def unnormalize(self, x, mask):
        #(T, B, D)==>(B, T, D)
        
        m = einops.repeat(mask, 'm n -> m n k', k = x.size(2))

        x = x * self.scale + self.offset
        x = torch.mul(x, m )
        return x

    def unnormalize_mean(self, x_mu, mask):
        #(T, B, D)==>(B, T, D)
        m  = einops.repeat(mask, 'm n -> m n k', k = x_mu.size(2))

        x_mu = x_mu * self.scale + self.offset
        x_mu = torch.mul(x_mu, m )
        return x_mu

    def unnormalize_sigma(self, x_sigma, mask):

        m = einops.repeat(mask, 'm n -> m n k', k = x_sigma.size(2))

        x_sigma = x_sigma * self.scale
        x_sigma = torch.mul(x_sigma, m )
        return x_sigma

def compute_mean_variance(data, seq_length, mean=True):
    y=[data[count, :value,:] for count, value in enumerate(seq_length)]
    if mean:
        #(T, B, D)        
        data_mean =torch.stack([y[i].mean(dim=0) for i in range(len(y))]).mean(dim=0)
        #dimension D
        return data_mean 
    else:
        data_variance = torch.stack([y[i].var(dim=0, unbiased=False) for i in range(len(y))]).mean(dim=0)
        #Dimension D
        return data_variance
    
def compute_normalizer(loader_train):
    
    variance_scaler = 1

    # initialization
    total_batches = 0
    u_mean = 0
    y_mean = 0
    u_var = 0
    y_var = 0
    for i, (u, y, s) in enumerate(loader_train):
        total_batches += u.size()[0]
        
        u_mean += compute_mean_variance(u, s, mean=True)
        y_mean += compute_mean_variance(y, s, mean=True)
        u_var += compute_mean_variance(u, s, mean=False)
        y_var += compute_mean_variance(y, s, mean=False)

    u_mean = u_mean.numpy()
    y_mean = y_mean.numpy()
    u_var = u_var.numpy()
    y_var = y_var.numpy()


    u_normalizer = Normalizer1D(np.sqrt(u_var / total_batches) * variance_scaler, u_mean / total_batches)
    y_normalizer = Normalizer1D(np.sqrt(y_var / total_batches) * variance_scaler, y_mean / total_batches)

    return u_normalizer, y_normalizer

There are a lot of examples in the web for language datasets and how to deal with zero-padding. However, I couldn’t find any proper example especially in pytorch for continuous sequential input data with a similar length output. I am not sure whether it is a right way to do this. Any suggestion or help will be appreciated.