Hi,
Thanks for your reply. Actually I was trying to build CNN from scratch. I have re-write this code and run again, and now it’s giving false for is_leaf
. Executable code snippet is below:
import torch
from torchvision import datasets,transforms
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
transform = transforms.Compose([transforms.Resize((32,32)),
transforms.ToPILImage(),
transforms.ToTensor(),
])
train_set = datasets.CIFAR10(root='./data', train = True, download = True, transform = transform)
test_set = datasets.CIFAR10(root='./data', train = False, download = True, transform = transform)
# Getting numpy data
X_train = train_set.data
#Converting to tensor for autograd
X_train = torch.from_numpy(X_train)
# Getting numpy data
X_test = test_set.data
#Converting to tensor for autograd
X_test = torch.from_numpy(X_test)
# Labels
Y_train= train_set.targets
Y_test = test_set.targets
# Normalizing our dataset
X_train = X_train.float()
# getting mean
X_train_mean = torch.mean(X_train, dim= 0)
#getting variance
X_train_var = torch.var(X_train, dim=0)
#performing normalization-> normalize = (data - mean) / std
X_train = (X_train - X_train_mean) / torch.sqrt(X_train_var)
class FCLayer():
def __init__(self, sizes, learning_rate):
self.sizes = sizes
self.learning_rate = learning_rate
# we will save all parameters of our fc layer in this directory
self.params = self.initialization()
def sigmoid(self, x):
return 1/(1+torch.exp(-x))
def sigmoid_der(self, x):
return self.sigmoid(x) *(1- self.sigmoid(x))
def softmax(self, x):
# Numerically stable with large exponentials
exps = torch.exp(x - x.max())
return exps / torch.sum(exps, axis=0)
def initialization(self):
#number of neurons in each layer
input_layer = self.sizes[0]
hidden_layer_1 = self.sizes[1]
hidden_layer_2 = self.sizes[2]
output_layer = self.sizes[3]
params = {
'WH1': torch.rand(input_layer, hidden_layer_1, requires_grad= True) * torch.sqrt(torch.tensor(2) / (input_layer + hidden_layer_1)),
'WH2': torch.rand(hidden_layer_1, hidden_layer_2, requires_grad= True) * torch.sqrt(torch.tensor(2) / (hidden_layer_1 + hidden_layer_2)),
'WO': torch.rand(hidden_layer_2, output_layer, requires_grad= True) * torch.sqrt(torch.tensor(2) / (hidden_layer_2 + output_layer)),
'BH1': torch.rand(hidden_layer_1, requires_grad= True) * torch.sqrt(torch.tensor(2)/ (input_layer + hidden_layer_2)),
'BH2': torch.rand(hidden_layer_2, requires_grad= True) * torch.sqrt(torch.tensor(2) / (hidden_layer_1 + hidden_layer_2)),
'BO' : torch.rand(output_layer, requires_grad= True) * np.sqrt(torch.tensor(2) / (hidden_layer_2 + output_layer))
}
return params
def one_hot_encoding(self, Y_train):
h = Y_train.shape[0]
w = 10 # As CIFAR-10 has 10 classes
one_hot_labels = torch.zeros((h,w))
for i in range(h):
one_hot_labels[i, Y_train[i]] = 1
return one_hot_labels
def forward_pass(self, x_train):
params = self.params
# input layer (training samples)
params['A'] = x_train
# input layer to hidden layer 1
params['ZH1'] = torch.matmul(params['A'], params['WH1']) + params['BH1']
params['AH1'] = self.sigmoid(params['ZH1'])
# hidden layer 1 to hidden layer 2
params['ZH2'] = torch.matmul(params['AH1'], params['WH2']) + params['BH2']
params['AH2'] = self.sigmoid(params['ZH2'])
# hidden layer 2 to output layer
params['ZO'] = torch.matmul(params['AH2'], params['WO']) + params['BO']
params['AO'] = self.softmax(params['ZO'])
return params['AO']
def backward_pass(self, one_hot_labels, output):
params = self.params
# Calculate WO update
dcost_dzo = output - one_hot_labels
dzo_dwo = params['AH2']
dcost_wo = torch.matmul(dzo_dwo.T, dcost_dzo)
dcost_bo = dcost_dzo
# Calculate WH2 update
dzo_dah2 = params['WO']
dcost_dah2 = torch.matmul(dcost_dzo, dzo_dah2.T)
dah2_dzh2 = self.sigmoid_der(params['ZH2'])
dzh2_dwh2 = params['AH1']
dcost_w2 = torch.matmul(dzh2_dwh2.T, dah2_dzh2 * dcost_dah2)
dcost_bh2 = dcost_dah2 * dah2_dzh2
# Calculate W1 update
dzh2_dah1 = params['WH2']
dcost_dzh2 = dcost_dah2 * dah2_dzh2
dcost_dah1 = torch.matmul(dcost_dzh2, dzh2_dah1.T)
dah1_dzh1 = self.sigmoid_der(params['ZH1'])
dzh1_dw1 = params['A']
dcost_w1 = torch.matmul(dzh1_dw1.T, dcost_dah1 * dah1_dzh1)
dcost_bh1 = dcost_dah1 * dah1_dzh1
# updating weigths and biases
params['WH1'] -= self.learning_rate * dcost_w1
params['BH1'] -= self.learning_rate * dcost_bh1.sum(axis=0)
params['WH2'] -= self.learning_rate * dcost_w2
params['BH2'] -= self.learning_rate * dcost_bh2.sum(axis=0)
params['WO'] -= self.learning_rate * dcost_wo
params['BO'] -= self.learning_rate * dcost_bo.sum(axis=0)
# taking some training examples to see our training
demo_train = []
for i in range(3):
demo_train.append(X_train[i])
demo_train = torch.stack(demo_train, dim=0)
# Now let's take corresponding labels for those training examples
demo_label = []
for i in range(3):
demo_label.append(Y_train[i])
demo_label = np.asarray(demo_label)
class Conv2D():
def __init__(self, input_chanel, output_chanel, kernel_size, stride_size, padding_size):
self.input_chanel = input_chanel
self.output_chanel = output_chanel
self.kernel_size = kernel_size
self.stride_size = stride_size
self.padding_size = padding_size
def show_details(self):
print('Input Size: ', self.input_chanel)
print('Output Size: ', self.output_chanel)
print('Kernel Size: ', self.kernel_size)
print('Stride Size: ', self.stride_size)
print('Padding Size: ', self.padding_size)
def weight_initialization(self):
# initialize weights with xavier initialization
w = torch.randn(self.kernel_size, self.kernel_size, self.input_chanel, self.output_chanel, requires_grad= True) /9.0 # RGB image has 3 chanel
b = torch.randn(1,1,1, self.output_chanel, requires_grad= True) /9.0
params = {
'weights': w,
'bias': b
}
return params
def zero_pad(self, X):
"""
Argument:
X-- python numpy array of shape (n_H, n_W, n_C) -> n_H: height, n_W: width, n_C: chanel
pad-- integer, amount of padding around each image in vertical and horizontal
Return:
X_pad-- padded image of shape(m, n_H, n_W, n_C) -> m is the total number of images
"""
# we will apply padding only n_H and n_W not in m and n_C
X = X.detach().numpy()
X_pad = np.pad(X, ((0,0), (self.padding_size,self.padding_size), (self.padding_size, self.padding_size), (0,0)), 'constant', constant_values = (0,0))
X_pad = torch.from_numpy(X_pad)
return X_pad
#function to perform single convolution
def conv_single_step(self, a_slice_prev, W, b):
"""
It will apply one filter defined by parameters W on a single slice (a_slice_prev) of the output activation
of the previous layer.
Arguments:
a_slice_prev -- slice of input data of shape (f, f, n_C_prev) -> f: height, width, n_C_prev: chanel of previous layer
W -- Weight parameters contained in a window - matrix of shape (f, f, n_C_prev)
b -- Bias parameters contained in a window - matrix of shape (1, 1, 1)
Returns:
Z -- a scalar value, result of convolving the sliding window (W,b) on a slice x of the input data
"""
# element-wise multiplication between a_slice_prev and w
matrix = torch.mul(a_slice_prev, W)
# sum up all the elements of the matrix
sum = torch.sum(matrix)
# adding bias to the sum to get final output Z
Z = sum + b.float()
return Z
def get_hparameter(self):
hparam = {
'stride': self.stride_size,
'pad': self.padding_size
}
return hparam
def conv_forward(self, A_prev, W, b, hparameters):
"""
Implementing the forward propagation for convolution
Arguments:
A_prev -- output activations of the previous layer, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
m-> total number inputs
n_H_prev-> height of previous layers input
n_W_prev-> width of previous layers input
n-C_prev-> chanel of previous layers input
W -- Weights, numpy array of shape (f, f, n_C_prev, n_C)
f-> spatial extend of kernels/filter
n_C_prec-> chanel of previous layers input
n_C-> dept (how many filters we want to use for next layer)
b -- Biases, numpy array of shape (1, 1, 1, n_C)
n_C-> dept (how many filters we want to use for next layer)
hparameters -- python dictionary containing "stride" and "pad"
Returns:
Z -- conv output, numpy array of shape (m, n_H, n_W, n_C)
m-> total number outputs will be generated in next layer(output layer)
n_H -> height of next layers output
n_W-> width of next layers output
n-C -> dept (how many filters we want to use for next layer)
mem -- cache of values needed for the conv_backward() function
"""
# getting dimensions from A_prev's shape (shape of input data)
(m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape
# getting dimensions of W's shape
(f, f, n_C_prev, n_C) = W.shape
# getting parameters value from hparameter dictionary
stride = hparameters['stride']
pad = hparameters['pad']
# computing dimensions of the conv2D output volume using a formula (Ref: CS231n)[lecture slide 4]
n_H = int((n_H_prev + 2*pad - f)/stride) + 1
n_W = int((n_W_prev + 2*pad - f)/stride) + 1
# Initialize output volume with zeros
Z = torch.zeros([m, n_H, n_W, n_C])
# Applying zero-padding to our input data A_prev
A_prev_pad = self.zero_pad(A_prev)
# now we will use for loop to perform conv2D
#loop over the total training examples
for i in range(m):
# taking i'th training example from zero_padded data
a_prev_pad = A_prev_pad[i,:,:,:]
for height in range(n_H):
for width in range(n_W):
for chanel in range(n_C):
# finding the four corner's of current slide
vert_start = height * stride
vert_end = height* stride+ f
horiz_start = width * stride
horiz_end = width* stride+ f
# with this four corner we are defining our current slice of a_prev_pad
a_slice_prev = a_prev_pad[vert_start:vert_end,horiz_start:horiz_end,:] # we are taking for all 3 chanels
# performing single convolution to each slice with W and b for getting our output
Z[i, height, width, chanel] = self.conv_single_step(a_slice_prev, W[:, :, :, chanel], b[:,:,:,chanel])
# To make sure our output shape is correct
assert(Z.shape == (m, n_H, n_W, n_C))
# Save information in "mem" as cache for the backprop
mem = (A_prev, W, b, hparameters)
return Z, mem
def relu(self, Z):
relu_Z = torch.maximum(torch.tensor(0), Z) # element-wise
return relu_Z
# Max pooling
def pool_forward(self, A_prev, hparameters):
"""
Implementing the forward pass of pooling layer
Arguments:
A_prev -- Input data, numpy array of shape (m, n_H_prev, n_W_prev, n_C_prev)
hparameters -- python dictionary containing "f" and "stride"
f-> spatial extend of filter
Returns:
A -- output of the pool layer, a numpy array of shape (m, n_H, n_W, n_C)
mem -- cache used in the backward pass of the pooling layer, contains the input and hparameters
"""
# getting input dimensions
(m, n_H_prev, n_W_prev, n_C_prev) = A_prev.shape
# retrieving hyper parameters from hparameters
f = hparameters['filter']
stride = hparameters['stride']
# Computing dimensions for output volume
n_H = int(1 + (n_H_prev - f) / stride)
n_W = int(1 + (n_W_prev - f) / stride)
n_C = n_C_prev
# initialize output volume with zeros as A
A = torch.zeros((m, n_H, n_W, n_C))
for i in range(m):
for height in range(n_H):
for width in range(n_W):
for chanel in range(n_C):
# finding the four corners
vert_start = height * stride
vert_end = height* stride+ f
horiz_start = width * stride
horiz_end = width* stride+ f
# with this four corner we are defining our current slice of A_prev of ith training example, chanel c
a_prev_slice = A_prev[i, vert_start:vert_end,horiz_start:horiz_end, chanel]
# getting the maximum value from the slice
A[i, height, width, chanel] = torch.max(a_prev_slice)
# Making sure our output shape is correct
assert(A.shape == (m, n_H, n_W, n_C))
# Store the input and hparameters in "cache" for pool_backward()
mem = (A_prev, hparameters)
return A, mem
class CNN_Model():
def __init__(self, epochs, learning_rate):
self.epochs = epochs
self.learning_rate = learning_rate
# conv layer 1
self.conv_1 = Conv2D(3,16,3,1,1)
# weight initialization
self.wbc_1_params = self.conv_1.weight_initialization()
self.WCONV_1 = self.wbc_1_params['weights']
self.BCONV_1 = self.wbc_1_params['bias']
# conv layer 2
self.conv_2 = Conv2D(16,32,3,1,1)
# weight initialization
self.wbc_2_params = self.conv_2.weight_initialization()
self.WCONV_2 = self.wbc_2_params['weights']
self.BCONV_2 = self.wbc_2_params['bias']
# fully connected Layer
self.fc_layer = FCLayer(sizes=[2048, 512, 128, 10], learning_rate= 0.1)
def forward(self, A_prev): # A_prev= X_train
#conv_1 = Conv2D(3,16,3,1,1)
W1 = self.WCONV_1
b1 = self.BCONV_1
# getting stride and padding
hparams = self.conv_1.get_hparameter()
# performing convolution
output, mem = self.conv_1.conv_forward(A_prev, W1, b1, hparams) # output shape: 50000 * 32 * 32 * 16
# passing the output through ReLu
relu_output = self.conv_1.relu(output) # relu_output shape: 50000 * 32 * 32 * 16
# performorming MaxPool operation
hparameters = {"filter" : 2,
"stride": 2}
pool_output, pool_mem = self.conv_1.pool_forward(relu_output, hparameters) # pool_output shape: 50000 * 16 * 16 * 16
# storing output and pool layer
conv_1_features = {
'output': output,
'mem': mem,
'pool_output': pool_output,
'pool_mem': pool_mem
}
#conv_2 = Conv2D(16,32,3,1,1)
W2 = self.WCONV_2
b2 = self.BCONV_2
# getting stride and padding
hparams_2 = self.conv_2.get_hparameter()
# performing convolution
output_2, mem_2 = self.conv_2.conv_forward(pool_output, W2, b2, hparams) # passing 1s layer output as input # output_2 shape: 50000 * 16 * 16 * 32
# passing the output through ReLu
relu_output_2 = self.conv_2.relu(output_2) # relu_output_2 shape: 50000 * 16 * 16 * 32
# performorming MaxPool operation
hparameters_2 = {"filter" : 2,
"stride": 2}
pool_output_2, pool_mem_2 = self.conv_2.pool_forward(relu_output_2, hparameters_2) # pool_output_2 shape: 50000 * 8 * 8 * 32
# storing output and pool layer output to visualize feature
conv_2_features = {
'output': output_2,
'mem': mem_2,
'pool_output': pool_output_2,
'pool_mem': pool_mem_2
}
# Make output flatten for FC Layer
f_output = conv_2_features['pool_output']
output_rows = torch.flatten(f_output, start_dim= 1)
fc_output = self.fc_layer.forward_pass(output_rows)
# returning feauter maps from two layers
return conv_1_features, conv_2_features, fc_output
def train(self, X_train, Y_train):
start_time = time.time()
# getting one-hot-encoded labels
one_hot_labels = self.fc_layer.one_hot_encoding(Y_train)
# forward pass
conv1_features, conv2_features, output = self.forward(X_train)
for iteration in range(self.epochs):
conv1_features, conv2_features, output = self.forward(X_train)
loss = torch.sum(-one_hot_labels * torch.log(output))
a_prev, wc1, bc1, hp1 = conv1_features['mem']
aa_prev, wc2, bc2, hp2 = conv2_features['mem']
wc1.retain_grad()
bc1.retain_grad()
loss.backward()
print(loss)
print(loss.is_leaf)
print(output.is_leaf)
print(bc1.is_leaf)
print(self.wbc_1_params['bias'])
print(self.wbc_1_params['bias'].is_leaf)
acc = self.accuracy(X_train, Y_train)
print('Epoch: {0}, Time spent: {1:.2f}s, Loss: {2:.2f}, Accuracy: {3:.2f}'.format(
iteration+1, time.time() - start_time, loss, acc*100
))
def predict(self, X_test):
# forward pass
conv1_features, conv2_features, output = self.forward(X_test)
predicted_class= torch.argmax(output, dim= 1)
return predicted_class
def accuracy(self, X_train, Y_train):
predictions = []
pred = self.predict(X_train)
for i in range(len(Y_train)):
if pred[i] == Y_train[i]:
predictions.append(pred[i])
acc = len(predictions) / len(Y_train)
return acc
Now, if I run this, I’m getting this
tensor(3.2958, grad_fn=<SumBackward0>)
False
False
False
tensor([[[[ 0.0255, 0.1655, -0.1032, 0.0785, -0.0081, 0.0402, 0.1369,
-0.1066, -0.0874, 0.1110, 0.0014, -0.1300, 0.0657, -0.0220,
0.0562, 0.1101]]]], grad_fn=<DivBackward0>)
False