I have created a small CNN model and it’s giving me the same output every time. I saw other posts also, but I couldn’t figure out what’s going wrong in my case.
I am using the cigar 10 dataset
I would also like to know If my implementation of checkpoints is right or not.
This is the code I wrote
import os
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torchvision.datasets.utils import download_url
from torchvision.datasets import ImageFolder #This will help us to read the datasets into Pytorch tensors
from torchvision.transforms import ToTensor
import tarfile
import time
import matplotlib.pyplot as plt
#Download the dataset
dataset_url = "https://s3.amazonaws.com/fast-ai-imageclas/cifar10.tgz"
download_url(dataset_url, '.')
# ANy ML problem we should initially visualize the dataset
with tarfile.open('./cifar10.tgz', 'r:gz') as tar:
tar.extractall(path='./data')
data_dir = './data/cifar10'
print(os.listdir(data_dir))
classes = os.listdir(data_dir + '/test')
print(classes)
#Checking the datafiles
airplane_files = os.listdir(data_dir+'/train/airplane')
print('Number of exmaples:',len(airplane_files))
print(airplane_files)
dataset = ImageFolder(data_dir+'/train', transform=ToTensor()) #This will not load all images. Only when needed
img, label = dataset[0]
print(img.shape)
print(label)
print(dataset.class_to_idx)
#We cannot directly visualize the images as the frist channel indicates the color where as matplotlib expects the lastone as channel index
import matplotlib.pyplot as plt
def show_example(img, label):
print('Label: ', dataset.classes[label], '('+str(label)+')')
plt.imshow(img.permute(1, 2, 0))
show_example(*dataset[15000])
#Since there is no predefined validation. We will split the train into two parts
def split_indices(n, val_pct=0.1, seed=99):
n_val = int(n*val_pct)
np.random.seed(seed) #Seed helps in generating the same split if we want to reuse the same split at a later stage
idxs = np.random.permutation(n)
return idxs[n_val:], idxs[:n_val]
val_pct = 0.2
rand_seed = 42
train_indices, val_indices = split_indices(len(dataset), val_pct, rand_seed)
print(train_indices)
train_indices, val_indices = split_indices(len(dataset), val_pct)
print(train_indices)
train_indices, val_indices = split_indices(len(dataset), val_pct, rand_seed)
print(train_indices)
print(len(train_indices))
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data.dataloader import DataLoader
batch_size = 16
#Training and Validation data loaders to create batches of data
train_sampler = SubsetRandomSampler(train_indices)
train_dl = DataLoader(dataset,
batch_size,
sampler = train_sampler)
val_sampler = SubsetRandomSampler(val_indices)
val_dl = DataLoader(dataset,
batch_size,
sampler=val_sampler) #The same dataset is used in validation dataloader as well as the val_sampler has the info about which indices to pickup
#Visualizing a batch in colab itself
from torchvision.utils import make_grid
def show_batch(dl):
for images, labels in dl:
fig, ax = plt.subplots(figsize=(10, 10))
ax.set_xticks([]); ax.set_yticks([]);
ax.imshow(make_grid(images, 10).permute(1, 2, 0)) #Permute us used to change the dimensions as interpreted by imshow
break
show_batch(val_dl)
# For color images convolution is applied for each of the channels individually and then the output features are added up
# Convolutions have few parameters
# Sparisity of connection. Each output depends on the smaller number of input elements
# Local structure is preserved. Parameters sharing and translation invariant
# DO A DEMO ON SPATIAL TRANSLATION AND DOING DETECTION
# SHOW THE PARAMETER ADVANTAGE BY COMPUTING NUMBER OF PARAMETERS is there something similar to model.summary in TF
import torch.nn as nn
import torch.nn.functional as F
simple_model = nn.Sequential(nn.Conv2d(3, 8, kernel_size=3, stride=1, padding=1), nn.MaxPool2d(2,2))
for images, labels in train_dl:
print(images.shape)
out = simple_model(images)
print(out.shape) #Individual channels can represent a lot of different features. They need not be colors alone
break
# Helper functions to run our code on GPU
def get_default_device():
if torch.cuda.is_available():
return torch.device('cuda')
else:
return torch.device('cpu')
def to_device(data, device):
if isinstance(data, (list, tuple)):
return [to_device(x, device) for x in data]
return data.to(device, non_blocking=True)
class DeviceDataLoader():
def __init__(self, dl, device):
self.dl = dl
self.device = device
def __iter__(self):
for b in self.dl:
yield to_device(b, self.device)
def __len__(self):
return len(self.dl)
def calculate_batch_loss(model, criterion, input, label, opt=None, metric=None):
outputs = model(input)
all_preds = torch.tensor([], device=device)
loss = criterion(outputs, label)
if opt is not None:
loss.backward()
opt.step()
opt.zero_grad()
metric_result = 0.0000
if metric is not None:
all_preds = torch.cat(
(all_preds, outputs),
dim=0
)
metric_result = metric(all_preds.argmax(dim=1).tolist(), label.tolist())
return loss.item(), len(input), metric_result
def evaluate(model, criterion, val_dl, metric=None):
with torch.no_grad():
results = [calculate_batch_loss(model, criterion, input, label, metric=metric) for input, label in val_dl]
losses, lens, metrics = zip(*results)
total = np.sum(lens)
avg_loss = np.sum(np.multiply(losses, lens)) / total
avg_metric = 0.0000
if metric is not None:
avg_metric = np.sum(np.multiply(metrics, lens)) / total
return avg_loss, total, avg_metric
def fit(epochs, lr, model, model_name, criterion, train_dl, valid_dl, metric=None, opt_fn=None):
since = time.time()
train_losses, val_losses, metrics = [], [], []
history = []
if opt_fn is None:
opt_fn = torch.optim.SGD
opt = opt_fn(model.parameters(), lr=lr)
for epoch in range(epochs):
print('Epoch {}/{}'.format(epoch+1, num_epochs))
print('-'*10)
model.train()
for data, label in train_dl:
train_loss, _, train_acc = calculate_batch_loss(model, criterion, data, label, opt=opt, metric=metric)
model.eval()
result = evaluate(model, criterion, val_dl, metric=metric)
val_loss, total, val_acc = result
train_losses.append(train_loss)
val_losses.append(val_loss)
metrics.append(val_acc)
# if metric is not None:
print('Train Loss: {:.4f}, Train Acc: {:.4f}'.format(train_loss, train_acc))
print('Val Loss: {:.4f} Val Acc: {:.4f}'.format(val_loss, val_acc))
print()
# else:
# print('Epoch [{}/{}], Val Loss: {:.4f}'.format(epoch+1, epochs, val_loss))
EPOCH = epoch
PATH = './' + model_name + '.pth'
LOSS = val_loss
# checkpoint
torch.save({
'epoch': EPOCH,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': opt.state_dict(),
'loss': LOSS,
}, PATH)
res = {
'train_loss': train_loss,
'train_acc': train_acc,
'val_loss': val_loss,
'val_acc': val_acc
}
history.append(res)
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
return history
def plot_losses(history):
train_losses = [x.get('train_loss') for x in history]
val_losses = [x['val_loss'] for x in history]
plt.plot(train_losses, '-bx')
plt.plot(val_losses, '-rx')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['Training', 'Validation'])
plt.title('Loss vs. No. of epochs');
def plot_accuracies(history):
train_acc = [x['train_acc'] for x in history]
val_acc = [x['val_acc'] for x in history]
plt.plot(train_acc, '-bx')
plt.plot(val_acc, '-rx')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend(['Training', 'Validation'])
plt.title('Accuracy vs. No. of epochs');
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
import itertools
# if normalize:
# cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# print("Normalized confusion matrix")
# else:
# print('Confusion matrix, without normalization')
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
def generate_report(model):
from sklearn.metrics import classification_report, confusion_matrix
correct = 0
total = 0
all_preds = torch.tensor([], device=device)
with torch.no_grad():
for inputs, labels in test_dl:
outputs = model(inputs)
all_preds = torch.cat(
(all_preds, outputs),
dim=0
)
targets = test_ds.targets
target_class = test_ds.classes
prediction = all_preds.argmax(dim=1).tolist()
cm = confusion_matrix(targets, prediction)
print(classification_report(targets, prediction, digits=10))
plt.figure(figsize=(10, 10))
plot_confusion_matrix(cm, target_class)
return None
device = get_default_device()
test_ds = ImageFolder(data_dir+'/test', transform=ToTensor()) #This will not load all images. Only when needed
test_dl = DataLoader(test_ds, batch_size)
test_dl = DeviceDataLoader(test_dl, device)
train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)
model_cnn = nn.Sequential(
nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2),
nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2),
nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2,2),
nn.Flatten(),
nn.Linear(256, 128),
nn.Linear(128, 32),
nn.Linear(32, 10)
)
model_cnn
# model_cnn.state_dict()
num_epochs = 5
lr=0.01
model_name = 'model_cnn_5'
criterion = F.cross_entropy
opt_fn = torch.optim.Adam
metric = accuracy_score
val_loss, _, val_acc = evaluate(model_cnn, F.cross_entropy, val_dl, metric=metric)
print('Loss: {:.4f}, Accuracy: {:.4f}'.format(val_loss, val_acc))
history_cnn = fit(num_epochs, lr, model_cnn, model_name, criterion, train_dl, val_dl, opt_fn=opt_fn, metric=metric)
plot_losses(history_cnn)
plot_accuracies(history_cnn)
generate_report(model_cnn)