# LSTM - Test set accuracy varies with batch size used for test set

Hi!

I am training an LSTM where inputs are 5x(N+5) matrices (each row being a new timestep) and outputs are N-dim one-hot vectors.

The issue is that with the same trained model (I’ve been training on batch_size=32), I get different test accuracies when I vary the batch_size I use to iterate through the test set. I get around ~75% accuracy with test batch size = 32, 85% with 64, and 97% with the full test set. At the bottom of train_model method, there are some ‘predict’ functions which demonstrate this varying accuracy.

I’ve searched the issue and found a few similar issues. Some didn’t seem relevant, some fixes I tried but they didn’t work. Two that I saw but didn’t know how to apply to mine were this and this , though I highly doubt the second one is relevant.

Let me know if you have questions on my code!!

Here is dataset_generator.py

``````import numpy as np

from itertools import permutations, combinations

ANALOGY_ROWS = 2
ANALOGY_COLS = 3
ANALOGY_SIZE = ANALOGY_ROWS * ANALOGY_COLS  # number of cells in an analogy

def _process_data(shapes, shapes_in_train, trials):
"""
Convert lists of shape indices into matrices
:param shapes: 2D array of vectors corresponding to shapes
:param shapes_in_train: list of indices corresponding to shapes
:param trials:
:return:
"""

data = []

# todo: dynamically generate this based off ANALOGY_ROWS AND ANALOGY_COLS
order_array = np.array([[1, 0, 0, 1, 0], [0, 1, 0, 1, 0], [0, 0, 1, 1, 0],
[1, 0, 0, 0, 1], [0, 1, 0, 0, 1]])  # first 3 are column, next 2 are row

for trial in trials:

# get source matrix and target vector
source, target = trial[:ANALOGY_SIZE - 1], trial[ANALOGY_SIZE - 1]
train_matrix = np.concatenate((shapes[source, :], order_array), axis=1)
y = shapes[target, :]

# make MC options
shape_set = set(source)
random_shape_set = shapes_in_train - shape_set  # randomly select a shape from training that wasn't in source
random_shape = np.random.choice(list(random_shape_set))
mc_choices = list(shape_set)  # the 3 shapes in the relation
mc_choices.append(random_shape)  # add the randomly selected 4th shape
mc_choice_matrix = shapes[mc_choices, :]
data.append((train_matrix, y, mc_choice_matrix))

return data

def return_dataset(n=100, train_size=1000000, test_size=50000):
"""
Creates a dataset of distribution of 3 relations.
With N shapes, there are combo(N, 3) x Perm(Perm(3,3,), 2) trials

:param n: total number of shapes
:param train_size: number of training samples
:param test_size: number of test samples
:return: 2 lists of lists, where each sublist of the 2 lists is [5x105 training input, 1x100 correct answer, 4x100 MC options]
"""

shapes = np.identity(n)
shape_indices = np.arange(n)

trials_all = []  # list of possible trials (unprocessed)

# generate all trials (not processed into matrices)
xyz_combos = list(combinations(shape_indices, 3))
for combo in xyz_combos:
perms = list(permutations(combo))
perms_final = list(permutations(perms, 2))
for perm in perms_final:
perm_list = [element for tupl in perm for element in tupl]
trials_all.append(perm_list)
assert((train_size + test_size) <= len(trials_all))

# break into train and test
trials_select_indices = np.random.choice(len(trials_all), train_size + test_size, replace=False)
trials_select = [trials_all[i] for i in trials_select_indices]
trials_train, trials_test = trials_select[:train_size], trials_select[train_size:]
assert(len(trials_train) == train_size)
assert(len(trials_test) == test_size)

# get all shapes in train
train_shapes = set()
for trial in trials_train:
for shape in trial:

# remove test trials that contain shapes that aren't in training
# todo: replace removed trials with new trials
trials_test_valid_indices = []
for i, trial in enumerate(trials_test):
trial_valid = True
for shape in trial:
if shape not in train_shapes:
trial_valid = False
if trial_valid:
trials_test_valid_indices.append(i)
trials_test_valid = [trials_test[i] for i in trials_test_valid_indices]

train_data = _process_data(shapes, train_shapes, trials_train)
test_data = _process_data(shapes, train_shapes, trials_test_valid)

return train_data, test_data

if __name__ == "__main__":
pass
``````

Here is model.py (which you should run)

``````import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import *
import torch.nn as nn

from dataset_generator import return_dataset, ANALOGY_SIZE

# GPU Check
if torch.cuda.is_available():
device = torch.device("cuda")
print("GPU is available")
else:
device = torch.device("cpu")
print("GPU not available, CPU used")

class LSTM(nn.Module):

def __init__(self, input_size, output_size, hidden_dim, n_layers):
super(LSTM, self).__init__()

# Defining some parameters
self.hidden_dim = hidden_dim
self.n_layers = n_layers

# Defining the layers
# todo: later -- approach this as a ConvNet, drop in models.resnet18() instead
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_dim, num_layers=n_layers, batch_first=True)  # LSTM layer
self.linear = nn.Linear(hidden_dim, output_size)  # FC layer

# initialize LSTM weights (https://discuss.pytorch.org/t/initializing-parameters-of-a-multi-layer-lstm/5791)
for name, param in self.lstm.named_parameters():
if 'bias' in name:
nn.init.constant_(param, 0.0)
elif 'weight' in name:
nn.init.xavier_normal_(param)

def forward(self, x):

# inspired by: https://stackoverflow.com/questions/58176523/how-to-use-a-different-test-batch-size-for-rnn-in-pytorch
batch_size = x.size(0)

# Initializing hidden state for first input
hidden = self.init_hidden(batch_size)

# Passing in the input and hidden state into the model and obtaining outputs
out, hidden = self.lstm(x, hidden)

# Reshaping the outputs such that it can be fit into the fully connected layer
out = out[:, -1, :]  # take final timestep prediction
out = out.contiguous().view(-1, self.hidden_dim)
out = self.linear(out)
# out = F.softmax(out) (and change 'criterion' to NLLoss)  # crossentropyloss does both

return out, hidden

def init_hidden(self, batch_size):
# This method generates the first hidden state of zeros which we'll use in the forward pass

hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).float(),
torch.zeros(self.n_layers, batch_size, self.hidden_dim).float())

hidden = [h.to(device) for h in hidden]  # todo: do I need this?

# We'll send the tensor holding the hidden state to the device we specified earlier as well
return hidden

def train_model(n=100, train_size=1000000, test_size=50000, num_epochs=50):

########################### Variables
seq_len = 5
batch_size = 32
input_size = n + 5
hidden_dim = 256
output_dim = n
num_layers = 1
learning_rate = 5e-4

########################### Get dataset
train, test = return_dataset(n, train_size, test_size)

########################### Model
model = LSTM(input_size=input_size, output_size=output_dim, hidden_dim=hidden_dim, n_layers=num_layers)
criterion = nn.CrossEntropyLoss()

########################### Train
model.train()  # set model to training state
dl = DataLoader(train, batch_size=batch_size, shuffle=True, drop_last=True)  # drop_last probably isn't necessary now that batches can be variable
train_loss = []

for epoch in range(1, num_epochs + 1):
print('epoch {}'.format(epoch))

total_loss = 0
n_batch = 0
for batch in dl:

input = batch.float()
label = batch.float()
input = input.to(device)
# print("Input shape: {} --> (Batch Size, Sequence Length, One-Hot Encoding Size)".format(input.shape))
# print("Target shape: {}".format(label.shape))

output, hidden = model(input)
label = label.to(device)

loss = criterion(output, label.argmax(-1).long())  # argmax(-1) takes index of the one in the one-hot
total_loss += loss.item()
n_batch += 1
loss.backward()  # Does backprop and calculates gradients

train_loss.append(total_loss / n_batch)
print('\t {}'.format(total_loss/n_batch))

plt.plot(range(1, num_epochs + 1), train_loss)
plt.title('Training loss')
plt.savefig('TrainLossModel1_n={}_hidden={}_num_epochs={}_train_size={}_test_size={}'.format(n, hidden_dim, num_epochs, train_size, test_size))
print("Model in training state?: {}".format(model.training))  # true, as expected

########################### Save the model
torch.save(model.state_dict(),
'TrainLoss_ModelDynamicBatch_n={}_hidden={}_num_epochs={}_train_size={}_test_size={}'.format(n,
hidden_dim,
num_epochs,
train_size,
test_size))
torch.save(test, 'test.pt')

########################### Predict
print('\n')
print('-------------------------------Now entering predictions------------------------------')

# should be equal
predict(model, test, test_size, True)
y_true_full_dynamic = predict(model, test, test_size, False)

# can diverge very slightly b/c of final batch being left off
y_true_full_constant = predict(model, test, batch_size, True)
predict(model, test, batch_size, False)

# can diverge very slightly b/c of final batch being left off
predict(model, test, batch_size * 2, False)
predict(model, test, batch_size * 2, True)

# can diverge very slightly b/c of final batch being left off
predict(model, test, batch_size // 2, False)
predict(model, test, batch_size // 2, True)

########################### Check that the test sets are the same (or almost the same for cases where drop_last=True) regardless of test batch size
print('Length of test set when test_batch_size = training batch_size and drop_last=True: {}'.format(len(y_true_full_constant)))
y_true_full_constant.extend([-1] * (test_size % batch_size))  # pad it so it's back to full length of test set
print('Length of test set when predicting all at once: {}'.format(len(y_true_full_dynamic)))
test = (np.array(y_true_full_constant) == np.array(y_true_full_dynamic))
print('% of former and latter that are equal: {}'.format(sum(test) / len(test)))  # should almost be 1

def predict(model, test, test_batch_size, drop_last):

model.eval()  # set model to eval mode

y_pred_full = []
y_true_full = []

for batch in DataLoader(test, batch_size=test_batch_size, shuffle=False, drop_last=drop_last):
y_pred, hidden = model(batch.float())
prob = nn.functional.softmax(y_pred, dim=0).data
shape = torch.max(prob, dim=1).tolist()
y_pred_full.extend(shape)
target = torch.max(batch, dim=1).tolist()
y_true_full.extend(target)

assert(len(y_pred_full) == len(y_true_full))

results = (np.array(y_pred_full) == np.array(y_true_full)) * 1
print('Test batch size: {}'.format(test_batch_size))
print('# Test samples used: {}'.format(len(results)))
print('Accuracy: {}'.format(sum(results) / len(results)))
print("Model in training state?: {}".format(model.training))  # False, as expected
print('----')

return y_true_full.copy()

if __name__ == "__main__":
train_model(50, 10000, 1200, num_epochs=20)
``````

Nevermind. I did softmax over the wrong dimension in my `predict` function. Hate to see it. Interestingly enough, the model still predicted well even when the softmax was along the wrong dimension!