# Different kernel within a same batch for a causal convolution

Hi everybody,

I was wondering if there would be a way in PyTorch to compute dynamic causal convolution: where the kernel size is different per sample?

Let’s take the simple case where the kernel_size is the same for every sample in the batch:

``````In [1]: import torch
...: import torch.nn as nn
...: import torch.nn.functional as F

In [2]: inputs1 = [0 for _ in range(20)]
...: inputs1[8] = 1
...: inputs2 = [0 for _ in range(20)]
...: inputs2[13] = 1
...: inputs = torch.FloatTensor([inputs1, inputs2])

In [3]: kernel_size = 6 # Known a priori
...: kernels = torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1)])

In [4]: inputs = inputs.unsqueeze(1)
...: inputs
Out[4]:
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0.]],

[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
0., 0., 0.]]])

In [5]: kernels = kernels.unsqueeze(0).unsqueeze(0)
...: kernels # torch.Size([1, 1, 11])
Out[5]: tensor([[[1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0.]]])

In [6]: conv_res = F.conv1d(inputs, kernels, padding=kernel_size//2 + 1, groups=1)
...: conv_res
Out[6]:
tensor([[[0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0.,
0.]],

[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1.,
1.]]])

Out[7]:
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0.,
0., 0., 0.]],

[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1.,
1., 1., 0.]]])

In [8]: assert pad_res.shape == inputs.shape
``````

So far, we see that the input that contained original a single “1” has now kernel_size “1”. This is expected.

What if we would like a different kernel per sample? How could we define two different kernels into a same F.conv1d?

I tried the following but sadly, it does not work.

Does anyone know what am I doing wrong?

``````In [1]: import torch
...: import torch.nn as nn
...: import torch.nn.functional as F

In [2]: inputs1 = [0 for _ in range(20)]
...: inputs1[8] = 1
...: inputs2 = [0 for _ in range(20)]
...: inputs2[13] = 1
...: inputs = torch.FloatTensor([inputs1, inputs2])
...:

In [3]: kernel_size1 = 6 # Let's assume
...: kernel_size2 = 4
...: kernel1 = torch.cat([torch.ones(kernel_size1), torch.zeros(kernel_size1 - 1)])
...: kernel2 = torch.cat([torch.ones(kernel_size2), torch.zeros(kernel_size2 - 1)])
...: kernels = torch.cat([kernel1, kernel2], axis=0)
...:

In [4]: inputs = inputs.view(-1).unsqueeze(0).unsqueeze(0)
...: inputs
Out[4]:
tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
0., 0., 0., 0., 0., 0.]]])

In [5]: kernels = kernels.unsqueeze(0).unsqueeze(0)
...: kernels
Out[5]:
tensor([[[1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0.,
0.]]])

In [6]: conv_res = F.conv1d(inputs, kernels, padding=max(kernel_size1, kernel_size2)//2+1, groups=1)
...: conv_res # Aïe
Out[6]:
tensor([[[1., 1., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0.,
0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0.]]])

In [7]: conv_res.shape
Out[7]: torch.Size([1, 1, 31])
``````

If I understand the code correctly, it seems you are trying to use two different kernel in the second approach?
If that’s the case and each kernel should only be applied to the corresponding input channel, you could try to use a depthwise convolution via `groups=2`.

Thank you for the answer! That’s somehow what I was looking for. However, the kernels must have the same shape, which makes the padding complicate.

For example, I took a similar case with 5 inputs and 5 differents kernels, I obtain a wrong output due to the largest kernel_size:

``````import torch
import torch.nn as nn
import torch.nn.functional as F

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 6
kernel_size2 = 4
kernel_size3 = 5
kernel_size4 = 8
kernel_size5 = 4
largest_kernel = max(kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)
kernel1 = torch.cat([torch.ones(kernel_size1), torch.zeros(kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0))])
kernel2 = torch.cat([torch.ones(kernel_size2), torch.zeros(kernel_size2 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size2 + (kernel_size2 - 1)), 0))])
kernel3 = torch.cat([torch.ones(kernel_size3), torch.zeros(kernel_size3 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size3 + (kernel_size3 - 1)), 0))])
kernel4 = torch.cat([torch.ones(kernel_size4), torch.zeros(kernel_size4 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size4 + (kernel_size4 - 1)), 0))])
kernel5 = torch.cat([torch.ones(kernel_size5), torch.zeros(kernel_size5 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size5 + (kernel_size5 - 1)), 0))])
kernels = torch.cat([kernel1.unsqueeze(0), kernel2.unsqueeze(0), kernel3.unsqueeze(0), kernel4.unsqueeze(0), kernel5.unsqueeze(0)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)

print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel - 1, groups=inputs.size(1)).long()
for x in conv_res.squeeze().tolist():
print('({})'.format(len(x)), x)
print('Expected')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]')
print('(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]')
``````

Produce

``````Predicted
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1]
(20) [0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Expected
(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]
``````

Any idea how to do the padding properly?

I think the difference in the output comes form your manual padding which is only applied to one side.
You could split the padding and add half of the shape to both sides. `F.pad` might be easier to use than the manual `torch.cat` approach.

I thought of this approach, but I think the padding in the conv1d function should depend of the group; some inputs have the right result but others don’t. Is there a way of doing that?

``````import torch
import torch.nn as nn
import torch.nn.functional as F

def create_kernal(kernel_size, largest_kernel):
#return torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0))])
pad = kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0)
offset = int(pad % 2 == 1)

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 6
kernel_size2 = 4
kernel_size3 = 5
kernel_size4 = 8
kernel_size5 = 4
largest_kernel = max(kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)

ks = kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0)
kernels = torch.cat([create_kernal(kernel_size, largest_kernel).unsqueeze(0) for kernel_size in (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)
print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel + 1, groups=inputs.size(1)).long()[:, :, :inputs.size(-1)]
for x in conv_res.squeeze().tolist():
print('({})'.format(len(x)), x)
print('Expected')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]')
print('(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]')
print('(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]')

print()

Predicted
(20) [0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]
Expected
(20) [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0]
(20) [0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
(20) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]
``````

Hey again,

I realize that the method is not functioning if there is a difference > 1 between all the kernels. Is there something equivalent to SAME on tensorflow?

``````import torch
import torch.nn as nn
import torch.nn.functional as F

def create_kernal(kernel_size, largest_kernel):
#return torch.cat([torch.ones(kernel_size), torch.zeros(kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0))])
pad = kernel_size - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size + (kernel_size - 1)), 0)
offset = int(pad % 2 == 1)

inputs1 = [0 for _ in range(20)]
inputs1[8] = 1
inputs2 = [0 for _ in range(20)]
inputs2[13] = 1
inputs3 = [0 for _ in range(20)]
inputs3[1] = 1
inputs4 = [0 for _ in range(20)]
inputs4[0] = 1
inputs5 = [0 for _ in range(20)]
inputs5[18] = 1
inputs = torch.FloatTensor([inputs1, inputs2, inputs3, inputs4, inputs5])

kernel_size1 = 5
kernel_size2 = 4
kernel_size3 = 4 # Change it to 6 to see the problem.
kernel_size4 = 5
kernel_size5 = 4
all_kernels = [kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5]
largest_kernel = max(all_kernels)
min_kernel = min(all_kernels)

ks = kernel_size1 - 1 + max(largest_kernel + (largest_kernel - 1) - (kernel_size1 + (kernel_size1 - 1)), 0)
kernels = torch.cat([create_kernal(kernel_size, largest_kernel).unsqueeze(0) for kernel_size in (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)], axis=0)

inputs = inputs.unsqueeze(0) # Inverse channel and batch
kernels = kernels.unsqueeze(1)
print('Predicted')
conv_res = F.conv1d(inputs, kernels, padding=largest_kernel + 1, groups=inputs.size(1)).long()[:, :, :inputs.size(-1)]
for (true, ks), pred in zip(zip(inputs.squeeze().long().tolist(), (kernel_size1, kernel_size2, kernel_size3, kernel_size4, kernel_size5)), conv_res.squeeze().tolist()):
pred_str = ''.join([str(x) for x in pred])
print('P ({})'.format(len(pred)), pred_str, '({})'.format(pred_str.count('1')))

one_idx = None
if 1 in true:
one_idx = true.index(1)
for i in range(one_idx, min(one_idx + ks, len(true))):
true[i] = 1
true_str = ''.join([str(x) for x in true])
print('E ({})'.format(len(true)), true_str, '({})'.format(true_str.count('1')))

assert true_str == pred_str
print()
``````

There is not a `"same"` option for e.g. conv layers, but you might find some custom implementations in the forum. What issue are you currently seeing using this code?

Hum I see.

For example, if my kernel sizes are `kernel_size1 = 5, kernel_size2 = 4, kernel_size3 = 4, kernel_size4 = 5, kernel_size5 = 4`, the output is correct because the difference between kernels is not big.

Top: predicted, Bottom: expected

``````P (20) 00000000111110000000 (5)
E (20) 00000000111110000000 (5)

P (20) 00000000000001111000 (4)
E (20) 00000000000001111000 (4)

P (20) 01111000000000000000 (4)
E (20) 01111000000000000000 (4)

P (20) 11111000000000000000 (5)
E (20) 11111000000000000000 (5)

P (20) 00000000000000000011 (2)
E (20) 00000000000000000011 (2)
``````

If I replace some kernels with larger values, some results are correct but not all

``````kernel_size1 = 5, kernel_size2 = 4, kernel_size3 = 7, kernel_size4 = 6, kernel_size5 = 4
// Here
P (20) 00000111111111100000 (10)
E (20) 00000000111111111100 (10)

P (20) 00000000000001111000 (4)
E (20) 00000000000001111000 (4)

// Here
P (20) 11111110000000000000 (7)
E (20) 01111111000000000000 (7)

// Here
P (20) 11111000000000000000 (5)
E (20) 11111100000000000000 (6)

P (20) 00000000000000000011 (2)
E (20) 00000000000000000011 (2)
``````

So there are four possibilities for the results being wrong:

1. The select after the convolution `[:, :, :inputs.size(-1)]`