How can I run 5 processes per GPU for three GPUs using DDP?

#https://github.com/miki998/3d_convolution_neural_net_MNET/blob/master/3DpytorchConv_example.ipynb

#importing the libraries
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import seaborn as sns
import pickle

#for reading and displaying images
from skimage.io import imread
import matplotlib.pyplot as plt

#for creating validation set
from sklearn.model_selection import train_test_split
#for evaluating the model
from sklearn.metrics import accuracy_score

from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score, precision_recall_fscore_support


#PyTorch libraries and modules
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import *
import h5py

from joblib import Parallel, delayed, parallel_backend
import torch.multiprocessing as mp

h5file = "testhdf5train.h5"

class_weights = h5py.File(h5file, mode='r')['class_weights0'][:]
num_classes = len(class_weights)

batch_size = 100

#Create CNN Model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        
        self.conv_layer1 = self._conv_layer_set(1, 32)
        self.conv_layer2 = self._conv_layer_set(32, 64)
        self.fc1 = nn.Linear(2**3*64, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.LeakyReLU()
        self.batch=nn.BatchNorm1d(128)
        self.drop=nn.Dropout(p=0.15)        
        
    def _conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv3d(in_c, out_c, kernel_size=(3, 3, 3), padding=1),
        nn.LeakyReLU(),
        nn.MaxPool3d((2, 2, 2)),
        )
        return conv_layer
    

    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.batch(out)
        out = self.drop(out)
        out = self.fc2(out)
        
        return out

#Definition of hyperparameters
num_epochs = 100

#Create CNN
model = CNNModel()
model = model.to('cuda')
#print(model)

#SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)

def lidar_classifier(i, y_test_queue, pred_queue):
    train_x = h5py.File(h5file, mode='r')['X_train' + str(i)][:]
    p = np.random.permutation(len(train_x))[:30000]
    test_x = h5py.File(h5file, mode='r')['X_test' + str(i)][:]
    q = np.random.permutation(len(test_x))[:6000]
    del train_x
    del test_x

    #Cross Entropy Loss
    error = nn.CrossEntropyLoss(weight=torch.tensor(h5py.File(h5file, mode='r')['class_weights' + str(i)][:]).to(torch.float32).cuda())
    
    train_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_train' + str(i)][:][p], -1, 1)).to(torch.float32)
    train_y = torch.tensor(h5py.File(h5file, mode='r')['y_train' + str(i)][:][p]).to(torch.float32).type(torch.LongTensor)
    test_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_test' + str(i)][:][q], -1, 1)).to(torch.float32)
    test_y = torch.tensor(h5py.File(h5file, mode='r')['y_test' + str(i)][:][q]).to(torch.float32).type(torch.LongTensor)

    #Pytorch train and test sets
    train = torch.utils.data.TensorDataset(train_x,train_y)
    test = torch.utils.data.TensorDataset(test_x,test_y)

    #Data loader
    train_loader = torch.utils.data.DataLoader(train, batch_size = batch_size, shuffle=False, num_workers=8, persistent_workers=True, drop_last=True)
    test_loader = torch.utils.data.DataLoader(test, batch_size = batch_size, shuffle=False, num_workers=8, persistent_workers=True, drop_last=True)

    #CNN model training
    total_count = 0
    count = 0
    loss_list = []
    iteration_list = []
    accuracy_list = []
    y_test_tmp = []
    pred_tmp = []
    for epoch in range(num_epochs):
        
        for images, labels in train_loader:
            
            train = Variable(images.view(batch_size,1,9,9,9)).cuda()
            labels = Variable(labels).cuda()
            #Clear gradients
            optimizer.zero_grad()
            #Forward propagation
            outputs = model(train)
            #Calculate softmax and cross entropy loss
            loss = error(outputs, labels)
            #Calculating gradients
            loss.backward()
            #Update parameters
            optimizer.step()
            
            count += 1
            if count % 50 == 0:
                #Calculate Accuracy         
                correct = 0
                total = 0
                #Iterate through test dataset
                for images, labels in test_loader:
                    
                    test = Variable(images.view(batch_size,1,9,9,9)).cuda()
                    labels = Variable(labels).cuda()
                    #Forward propagation
                    outputs = model(test)

                    #Get predictions from the maximum value
                    predicted = torch.max(outputs.data, 1)[1]
                    
                    #Total number of labels
                    total += len(labels)
                    correct += (predicted == labels).sum()
                
                accuracy = 100 * correct / float(total)
                scheduler.step(loss)
                #Store loss and iteration
                loss_list.append(loss.data.cpu())
                iteration_list.append(count)
                accuracy_list.append(accuracy.cpu())
        total_count += 1
        #Print Loss
        print('Epoch: {}  Loss: {}  Accuracy: {} %'.format(total_count, loss.data, accuracy))

    for images, labels in test_loader:                
        test = Variable(images.view(batch_size,1,9,9,9)).cuda()
        labels = Variable(labels).cuda()
        #Forward propagation
        outputs = model(test)
        #Get predictions from the maximum value
        predicted = torch.max(outputs.data, 1)[1]
        y_test_tmp.extend(labels.cpu().numpy())
        pred_tmp.extend(predicted.cpu().numpy())

    os.makedirs(os.path.dirname('model/'), exist_ok=True)
    torch.save(model.state_dict(), 'model/lidar_model' + str(i) + '.pth')
    y_test_queue.put(y_test_tmp)
    pred_queue.put(pred_tmp)

if __name__ == '__main__':
    acc_results = [];
    bac_results = [];
    gm_results = [];

    y_test = [];
    pred = [];

    mp = mp.get_context('spawn')
    y_test_queue = mp.Queue()
    pred_queue = mp.Queue()
    
    processes = []
    for i in range(5):
        p = mp.Process(target=lidar_classifier, args=(i, y_test_queue, pred_queue))
        p.start()
        processes.append(p)

    for i in range(5):
        y_test_tmp = y_test_queue.get()
        pred_tmp = pred_queue.get()
        
        y_test.extend(y_test_tmp)
        pred.extend(pred_tmp)

        acc_results.append(accuracy_score(y_test_tmp, pred_tmp))
        bac_results.append(balanced_accuracy_score(y_test_tmp, pred_tmp))

        recall = precision_recall_fscore_support(y_test_tmp, pred_tmp, labels=range(num_classes))[1]
        result = np.prod(recall)
        result = result**(1/len(recall))
        gm_results.append(result)

    # Join processes
    for p in processes:
        p.join()

    acc_mean = np.mean(acc_results)
    acc_stdev = np.std(acc_results)

    bac_mean = np.mean(bac_results)
    bac_stdev = np.std(bac_results)

    gm_mean = np.mean(gm_results)
    gm_stdev = np.std(gm_results)
    np.savetxt('metrics.txt', [acc_mean, acc_stdev, bac_mean, bac_stdev, gm_mean, gm_stdev], delimiter=',', fmt='%.2f')

    array = confusion_matrix(y_test, pred)
    np.savetxt('confmatrix.csv', array, delimiter=',')
    cm = pd.DataFrame(array, index = range(num_classes), columns = range(num_classes))
    plt.figure(figsize=(20,20))
    sns.heatmap(cm, annot=True,fmt='g')
    plt.savefig("confmatrix.png")
    plt.show()

However, this can only run on one GPU. I want to be able to equally use all three GPUs in my server.

If you want to keep the processes separate for each GPU, you could simple launch 3 scripts in your terminal via CUDA_VISIBLE_DEVICES=X python script.py args where X would correspond to [0 ,1, 2].

The DDP example shows otherwise how multi-processing can be used inside a script to launch a process per GPU.

1 Like

So I should equally split the data into three parts, one per GPU. I will try to figure out how to do this. Thanks, you definitely know a lot :smiley: