@ptrblck thanks for response yes i want to visualize last layer output of encoder in VAE
i am getting below error @ output = resnet_vae(data) , i pused input data to gpu.because all parameters in gpu
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms
class Dataset(data.Dataset):
"Characterizes a dataset for PyTorch"
def __init__(self, filenames, labels, transform=None):
"Initialization"
self.filenames = filenames
self.labels = labels
self.transform = transform
def __len__(self):
"Denotes the total number of samples"
return len(self.filenames)
def __getitem__(self, index):
"Generates one sample of data"
# Select sample
filename = self.filenames[index]
X = Image.open(filename)
if self.transform:
X = self.transform(X) # transform
y = torch.LongTensor([self.labels[index]])
return X, y
## ---------------------- end of Dataloaders ---------------------- ##
def conv2D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv2D
outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int))
return outshape
def convtrans2D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv2D
outshape = ((img_size[0] - 1) * stride[0] - 2 * padding[0] + kernel_size[0],
(img_size[1] - 1) * stride[1] - 2 * padding[1] + kernel_size[1])
return outshape
## ---------------------- ResNet VAE ---------------------- ##
class ResNet_VAE(nn.Module):
def __init__(self, fc_hidden1=1024, fc_hidden2=768, drop_p=0.3, CNN_embed_dim=256):
super(ResNet_VAE, self).__init__()
self.fc_hidden1, self.fc_hidden2, self.CNN_embed_dim = fc_hidden1, fc_hidden2, CNN_embed_dim
# CNN architechtures
self.ch1, self.ch2, self.ch3, self.ch4 = 16, 32, 64, 128
self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3) # 2d kernal size
self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2) # 2d strides
self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0) # 2d padding
# encoding components
resnet = models.resnet152(pretrained=True)
modules = list(resnet.children())[:-1] # delete the last fc layer.
self.resnet = nn.Sequential(*modules)
self.fc1 = nn.Linear(resnet.fc.in_features, self.fc_hidden1)
self.bn1 = nn.BatchNorm1d(self.fc_hidden1, momentum=0.01)
self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
self.bn2 = nn.BatchNorm1d(self.fc_hidden2, momentum=0.01)
# Latent vectors mu and sigma
self.fc3_mu = nn.Linear(self.fc_hidden2, self.CNN_embed_dim) # output = CNN embedding latent variables
self.fc3_logvar = nn.Linear(self.fc_hidden2, self.CNN_embed_dim) # output = CNN embedding latent variables
# Sampling vector
self.fc4 = nn.Linear(self.CNN_embed_dim, self.fc_hidden2)
self.fc_bn4 = nn.BatchNorm1d(self.fc_hidden2)
self.fc5 = nn.Linear(self.fc_hidden2, 64 * 4 * 4)
self.fc_bn5 = nn.BatchNorm1d(64 * 4 * 4)
self.relu = nn.ReLU(inplace=True)
# Decoder
self.convTrans6 = nn.Sequential(
nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=self.k4, stride=self.s4,
padding=self.pd4),
nn.BatchNorm2d(32, momentum=0.01),
nn.ReLU(inplace=True),
)
self.convTrans7 = nn.Sequential(
nn.ConvTranspose2d(in_channels=32, out_channels=8, kernel_size=self.k3, stride=self.s3,
padding=self.pd3),
nn.BatchNorm2d(8, momentum=0.01),
nn.ReLU(inplace=True),
)
self.convTrans8 = nn.Sequential(
nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=self.k2, stride=self.s2,
padding=self.pd2),
nn.BatchNorm2d(3, momentum=0.01),
nn.Sigmoid() # y = (y1, y2, y3) \in [0 ,1]^3
)
def encode(self, x):
x = self.resnet(x) # ResNet
x = x.view(x.size(0), -1) # flatten output of conv
# FC layers
x = self.bn1(self.fc1(x))
x = self.relu(x)
x = self.bn2(self.fc2(x))
x = self.relu(x)
# x = F.dropout(x, p=self.drop_p, training=self.training)
mu, logvar = self.fc3_mu(x), self.fc3_logvar(x)
return mu, logvar
def reparameterize(self, mu, logvar):
if self.training:
std = logvar.mul(0.5).exp_()
eps = Variable(std.data.new(std.size()).normal_())
return eps.mul(std).add_(mu)
else:
return mu
def decode(self, z):
x = self.relu(self.fc_bn4(self.fc4(z)))
x = self.relu(self.fc_bn5(self.fc5(x))).view(-1, 64, 4, 4)
x = self.convTrans6(x)
x = self.convTrans7(x)
x = self.convTrans8(x)
x = F.interpolate(x, size=(224, 224), mode='bilinear')
return x
def forward(self, x):
#self.x = self.relu(self.x)
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
x_reconst = self.decode(z)
return x_reconst, z, mu, logvar```
```import os
import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
#from modules import *
from sklearn.model_selection import train_test_split
import pickle
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms
# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 1024
CNN_embed_dim = 256 # latent dim extracted by 2D CNN
res_size = 224 # ResNet image size
dropout_p = 0.2 # dropout probability
# training parameters
epochs = 1 # training epochs
batch_size = 64
learning_rate = 1e-3
log_interval = 10 # interval for displaying training info
# save model
save_model_path = './results_MNIST'
def check_mkdir(dir_name):
if not os.path.exists(dir_name):
os.mkdir(dir_name)
def loss_function(recon_x, x, mu, logvar):
# MSE = F.mse_loss(recon_x, x, reduction='sum')
MSE = F.binary_cross_entropy(recon_x, x, reduction='sum')
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return MSE + KLD
def train(log_interval, model, device, train_loader, optimizer, epoch):
# set model as training mode
model.train()
losses = []
all_y, all_z, all_mu, all_logvar = [], [], [], []
N_count = 0 # counting total trained sample in one epoch
for batch_idx, (X, y) in enumerate(train_loader):
# distribute data to device
X, y = X.to(device), y.to(device).view(-1, )
N_count += X.size(0)
optimizer.zero_grad()
X_reconst, z, mu, logvar = model(X) # VAE
loss = loss_function(X_reconst, X, mu, logvar)
losses.append(loss.item())
loss.backward()
optimizer.step()
all_y.extend(y.data.cpu().numpy())
all_z.extend(z.data.cpu().numpy())
all_mu.extend(mu.data.cpu().numpy())
all_logvar.extend(logvar.data.cpu().numpy())
# show information
if (batch_idx + 1) % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))
all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)
# save Pytorch models of best record
torch.save(model.state_dict(), os.path.join(save_model_path, 'model_epoch{}.pth'.format(epoch + 1))) # save motion_encoder
torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1))) # save optimizer
print("Epoch {} model saved!".format(epoch + 1))
return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, losses
def validation(model, device, optimizer, test_loader):
# set model as testing mode
model.eval()
test_loss = 0
all_y, all_z, all_mu, all_logvar = [], [], [], []
with torch.no_grad():
for X, y in test_loader:
# distribute data to device
X, y = X.to(device), y.to(device).view(-1, )
X_reconst, z, mu, logvar = model(X)
loss = loss_function(X_reconst, X, mu, logvar)
test_loss += loss.item() # sum up batch loss
all_y.extend(y.data.cpu().numpy())
all_z.extend(z.data.cpu().numpy())
all_mu.extend(mu.data.cpu().numpy())
all_logvar.extend(logvar.data.cpu().numpy())
test_loss /= len(test_loader.dataset)
all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)
# show information
print('\nTest set ({:d} samples): Average loss: {:.4f}\n'.format(len(test_loader.dataset), test_loss))
return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, test_loss
# Detect devices
use_cuda = torch.cuda.is_available() # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu") # use CPU or GPU
# Data loading parameters
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 4, 'pin_memory': True} if use_cuda else {}
transform = transforms.Compose([transforms.Resize([res_size, res_size]),
transforms.ToTensor(),
transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # gray -> GRB 3 channel (lambda function)
transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])]) # for grayscale images
# MNIST dataset (images and labels)
MNIST_train_dataset = torchvision.datasets.MNIST(root='/content', train=True, transform=transform, download=True)
MNIST_test_dataset = torchvision.datasets.MNIST(root='/content', train=False, transform=transform)
# Data loader (input pipeline)
train_loader = torch.utils.data.DataLoader(dataset=MNIST_train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=MNIST_test_dataset, batch_size=batch_size, shuffle=False)
# Create model
resnet_vae = ResNet_VAE(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
#resnet_vae.resnet[0].register_forward_hook(get_activation('lastlayer'))
print("Using", torch.cuda.device_count(), "GPU!")
print(resnet_vae.relu)
model_params = list(resnet_vae.parameters())
optimizer = torch.optim.Adam(model_params, lr=learning_rate)
# record training process
epoch_train_losses = []
epoch_test_losses = []
check_mkdir(save_model_path)
# start training
for epoch in range(epochs):
# train, test model
X_train, y_train, z_train, mu_train, logvar_train, train_losses = train(log_interval, resnet_vae, device, train_loader, optimizer, epoch)
X_test, y_test, z_test, mu_test, logvar_test, epoch_test_loss = validation(resnet_vae, device, optimizer, valid_loader)
# save results
epoch_train_losses.append(train_losses)
epoch_test_losses.append(epoch_test_loss)
# save all train test results
A = np.array(epoch_train_losses)
C = np.array(epoch_test_losses)
np.save(os.path.join(save_model_path, 'ResNet_VAE_training_loss.npy'), A)
np.save(os.path.join(save_model_path, 'X_MNIST_train_epoch{}.npy'.format(epoch + 1)), X_train) #save last batch
np.save(os.path.join(save_model_path, 'y_MNIST_train_epoch{}.npy'.format(epoch + 1)), y_train)
np.save(os.path.join(save_model_path, 'z_MNIST_train_epoch{}.npy'.format(epoch + 1)), z_train)
# Visualize feature maps
activation = {}
def get_activation(name):
def hook(resnet_vae, input, output):
activation[name] = output.detach()
return hook
#model.conv2.register_forward_hook(get_activation('conv2'))
#resnet_vae.x.register_forward_hook(get_activation('x'))
resnet_vae.relu.register_forward_hook(get_activation('relu'))
data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
data = data.cuda
output = resnet_vae(data)
act = activation['relu'].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
print(act.size(0))
axarr[idx].imshow(act[idx]) ```
error:
TypeError: conv2d() received an invalid combination of arguments - got (builtin_function_or_method, Parameter, NoneType, tuple, tuple, tuple, int), but expected one of:
* (Tensor input, Tensor weight, Tensor bias, tuple of ints stride, tuple of ints padding, tuple of ints dilation, int groups)
didn't match because some of the arguments have invalid types: (!builtin_function_or_method!, !Parameter!, !NoneType!, !tuple!, !tuple!, !tuple!, int)
* (Tensor input, Tensor weight, Tensor bias, tuple of ints stride, str padding, tuple of ints dilation, int groups)
didn't match because some of the arguments have invalid types: (!builtin_function_or_method!, !Parameter!, !NoneType!, !tuple!, !tuple!, !tuple!, int)