Creating a Custom Convolutional Layer?

I’m trying to write a custom convolutional layer using nn.Conv2d as a guide.

The python source for nn.Conv2d calls torch.functional.conv2d (torch/csrc/api/include/torch/nn/functional/conv.h#L72) which is a C++ function.

That calls torch::conv2d. I can’t find that.

I can find Conv2dFunctOptions (torch/csrc/api/include/torch/nn/options/conv.h#L243).

I found torch::nn::Conv2dImpl (torch/csrc/api/src/nn/modules/conv.cpp#L81) but that I don’t see where that is used.

I could do what I want all in python, but it will be a lot slower. I was hoping the implementation for Conv2d would be well enough factored that it would be easy to implement my own custom version, but maybe that isn’t the case.

My goal is a performant custom PyTorch layer where an offset patch is applied to the input before weights are applied instead of applying a single bias scalar after the weights.

Is there a better place to ask this?

Hi Alan!

I don’t know what you mean by applying “an offset patch.”

However, would it suffice to apply your “offset patch” using pytorch tensor
operations – which should be quite efficient and through which you can
backpropagate – followed by pytorch’s built-in Conv2d with bias = False?

Best.

K. Frank

Conv2d takes an image and convolves A over it then adds a scalar bias: y=Ax+b

I want an offset image patch B that is the same size as A such that I calculate y=A(x-B).

That’s an implementation detail that isn’t super important for my actual question. I kind didn’t want to mention it because I thought it would be a distraction.

I need to do something between the convolution and the weight multiplication. I haven’t found any hooks in the code to do that.

Conv2d’s implementation is in C++ for a reason. I’m assuming it is for performance.

Perhaps I am a bit unclear by your written out definition above and this:

x is the image, correct? Which would make A the kernels(or weights) and b the bias. So is B an input image or a second set of weights?

By the way, you might find it helpful to know you can pass in a custom set of kernels and your image by using F.conv2d. See here:

https://pytorch.org/docs/stable/generated/torch.nn.functional.conv2d.html

But to make those kernels learnable, you’ll need to wrap them in nn.Parameter in your init.

https://pytorch.org/docs/stable/generated/torch.nn.parameter.Parameter.html

I suppose in those equations x is a convolution patch of the image.

A, B, and x are the same shape. B is a learnable parameter containing a weight for each element of the convolution patch x. B is subtracted from x before taking the dot product with A. The details of this aren’t super important to my actual question.

When you say “pass in a custom set of kernels” do you mean the weights of A? I know that. It doesn’t help me change the function being computed.

I don’t see anything in the F.conv2d code that takes a lambda.

F.conv2d delegates its implementation to C++. I can find the declaration for that function, but I can’t find the definition. Does anyone know where it is defined?

The convolution operation is pretty straight forward. First, your image is converted into a Toeplitz matrix, based on the kernel, stride, dilation and padding settings. Then that and the kernels undergo a matmul. As far as I know, the process behind the scenes does so a little more efficiently than what I just described.

So let’s suppose A and B are learnable parameters. Do you want B applied as a kernel, or just a simple elementwise subtraction?

If the latter, you just define A and B in your init as learnable parameters via the nn.Parameter class. B should be the same shape as the incoming image. A, however, would not be the same shape, unless under very specific circumstances, such as when the stride and kernel are the same size as the image. The kernel is defined based on the settings passed into the convolution layer. In the forward pass, you can just subtract B from the image before running the convolution. For example:

def forward(self, x):
    x = x - self.B
    return F.conv2d(x, self.A, ...) #fill in your desired conv settings here

I have an implementation using tensors. It is slow. What takes around 5s with Conv2d takes around 116s with my custom Conv2dPS.

Are there any obvious performance improvements I can make to this?

class Conv2dPS(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0):
        super(Conv2dPS, self).__init__()
        # Initialize parameters for the convolution layer
        # in_channels: number of input channels
        # out_channels: number of output channels
        # kernel_size: size of the convolution kernel

        # Define A (weights) for the convolution layer
        # Use nn.Parameter to create a learnable parameter for the weights
        self.A = nn.Parameter(torch.Tensor(out_channels, in_channels, kernel_size, kernel_size))

        # Define B (offset) as a learnable parameter
        # B should have the same dimensions as the kernel
        self.B = nn.Parameter(torch.Tensor(out_channels, in_channels, kernel_size, kernel_size))

        # Initialize A and B with appropriate initialization methods, e.g., Xavier initialization
        nn.init.xavier_uniform_(self.A)
        nn.init.xavier_uniform_(self.B)

        # Set padding for the convolution operation
        self.padding = padding
    
    def convolve(self, x):
        batch_size = x.shape[0]
        image_channels = x.shape[1]
        image_height = x.shape[2]
        image_width = x.shape[3]

        out_channels = self.A.shape[0]
        in_channels = self.A.shape[1]
        kernel_height = self.A.shape[2]
        kernel_width = self.A.shape[3]

        assert(image_channels == in_channels)
        assert(kernel_height == kernel_width)
        
        # F.unfold takes an input tensor and extracts sliding local blocks (or patches) from it. 
        # These blocks are the regions of the input tensor over which the convolution operation 
        # (filter application) will take place.
        # x_unfolded, will have a shape of [batch_size, in_channels * kernel_height * kernel_width, num_patches]
        # The output will look something like this:
        # tensor([[[ 1.,  2.,  3.,  5.,  6.,  7.,  9., 10., 11.],
        #          [ 2.,  3.,  4.,  6.,  7.,  8., 10., 11., 12.],
        #          [ 5.,  6.,  7.,  9., 10., 11., 13., 14., 15.],
        #          [ 6.,  7.,  8., 10., 11., 12., 14., 15., 16.]]])
        # The first patch is the first element of each row: [1, 2, 5, 6]
        x_unfolded = F.unfold(x, kernel_size=kernel_height, padding=self.padding)
                
        unfolded_batch_size = x_unfolded.shape[0]
        unfolded_patch_size = x_unfolded.shape[1]
        num_patches = x_unfolded.shape[2]
        assert(unfolded_batch_size == batch_size)
        assert(unfolded_patch_size == in_channels * kernel_height * kernel_width)
        
        # Reshape x_unfolded into a format that aligns with the convolution weights A
        # transpose dimensions 1 and 2 above into [batch, num_patches, in_channels * kernel_height * kernel_width]
        # then view as [batch, num_patches, in_channels, kernel_height, kernel_width]
        x_unfolded = x_unfolded.permute(0, 2, 1).view(batch_size, num_patches, in_channels, kernel_height, kernel_width)

        # Expand x_unfolded across output_channels to match the dimensions of B_expanded
        x_expanded = x_unfolded.unsqueeze(2).expand(batch_size, num_patches, out_channels, in_channels, kernel_height, kernel_width)
    
        return x_expanded
    
    def subtract_offset(self, x_convolve):
        batch_size = x_convolve.shape[0]
        num_patches = x_convolve.shape[1]
        x_out_channels = x_convolve.shape[2]
        x_in_channels = x_convolve.shape[3]
        x_kernel_height = x_convolve.shape[4]
        x_kernel_width = x_convolve.shape[5]

        out_channels = self.B.shape[0]
        in_channels = self.B.shape[1]
        kernel_height = self.B.shape[2]
        kernel_width = self.B.shape[3]

        assert(x_out_channels == out_channels)
        assert(x_in_channels == in_channels)
        assert(x_kernel_height == kernel_height)
        assert(x_kernel_width == kernel_width)
        
        # Reshape B to match the dimensions of x_unfolded, but keeping its unique values per filter

        # Current shape of B: [out_channels, in_channels, kernel_height, kernel_width]
        # We need B to have the shape: [batch_size, in_channels, patch_size, num_patches]
        # Expand B across the batch_size and num_patches dimensions
        B_reshaped = self.B.view(1, 1, out_channels, in_channels, kernel_height, kernel_width)
        B_expanded = B_reshaped.expand(batch_size, num_patches, out_channels, in_channels, kernel_height, kernel_width)

        # Subtract B_expanded from each patch
        x_offset = x_convolve - B_expanded
        return x_offset

    def multiply_weights(self, x_offset):
        batch_size = x_offset.shape[0]
        num_patches = x_offset.shape[1]
        x_out_channels = x_offset.shape[2]
        x_in_channels = x_offset.shape[3]
        x_kernel_height = x_offset.shape[4]
        x_kernel_width = x_offset.shape[5]
        
        out_channels = self.A.shape[0]
        in_channels = self.A.shape[1]
        kernel_height = self.A.shape[2]
        kernel_width = self.A.shape[3]
        
        assert(x_out_channels == out_channels)
        assert(x_in_channels == in_channels)
        assert(x_kernel_height == kernel_height)
        assert(x_kernel_width == kernel_width)
        
        # Multiply A with x_offset and sum over the kernel dimensions
        # A shape: [out_channels, in_channels, kernel_height, kernel_width]
        # x_offset shape: [batch_size, num_patches, in_channels, kernel_height, kernel_width]
        return self.A.unsqueeze(0).unsqueeze(1) * x_offset   

    def forward(self, x):
        batch_size = x.shape[0]
        image_channels = x.shape[1]
        image_height = x.shape[2]
        image_width = x.shape[3]

        out_channels = self.A.shape[0]
        in_channels = self.A.shape[1]
        kernel_height = self.A.shape[2]
        kernel_width = self.A.shape[3]
        
        assert(image_channels == in_channels)
        assert(kernel_height == kernel_width)
        
        # 1. Extract image patches
        x_convolve = self.convolve(x)

        # 2. Calculate P_offset = P - B
        x_offset = self.subtract_offset(x_convolve)

        # 3. Perform the convolution operation y = A * P_offset
        output = torch.sum(self.multiply_weights(x_offset), dim=[3, 4, 5])

        # 4. Reshape the output
        # Calculate the dimensions of the output feature map
        output_height = (image_height + 2 * self.padding - (kernel_height - 1) - 1) + 1
        output_width = (image_width + 2 * self.padding - (kernel_width - 1) - 1) + 1

        # Reshape output to the shape (batch_size, out_channels, output_height, output_width)
        output = output.view(batch_size, out_channels, output_height, output_width)

        return output

I’m used to looking at source code to figure things out. I can’t find the C++ code that F.conv2d calls in github. That is my question. Where is that C++ code? I’m doing non-standard things with CNN’s. I’ve implemented it using tensors in python but it’s about 20x slower – which is sort of what I expected and why I asked my question.

I’m going to define words to make sure we are talking about the same thing.

A standard CNN convolves the function y=Ax+b over an image. That function is the “convolutional operation”.
“A” is the kernel. It is a matrix, defines the convolution window size, and is multiplied against image values in a convolutional window.

“b” is the bias. It is a scalar value.

A filter is a single node with its own <A, b>. It generates an output for each convolution position.

A layer is a collection of filters.

I’ve implemented a non-standard convolutional operation y=A(x-B).

“A” is the kernel. It is a matrix, defines the convolution window size, and is multiplied against image values in a convolutional window.

“B” is an offset. It is a matrix that is the same size as A, and is subtracted from image values in a convolutional window before A is applied.

A filter is a single node with its own <A, B>. It generates an output for each convolution position.

A layer is a collection of filters.

The specific details of what I’m doing aren’t really important to my question, which is really how do I implement a custom Conv2d layer that runs quickly?

At some point I want to make a Conv2d layer that implements a quadratic convolution operation y = Ax^2 + Bx + C instead of a linear function y=Ax+b. Or maybe something else. I want to be able to make Conv2d use different convolution operations. The details of what those operations are shouldn’t matter. Ideally I’d have a Conv2d that can take a lambda.

Yes, I know I’m doing weird stuff.

I’ll read about the Toeplitz matrix and see if that can speed anything up for me.

You can find the slow matmul implementation here and could compare your custom approach against it. I don’t know what you are comparing against, but cuDNN is used by default so if you want to compare against the native PyTorch implementation, disable cuDNN via torch.backends.cudnn.enabled = False.

You may find what you’re looking for here.

To

In that case, you could just rewrite the forward function as this:

def forward(self, x):
    return F.conv2d(x, self.A, ...) - F.conv2d(torch.ones_like(x), self.A*self.B, ...)

Sure, it takes two convolutions, but probably much faster than coding it in Python. The above is identical to A(x-B). It may not be obvious at first, but you can test it or try it out in a spreadsheet.

That works. It doesn’t require two convolutions. AB is constant for each filter across convolution. I feel like I should have seen that solution. Thank you for breaking me out of my myopia.

For posterity:

class Conv2dPS(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0):
        super(Conv2dPS, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding, bias=False)
        self.A = self.conv.weight  # Alias for clarity
        self.B = nn.Parameter(torch.zeros(out_channels, in_channels, kernel_size, kernel_size))

    def forward(self, x):
        # Standard convolution
        Ax = self.conv(x)

        # Calculate AB as a dot product and then sum it
        # Reshape or expand as necessary to match the dimensions of Ax
        # element-wise multiplication for dot product. 
        # Gives [outputs, channels, kernel_width, kernel_height]
        AB = self.A * self.B

        # sum each output (filter) over channel, kernel_width, kernel_height
        # Gives [outputs]
        AB = torch.sum(AB, dim=(1, 2, 3))

        # expand for substraction from Ax
        # Gives [batch, outputs, window_x, window_y]
        AB = AB.unsqueeze(0).unsqueeze(2).unsqueeze(3)

        # Add AB to the convolution result
        # Ensure correct reshaping/broadcasting to match Ax dimensions
        return Ax - AB
1 Like