Different kernel within a same batch for a causal convolution

Hi everybody,

I was wondering if there would be a way in PyTorch to compute dynamic causal convolution: where the kernel size is different per sample?

Let’s take the simple case where the kernel_size is the same for every sample in the batch:

In [1]: import torch       
   ...: import torch.nn as nn 
   ...: import torch.nn.functional as F                                                                                                                                                                                                                                                                                                                                                                                                                                                                            

In [2]: inputs1 = [0 for _ in range(20)]     
   ...: inputs1[8] = 1 
   ...: inputs2 = [0 for _ in range(20)] 
   ...: inputs2[13] = 1 
   ...: inputs = torch.FloatTensor([inputs1, inputs2])                                                                                                                                                                                                                           

In [3]: kernel_size = 6 # Known a priori 
   ...: kernels = torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1)])                                                                                                                                                                                             

In [4]: inputs = inputs.unsqueeze(1) 
   ...: inputs                                                                                                                                                                                                                                                                   
Out[4]: 
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0.]],

        [[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
          0., 0., 0.]]])

In [5]: kernels = kernels.unsqueeze(0).unsqueeze(0) 
   ...: kernels # torch.Size([1, 1, 11])                                                                                                                                                                                                                                                                  
Out[5]: tensor([[[1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0.]]])

In [6]: conv_res = F.conv1d(inputs, kernels, padding=kernel_size//2 + 1, groups=1)  
   ...: conv_res                                                                                                                                                                                                                                                                 
Out[6]: 
tensor([[[0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0.,
          0.]],

        [[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1.,
          1.]]])

In [7]: pad_res = F.pad(conv_res, (1,1), mode='constant', value=0)  
   ...: pad_res                                                                                                                                                                                                                                                                  
Out[7]: 
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0.,
          0., 0., 0.]],

        [[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1.,
          1., 1., 0.]]])

In [8]: assert pad_res.shape == inputs.shape  

So far, we see that the input that contained original a single “1” has now kernel_size “1”. This is expected.

What if we would like a different kernel per sample? How could we define two different kernels into a same F.conv1d?

I tried the following but sadly, it does not work.

Does anyone know what am I doing wrong?

In [1]: import torch       
   ...: import torch.nn as nn 
   ...: import torch.nn.functional as F                                                                                                                                                                                                                                          

In [2]: inputs1 = [0 for _ in range(20)]     
   ...: inputs1[8] = 1 
   ...: inputs2 = [0 for _ in range(20)] 
   ...: inputs2[13] = 1 
   ...: inputs = torch.FloatTensor([inputs1, inputs2]) 
   ...:                                                                                                                                                                                                                                                                          

In [3]: kernel_size1 = 6 # Let's assume  
   ...: kernel_size2 = 4 
   ...: kernel1 = torch.cat([torch.ones(kernel_size1), torch.zeros(kernel_size1 - 1)]) 
   ...: kernel2 = torch.cat([torch.ones(kernel_size2), torch.zeros(kernel_size2 - 1)]) 
   ...: kernels = torch.cat([kernel1, kernel2], axis=0) 
   ...:                                                                                                                                                                                                                                                                          

In [4]: inputs = inputs.view(-1).unsqueeze(0).unsqueeze(0) 
   ...: inputs                                                                                                                                                                                                                                                                   
Out[4]: 
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
          0., 0., 0., 0., 0., 0.]]])

In [5]: kernels = kernels.unsqueeze(0).unsqueeze(0) 
   ...: kernels                                                                                                                                                                                                                                                                  
Out[5]: 
tensor([[[1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0.,
          0.]]])

In [6]: conv_res = F.conv1d(inputs, kernels, padding=max(kernel_size1, kernel_size2)//2+1, groups=1)  
   ...: conv_res # Aïe                                                                                                                                                                                                                                                           
Out[6]: 
tensor([[[1., 1., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0.]]])

In [7]: conv_res.shape                                                                                                                                                                                                                                                           
Out[7]: torch.Size([1, 1, 31])

Thank you for your help!

If I understand the code correctly, it seems you are trying to use two different kernel in the second approach?
If that’s the case and each kernel should only be applied to the corresponding input channel, you could try to use a depthwise convolution via groups=2.

Thank you for the answer! That’s somehow what I was looking for. However, the kernels must have the same shape, which makes the padding complicate.

For example, I took a similar case with 5 inputs and 5 differents kernels, I obtain a wrong output due to the largest kernel_size:

import torch
import torch.nn as nn
import torch.nn.functional as F

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 6
kernel_size2 = 4
kernel_size3 = 5
kernel_size4 = 8
kernel_size5 = 4
largest_kernel = max(kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)
kernel1 = torch.cat([torch.ones(kernel_size1), torch.zeros(kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0))])
kernel2 = torch.cat([torch.ones(kernel_size2), torch.zeros(kernel_size2 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size2 + (kernel_size2 - 1)), 0))])
kernel3 = torch.cat([torch.ones(kernel_size3), torch.zeros(kernel_size3 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size3 + (kernel_size3 - 1)), 0))])
kernel4 = torch.cat([torch.ones(kernel_size4), torch.zeros(kernel_size4 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size4 + (kernel_size4 - 1)), 0))])
kernel5 = torch.cat([torch.ones(kernel_size5), torch.zeros(kernel_size5 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size5 + (kernel_size5 - 1)), 0))])
kernels = torch.cat([kernel1.unsqueeze(0), kernel2.unsqueeze(0), kernel3.unsqueeze(0), kernel4.unsqueeze(0), kernel5.unsqueeze(0)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)

print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel - 1, groups=inputs.size(1)).long()
for x in conv_res.squeeze().tolist():
    print('({})'.format(len(x)), x)
print('Expected')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]')
print('(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]')

Produce

Predicted
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1]
(20) [0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Expected
(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]

Any idea how to do the padding properly?

I think the difference in the output comes form your manual padding which is only applied to one side.
You could split the padding and add half of the shape to both sides. F.pad might be easier to use than the manual torch.cat approach. :wink:

I thought of this approach, but I think the padding in the conv1d function should depend of the group; some inputs have the right result but others don’t. Is there a way of doing that?

import torch
import torch.nn as nn
import torch.nn.functional as F

def create_kernal(kernel_size, largest_kernel):
    #return torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0))])
    pad = kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0)
    offset = int(pad % 2 == 1)
    pad = pad // 2
    return F.pad(torch.ones(kernel_size), (pad + offset, pad), mode='constant', value=0)

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 6
kernel_size2 = 4
kernel_size3 = 5
kernel_size4 = 8
kernel_size5 = 4
largest_kernel = max(kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)

ks = kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0)
kernels = torch.cat([create_kernal(kernel_size, largest_kernel).unsqueeze(0) for kernel_size in (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)
print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel + 1, groups=inputs.size(1)).long()[:, :, :inputs.size(-1)]
for x in conv_res.squeeze().tolist():
    print('({})'.format(len(x)), x)
print('Expected')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]')
print('(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]')

print()

Predicted
(20) [0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]
Expected
(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]

Hey again,

I realize that the method is not functioning if there is a difference > 1 between all the kernels. Is there something equivalent to SAME on tensorflow?

import torch
import torch.nn as nn
import torch.nn.functional as F

def create_kernal(kernel_size, largest_kernel):
    #return torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0))])
    pad = kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0)
    offset = int(pad % 2 == 1)
    pad = pad // 2
    return F.pad(torch.ones(kernel_size), (pad + offset, pad), mode='constant', value=0)

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 5
kernel_size2 = 4
kernel_size3 = 4 # Change it to 6 to see the problem.
kernel_size4 = 5
kernel_size5 = 4
all_kernels = [kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5]
largest_kernel = max(all_kernels)
min_kernel = min(all_kernels)

ks = kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0)
kernels = torch.cat([create_kernal(kernel_size, largest_kernel).unsqueeze(0) for kernel_size in (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)
print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel + 1, groups=inputs.size(1)).long()[:, :, :inputs.size(-1)]
for (true, ks), pred in zip(zip(inputs.squeeze().long().tolist(), (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)), conv_res.squeeze().tolist()):
    pred_str = ''.join([str(x) for x in pred])
    print('P ({})'.format(len(pred)), pred_str, '({})'.format(pred_str.count('1')))

    one_idx = None
    if 1 in true:
        one_idx = true.index(1)
        for i in range(one_idx, min(one_idx + ks, len(true))):
            true[i] = 1
    true_str = ''.join([str(x) for x in true])
    print('E ({})'.format(len(true)), true_str, '({})'.format(true_str.count('1')))

    assert true_str == pred_str
    print()

There is not a "same" option for e.g. conv layers, but you might find some custom implementations in the forum. What issue are you currently seeing using this code?

Hum I see.

For example, if my kernel sizes are kernel_size1 = 5, kernel_size2 = 4, kernel_size3 = 4, kernel_size4 = 5, kernel_size5 = 4, the output is correct because the difference between kernels is not big.

Top: predicted, Bottom: expected

P (20) 00000000111110000000 (5)
E (20) 00000000111110000000 (5)

P (20) 00000000000001111000 (4)
E (20) 00000000000001111000 (4)

P (20) 01111000000000000000 (4)
E (20) 01111000000000000000 (4)

P (20) 11111000000000000000 (5)
E (20) 11111000000000000000 (5)

P (20) 00000000000000000011 (2)
E (20) 00000000000000000011 (2)

If I replace some kernels with larger values, some results are correct but not all

kernel_size1 = 5, kernel_size2 = 4, kernel_size3 = 7, kernel_size4 = 6, kernel_size5 = 4
// Here
P (20) 00000111111111100000 (10)
E (20) 00000000111111111100 (10)

P (20) 00000000000001111000 (4)
E (20) 00000000000001111000 (4)

// Here
P (20) 11111110000000000000 (7)
E (20) 01111111000000000000 (7)

// Here
P (20) 11111000000000000000 (5)
E (20) 11111100000000000000 (6)

P (20) 00000000000000000011 (2)
E (20) 00000000000000000011 (2)

So there are four possibilities for the results being wrong:

  1. The select after the convolution [:, :, :inputs.size(-1)]
  2. Padding of the kernel
  3. Padding of the conv
  4. Conv padding should be dynamic

I’ve been working on 2, 3 a lot but couldn’t find a solution which works.