Hi!
I am training an LSTM where inputs are 5x(N+5) matrices (each row being a new timestep) and outputs are N-dim one-hot vectors.
The issue is that with the same trained model (I’ve been training on batch_size=32), I get different test accuracies when I vary the batch_size I use to iterate through the test set. I get around ~75% accuracy with test batch size = 32, 85% with 64, and 97% with the full test set. At the bottom of train_model method, there are some ‘predict’ functions which demonstrate this varying accuracy.
I’ve searched the issue and found a few similar issues. Some didn’t seem relevant, some fixes I tried but they didn’t work. Two that I saw but didn’t know how to apply to mine were this and this , though I highly doubt the second one is relevant.
Let me know if you have questions on my code!!
Here is dataset_generator.py
import numpy as np
from itertools import permutations, combinations
ANALOGY_ROWS = 2
ANALOGY_COLS = 3
ANALOGY_SIZE = ANALOGY_ROWS * ANALOGY_COLS # number of cells in an analogy
def _process_data(shapes, shapes_in_train, trials):
"""
Convert lists of shape indices into matrices
:param shapes: 2D array of vectors corresponding to shapes
:param shapes_in_train: list of indices corresponding to shapes
:param trials:
:return:
"""
data = []
# todo: dynamically generate this based off ANALOGY_ROWS AND ANALOGY_COLS
order_array = np.array([[1, 0, 0, 1, 0], [0, 1, 0, 1, 0], [0, 0, 1, 1, 0],
[1, 0, 0, 0, 1], [0, 1, 0, 0, 1]]) # first 3 are column, next 2 are row
for trial in trials:
# get source matrix and target vector
source, target = trial[:ANALOGY_SIZE - 1], trial[ANALOGY_SIZE - 1]
train_matrix = np.concatenate((shapes[source, :], order_array), axis=1)
y = shapes[target, :]
# make MC options
shape_set = set(source)
random_shape_set = shapes_in_train - shape_set # randomly select a shape from training that wasn't in source
random_shape = np.random.choice(list(random_shape_set))
mc_choices = list(shape_set) # the 3 shapes in the relation
mc_choices.append(random_shape) # add the randomly selected 4th shape
mc_choice_matrix = shapes[mc_choices, :]
data.append((train_matrix, y, mc_choice_matrix))
return data
def return_dataset(n=100, train_size=1000000, test_size=50000):
"""
Creates a dataset of distribution of 3 relations.
With N shapes, there are combo(N, 3) x Perm(Perm(3,3,), 2) trials
:param n: total number of shapes
:param train_size: number of training samples
:param test_size: number of test samples
:return: 2 lists of lists, where each sublist of the 2 lists is [5x105 training input, 1x100 correct answer, 4x100 MC options]
"""
shapes = np.identity(n)
shape_indices = np.arange(n)
trials_all = [] # list of possible trials (unprocessed)
# generate all trials (not processed into matrices)
# todo: add in XYZ:XYZ
xyz_combos = list(combinations(shape_indices, 3))
for combo in xyz_combos:
perms = list(permutations(combo))
perms_final = list(permutations(perms, 2))
for perm in perms_final:
perm_list = [element for tupl in perm for element in tupl]
trials_all.append(perm_list)
assert((train_size + test_size) <= len(trials_all))
# break into train and test
trials_select_indices = np.random.choice(len(trials_all), train_size + test_size, replace=False)
trials_select = [trials_all[i] for i in trials_select_indices]
trials_train, trials_test = trials_select[:train_size], trials_select[train_size:]
assert(len(trials_train) == train_size)
assert(len(trials_test) == test_size)
# get all shapes in train
train_shapes = set()
for trial in trials_train:
for shape in trial:
train_shapes.add(shape)
# remove test trials that contain shapes that aren't in training
# todo: replace removed trials with new trials
trials_test_valid_indices = []
for i, trial in enumerate(trials_test):
trial_valid = True
for shape in trial:
if shape not in train_shapes:
trial_valid = False
if trial_valid:
trials_test_valid_indices.append(i)
trials_test_valid = [trials_test[i] for i in trials_test_valid_indices]
train_data = _process_data(shapes, train_shapes, trials_train)
test_data = _process_data(shapes, train_shapes, trials_test_valid)
return train_data, test_data
if __name__ == "__main__":
pass
Here is model.py (which you should run)
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import *
import torch.nn as nn
from torch.utils.data import DataLoader
from dataset_generator import return_dataset, ANALOGY_SIZE
# GPU Check
if torch.cuda.is_available():
device = torch.device("cuda")
print("GPU is available")
else:
device = torch.device("cpu")
print("GPU not available, CPU used")
class LSTM(nn.Module):
def __init__(self, input_size, output_size, hidden_dim, n_layers):
super(LSTM, self).__init__()
# Defining some parameters
self.hidden_dim = hidden_dim
self.n_layers = n_layers
# Defining the layers
# todo: later -- approach this as a ConvNet, drop in models.resnet18() instead
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_dim, num_layers=n_layers, batch_first=True) # LSTM layer
self.linear = nn.Linear(hidden_dim, output_size) # FC layer
# initialize LSTM weights (https://discuss.pytorch.org/t/initializing-parameters-of-a-multi-layer-lstm/5791)
for name, param in self.lstm.named_parameters():
if 'bias' in name:
nn.init.constant_(param, 0.0)
elif 'weight' in name:
nn.init.xavier_normal_(param)
def forward(self, x):
# inspired by: https://stackoverflow.com/questions/58176523/how-to-use-a-different-test-batch-size-for-rnn-in-pytorch
batch_size = x.size(0)
# Initializing hidden state for first input
hidden = self.init_hidden(batch_size)
# Passing in the input and hidden state into the model and obtaining outputs
out, hidden = self.lstm(x, hidden)
# Reshaping the outputs such that it can be fit into the fully connected layer
out = out[:, -1, :] # take final timestep prediction
out = out.contiguous().view(-1, self.hidden_dim)
out = self.linear(out)
# out = F.softmax(out) (and change 'criterion' to NLLoss) # crossentropyloss does both
return out, hidden
def init_hidden(self, batch_size):
# This method generates the first hidden state of zeros which we'll use in the forward pass
hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).float(),
torch.zeros(self.n_layers, batch_size, self.hidden_dim).float())
hidden = [h.to(device) for h in hidden] # todo: do I need this?
# We'll send the tensor holding the hidden state to the device we specified earlier as well
return hidden
def train_model(n=100, train_size=1000000, test_size=50000, num_epochs=50):
########################### Variables
seq_len = 5
batch_size = 32
input_size = n + 5
hidden_dim = 256
output_dim = n
num_layers = 1
learning_rate = 5e-4
########################### Get dataset
train, test = return_dataset(n, train_size, test_size)
########################### Model
model = LSTM(input_size=input_size, output_size=output_dim, hidden_dim=hidden_dim, n_layers=num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
########################### Train
model.train() # set model to training state
dl = DataLoader(train, batch_size=batch_size, shuffle=True, drop_last=True) # drop_last probably isn't necessary now that batches can be variable
train_loss = []
for epoch in range(1, num_epochs + 1):
print('epoch {}'.format(epoch))
total_loss = 0
n_batch = 0
for batch in dl:
input = batch[0].float()
label = batch[1].float()
input = input.to(device)
# print("Input shape: {} --> (Batch Size, Sequence Length, One-Hot Encoding Size)".format(input.shape))
# print("Target shape: {}".format(label.shape))
output, hidden = model(input)
label = label.to(device)
loss = criterion(output, label.argmax(-1).long()) # argmax(-1) takes index of the one in the one-hot
total_loss += loss.item()
n_batch += 1
loss.backward() # Does backprop and calculates gradients
optimizer.step() # updates weights accordingly
# Clears existing gradients
optimizer.zero_grad()
train_loss.append(total_loss / n_batch)
print('\t {}'.format(total_loss/n_batch))
plt.plot(range(1, num_epochs + 1), train_loss)
plt.title('Training loss')
plt.savefig('TrainLossModel1_n={}_hidden={}_num_epochs={}_train_size={}_test_size={}'.format(n, hidden_dim, num_epochs, train_size, test_size))
print("Model in training state?: {}".format(model.training)) # true, as expected
########################### Save the model
torch.save(model.state_dict(),
'TrainLoss_ModelDynamicBatch_n={}_hidden={}_num_epochs={}_train_size={}_test_size={}'.format(n,
hidden_dim,
num_epochs,
train_size,
test_size))
torch.save(test, 'test.pt')
########################### Predict
print('\n')
print('-------------------------------Now entering predictions------------------------------')
# should be equal
predict(model, test, test_size, True)
y_true_full_dynamic = predict(model, test, test_size, False)
# can diverge very slightly b/c of final batch being left off
y_true_full_constant = predict(model, test, batch_size, True)
predict(model, test, batch_size, False)
# can diverge very slightly b/c of final batch being left off
predict(model, test, batch_size * 2, False)
predict(model, test, batch_size * 2, True)
# can diverge very slightly b/c of final batch being left off
predict(model, test, batch_size // 2, False)
predict(model, test, batch_size // 2, True)
########################### Check that the test sets are the same (or almost the same for cases where drop_last=True) regardless of test batch size
print('Length of test set when test_batch_size = training batch_size and drop_last=True: {}'.format(len(y_true_full_constant)))
y_true_full_constant.extend([-1] * (test_size % batch_size)) # pad it so it's back to full length of test set
print('Length of test set when predicting all at once: {}'.format(len(y_true_full_dynamic)))
test = (np.array(y_true_full_constant) == np.array(y_true_full_dynamic))
print('% of former and latter that are equal: {}'.format(sum(test) / len(test))) # should almost be 1
def predict(model, test, test_batch_size, drop_last):
model.eval() # set model to eval mode
with torch.no_grad():
y_pred_full = []
y_true_full = []
for batch in DataLoader(test, batch_size=test_batch_size, shuffle=False, drop_last=drop_last):
y_pred, hidden = model(batch[0].float())
prob = nn.functional.softmax(y_pred, dim=0).data
shape = torch.max(prob, dim=1)[1].tolist()
y_pred_full.extend(shape)
target = torch.max(batch[1], dim=1)[1].tolist()
y_true_full.extend(target)
assert(len(y_pred_full) == len(y_true_full))
results = (np.array(y_pred_full) == np.array(y_true_full)) * 1
print('Test batch size: {}'.format(test_batch_size))
print('# Test samples used: {}'.format(len(results)))
print('Accuracy: {}'.format(sum(results) / len(results)))
print("Model in training state?: {}".format(model.training)) # False, as expected
print('----')
return y_true_full.copy()
if __name__ == "__main__":
train_model(50, 10000, 1200, num_epochs=20)