@ptrblck . i m trying to print output of last layer of encoder in vae
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms
class Dataset(data.Dataset):
"Characterizes a dataset for PyTorch"
def __init__(self, filenames, labels, transform=None):
"Initialization"
self.filenames = filenames
self.labels = labels
self.transform = transform
def __len__(self):
"Denotes the total number of samples"
return len(self.filenames)
def __getitem__(self, index):
"Generates one sample of data"
# Select sample
filename = self.filenames[index]
X = Image.open(filename)
if self.transform:
X = self.transform(X) # transform
y = torch.LongTensor([self.labels[index]])
return X, y
## ---------------------- end of Dataloaders ---------------------- ##
def conv2D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv2D
outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int))
return outshape
def convtrans2D_output_size(img_size, padding, kernel_size, stride):
# compute output shape of conv2D
outshape = ((img_size[0] - 1) * stride[0] - 2 * padding[0] + kernel_size[0],
(img_size[1] - 1) * stride[1] - 2 * padding[1] + kernel_size[1])
return outshape
## ---------------------- ResNet VAE ---------------------- ##
class ResNet_VAE(nn.Module):
def __init__(self, fc_hidden1=1024, fc_hidden2=768, drop_p=0.3, CNN_embed_dim=256):
super(ResNet_VAE, self).__init__()
self.fc_hidden1, self.fc_hidden2, self.CNN_embed_dim = fc_hidden1, fc_hidden2, CNN_embed_dim
# CNN architechtures
self.ch1, self.ch2, self.ch3, self.ch4 = 16, 32, 64, 128
self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3) # 2d kernal size
self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2) # 2d strides
self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0) # 2d padding
# encoding components
resnet = models.resnet152(pretrained=True)
modules = list(resnet.children())[:-1] # delete the last fc layer.
self.resnet = nn.Sequential(*modules)
self.fc1 = nn.Linear(resnet.fc.in_features, self.fc_hidden1)
self.bn1 = nn.BatchNorm1d(self.fc_hidden1, momentum=0.01)
self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
self.bn2 = nn.BatchNorm1d(self.fc_hidden2, momentum=0.01)
# Latent vectors mu and sigma
self.fc3_mu = nn.Linear(self.fc_hidden2, self.CNN_embed_dim) # output = CNN embedding latent variables
self.fc3_logvar = nn.Linear(self.fc_hidden2, self.CNN_embed_dim) # output = CNN embedding latent variables
# Sampling vector
self.fc4 = nn.Linear(self.CNN_embed_dim, self.fc_hidden2)
self.fc_bn4 = nn.BatchNorm1d(self.fc_hidden2)
self.fc5 = nn.Linear(self.fc_hidden2, 64 * 4 * 4)
self.fc_bn5 = nn.BatchNorm1d(64 * 4 * 4)
self.relu = nn.ReLU(inplace=True)
# Decoder
self.convTrans6 = nn.Sequential(
nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=self.k4, stride=self.s4,
padding=self.pd4),
nn.BatchNorm2d(32, momentum=0.01),
nn.ReLU(inplace=True),
)
self.convTrans7 = nn.Sequential(
nn.ConvTranspose2d(in_channels=32, out_channels=8, kernel_size=self.k3, stride=self.s3,
padding=self.pd3),
nn.BatchNorm2d(8, momentum=0.01),
nn.ReLU(inplace=True),
)
self.convTrans8 = nn.Sequential(
nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=self.k2, stride=self.s2,
padding=self.pd2),
nn.BatchNorm2d(3, momentum=0.01),
nn.Sigmoid() # y = (y1, y2, y3) \in [0 ,1]^3
)
def encode(self, x):
self.x = self.resnet(x) # ResNet
self.x = self.x.view(self.x.size(0), -1) # flatten output of conv
# FC layers
self.x = self.bn1(self.fc1(self.x))
self.x = self.relu(self.x)
self.x = self.bn2(self.fc2(self.x))
self.x = self.relu(self.x)
# x = F.dropout(x, p=self.drop_p, training=self.training)
mu, logvar = self.fc3_mu(self.x), self.fc3_logvar(self.x)
return mu, logvar
def reparameterize(self, mu, logvar):
if self.training:
std = logvar.mul(0.5).exp_()
eps = Variable(std.data.new(std.size()).normal_())
return eps.mul(std).add_(mu)
else:
return mu
def decode(self, z):
x = self.relu(self.fc_bn4(self.fc4(z)))
x = self.relu(self.fc_bn5(self.fc5(x))).view(-1, 64, 4, 4)
x = self.convTrans6(x)
x = self.convTrans7(x)
x = self.convTrans8(x)
x = F.interpolate(x, size=(224, 224), mode='bilinear')
return x
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
x_reconst = self.decode(z)
return x_reconst, z, mu, logvar
import os
import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
#from modules import *
from sklearn.model_selection import train_test_split
import pickle
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms
# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 1024
CNN_embed_dim = 256 # latent dim extracted by 2D CNN
res_size = 224 # ResNet image size
dropout_p = 0.2 # dropout probability
# training parameters
epochs = 1 # training epochs
batch_size = 64
learning_rate = 1e-3
log_interval = 10 # interval for displaying training info
# save model
save_model_path = './results_MNIST'
def check_mkdir(dir_name):
if not os.path.exists(dir_name):
os.mkdir(dir_name)
def loss_function(recon_x, x, mu, logvar):
# MSE = F.mse_loss(recon_x, x, reduction='sum')
MSE = F.binary_cross_entropy(recon_x, x, reduction='sum')
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return MSE + KLD
def train(log_interval, model, device, train_loader, optimizer, epoch):
# set model as training mode
model.train()
losses = []
all_y, all_z, all_mu, all_logvar = [], [], [], []
N_count = 0 # counting total trained sample in one epoch
for batch_idx, (X, y) in enumerate(train_loader):
# distribute data to device
X, y = X.to(device), y.to(device).view(-1, )
N_count += X.size(0)
optimizer.zero_grad()
X_reconst, z, mu, logvar = model(X) # VAE
loss = loss_function(X_reconst, X, mu, logvar)
losses.append(loss.item())
loss.backward()
optimizer.step()
all_y.extend(y.data.cpu().numpy())
all_z.extend(z.data.cpu().numpy())
all_mu.extend(mu.data.cpu().numpy())
all_logvar.extend(logvar.data.cpu().numpy())
# show information
if (batch_idx + 1) % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))
all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)
# save Pytorch models of best record
torch.save(model.state_dict(), os.path.join(save_model_path, 'model_epoch{}.pth'.format(epoch + 1))) # save motion_encoder
torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1))) # save optimizer
print("Epoch {} model saved!".format(epoch + 1))
return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, losses
def validation(model, device, optimizer, test_loader):
# set model as testing mode
model.eval()
test_loss = 0
all_y, all_z, all_mu, all_logvar = [], [], [], []
with torch.no_grad():
for X, y in test_loader:
# distribute data to device
X, y = X.to(device), y.to(device).view(-1, )
X_reconst, z, mu, logvar = model(X)
loss = loss_function(X_reconst, X, mu, logvar)
test_loss += loss.item() # sum up batch loss
all_y.extend(y.data.cpu().numpy())
all_z.extend(z.data.cpu().numpy())
all_mu.extend(mu.data.cpu().numpy())
all_logvar.extend(logvar.data.cpu().numpy())
test_loss /= len(test_loader.dataset)
all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)
# show information
print('\nTest set ({:d} samples): Average loss: {:.4f}\n'.format(len(test_loader.dataset), test_loss))
return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, test_loss
# Detect devices
use_cuda = torch.cuda.is_available() # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu") # use CPU or GPU
# Data loading parameters
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 4, 'pin_memory': True} if use_cuda else {}
transform = transforms.Compose([transforms.Resize([res_size, res_size]),
transforms.ToTensor(),
transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # gray -> GRB 3 channel (lambda function)
transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])]) # for grayscale images
# MNIST dataset (images and labels)
MNIST_train_dataset = torchvision.datasets.MNIST(root='/content', train=True, transform=transform, download=True)
MNIST_test_dataset = torchvision.datasets.MNIST(root='/content', train=False, transform=transform)
# Data loader (input pipeline)
train_loader = torch.utils.data.DataLoader(dataset=MNIST_train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=MNIST_test_dataset, batch_size=batch_size, shuffle=False)
# Create model
resnet_vae = ResNet_VAE(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
#resnet_vae.fc2.register_forward_hook(get_activation('fc_hidden2'))
print("Using", torch.cuda.device_count(), "GPU!")
print(resnet_vae.self.x)
model_params = list(resnet_vae.parameters())
optimizer = torch.optim.Adam(model_params, lr=learning_rate)
# record training process
epoch_train_losses = []
epoch_test_losses = []
check_mkdir(save_model_path)
# start training
for epoch in range(epochs):
# train, test model
X_train, y_train, z_train, mu_train, logvar_train, train_losses = train(log_interval, resnet_vae, device, train_loader, optimizer, epoch)
X_test, y_test, z_test, mu_test, logvar_test, epoch_test_loss = validation(resnet_vae, device, optimizer, valid_loader)
# save results
epoch_train_losses.append(train_losses)
epoch_test_losses.append(epoch_test_loss)
# save all train test results
A = np.array(epoch_train_losses)
C = np.array(epoch_test_losses)
np.save(os.path.join(save_model_path, 'ResNet_VAE_training_loss.npy'), A)
np.save(os.path.join(save_model_path, 'X_MNIST_train_epoch{}.npy'.format(epoch + 1)), X_train) #save last batch
np.save(os.path.join(save_model_path, 'y_MNIST_train_epoch{}.npy'.format(epoch + 1)), y_train)
np.save(os.path.join(save_model_path, 'z_MNIST_train_epoch{}.npy'.format(epoch + 1)), z_train)
# Visualize feature maps
activation = {}
def get_activation(name):
def hook(resnet_vae, input, output):
activation[name] = output.detach()
return hook
#model.conv2.register_forward_hook(get_activation('conv2'))
resnet_vae.x.register_forward_hook(get_activation('x'))
data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
output = resnet_vae(data)
act = activation['x'].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
print(act.size(0))
axarr[idx].imshow(act[idx]) ```
AttributeError: 'ResNet_VAE' object has no attribute 'self'