Hi everyone, I am writing some image classification code which I plan to use as a small part in some evolutionary algorithm stuff. I tried to test the correctness of my code by running MNIST. I googled online about the networks which score 99% on that dataset, but when I tried on my code it did not achieve that score. I think there might be some problem with my code. To explain, I divided the original training set into train (80%) and val set (20%). I loaded the dataset from file. The augmentation stuff I read from this paper: [2310.19909] Battle of the Backbones: A Large-Scale Comparison of Pretrained Models across Computer Vision Tasks.
If anyone can help me with this that would be great. Thank you.
from __future__ import print_function
import argparse
from enum import Enum
import networkx as nx
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import sys
import os
from os.path import join as pjoin
import numpy as np
import random
import math
from pdb import set_trace
from time import time
import logging
from copy import deepcopy
import PIL
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim.lr_scheduler import StepLR
from torchvision import transforms, utils, datasets, models
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torchvision.transforms import v2
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import timm
# from lr_sched import adjust_learning_rate
#import albumentations as A
#from albumentations.pytorch import ToTensorV2
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
# self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc1 = nn.Linear(16 * 4 * 4, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
# set_trace()
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
class MyNet(nn.Module):
def __init__(self, input_size=(3,28,28), num_classes=10):
"""
init convolution and activation layers
Args:
input_size: (1,28,28)
num_classes: 10
"""
super(MyNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(input_size[0], 32, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2))
self.layer2 = nn.Sequential(
nn.Conv2d(32, 64, kernel_size=5),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2))
self.fc1 = nn.Linear(4 * 4 * 64, num_classes)
def forward(self, x):
"""
forward function describes how input tensor is transformed to output tensor
Args:
x: (Nx1x28x28) tensor
"""
# set_trace()
x = self.layer1(x)
x = self.layer2(x)
x = x.reshape(x.size(0), -1)
x = self.fc1(x)
return x
def get_simple_model():
model = MyNet()
return model
def get_input_size(dataset_name):
if dataset_name == "MNIST":
return 28
else:
return 32
def get_transforms(input_size):
# Kaiming He, Xinlei Chen, Saining Xie, Yanghao Li, Piotr Dollár, and Ross Girshick. Masked autoencoders
# are scalable vision learners. In Proceedings of the IEEE/CVF Conference on Computer Vision and Pattern
# Recognition, pages 16000–16009, 2022.
image_transforms = {
"train": transforms.Compose([
transforms.Resize((input_size, input_size)),
transforms.RandomHorizontalFlip(),
# transforms.RandAugment(num_ops=9),
transforms.ToTensor(),
# transforms.RandomErasing(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
"val": transforms.Compose([
transforms.Resize((input_size, input_size)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
"test": transforms.Compose([
transforms.Resize((input_size, input_size)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
}
return image_transforms
def get_dataloader(base_path, dataset_name, batch_size, image_transforms, train_subset_name, val_subset_name, test_subset_name,
get_fitness=True):
dataset_train = datasets.ImageFolder(
root=pjoin(base_path, dataset_name, train_subset_name),
transform=image_transforms["train"])
dataset_val = datasets.ImageFolder(
root=pjoin(base_path, dataset_name, val_subset_name),
transform=image_transforms["val"])
dataset_test = datasets.ImageFolder(
root=pjoin(base_path, dataset_name, test_subset_name),
transform = image_transforms["test"])
train_loader = DataLoader(dataset=dataset_train, shuffle=True,
batch_size=batch_size)
val_loader = DataLoader(dataset=dataset_val, shuffle=False,
batch_size=batch_size)
if get_fitness is True:
test_loader = DataLoader(dataset=dataset_test,
shuffle=False, batch_size=batch_size)
else:
# When we apply on the test set, we want batch_size equal to 1 so that we can calculate all the metrics accurately
# or else it will just be the average for all mini-batches.
test_loader = DataLoader(dataset=dataset_test,
shuffle=False, batch_size=1)
return (train_loader, val_loader, test_loader)
def multi_acc_train(y_pred, y_test):
""" Function to calculate multi-class accuracy. Handles the case when CutMix and MixUp are used.
"""
y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
# set_trace()
_, y_pred_tags = torch.max(y_pred_softmax, dim = 1)
_, y_test_tags = torch.max(y_test, dim = 1)
correct_pred = (y_pred_tags == y_test_tags).float()
acc = correct_pred.sum() / len(correct_pred)
return acc
def multi_acc(y_pred, y_test):
""" Function to calculate multi-class accuracy
"""
y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
# set_trace()
_, y_pred_tags = torch.max(y_pred_softmax, dim = 1)
# set_trace()
correct_pred = (y_pred_tags == y_test).float()
# set_trace()
acc = correct_pred.sum() / len(correct_pred)
# set_trace()
# acc = torch.round(acc)
# set_trace()
# acc = torch.round(acc * 100)
return acc
def make_train_step(model, model_name, loss_fn, optimizer, epoch_idx, learning_rate, n_epoch):
""" Function to make one training step
"""
def perform_train_step(X_train_batch, y_train_batch, data_iter_step, len_data_loader):
model.train()
# adjust_learning_rate(optimizer, data_iter_step / len(data_loader) + epoch_idx, learning_rate, n_epoch)
# https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html#inception-v3
# Cosine annealing with a warmup period of 5 epochs
# adjust_learning_rate(optimizer, data_iter_step / len_data_loader + epoch_idx, learning_rate, n_epoch)
y_train_pred = model(X_train_batch)
train_loss = loss_fn(y_train_pred, y_train_batch)
train_acc = multi_acc_train(y_train_pred, y_train_batch)
train_loss.backward()
optimizer.step()
optimizer.zero_grad()
return (train_acc.item(), train_loss.item())
return perform_train_step
def make_val_step(model, model_name, loss_fn, optimizer, epoch_idx, n_epoch):
""" Function to make one validation step
"""
def perform_val_step(X_val_batch, y_val_batch):
model.eval()
# y_val_pred = model(X_val_batch).squeeze()
y_val_pred = model(X_val_batch)
val_acc = multi_acc(y_val_pred, y_val_batch)
val_loss = loss_fn(y_val_pred, y_val_batch)
# set_trace()
return (val_acc.item(), val_loss.item())
return perform_val_step
def mini_batch(device, epoch_idx, n_epoch, data_loader, step_fn, n_cls):
""" Function to run through a mini-batch (train or validation)
"""
mini_batch_acc_list = []
mini_batch_loss_list = []
# count = 0
len_data_loader = len(data_loader)
for iteration_idx, (X_batch, y_batch) in enumerate(data_loader):
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
if "train" in step_fn.__name__:
# Apply CutMix and MixUp for the training set
# set_trace()
cutmix = v2.CutMix(num_classes=n_cls)
mixup = v2.MixUp(num_classes=n_cls)
cutmix_or_mixup = v2.RandomChoice([cutmix, mixup])
X_batch, y_batch = cutmix_or_mixup(X_batch, y_batch)
# set_trace()
if "train" in step_fn.__name__:
(mini_batch_acc, mini_batch_loss) = step_fn(X_batch, y_batch, iteration_idx, len_data_loader)
else:
(mini_batch_acc, mini_batch_loss) = step_fn(X_batch, y_batch)
mini_batch_acc_list.append(mini_batch_acc)
mini_batch_loss_list.append(mini_batch_loss)
# count += 1
loss = np.mean(mini_batch_loss_list)
acc = np.mean(mini_batch_acc_list)
# set_trace()
return (acc, loss)
def adjust_learning_rate(optimizer, epoch, learning_rate, n_epoch, warmup_epochs=5, min_lr=1e-6):
"""Decay the learning rate with half-cycle cosine after warmup"""
if epoch < warmup_epochs:
lr = learning_rate * epoch / warmup_epochs
else:
lr = min_lr + (learning_rate - min_lr) * 0.5 * \
(1. + math.cos(math.pi * (epoch - warmup_epochs) / (n_epoch - warmup_epochs)))
for param_group in optimizer.param_groups:
if "lr_scale" in param_group:
param_group["lr"] = lr * param_group["lr_scale"]
else:
param_group["lr"] = lr
return lr
def evaluate_model(model, model_name, dataset_name, base_path, batch_size, n_epoch, n_cls, get_fitness=True):
if get_fitness is True:
train_subset_name = "train1"
val_subset_name = "val1"
test_subset_name = "val"
else:
train_subset_name = "train"
val_subset_name = "val"
test_subset_name = "test"
input_size = get_input_size(dataset_name)
image_transforms = get_transforms(input_size)
(train_loader, val_loader, test_loader) = get_dataloader(base_path, dataset_name, batch_size, image_transforms,
train_subset_name, val_subset_name, test_subset_name, get_fitness=get_fitness)
criterion = nn.CrossEntropyLoss()
learning_rate = 0.001
# learning_rate = 0.0001
warmup_epoch = 5
# optimizer = optim.Adam(model.parameters(), lr=0.0001)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
# lambda1 = lambda epoch: learning_rate ** epoch / warmup_epoch
# warmup_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=[lambda1])
# cosine_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer)
accuracy_stats = {
'train': [],
"val": []
}
loss_stats = {
'train': [],
"val": []
}
best_model_wts = deepcopy(model.state_dict())
best_acc = 0.0
best_loss = 100000
model_output_name = '%s_best.pth' % (model_name)
optimizer.zero_grad()
print("Begin training.")
for epoch_idx in range(1, n_epoch):
train_step_fn = make_train_step(model, model_name, criterion, optimizer, epoch_idx, learning_rate, n_epoch)
val_step_fn = make_val_step(model, model_name, criterion, optimizer, epoch_idx, n_epoch)
# TRAINING
(train_epoch_acc, train_epoch_loss) = mini_batch(DEVICE, epoch_idx, n_epoch, train_loader, train_step_fn, n_cls)
loss_stats['train'].append(train_epoch_loss)
accuracy_stats['train'].append(train_epoch_acc)
with torch.no_grad():
(val_epoch_acc, val_epoch_loss) = mini_batch(DEVICE, epoch_idx, n_epoch, val_loader, val_step_fn, n_cls)
loss_stats['val'].append(val_epoch_loss)
accuracy_stats['val'].append(val_epoch_acc)
print(f'Epoch {epoch_idx+0:02}: | Train Loss: {train_epoch_loss:.5f} | Val Loss: {val_epoch_loss:.5f} | Train Acc: {train_epoch_acc:.3f} | Val Acc: {val_epoch_acc:.3f}')
logging.debug(f"Percentage of CUDA allocated memory: {torch.cuda.memory_allocated()*100 / torch.cuda.max_memory_allocated()}")
# set_trace()
if val_epoch_loss < best_loss:
best_acc = val_epoch_acc
best_loss = val_epoch_loss
best_model_wts = deepcopy(model.state_dict())
torch.save(best_model_wts, model_output_name)
model.load_state_dict(torch.load(model_output_name, weights_only=True))
model.to(DEVICE)
model.eval()
y_pred_list = []
y_true_list = []
with torch.no_grad():
# for x_batch, y_batch in tqdm(test_loader):
for x_batch, y_batch in test_loader:
x_batch, y_batch = x_batch.to(DEVICE), y_batch.to(DEVICE)
y_test_pred = model(x_batch)
_, y_pred_tag = torch.max(y_test_pred, dim = 1)
y_pred_list.append(y_pred_tag.cpu().numpy())
y_true_list.append(y_batch.cpu().numpy())
# set_trace()
# y_pred_list = [i[0][0][0] for i in y_pred_list]
# y_true_list = [i[0] for i in y_true_list]
y_pred_list = [i[0] for i in y_pred_list]
y_true_list = [i[0] for i in y_true_list]
acc = accuracy_score(y_true_list, y_pred_list)
precision = precision_score(y_true_list, y_pred_list, average="macro")
recall = recall_score(y_true_list, y_pred_list, average="macro")
f1 = f1_score(y_true_list, y_pred_list, average="macro")
conf = confusion_matrix(y_true_list, y_pred_list)
return (acc, precision, recall, f1, conf)
def main():
dataset_name = "MNIST"
base_path = "/home/truong/Desktop/TRUONG/datasets"
model_name = "MyModel"
n_epoch = 200
batch_size = 64
n_cls = 10
model = get_simple_model()
model = model.to(DEVICE)
(acc, precision, recall, f1, conf) = evaluate_model(model, model_name, dataset_name, base_path, batch_size,
n_epoch, n_cls,
# get_fitness=True,
get_fitness=False,
)
print((acc, precision, recall, f1, conf))
if __name__ == "__main__":
main()