Hi all,
I’m a newbie with NN and PyTorch and trying to implement a small network as shown in the following code. The problem is if I’m calculating Test accuracy at intermediate epochs, my final test accuracy is increasing, compared to if I only estimate it at the last epoch. It looks like maybe there is some test data leakage to training step, however, I’m not able to debug it.
The test accuracy when estimated only at last epoch (150) is 78%. However, if estimated at intermediate epochs (every 15th or 30th etc., result does not change much) as well, the accuracy at the 150th epoch increases to 87%. Train accuracy is always reaching 100%, it is over-trained.
I’ve a custom train-test data distribution code, based on some pre-known features of data, which is not shown below. However, similar behavior is observed with random_split(), with a small increase from 85% to 87%.
Any help will be appreciated.
Thanks,
Pragya
# %% All classes and functions
# ------------------------------------------------------------------------------------------------------
# Writing custom dataset
class RF_dataset(Dataset):
def __init__(self, file_name):
data = scipy.io.loadmat(file_name)
# Data arranged as [batch_size,n_feature,feature_length]
# In Pytorch images are represented as [batch_siz,channels,height,width]
# This can be compared to images, with 4 channels and height of feature_length, width of 1
self.X = data['featVec'] # Can change this to featVec, featVec_1, featVec_2, featVec_3
# Only use with object features
self.X = self.X[:,[0,4,12,13],:] # Features: Using only RSSI and Phase with object
self.Y = data['labelVec'] # Labels: #Objects {0,5}
self.LocTag = data['LocTag'] # Location Tag: Based on pre-defined scheme nData x 5
self.PosTag = data['PosTag'] # Posture Tag: 1=stand, 2=sit, for three people looks like 211
self.nStand = data['nStand'] # Number of standing people
self.nSit = data['nSit'] # Number of sitting people
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
sample = self.X[idx]
return (self.Y[idx],sample)
# ------------------------------------------------------------------------------------------------------
#Network definition
class occupancyCounting(nn.Module):
def __init__(self):
super(occupancyCounting,self).__init__()
n_CW = 120 # This directly affects accuracy if value is low.
nFeat = 4
self.conv1 = nn.Conv1d(nFeat,n_CW,kernel_size=5, stride=4, padding=1) #featvec
self.conv2 = nn.Conv1d(n_CW,int(n_CW/2) , kernel_size=64, stride=35, padding=1) #featvec
self.conv3 = nn.Conv1d(int(n_CW/2),6,kernel_size=24, stride=10, padding=0) #featvec
self.maxPool1 = nn.MaxPool1d(kernel_size=8,stride=4,padding=0)
self.avgPool1 = nn.AvgPool1d(kernel_size=5)
self.drop1 = nn.Dropout(p=0.2)
def forward(self, x):
x = (F.relu(self.conv1(x)))
#print(x.shape)
x = (F.relu(self.conv2(x))) #featvec
#print(x.shape)
x = self.avgPool1(F.relu(self.conv3(x)))
#print(x.shape)
x = self.drop1(x)
return x
# ------------------------------------------------------------------------------------------------------
# Performing weighted selection without replacement for location tags
def WeightedSelectionWithoutReplacement(weights, m):
# https://stackoverflow.com/questions/352670/weighted-random-selection-with-and-without-replacement
elt = [(math.log(random.random()) / weights[i], i) for i in range(len(weights))]
return [x[1] for x in heapq.nlargest(m, elt)]
# ------------------------------------------------------------------------------------------------------
#%% Main function: Including training and testing model
if __name__ == '__main__':
file_name=r"xxxxxxxx.mat"
dataset = RF_dataset(file_name)
data_normalize = False # Normalize data?
# -----------------------------
# Train-test data distribution
fractionTrain = 0.75
trainset, testset = random_split(dataset, [int(fractionTrain*len(dataset)), len(dataset)-int(fractionTrain*len(dataset))])
batchsize_train = 50
train_loader = DataLoader(trainset, batch_size=batchsize_train,shuffle=True)
batchsize_test = len(testset)
test_loader = DataLoader(testset, batch_size=batchsize_test, shuffle=False)
# ------------------------------------------------------------------------------------------------------
# %% Start Training Block
#hyperparameter definition
model = occupancyCounting()
learning_rate = 0.008 # 0.02, 0.008
momentum = 0.1
random_seed=1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
EPOCHS = 150
train_loss_epoch = [] # Training Loss every epoch
train_acc_epoch = []
train_time_epoch = [] # Training time accumulating from the 0th epoch
test_loss_n_epoch = [] # Test Loss every nth epoch
test_acc_n_epoch =[] # Test accuracy every nth epoch
total_acc_n_epoch = [] # Total accuracy every nth epoch
count_n_epoch = [] # Counting every nth epoch
plt.figure()
start_time = time.time()
for epoch in range(EPOCHS):
# ------------------------------------------------------------------------------------------------------
train_loss = 0
correct_train = []
for Y_train,X_train in train_loader:
current_batchsize = X_train.shape[0]
feat_length = X_train.shape[2]
# -----------------------------------------------------------------
X_train = X_train.float()
Y_train = Y_train.view(-1,Y_train.shape[1])
Y_train = Y_train.long()
model.train()
output_train = model(X_train)
loss = F.cross_entropy(output_train,Y_train) # Computing total loss of this batch
optimizer.zero_grad()
loss.backward() #calculate the gradient decent
optimizer.step() #update the weight
# Clean-up step for PyTorch
train_loss = train_loss + (loss*current_batchsize/len(train_loader.dataset)) # Loss for each epoch
with torch.no_grad():
model.eval()
output_train = model(X_train)
symbol_train = output_train.data
symbol_train = symbol_train.max(dim=1).indices
correct_train.append(Y_train.eq(symbol_train).numpy())
train_time_epoch.append(time.time()-start_time)
correct_train = [ item for sublist in correct_train for item in sublist]
correct_train = np.asarray(correct_train)
correct_train = correct_train.reshape(-1)
train_accuracy = correct_train.sum()/len(train_loader.dataset)
train_acc_epoch.append(float(train_accuracy))
train_loss_epoch.append(float(train_loss)) # Appending Training loss for the epoch
if (epoch+1)%10 == 0:
# Update Plot with some random case every 5 epochs
plt.clf()
plt.plot(Y_train[0], 'bo')
plt.plot(symbol_train[0], 'rx')
plt.title('epoch: %i , Training loss is ' %(epoch+1) + '%f'%float(train_loss))
plt.show()
plt.pause(0.1)
if (epoch+1)%15 == 0:
count_n_epoch.append(epoch+1) # Current Epoch
# ------------------------------------------------------------------------------------------------------
#evaluation: # Update test loss & accuracy every n epochs
with torch.no_grad():
correct_test = []
test_loss = 0
for Y_test, X_test in test_loader:
current_batchsize = X_test.shape[0]
feat_length = X_test.shape[2]
# -----------------------------------------------------------------
X_test = X_test.float()
Y_test = Y_test.view(-1,Y_test.shape[1])
Y_test = Y_test.long()
model.eval()
output_test = model(X_test)
test_loss = test_loss + (F.cross_entropy(output_test,Y_test) * current_batchsize/len(test_loader.dataset))
symbol_test = output_test.data
symbol_test = symbol_test.max(dim=1).indices
correct_test.append(Y_test.eq(symbol_test).numpy())
correct_test = [ item for sublist in correct_test for item in sublist]
correct_test = np.asarray(correct_test)
correct_test = correct_test.reshape(-1)
test_accuracy = correct_test.sum()/len(test_loader.dataset)
print('Epoch: %i' %(epoch+1),', Train loss: %0.2f' %(float(train_loss)),', Train Accuracy: %0.2f' %train_accuracy)
print('Epoch: %i' %(epoch+1),', Test loss: %0.2f' %(float(test_loss)),', Test Accuracy: %0.2f' %test_accuracy)
test_loss_n_epoch.append(float(test_loss))
test_acc_n_epoch.append(test_accuracy)