Visualize feature map

Which activation would you like to visualize?
You could most likely use this posted code snippet and call register_forward_hook on the desired layer, e.g.:

model = WiFiResNet4(...)
model.layer[2].register_forward_hook(get_activation('conv2'))

The next forward pass would fill the dict and you could visualize the activation via e.g. matplotlib.
Note that you might need to show each channel separately, if the activation cannot be displayed as a grayscale or color image due to its shape.

Hi there! I’m trying to see the activation after all convs layers, right before feeding the linear, and also after the linear (fully connected). i’ve tried what you told me, inserting the text right after performing the train step and I get this error:
File “/home/laura/Documentos/DatasetAppLGG3v6/guada_withvalidation_CNNv7_weightedsample.py”, line 376, in
act = activation[‘conv4’].squeeze()

KeyError: 'conv4’
Any help please?

def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook
model.layer[6].register_forward_hook(get_activation('conv4'))
act = activation['conv4'].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
    axarr[idx].imshow(act[idx])

You would have to register the forward hook after creating the model and perform a forward pass, so that the activation can be stored using the hook.
In your current code snippet you are properly registering the hook, but don’t perform any forward operation.

Thanks that was it, however (because I’m using 130 as batch_size) I’ve done this to visualize it but the result is the one below in which I cannot see the pictures, could you help me with that? Thanks a lot!
Figure_1

fig, axarr = plt.subplots(act.size(1))
for idx in range(act.size(1)):
    axarr[idx].imshow(act[0][idx].cpu())

Based on the code it seems you are trying to visualize each channel from the first sample in the batch, so the batch dimension shouldn’t cause this output, but the high number of filters.
You could create a grid of subplots by specifying the number of rows and columns or alternatively, you could also use torchvision.utils.make_grid, would would do the same.

I’ve tried doing:

fig, axarr = plt.subplots(act.size(0),act.size(1))
for idx in range(act.size(0)):
    for idx2 in range(act.size(1)):
        axarr[idx][idx2].imshow(act[idx][idx2].cpu())

and also

fig, axarr = plt.subplots(act.size(0),act.size(1))
plt.show(torchvision.utils.make_grid(act,act.size(1)))

however with the first option my terminal crash and the second one does not display anything and gives the runtime error: RuntimeError: bool value of Tensor with more than one value is ambiguous :pensive:

Assuming your activation tensor has a shape of [batch_size=130, channels=64, height=24, width=24], you could create a grid of the grayscale filters for each sample using this code snippet:

x = torch.randn(130, 64, 24, 24)
# visualize first sample of batch
tmp = x[0] # index first sample
tmp = tmp.unsqueeze(1) # unsqueeze channel dimension to visualize the activations as grayscale images
grid = torchvision.utils.make_grid(tmp, int(tmp.size(0)**0.5))
# permute to channels-last, as matplotlib expects this image format
grid = grid.permute(1, 2, 0)
plt.imshow(grid.numpy())
1 Like

That works perfectly thanks!! One last question, what about if I want to see the 130 batches? Is that possible?

Thanks a lot!!!

Yes, that’s also possible.
You could use the batch size of the rows and the number of channels as the columns as seen here:

x = torch.randn(130, 64, 24, 24)
x = x.view(-1, 1, 24, 24) # flatten the channels into batch dimension and add channel dim
grid = torchvision.utils.make_grid(x, 130)
# permute to channels-last, as matplotlib expects this image format
grid = grid.permute(1, 2, 0)
plt.imshow(grid.numpy())

Depending on the output size of the image, you might need to split it to a few images.

1 Like

Thanks for the help!! I’m still getting a diffuse image but I think it’s because there are a lot of batches to show.
image

class ConvNet(nn.Module):
def init(self):
super(ConvNet, self).init()
# 1 input image channel, 6 output features, 3x3 square convolution kernel
self.conv1 = nn.Conv2d(1, 6, 3)
# 6 input images channel, 16 output features, 3x3 square convolution kernel
self.conv2 = nn.Conv2d(6, 16, 3)

    self.batch = nn.BatchNorm2d(6, affine=True)

# normalize filter values to 0-1 so we can visualize them
def NormalizeImg(self, img):
    nimg = (img - img.min()) / (img.max() - img.min())
    return nimg

def show_MNIST(self, img):
    grid    = torchvision.utils.make_grid(img)
    trimg   = grid.numpy().transpose(1, 2, 0)
    plt.imshow(trimg)
    plt.title('Batch from dataloader')
    plt.axis('off')
    plt.show()

def forward(self, data):
    self.show_MNIST(data)

    data = self.conv1(data)
    for i in range(data.size(1)):
       nimg = self.NormalizeImg(data[0, i])
       self.show_MNIST(nimg)

    data = self.batch(data)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    # Max pooling over a (2, 2) window
    data = tnf.relu(data)
    data = tnf.max_pool2d(data, (2,2))
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    # If the size is a square you can only specify a single number
    data = self.conv2(data)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    data = tnf.relu(data)
    data = tnf.max_pool2d(data, 2)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    return data

class ConvNet(nn.Module):
def init(self):
super(ConvNet, self).init()
# 1 input image channel, 6 output features, 3x3 square convolution kernel
self.conv1 = nn.Conv2d(1, 6, 3)

    # 6 input images channel, 16 output features, 3x3 square convolution kernel
    self.conv2 = nn.Conv2d(6, 16, 3)

    self.batch = nn.BatchNorm2d(6, affine=True)

# normalize filter values to 0-1 so we can visualize them
def NormalizeImg(self, img):
    nimg = (img - img.min()) / (img.max() - img.min())
    return nimg

def show_MNIST(self, img):
    grid    = torchvision.utils.make_grid(img)
    trimg   = grid.numpy().transpose(1, 2, 0)
    plt.imshow(trimg)
    plt.title('Batch from dataloader')
    plt.axis('off')
    plt.show()

def forward(self, data):
    self.show_MNIST(data)

    data = self.conv1(data)
    for i in range(data.size(1)):
       nimg = self.NormalizeImg(data[0, i])
       self.show_MNIST(nimg)

    data = self.batch(data)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    # Max pooling over a (2, 2) window
    data = tnf.relu(data)
    data = tnf.max_pool2d(data, (2,2))
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    # If the size is a square you can only specify a single number
    data = self.conv2(data)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    data = tnf.relu(data)
    data = tnf.max_pool2d(data, 2)
    for i in range(data.size(1)):
        nimg = self.NormalizeImg(data[0, i])
        self.show_MNIST(nimg)

    return data

#refer to 理解PyTorch的第一个例子 - 知乎
def example3():
train_set = datasets.MNIST(‘data’, train=True, download=True, transform=transforms.ToTensor())
trainnum = len(train_set)
indices = list(range(trainnum))
train_idx = indices[0:]
train_sampler = SubsetRandomSampler(train_idx)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=args.batch_size, sampler=train_sampler, **kwargs)

data, label     = train_loader.__iter__().next()
data            = data.to(device)
conv_net        = ConvNet()
data            = conv_net(data)
print(data)

@ptrblck :could please help me out with below issue
my model:
import os
import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
#from modules import *
from sklearn.model_selection import train_test_split
import pickle

os.environ[“CUDA_DEVICE_ORDER”] = “PCI_BUS_ID”

os.environ[“CUDA_VISIBLE_DEVICES”] = “0”

EncoderCNN architecture

CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 1024
CNN_embed_dim = 256 # latent dim extracted by 2D CNN
res_size = 224 # ResNet image size
dropout_p = 0.2 # dropout probability

training parameters

epochs = 1 # training epochs
batch_size = 64
learning_rate = 1e-3
log_interval = 10 # interval for displaying training info

save model

save_model_path = ‘./results_MNIST’

def check_mkdir(dir_name):
if not os.path.exists(dir_name):
os.mkdir(dir_name)

def loss_function(recon_x, x, mu, logvar):
# MSE = F.mse_loss(recon_x, x, reduction=‘sum’)
MSE = F.binary_cross_entropy(recon_x, x, reduction=‘sum’)
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return MSE + KLD

def train(log_interval, model, device, train_loader, optimizer, epoch):
# set model as training mode
model.train()

losses = []
all_y, all_z, all_mu, all_logvar = [], [], [], []
N_count = 0   # counting total trained sample in one epoch
for batch_idx, (X, y) in enumerate(train_loader):
    # distribute data to device
    X, y = X.to(device), y.to(device).view(-1, )
    N_count += X.size(0)

    optimizer.zero_grad()
    X_reconst, z, mu, logvar  = model(X)  # VAE
    loss = loss_function(X_reconst, X, mu, logvar)
    losses.append(loss.item())  

    loss.backward()
    optimizer.step()

    all_y.extend(y.data.cpu().numpy())
    all_z.extend(z.data.cpu().numpy())
    all_mu.extend(mu.data.cpu().numpy())
    all_logvar.extend(logvar.data.cpu().numpy())

    # show information
    if (batch_idx + 1) % log_interval == 0:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))

all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)

# save Pytorch models of best record
torch.save(model.state_dict(), os.path.join(save_model_path, 'model_epoch{}.pth'.format(epoch + 1)))  # save motion_encoder
torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1)))      # save optimizer
print("Epoch {} model saved!".format(epoch + 1))


return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, losses

def validation(model, device, optimizer, test_loader):
# set model as testing mode
model.eval()

test_loss = 0
all_y, all_z, all_mu, all_logvar = [], [], [], []
with torch.no_grad():
    for X, y in test_loader:
        # distribute data to device
        X, y = X.to(device), y.to(device).view(-1, )
        X_reconst, z, mu, logvar = model(X)

        loss = loss_function(X_reconst, X, mu, logvar)
        test_loss += loss.item()  # sum up batch loss

        all_y.extend(y.data.cpu().numpy())
        all_z.extend(z.data.cpu().numpy())
        all_mu.extend(mu.data.cpu().numpy())
        all_logvar.extend(logvar.data.cpu().numpy())

test_loss /= len(test_loader.dataset)
all_y = np.stack(all_y, axis=0)
all_z = np.stack(all_z, axis=0)
all_mu = np.stack(all_mu, axis=0)
all_logvar = np.stack(all_logvar, axis=0)

# show information
print('\nTest set ({:d} samples): Average loss: {:.4f}\n'.format(len(test_loader.dataset), test_loss))
return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, test_loss

Detect devices

use_cuda = torch.cuda.is_available() # check if GPU exists
device = torch.device(“cuda” if use_cuda else “cpu”) # use CPU or GPU

Data loading parameters

params = {‘batch_size’: batch_size, ‘shuffle’: True, ‘num_workers’: 4, ‘pin_memory’: True} if use_cuda else {}
transform = transforms.Compose([transforms.Resize([res_size, res_size]),
transforms.ToTensor(),
transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # gray → GRB 3 channel (lambda function)
transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])]) # for grayscale images

MNIST dataset (images and labels)

MNIST_train_dataset = torchvision.datasets.MNIST(root=’/content’, train=True, transform=transform, download=True)
MNIST_test_dataset = torchvision.datasets.MNIST(root=’/content’, train=False, transform=transform)

Data loader (input pipeline)

train_loader = torch.utils.data.DataLoader(dataset=MNIST_train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=MNIST_test_dataset, batch_size=batch_size, shuffle=False)

Create model

resnet_vae = ResNet_VAE(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
#resnet_vae.fc2.register_forward_hook(get_activation(‘fc_hidden2’))
print(“Using”, torch.cuda.device_count(), “GPU!”)
model_params = list(resnet_vae.parameters())
optimizer = torch.optim.Adam(model_params, lr=learning_rate)

record training process

epoch_train_losses = []
epoch_test_losses = []
check_mkdir(save_model_path)

start training

for epoch in range(epochs):

# train, test model
X_train, y_train, z_train, mu_train, logvar_train, train_losses = train(log_interval, resnet_vae, device, train_loader, optimizer, epoch)
X_test, y_test, z_test, mu_test, logvar_test, epoch_test_loss = validation(resnet_vae, device, optimizer, valid_loader)

# save results
epoch_train_losses.append(train_losses)
epoch_test_losses.append(epoch_test_loss)


# save all train test results
A = np.array(epoch_train_losses)
C = np.array(epoch_test_losses)

np.save(os.path.join(save_model_path, 'ResNet_VAE_training_loss.npy'), A)
np.save(os.path.join(save_model_path, 'X_MNIST_train_epoch{}.npy'.format(epoch + 1)), X_train) #save last batch
np.save(os.path.join(save_model_path, 'y_MNIST_train_epoch{}.npy'.format(epoch + 1)), y_train)
np.save(os.path.join(save_model_path, 'z_MNIST_train_epoch{}.npy'.format(epoch + 1)), z_train)

Visualize feature maps

activation = {}
def get_activation(name):
def hook(resnet_vae, input, output):
activation[name] = output.detach()
return hook

#model.conv2.register_forward_hook(get_activation(‘conv2’))
resnet_vae.fc2.register_forward_hook(get_activation(‘fc_hidden2’))
data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
output = resnet_vae(data)

act = activation[‘fc2’].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
axarr[idx].imshow(act[idx])
error:RuntimeError: Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor) should be the same or input should be a MKLDNN tensor and weight is a dense tensor

Based on the error message, the input tensors are not on the GPU, while the model parameters are, so I guess you might have forgotten to push all inputs to the GPU.
Your code is currently a bit hard to read and you could format it by wrapping code snippets into three backticks ```.

@ptrblck below is my code thanks

import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
#from modules import *
from sklearn.model_selection import train_test_split
import pickle

# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"   
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 1024
CNN_embed_dim = 256     # latent dim extracted by 2D CNN
res_size = 224        # ResNet image size
dropout_p = 0.2       # dropout probability

# training parameters
epochs = 1  # training epochs
batch_size = 64
learning_rate = 1e-3
log_interval = 10   # interval for displaying training info



# save model
save_model_path = './results_MNIST'


def check_mkdir(dir_name):
    if not os.path.exists(dir_name):
        os.mkdir(dir_name)

def loss_function(recon_x, x, mu, logvar):
    # MSE = F.mse_loss(recon_x, x, reduction='sum')
    MSE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return MSE + KLD


def train(log_interval, model, device, train_loader, optimizer, epoch):
    # set model as training mode
    model.train()

    losses = []
    all_y, all_z, all_mu, all_logvar = [], [], [], []
    N_count = 0   # counting total trained sample in one epoch
    for batch_idx, (X, y) in enumerate(train_loader):
        # distribute data to device
        X, y = X.to(device), y.to(device).view(-1, )
        N_count += X.size(0)

        optimizer.zero_grad()
        X_reconst, z, mu, logvar  = model(X)  # VAE
        loss = loss_function(X_reconst, X, mu, logvar)
        losses.append(loss.item())  

        loss.backward()
        optimizer.step()

        all_y.extend(y.data.cpu().numpy())
        all_z.extend(z.data.cpu().numpy())
        all_mu.extend(mu.data.cpu().numpy())
        all_logvar.extend(logvar.data.cpu().numpy())

        # show information
        if (batch_idx + 1) % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))

    all_y = np.stack(all_y, axis=0)
    all_z = np.stack(all_z, axis=0)
    all_mu = np.stack(all_mu, axis=0)
    all_logvar = np.stack(all_logvar, axis=0)

    # save Pytorch models of best record
    torch.save(model.state_dict(), os.path.join(save_model_path, 'model_epoch{}.pth'.format(epoch + 1)))  # save motion_encoder
    torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1)))      # save optimizer
    print("Epoch {} model saved!".format(epoch + 1))


    return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, losses


def validation(model, device, optimizer, test_loader):
    # set model as testing mode
    model.eval()

    test_loss = 0
    all_y, all_z, all_mu, all_logvar = [], [], [], []
    with torch.no_grad():
        for X, y in test_loader:
            # distribute data to device
            X, y = X.to(device), y.to(device).view(-1, )
            X_reconst, z, mu, logvar = model(X)

            loss = loss_function(X_reconst, X, mu, logvar)
            test_loss += loss.item()  # sum up batch loss

            all_y.extend(y.data.cpu().numpy())
            all_z.extend(z.data.cpu().numpy())
            all_mu.extend(mu.data.cpu().numpy())
            all_logvar.extend(logvar.data.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    all_y = np.stack(all_y, axis=0)
    all_z = np.stack(all_z, axis=0)
    all_mu = np.stack(all_mu, axis=0)
    all_logvar = np.stack(all_logvar, axis=0)

    # show information
    print('\nTest set ({:d} samples): Average loss: {:.4f}\n'.format(len(test_loader.dataset), test_loss))
    return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, test_loss


# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU

# Data loading parameters
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 4, 'pin_memory': True} if use_cuda else {}
transform = transforms.Compose([transforms.Resize([res_size, res_size]),
                                transforms.ToTensor(),
                                transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # gray -> GRB 3 channel (lambda function)
                                transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])])  # for grayscale images

# MNIST dataset (images and labels)
MNIST_train_dataset = torchvision.datasets.MNIST(root='/content', train=True, transform=transform, download=True)
MNIST_test_dataset = torchvision.datasets.MNIST(root='/content', train=False, transform=transform)

# Data loader (input pipeline)
train_loader = torch.utils.data.DataLoader(dataset=MNIST_train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=MNIST_test_dataset, batch_size=batch_size, shuffle=False)

# Create model
resnet_vae = ResNet_VAE(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
#resnet_vae.fc2.register_forward_hook(get_activation('fc_hidden2'))
print("Using", torch.cuda.device_count(), "GPU!")
model_params = list(resnet_vae.parameters())
optimizer = torch.optim.Adam(model_params, lr=learning_rate)


# record training process
epoch_train_losses = []
epoch_test_losses = []
check_mkdir(save_model_path)

# start training
for epoch in range(epochs):

    # train, test model
    X_train, y_train, z_train, mu_train, logvar_train, train_losses = train(log_interval, resnet_vae, device, train_loader, optimizer, epoch)
    X_test, y_test, z_test, mu_test, logvar_test, epoch_test_loss = validation(resnet_vae, device, optimizer, valid_loader)

    # save results
    epoch_train_losses.append(train_losses)
    epoch_test_losses.append(epoch_test_loss)

    
    # save all train test results
    A = np.array(epoch_train_losses)
    C = np.array(epoch_test_losses)
    
    np.save(os.path.join(save_model_path, 'ResNet_VAE_training_loss.npy'), A)
    np.save(os.path.join(save_model_path, 'X_MNIST_train_epoch{}.npy'.format(epoch + 1)), X_train) #save last batch
    np.save(os.path.join(save_model_path, 'y_MNIST_train_epoch{}.npy'.format(epoch + 1)), y_train)
    np.save(os.path.join(save_model_path, 'z_MNIST_train_epoch{}.npy'.format(epoch + 1)), z_train)


# Visualize feature maps
activation = {}
def get_activation(name):
    def hook(resnet_vae, input, output):
        activation[name] = output.detach()
    return hook
    
#model.conv2.register_forward_hook(get_activation('conv2'))
resnet_vae.fc2.register_forward_hook(get_activation('fc_hidden2'))
data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
data = data.to('cuda')
output = resnet_vae(data)

act = activation['fc_hidden2'].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
    axarr[idx].imshow(act[idx]) ```

Thanks for the code! The model is unfortunately undefined, so the script it not executable.
Replacing the model with models.resnet18 works in the forward pass until an error is raised, as your script expects 4 return value, so could you update the code, please?

@ptrblck . i m trying to print output of last layer of encoder in vae

import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms


class Dataset(data.Dataset):
    "Characterizes a dataset for PyTorch"
    def __init__(self, filenames, labels, transform=None):
        "Initialization"
        self.filenames = filenames
        self.labels = labels
        self.transform = transform

    def __len__(self):
        "Denotes the total number of samples"
        return len(self.filenames)


    def __getitem__(self, index):
        "Generates one sample of data"
        # Select sample
        filename = self.filenames[index]
        X = Image.open(filename)

        if self.transform:
            X = self.transform(X)     # transform

        y = torch.LongTensor([self.labels[index]])
        return X, y

## ---------------------- end of Dataloaders ---------------------- ##

def conv2D_output_size(img_size, padding, kernel_size, stride):
    # compute output shape of conv2D
    outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
                np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int))
    return outshape

def convtrans2D_output_size(img_size, padding, kernel_size, stride):
    # compute output shape of conv2D
    outshape = ((img_size[0] - 1) * stride[0] - 2 * padding[0] + kernel_size[0],
                (img_size[1] - 1) * stride[1] - 2 * padding[1] + kernel_size[1])
    return outshape

## ---------------------- ResNet VAE ---------------------- ##

class ResNet_VAE(nn.Module):
    def __init__(self, fc_hidden1=1024, fc_hidden2=768, drop_p=0.3, CNN_embed_dim=256):
        super(ResNet_VAE, self).__init__()

        self.fc_hidden1, self.fc_hidden2, self.CNN_embed_dim = fc_hidden1, fc_hidden2, CNN_embed_dim

        # CNN architechtures
        self.ch1, self.ch2, self.ch3, self.ch4 = 16, 32, 64, 128
        self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3)      # 2d kernal size
        self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2)      # 2d strides
        self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0)  # 2d padding

        # encoding components
        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        self.fc1 = nn.Linear(resnet.fc.in_features, self.fc_hidden1)
        self.bn1 = nn.BatchNorm1d(self.fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
        self.bn2 = nn.BatchNorm1d(self.fc_hidden2, momentum=0.01)
        # Latent vectors mu and sigma
        self.fc3_mu = nn.Linear(self.fc_hidden2, self.CNN_embed_dim)      # output = CNN embedding latent variables
        self.fc3_logvar = nn.Linear(self.fc_hidden2, self.CNN_embed_dim)  # output = CNN embedding latent variables

        # Sampling vector
        self.fc4 = nn.Linear(self.CNN_embed_dim, self.fc_hidden2)
        self.fc_bn4 = nn.BatchNorm1d(self.fc_hidden2)
        self.fc5 = nn.Linear(self.fc_hidden2, 64 * 4 * 4)
        self.fc_bn5 = nn.BatchNorm1d(64 * 4 * 4)
        self.relu = nn.ReLU(inplace=True)

        # Decoder
        self.convTrans6 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=self.k4, stride=self.s4,
                               padding=self.pd4),
            nn.BatchNorm2d(32, momentum=0.01),
            nn.ReLU(inplace=True),
        )
        self.convTrans7 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=32, out_channels=8, kernel_size=self.k3, stride=self.s3,
                               padding=self.pd3),
            nn.BatchNorm2d(8, momentum=0.01),
            nn.ReLU(inplace=True),
        )

        self.convTrans8 = nn.Sequential(
            nn.ConvTranspose2d(in_channels=8, out_channels=3, kernel_size=self.k2, stride=self.s2,
                               padding=self.pd2),
            nn.BatchNorm2d(3, momentum=0.01),
            nn.Sigmoid()    # y = (y1, y2, y3) \in [0 ,1]^3
        )


    def encode(self, x):
        self.x = self.resnet(x)  # ResNet
        self.x = self.x.view(self.x.size(0), -1)  # flatten output of conv

        # FC layers
        self.x = self.bn1(self.fc1(self.x))
        self.x = self.relu(self.x)
        self.x = self.bn2(self.fc2(self.x))
        self.x = self.relu(self.x)
        # x = F.dropout(x, p=self.drop_p, training=self.training)
        mu, logvar = self.fc3_mu(self.x), self.fc3_logvar(self.x)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = Variable(std.data.new(std.size()).normal_())
            return eps.mul(std).add_(mu)
        else:
            return mu

    def decode(self, z):
        x = self.relu(self.fc_bn4(self.fc4(z)))
        x = self.relu(self.fc_bn5(self.fc5(x))).view(-1, 64, 4, 4)
        x = self.convTrans6(x)
        x = self.convTrans7(x)
        x = self.convTrans8(x)
        x = F.interpolate(x, size=(224, 224), mode='bilinear')
        return x

 def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_reconst = self.decode(z)

        return x_reconst, z, mu, logvar
import os
import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
import torch.utils.data as data
import torchvision
from torch.autograd import Variable
import matplotlib.pyplot as plt
#from modules import *
from sklearn.model_selection import train_test_split
import pickle
import os
import numpy as np
from PIL import Image
from torch.utils import data
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torch.autograd import Variable
import torchvision.transforms as transforms




# EncoderCNN architecture
CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 1024
CNN_embed_dim = 256     # latent dim extracted by 2D CNN
res_size = 224        # ResNet image size
dropout_p = 0.2       # dropout probability

# training parameters
epochs = 1  # training epochs
batch_size = 64
learning_rate = 1e-3
log_interval = 10   # interval for displaying training info



# save model
save_model_path = './results_MNIST'


def check_mkdir(dir_name):
    if not os.path.exists(dir_name):
        os.mkdir(dir_name)

def loss_function(recon_x, x, mu, logvar):
    # MSE = F.mse_loss(recon_x, x, reduction='sum')
    MSE = F.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return MSE + KLD


def train(log_interval, model, device, train_loader, optimizer, epoch):
    # set model as training mode
    model.train()

    losses = []
    all_y, all_z, all_mu, all_logvar = [], [], [], []
    N_count = 0   # counting total trained sample in one epoch
    for batch_idx, (X, y) in enumerate(train_loader):
        # distribute data to device
        X, y = X.to(device), y.to(device).view(-1, )
        N_count += X.size(0)

        optimizer.zero_grad()
        X_reconst, z, mu, logvar  = model(X)  # VAE
        loss = loss_function(X_reconst, X, mu, logvar)
        losses.append(loss.item())  

        loss.backward()
        optimizer.step()

        all_y.extend(y.data.cpu().numpy())
        all_z.extend(z.data.cpu().numpy())
        all_mu.extend(mu.data.cpu().numpy())
        all_logvar.extend(logvar.data.cpu().numpy())
        
        # show information
        if (batch_idx + 1) % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch + 1, N_count, len(train_loader.dataset), 100. * (batch_idx + 1) / len(train_loader), loss.item()))

    all_y = np.stack(all_y, axis=0)
    all_z = np.stack(all_z, axis=0)
    all_mu = np.stack(all_mu, axis=0)
    all_logvar = np.stack(all_logvar, axis=0)

    # save Pytorch models of best record
    torch.save(model.state_dict(), os.path.join(save_model_path, 'model_epoch{}.pth'.format(epoch + 1)))  # save motion_encoder
    torch.save(optimizer.state_dict(), os.path.join(save_model_path, 'optimizer_epoch{}.pth'.format(epoch + 1)))      # save optimizer
    print("Epoch {} model saved!".format(epoch + 1))


    return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, losses


def validation(model, device, optimizer, test_loader):
    # set model as testing mode
    model.eval()

    test_loss = 0
    all_y, all_z, all_mu, all_logvar = [], [], [], []
    with torch.no_grad():
        for X, y in test_loader:
            # distribute data to device
            X, y = X.to(device), y.to(device).view(-1, )
            X_reconst, z, mu, logvar = model(X)

            loss = loss_function(X_reconst, X, mu, logvar)
            test_loss += loss.item()  # sum up batch loss

            all_y.extend(y.data.cpu().numpy())
            all_z.extend(z.data.cpu().numpy())
            all_mu.extend(mu.data.cpu().numpy())
            all_logvar.extend(logvar.data.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    all_y = np.stack(all_y, axis=0)
    all_z = np.stack(all_z, axis=0)
    all_mu = np.stack(all_mu, axis=0)
    all_logvar = np.stack(all_logvar, axis=0)

    # show information
    print('\nTest set ({:d} samples): Average loss: {:.4f}\n'.format(len(test_loader.dataset), test_loss))
    return X.data.cpu().numpy(), all_y, all_z, all_mu, all_logvar, test_loss


# Detect devices
use_cuda = torch.cuda.is_available()                   # check if GPU exists
device = torch.device("cuda" if use_cuda else "cpu")   # use CPU or GPU

# Data loading parameters
params = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 4, 'pin_memory': True} if use_cuda else {}
transform = transforms.Compose([transforms.Resize([res_size, res_size]),
                                transforms.ToTensor(),
                                transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # gray -> GRB 3 channel (lambda function)
                                transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0])])  # for grayscale images

# MNIST dataset (images and labels)
MNIST_train_dataset = torchvision.datasets.MNIST(root='/content', train=True, transform=transform, download=True)
MNIST_test_dataset = torchvision.datasets.MNIST(root='/content', train=False, transform=transform)

# Data loader (input pipeline)
train_loader = torch.utils.data.DataLoader(dataset=MNIST_train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=MNIST_test_dataset, batch_size=batch_size, shuffle=False)

# Create model
resnet_vae = ResNet_VAE(fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2, drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim).to(device)
#resnet_vae.fc2.register_forward_hook(get_activation('fc_hidden2'))
print("Using", torch.cuda.device_count(), "GPU!")
print(resnet_vae.self.x)
model_params = list(resnet_vae.parameters())
optimizer = torch.optim.Adam(model_params, lr=learning_rate)


# record training process
epoch_train_losses = []
epoch_test_losses = []
check_mkdir(save_model_path)

# start training
for epoch in range(epochs):

    # train, test model
    X_train, y_train, z_train, mu_train, logvar_train, train_losses = train(log_interval, resnet_vae, device, train_loader, optimizer, epoch)
    X_test, y_test, z_test, mu_test, logvar_test, epoch_test_loss = validation(resnet_vae, device, optimizer, valid_loader)

    # save results
    epoch_train_losses.append(train_losses)
    epoch_test_losses.append(epoch_test_loss)

    
    # save all train test results
    A = np.array(epoch_train_losses)
    C = np.array(epoch_test_losses)
    
    np.save(os.path.join(save_model_path, 'ResNet_VAE_training_loss.npy'), A)
    np.save(os.path.join(save_model_path, 'X_MNIST_train_epoch{}.npy'.format(epoch + 1)), X_train) #save last batch
    np.save(os.path.join(save_model_path, 'y_MNIST_train_epoch{}.npy'.format(epoch + 1)), y_train)
    np.save(os.path.join(save_model_path, 'z_MNIST_train_epoch{}.npy'.format(epoch + 1)), z_train)


# Visualize feature maps
activation = {}
def get_activation(name):
    def hook(resnet_vae, input, output):
        activation[name] = output.detach()
    return hook
    
#model.conv2.register_forward_hook(get_activation('conv2'))
resnet_vae.x.register_forward_hook(get_activation('x'))
data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
output = resnet_vae(data)

act = activation['x'].squeeze()
fig, axarr = plt.subplots(act.size(0))
for idx in range(act.size(0)):
    print(act.size(0)) 
    axarr[idx].imshow(act[idx]) ```
AttributeError: 'ResNet_VAE' object has no attribute 'self'

@ptrblck could you please guide me for above code

Thanks for the update. The code was unfortunately still not executable, so you would need to change a few issues first:

  • fix the indentation of the forward method
  • remove print(resnet_vae.self.x), as resnet_vae.self is undefined
  • remove resnet_vae.x.register_forward_hook(get_activation('x')), as resnet_vae.x is undefined

After these fixes, the device error is raised in:

data, _ = MNIST_train_dataset[0]
data.unsqueeze_(0)
output = resnet_vae(data)

since data is on the CPU, while the model parameters are on the GPU, so you need to push this tensor to the GPU first.

thanks for the reply
if i remove resnet_vae.x.register_forward_hook(get_activation('x')) , as resnet_vae.x is undefined

how I can visualize the last layer output of the encoder which is x in this code.

please guide me how to visualize last layer of encoder output in vae