Multi model DataParallel

Hi.
I’m trying to train Deep Ensembles using PyTorch.

My model is has this structure.

class ResNetDeepEnsembles1d(nn.Module):
    def __init__(self, input_dim, blocks_dim, resblock_repeat, num_models, n_classes, kernel_size=17, dropout_rate=0.8):
        super(ResNetDeepEnsembles1d, self).__init__()
        self.num_models = num_models
        # Ensembles
        for i in range(self.num_models):
            model = ResNet1d(input_dim=input_dim,
                        blocks_dim=blocks_dim, 
                        resblock_repeat=resblock_repeat, 
                        n_classes=n_classes, 
                        kernel_size=kernel_size, 
                        dropout_rate=dropout_rate)
            setattr(self, f'model_{str(i)}',  model)

    def forward(self, x):
        mus, sigmas = [], []
        for i in range(self.num_models):
            model = getattr(self, f'model_{str(i)}')
            mu, sigma = model(x)
            mus.append(mu)
            sigmas.append(sigma)

        mus = torch.stack(mus)
        mu = mus.mean(dim=0)
        sigmas = torch.stack(sigmas)
        sigma = (sigmas + mus.pow(2)).mean(dim=0) - mu.pow(2)

        return mu, sigma

And here is my train script.

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

for ep in range(start_epoch, config.epochs):
        train_total_loss = np.zeros(setup['num_models'])
        train_n_entries = np.zeros(setup['num_models'])
        train_desc = "Epoch {:2d}: train - Model {:2d} Loss: {:.6f}"
        train_bar = tqdm(initial=0, leave=True, total=len(train_loader), 
                        desc=train_desc.format(ep, 0, 0), position=0)

        for traces, ages, weights in train_loader:
            traces = traces.transpose(1, 2).float()
            for i in range(setup['num_models']):
                model = getattr(resnet, f'model_{str(i)}')
                model.train()
        
                model = nn.DataParallel(model)
                model.to(device)
                traces, ages, weights = traces.to(device), ages.to(device), weights.to(device)
                
                model.zero_grad()

                pred_ages, pred_sigma = model(traces)
                loss = compute_loss(ages, pred_ages, pred_sigma, weights)

                loss.backward()

                optimizers[i].step()

                bs = len(traces)
                train_total_loss[i] += loss.detach().cpu().numpy()
                train_n_entries[i] += bs

                train_bar.desc = train_desc.format(ep, i+1, train_total_loss[i] / train_n_entries[i])
                train_bar.update(1)
        train_loss = train_total_loss / train_n_entries

I want to make sure that each model is trained with same data batch input.
But this keeps giving this error.

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:1 and cuda:0! (when checking argument for argument weight in method wrapper__cudnn_convolution)

I’m on this error for weeks, please help :frowning:

Thanks.

I guess something is wrong in these lines:

                model = getattr(resnet, f'model_{str(i)}')
                model.train()
        
                model = nn.DataParallel(models)
                model.to(device)

as models is undefined or a global variable while I assume that you want to wrap model into nn.DataParallel.
Are you also calling to(device) somewhere inside the model’s __init__ or forward method? If so, remove it and let nn.DataParallel move the modules to the right device.

Sorry, it was a mistypo.
I don’t call to(device) inside the model’s init.

Could you post the missing model definitions as well as the input shapes so that I could execute and debug the code?

This is the whole model codes.

import os
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch

"""
https://towardsdev.com/implement-resnet-with-pytorch-a9fb40a77448
making various layer ResNet -> 18, 34, 50, 101, 152
"""


def _padding(downsample, kernel_size):
    """Compute required padding"""
    padding = max(0, int(np.floor((kernel_size - downsample + 1) / 2)))
    return padding


def _downsample(n_samples_in, n_samples_out):
    """Compute downsample rate"""
    downsample = int(n_samples_in // n_samples_out)
    if downsample < 1:
        raise ValueError("Number of samples should always decrease")
    if n_samples_in % n_samples_out != 0:
        raise ValueError("Number of samples for two consecutive blocks "
                         "should always decrease by an integer factor.")
    return downsample

class ResBlock1d(nn.Module):
    """Residual network unit for unidimensional signals."""

    def __init__(self, n_filters_in, n_filters_out, downsample, kernel_size, dropout_rate):
        if kernel_size % 2 == 0:
            raise ValueError("The current implementation only support odd values for `kernel_size`.")
        super(ResBlock1d, self).__init__()
        # Forward path
        padding = _padding(1, kernel_size)
        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size, padding=padding, bias=False)
        self.bn1 = nn.BatchNorm1d(n_filters_out)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)
        padding = _padding(downsample, kernel_size)
        self.conv2 = nn.Conv1d(n_filters_out, n_filters_out, kernel_size,
                               stride=downsample, padding=padding, bias=False)
        self.bn2 = nn.BatchNorm1d(n_filters_out)
        self.dropout2 = nn.Dropout(dropout_rate)

        # Skip connection
        skip_connection_layers = []
        # Deal with downsampling
        if downsample > 1:
            maxpool = nn.MaxPool1d(downsample, stride=downsample)
            skip_connection_layers += [maxpool]
        # Deal with n_filters dimension increase
        if n_filters_in != n_filters_out:
            conv1x1 = nn.Conv1d(n_filters_in, n_filters_out, 1, bias=False)
            skip_connection_layers += [conv1x1]
        # Build skip conection layer
        if skip_connection_layers:
            self.skip_connection = nn.Sequential(*skip_connection_layers)
        else:
            self.skip_connection = None

    def forward(self, x, y):
        """Residual unit."""
        if self.skip_connection is not None:
            y = self.skip_connection(y)
        else:
            y = y
        # 1st layer
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout1(x)

        # 2nd layer
        x = self.conv2(x)
        x += y  # Sum skip connection and main connection
        y = x
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout2(x)
        return x, y

        
class ResBottleneckBlock1d(nn.Module):
    """Residual network unit for unidimensional signals."""

    def __init__(self, n_filters_in, n_filters_out, downsample, kernel_size, dropout_rate):
        if kernel_size % 2 == 0:
            raise ValueError("The current implementation only support odd values for `kernel_size`.")
        super(ResBottleneckBlock1d, self).__init__()
        # Forward path
        padding = _padding(1, kernel_size)
        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out//4, kernel_size, padding=padding, bias=False)
        self.bn1 = nn.BatchNorm1d(n_filters_out//4)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout_rate)

        padding = _padding(downsample, kernel_size)
        self.conv2 = nn.Conv1d(n_filters_out//4, n_filters_out//4, kernel_size,
                               stride=downsample, padding=padding, bias=False)
        self.bn2 = nn.BatchNorm1d(n_filters_out//4)
        self.dropout2 = nn.Dropout(dropout_rate)

        padding = _padding(1, kernel_size)
        self.conv3 = nn.Conv1d(n_filters_out//4, n_filters_out, kernel_size, padding=padding, bias=False)
        self.bn3 = nn.BatchNorm1d(n_filters_out)
        self.dropout3 = nn.Dropout(dropout_rate)

        # Skip connection
        skip_connection_layers = []
        # Deal with downsampling
        if downsample > 1:
            maxpool = nn.MaxPool1d(downsample, stride=downsample)
            skip_connection_layers += [maxpool]
        # Deal with n_filters dimension increase
        if n_filters_in != n_filters_out:
            conv1x1 = nn.Conv1d(n_filters_in, n_filters_out, 1, bias=False)
            skip_connection_layers += [conv1x1]
        # Build skip conection layer
        if skip_connection_layers:
            self.skip_connection = nn.Sequential(*skip_connection_layers)
        else:
            self.skip_connection = None

    def forward(self, x, y):
        """Residual unit."""
        if self.skip_connection is not None:
            y = self.skip_connection(y)
        else:
            y = y
        # 1st layer
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout1(x)

        # 2nd layer
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.dropout2(x)

        # 3rd layer
        x = self.conv3(x)
        x += y  # Sum skip connection and main connection
        y = x
        x = self.bn3(x)
        x = self.relu(x)
        x = self.dropout3(x)
        return x, y


class ResNet1d(nn.Module):
    """Residual network for unidimensional signals.
    Parameters
    ----------
    input_dim : tuple
        Input dimensions. Tuple containing dimensions for the neural network
        input tensor. Should be like: ``(n_filters, n_samples)``.
    blocks_dim : list of tuples
        Dimensions of residual blocks.  The i-th tuple should contain the dimensions
        of the output (i-1)-th residual block and the input to the i-th residual
        block. Each tuple shoud be like: ``(n_filters, n_samples)``. `n_samples`
        for two consecutive samples should always decrease by an integer factor.
    dropout_rate: float [0, 1), optional
        Dropout rate used in all Dropout layers. Default is 0.8
    kernel_size: int, optional
        Kernel size for convolutional layers. The current implementation
        only supports odd kernel sizes. Default is 17.
    References
    ----------
    .. [1] K. He, X. Zhang, S. Ren, and J. Sun, "Identity Mappings in Deep Residual Networks,"
           arXiv:1603.05027, Mar. 2016. https://arxiv.org/pdf/1603.05027.pdf.
    .. [2] K. He, X. Zhang, S. Ren, and J. Sun, "Deep Residual Learning for Image Recognition," in 2016 IEEE Conference
           on Computer Vision and Pattern Recognition (CVPR), 2016, pp. 770-778. https://arxiv.org/pdf/1512.03385.pdf
    """

    def __init__(self, input_dim, blocks_dim, resblock_repeat, n_classes, kernel_size=17, dropout_rate=0.8):
        super(ResNet1d, self).__init__()
        # First layers
        n_filters_in, n_filters_out = input_dim[0], blocks_dim[0][0] #8, 64
        n_samples_in, n_samples_out = input_dim[1], blocks_dim[0][1]
        downsample = _downsample(n_samples_in, n_samples_out)
        padding = _padding(downsample, kernel_size)
        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size, bias=False,
                               stride=downsample, padding=padding)
        self.bn1 = nn.BatchNorm1d(n_filters_out)
        self.relu = nn.ReLU()

        # Residual block layers
        self.res_blocks = []
        for i, (n_filters, n_samples) in enumerate(blocks_dim):
            n_filters_in, n_filters_out = n_filters_out, n_filters
            n_samples_in, n_samples_out = n_samples_out, n_samples
            downsample = _downsample(n_samples_in, n_samples_out)
            resblk1d = ResBottleneckBlock1d(n_filters_in, n_filters_out, downsample, kernel_size, dropout_rate)
            self.add_module(f'resblock1d_{i}_1', resblk1d)
            for n in range(1, resblock_repeat[i]):
                self.add_module(f'resblock1d_{i}_{n+1}', ResBottleneckBlock1d(n_filters_out, n_filters_out, downsample, kernel_size, dropout_rate))
            self.res_blocks += [resblk1d]

        n_filters_last, n_samples_last = blocks_dim[-1]
        last_layer_dim = n_filters_last * n_samples_last #2048*20
        self.lin = nn.Linear(last_layer_dim, n_classes+1)
        self.n_blk = len(blocks_dim)

    def forward(self, x):
        """Implement ResNet1d forward propagation"""
        # First layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        # Residual blocks
        y = x
        for blk in self.res_blocks:
            x, y = blk(x, y)

        # Flatten array
        x = x.view(x.size(0), -1) #shape == [batch_size, 6400]
        
        # Fully conected layer
        x = self.lin(x)
        
        mu, sigma = torch.split(x, 1, dim=-1)
        sigma = F.softplus(sigma) + 1e-6 #log(1+exp(sigma))
        
        return mu, sigma


class ResNetDeepEnsembles1d(nn.Module):
    """Residual network for unidimensional signals.
    Parameters
    ----------
    input_dim : tuple
        Input dimensions. Tuple containing dimensions for the neural network
        input tensor. Should be like: ``(n_filters, n_samples)``.
    blocks_dim : list of tuples
        Dimensions of residual blocks.  The i-th tuple should contain the dimensions
        of the output (i-1)-th residual block and the input to the i-th residual
        block. Each tuple shoud be like: ``(n_filters, n_samples)``. `n_samples`
        for two consecutive samples should always decrease by an integer factor.
    block_repeat : list
        Number of repeats in each block. 
    num_models : int
        Number of ensembles. Default: 5
    dropout_rate: float [0, 1), optional
        Dropout rate used in all Dropout layers. Default is 0.8
    kernel_size: int, optional
        Kernel size for convolutional layers. The current implementation
        only supports odd kernel sizes. Default is 17.
    References
    ----------
    .. [1] K. He, X. Zhang, S. Ren, and J. Sun, "Identity Mappings in Deep Residual Networks,"
           arXiv:1603.05027, Mar. 2016. https://arxiv.org/pdf/1603.05027.pdf.
    .. [2] K. He, X. Zhang, S. Ren, and J. Sun, "Deep Residual Learning for Image Recognition," in 2016 IEEE Conference
           on Computer Vision and Pattern Recognition (CVPR), 2016, pp. 770-778. https://arxiv.org/pdf/1512.03385.pdf
    """

    def __init__(self, input_dim, blocks_dim, resblock_repeat, num_models, n_classes, kernel_size=17, dropout_rate=0.8):
        super(ResNetDeepEnsembles1d, self).__init__()
        self.num_models = num_models
        # Ensembles
        for i in range(self.num_models):
            model = ResNet1d(input_dim=input_dim,
                        blocks_dim=blocks_dim, 
                        resblock_repeat=resblock_repeat, 
                        n_classes=n_classes, 
                        kernel_size=kernel_size, 
                        dropout_rate=dropout_rate)
            setattr(self, f'model_{str(i)}',  model)

    def forward(self, x):
        mus, sigmas = [], []
        for i in range(self.num_models):
            model = getattr(self, f'model_{str(i)}')
            mu, sigma = model(x)
            mus.append(mu)
            sigmas.append(sigma)

        mus = torch.stack(mus)
        mu = mus.mean(dim=0)
        sigmas = torch.stack(sigmas)
        sigma = (sigmas + mus.pow(2)).mean(dim=0) - mu.pow(2)

        return mu, sigma

Most parts of script

import json
import torch
import os
from tqdm import tqdm
from models.resnet import ResNet1d, ResNetDeepEnsembles1d
from data.CustomDataset import CustomDataset
import torch.optim as optim
import numpy as np

# Deep Ensembles a.k.a ensembles NLL
def compute_loss(ages, pred_ages, pred_sigma, weights): 
  loss = (0.5*torch.log(pred_sigma.flatten()) + 0.5*((ages.flatten() - pred_ages.flatten()).pow(2))/pred_sigma.flatten()) + 1e-6
  weight_loss = torch.sum(weights.flatten() * loss)
  return weight_loss


if __name__ == "__main__":
    import pandas as pd
    import argparse
    import yaml
    from torch.utils.data import DataLoader
    from warnings import warn
    import wandb
    import torch.nn as nn

    torch.manual_seed(42)
    
    # Set device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    tqdm.write("Building data loaders...")
    # Get csv data
    # train
    train_df = pd.read_csv(data['path_to_train_csv'])
    train_ages = np.array(train_df[data['age_col']])
    # valid
    valid_df = pd.read_csv(data['path_to_valid_csv'])
    valid_ages = np.array(valid_df[data['age_col']])

    # weights; must be done all together
    whole_ages = np.append(train_ages, valid_ages)
    weights = compute_weights(whole_ages)

    # Dataset and Dataloader
    train_dataset = CustomDataset(train_ages, weights[:len(train_ages)], data['path_to_train_traces'])
    train_loader = DataLoader(dataset=train_dataset, num_workers=1, batch_size=128, shuffle=True, drop_last=False)
    valid_dataset = CustomDataset(valid_ages, weights[len(train_ages):], data['path_to_valid_traces'])
    valid_loader = DataLoader(dataset=valid_dataset, num_workers=1, batch_size=128, shuffle=False, drop_last=False)
    tqdm.write("Done!")

    tqdm.write("Define model...")
    N_LEADS = 8  # the 8 leads
    N_CLASSES = 1  # just the age
    net_filter_size = [256, 512, 1024, 2048] 
    net_seq_length = [5120, 1280, 320, 80]
    resblock_repeat = [3, 4, 6, 3]
    num_models = 5

    resnet = ResNetDeepEnsembles1d(input_dim=(N_LEADS, 5120),
                blocks_dim=list(zip(net_filter_size, net_seq_length)),
                resblock_repeat=resblock_repeat], 
                num_models=num_models, 
                n_classes=N_CLASSES,
                kernel_size=17,
                dropout_rate=0.8)     
    tqdm.write("Done!")

    tqdm.write("Define optimizer...")
    optimizers = []
    for i in range(num_models ):
        model = getattr(resnet, f'model_{str(i)}')
        optimizers.append(optim.Adam(model.parameters(), 0.001))
    tqdm.write("Done!")

    tqdm.write("Define scheduler...")

    schedulers = []
    for i in range(num_models ):
        model = getattr(resnet, f'model_{str(i)}')
        schedulers.append(optim.lr_scheduler.ReduceLROnPlateau(optimizers[i], patience=7,
                                                     min_lr=0.1 * 0.0000001,
                                                     factor=0.1))
    tqdm.write("Done!")
    
    tqdm.write("Training...")
    start_epoch = 0
    best_loss = np.full(num_models, np.Inf)
    history = pd.DataFrame(columns=['epoch', 'train_loss', 'valid_loss', 'lr',
                                    'weighted_rmse', 'weighted_mae', 'rmse', 'mse'])

    for ep in range(start_epoch, 50):
        train_total_loss = np.zeros(num_models)
        train_n_entries = np.zeros(num_models)
        train_desc = "Epoch {:2d}: train - Model {:2d} Loss: {:.6f}"
        train_bar = tqdm(initial=0, leave=True, total=len(train_loader), 
                        desc=train_desc.format(ep, 0, 0), position=0)

        for traces, ages, weights in train_loader:
            traces = traces.transpose(1, 2).float()
            for i in range(num_models):
                model = getattr(resnet, f'model_{str(i)}')
                #model.train()
        
                model = nn.DataParallel(model)
                model.to(device)
                traces, ages, weights = traces.to(device), ages.to(device), weights.to(device)
                
                model.zero_grad()

                pred_ages, pred_sigma = model(traces)
                loss = compute_loss(ages, pred_ages, pred_sigma, weights)

                loss.backward()

                optimizers[i].step()

                bs = len(traces)
                train_total_loss[i] += loss.detach().cpu().numpy()
                train_n_entries[i] += bs

                train_bar.desc = train_desc.format(ep, i+1, train_total_loss[i] / train_n_entries[i])
                train_bar.update(1)
        print(train_total_loss, train_n_entries)
        train_loss = train_total_loss / train_n_entries

The input shape is (N_batch, 5120, 8).

Thank you.

Could you make the script executable by defining the input arguments etc., please?

Sorry, I updated the arguments. Think it’s executable. Is it okay?

Yes, after removing some undefined/unneeded parts, the code is executable, thanks.
The error is raised in self.res_blocks as you are defining it as a plain Python list which is not properly registering these modules in the parent.

            resblk1d = ResBottleneckBlock1d(n_filters_in, n_filters_out, downsample, kernel_size, dropout_rate)
            self.add_module(f'resblock1d_{i}_1', resblk1d)
            for n in range(1, resblock_repeat[i]):
                self.add_module(f'resblock1d_{i}_{n+1}', ResBottleneckBlock1d(n_filters_out, n_filters_out, downsample, kernel_size, dropout_rate))
            self.res_blocks += [resblk1d]

You would have to use an nn.ModuleList instead to register each submodule:

    ...
    self.res_blocks += [resblk1d]

self.res_blocks = nn.ModuleList(self.res_blocks)

which would fix the error.

However, check the logic of this code snippet again as it seems you are registering new modules via:

self.add_module(f'resblock1d_{i}_{n+1}', ResBottleneckBlock1d(n_filters_out, n_filters_out, downsample, kernel_size, dropout_rate))

but add the same resblk1d to the list, which sounds wrong.

1 Like

Hi, thanks to your advice, I fixed the plain Python list error!

Following this blog. I found out the proper usage of add_module.

        # Residual block layers
        self.res_blocks = []
        for i, (n_filters, n_samples) in enumerate(blocks_dim):
            n_filters_in, n_filters_out = n_filters_out, n_filters
            n_samples_in, n_samples_out = n_samples_out, n_samples
            downsample = _downsample(n_samples_in, n_samples_out)
            resblk1d = ResBottleneckBlock1d(n_filters_in, n_filters_out, downsample, kernel_size, dropout_rate)
            self.add_module(f'resblock1d_{i}_1', resblk1d)
            self.res_blocks += [resblk1d]
            for n in range(1, resblock_repeat[i]):
                downsample = 1
                resblk1d = ResBottleneckBlock1d(n_filters_out, n_filters_out, downsample, kernel_size, dropout_rate)
                self.add_module(f'resblock1d_{i}_{n+1}', resblk1d)
                self.res_blocks += [resblk1d]

        n_filters_last, n_samples_last = blocks_dim[-1]
        last_layer_dim = n_filters_last * n_samples_last #2048*20
        self.lin = nn.Linear(last_layer_dim, n_classes+1)
        self.n_blk = len(blocks_dim)

I fixed it this way! but still getting same error :frowning:

You didn’t fix the self.res_blocks usage as it’s still a list.
Check my previous post and use the nn.ModuleList as indicated in the posted code snippet.