How to sequentially input a single element of a sequence to an RNN?

Dear community,

I have been working on RNNs and was able to write an RNN in base torch. However, I am experiencing a number of conflicts with respect to the input shape. In particular I see that in the original PyTorch implementation the RNN takes as input (batchsize, sequence length, input_features):

  • sequence_length: The number of time steps or elements in each sequence.
  • input_features: The number of features at each time step.

So, in the context of using an RNN with MNIST images and considering each image is flattened to a 1D array of size 784, the correct input dimensions would be (batch_size, 784, 1) , processing a single pixels at each time step.

Now if I have a flattened MNIST image but want to process more than a single pixel at each time-step i.e., the entire flattened image, then the input dimension would be (batch_size, 784, 784).

While I was able to replicate some features of the original RNN torch implementation, my RNN takes as input (batch_size, 784, 784), where in my training loop the input x gets changes from shape (batchsize, 784) to (batchsize, 784, 784) via x = x[:, None, ...].expand(x.shape[0], x.shape[1], x.shape[1]).to(device).

However, when I try changing the shape from shape (batchsize, 784) to (batchsize, 784, 1) via unsqueeze I get RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x1 and 784x64). See bottom post at the end of the Notebook code blocks.

  1. How can I adapt my implementation to also accommodate for cases where I can pass input_features that are not identical to the entire sequence_length at each time step ?

  2. To my understanding we can either process the input each pixel at a time or also use the entire image at each time step, but please correct my understanding if I should be wrong on the subject matter.

  3. Code can be copied and run in a notebook cell if desired.

Notebook

Libraries

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from torchvision import datasets
from torchvision import transforms as T
import torch.optim as optim
from torch.utils.data import DataLoader
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

RNN Cell

class RnnCell(nn.Module):
    def __init__(self, input_size, hidden_size, activation="tanh"):
        super(RnnCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.activation = activation
        if self.activation not in ["tanh", "relu", "sigmoid"]:
            raise ValueError("Invalid nonlinearity selected for RNN. Please use tanh, relu or sigmoid.")
        self.input2hidden = nn.Linear(input_size, hidden_size)
        self.hidden2hidden = nn.Linear(hidden_size, hidden_size)

    def forward(self, input, carry, hidden_state = None):
        if hidden_state is None:
            hidden_state = torch.zeros(input.shape[0], self.hidden_size).to(device)
            carry = (hidden_state, hidden_state)

        # carry
        h_t, _ = carry
        h_t = (self.input2hidden(input) + self.hidden2hidden(h_t))

        # takes output from hidden and apply activation
        if self.activation == "tanh":
            out = torch.tanh(h_t)
        elif self.activation == "relu":
            out = torch.relu(h_t)
        elif self.activation == "sigmoid":
            out = torch.sigmoid(h_t)

        return (out, out)

RNN

class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, activation='relu'):
        super(SimpleRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        self.rnn_cell_list = nn.ModuleList()

        if activation == 'tanh':
            self.rnn_cell_list.append(RnnCell(self.input_size, self.hidden_size, "tanh"))
            for l in range(1, self.num_layers):
                self.rnn_cell_list.append(RnnCell(self.hidden_size, self.hidden_size, "tanh"))

        elif activation == 'relu':
            self.rnn_cell_list.append(RnnCell(self.input_size, self.hidden_size, "relu"))
            for l in range(1, self.num_layers):
                self.rnn_cell_list.append(RnnCell(self.hidden_size, self.hidden_size, "relu"))

        elif activation == 'sigmoid':
            self.rnn_cell_list.append(RnnCell(self.input_size, self.hidden_size, "sigmoid"))
            for l in range(1, self.num_layers):
                self.rnn_cell_list.append(RnnCell(self.hidden_size, self.hidden_size, "sigmoid"))

        else:
            raise ValueError("Invalid activation. Please use tanh, relu or sigmoid activation.")

        self.fc = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden_state=None):
        '''
        Inputs: input (torch tensor) of shape [batchsize, seqence length, inputsize]
        Output: output (torch tensor) of shape [batchsize, outputsize]
        '''

        if hidden_state is None:
            hidden_state = torch.zeros(self.num_layers, input.shape[0], self.hidden_size).to(device)
        outs = []

        hidden = list()
        for layer in range(self.num_layers):
            hidden.append(hidden_state[layer, :, :])

        for t in range(input.size(1)):
            print('RNN for loop input shape', input[:, t, :].shape)
            for layer in range(self.num_layers):
                if layer == 0:
                    hidden_l = self.rnn_cell_list[layer](input[:, t, :], (hidden[layer][0], hidden[layer][1]))
                else:
                    hidden_l = self.rnn_cell_list[layer](hidden[layer - 1][0], (hidden[layer][0], hidden[layer][1]))
                hidden[layer] = hidden_l
            outs.append(hidden_l[0])

        # select last time step indexed at [-1]
        out = outs[-1].squeeze()
        out = self.fc(out)
        return out

Accuracy

def top1accuracy(class_prob, target):
    """
    Calculates top 1 accuracy.
    Input: Class probabilities from the neural network (tensor)
    and target class predictions (tensor) of shape number of classes by batch size
    Output: Top 1 accuracy (float).
    """
    with torch.no_grad():
        pred_class = torch.argmax(class_prob, dim = 1)
        top1_acc = sum(target==pred_class) / len(target)
    return top1_acc

def top5accuracy(class_prob, target):
    """
    Calculates top 1 accuracy.
    Input: Output of class probabilities from the neural network (tensor)
    of shape number of classes by batch size.
    Output: Top 5 accuracy (float).
    """
    with torch.no_grad():
        _, top5_class_pred = class_prob.topk(5, 1, largest = True, sorted = True)
        top5_class_pred = top5_class_pred.t()
        target_reshaped = target.view(1, -1).expand_as(top5_class_pred)
        correct = (top5_class_pred == target_reshaped)
        ncorrect_top5 = 0
        for i in range(correct.shape[1]):
            if (sum(correct[:,i]) >= 1):
                ncorrect_top5 = ncorrect_top5 + 1
        top5_acc = ncorrect_top5 / len(target)
        return top5_acc

Train

def train (data_loader, model, optimizer, loss_f):
    """
    Input: train loader (torch loader), model (torch model), optimizer (torch optimizer)
          loss function (torch custom yolov1 loss).
    Output: loss (torch float).
    """
    loss_lst = []
    top1_acc_lst = []
    top5_acc_lst = []
    model.train()
    for batch_idx, (x, y) in enumerate(data_loader):
        x, y = x.to(device), y.to(device)
        print('X shape train:', x.shape)
        print('y shape train:', y.shape)

        #x =  x[:, None, ...].expand(x.shape[0], x.shape[1], x.shape[1]).to(device)
        x = x.unsqueeze(-1)
        print('X shape train:', x.shape)
        #print('x.shape: ', x.shape)
        #x = x.T.unsqueeze(-1).unsqueeze(-1).to(device)
        out = model(x)
        #print('Model out Shape', out.shape)
        del x
        class_prob = F.softmax(out, dim = 1)
        pred = torch.argmax(class_prob, dim = 1)
        loss_val = loss_f(class_prob, y)
        loss_lst.append(float(loss_val.item()))
        top1_acc_val = top1accuracy(class_prob, y)
        top5_acc_val = top5accuracy(class_prob, y)
        top1_acc_lst.append(float(top1_acc_val))
        top5_acc_lst.append(float(top5_acc_val))
        del y, out
        optimizer.zero_grad()
        loss_val.backward()
        optimizer.step()
        del loss_val

    # compute average loss
    loss_val = round(sum(loss_lst) / len(loss_lst), 4)
    top1_acc = round(sum(top1_acc_lst) / len(top1_acc_lst),  4)
    top5_acc = round(sum(top5_acc_lst) / len(top5_acc_lst), 4)
    return (loss_val, top1_acc, top5_acc)

Run Train

device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 32
weight_decay = 0.0005
epochs = 20
nworkers = 2
lr = 0.00001
pin_memory = True
data_dir =  'data/'

train_dataset = datasets.MNIST(root = data_dir,
                                                train = True,
                                                transform = T.Compose([T.ToTensor(), T.Lambda(torch.flatten)]),
                                                download = True)
train_loader = DataLoader(dataset = train_dataset,
                                            batch_size = batch_size,
                                            shuffle = True, drop_last = True)

model = SimpleRNN(input_size = 28 * 28, hidden_size = 64, num_layers = 1, output_size = 10, activation = 'relu').to(device)
optimizer = optim.Adam(model.parameters(), lr = lr, weight_decay = weight_decay)
loss_f = nn.CrossEntropyLoss()
for epoch in range(epochs):
    train_loss_value, train_top1acc_value, train_top5acc_value = train(train_loader, model, optimizer, loss_f)

    print(f"Epoch:{epoch + 1}   Train[Loss:{train_loss_value} Top1 Acc:{train_top1acc_value}  Top5 Acc:{train_top5acc_value}]")

Error

---------------------------------------------------------------------------

RuntimeError                              Traceback (most recent call last)

<ipython-input-17-c4861450c26a> in <cell line: 23>()
     22 loss_f = nn.CrossEntropyLoss()
     23 for epoch in range(epochs):
---> 24     train_loss_value, train_top1acc_value, train_top5acc_value = train(train_loader, model, optimizer, loss_f)
     25 
     26     print(f"Epoch:{epoch + 1}   Train[Loss:{train_loss_value} Top1 Acc:{train_top1acc_value}  Top5 Acc:{train_top5acc_value}]")

6 frames

/usr/local/lib/python3.10/dist-packages/torch/nn/modules/linear.py in forward(self, input)
    112 
    113     def forward(self, input: Tensor) -> Tensor:
--> 114         return F.linear(input, self.weight, self.bias)
    115 
    116     def extra_repr(self) -> str:

RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x1 and 784x64)