How can I run 5 processes per GPU for three GPUs using DDP?

#https://github.com/miki998/3d_convolution_neural_net_MNET/blob/master/3DpytorchConv_example.ipynb

#importing the libraries
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import seaborn as sns
import pickle

#for reading and displaying images
from skimage.io import imread
import matplotlib.pyplot as plt

#for creating validation set
from sklearn.model_selection import train_test_split
#for evaluating the model
from sklearn.metrics import accuracy_score

from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score, precision_recall_fscore_support


#PyTorch libraries and modules
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import *
import h5py

from joblib import Parallel, delayed, parallel_backend
import torch.multiprocessing as mp

h5file = "testhdf5train.h5"

class_weights = h5py.File(h5file, mode='r')['class_weights0'][:]
num_classes = len(class_weights)

batch_size = 100

#Create CNN Model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        
        self.conv_layer1 = self._conv_layer_set(1, 32)
        self.conv_layer2 = self._conv_layer_set(32, 64)
        self.fc1 = nn.Linear(2**3*64, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.LeakyReLU()
        self.batch=nn.BatchNorm1d(128)
        self.drop=nn.Dropout(p=0.15)        
        
    def _conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv3d(in_c, out_c, kernel_size=(3, 3, 3), padding=1),
        nn.LeakyReLU(),
        nn.MaxPool3d((2, 2, 2)),
        )
        return conv_layer
    

    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.batch(out)
        out = self.drop(out)
        out = self.fc2(out)
        
        return out

#Definition of hyperparameters
num_epochs = 100

#Create CNN
model = CNNModel()
model = model.to('cuda')
#print(model)

#SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)

def lidar_classifier(i, y_test_queue, pred_queue):
    train_x = h5py.File(h5file, mode='r')['X_train' + str(i)][:]
    p = np.random.permutation(len(train_x))[:30000]
    test_x = h5py.File(h5file, mode='r')['X_test' + str(i)][:]
    q = np.random.permutation(len(test_x))[:6000]
    del train_x
    del test_x

    #Cross Entropy Loss
    error = nn.CrossEntropyLoss(weight=torch.tensor(h5py.File(h5file, mode='r')['class_weights' + str(i)][:]).to(torch.float32).cuda())
    
    train_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_train' + str(i)][:][p], -1, 1)).to(torch.float32)
    train_y = torch.tensor(h5py.File(h5file, mode='r')['y_train' + str(i)][:][p]).to(torch.float32).type(torch.LongTensor)
    test_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_test' + str(i)][:][q], -1, 1)).to(torch.float32)
    test_y = torch.tensor(h5py.File(h5file, mode='r')['y_test' + str(i)][:][q]).to(torch.float32).type(torch.LongTensor)

    #Pytorch train and test sets
    train = torch.utils.data.TensorDataset(train_x,train_y)
    test = torch.utils.data.TensorDataset(test_x,test_y)

    #Data loader
    train_loader = torch.utils.data.DataLoader(train, batch_size = batch_size, shuffle=False, num_workers=8, persistent_workers=True, drop_last=True)
    test_loader = torch.utils.data.DataLoader(test, batch_size = batch_size, shuffle=False, num_workers=8, persistent_workers=True, drop_last=True)

    #CNN model training
    total_count = 0
    count = 0
    loss_list = []
    iteration_list = []
    accuracy_list = []
    y_test_tmp = []
    pred_tmp = []
    for epoch in range(num_epochs):
        
        for images, labels in train_loader:
            
            train = Variable(images.view(batch_size,1,9,9,9)).cuda()
            labels = Variable(labels).cuda()
            #Clear gradients
            optimizer.zero_grad()
            #Forward propagation
            outputs = model(train)
            #Calculate softmax and cross entropy loss
            loss = error(outputs, labels)
            #Calculating gradients
            loss.backward()
            #Update parameters
            optimizer.step()
            
            count += 1
            if count % 50 == 0:
                #Calculate Accuracy         
                correct = 0
                total = 0
                #Iterate through test dataset
                for images, labels in test_loader:
                    
                    test = Variable(images.view(batch_size,1,9,9,9)).cuda()
                    labels = Variable(labels).cuda()
                    #Forward propagation
                    outputs = model(test)

                    #Get predictions from the maximum value
                    predicted = torch.max(outputs.data, 1)[1]
                    
                    #Total number of labels
                    total += len(labels)
                    correct += (predicted == labels).sum()
                
                accuracy = 100 * correct / float(total)
                scheduler.step(loss)
                #Store loss and iteration
                loss_list.append(loss.data.cpu())
                iteration_list.append(count)
                accuracy_list.append(accuracy.cpu())
        total_count += 1
        #Print Loss
        print('Epoch: {}  Loss: {}  Accuracy: {} %'.format(total_count, loss.data, accuracy))

    for images, labels in test_loader:                
        test = Variable(images.view(batch_size,1,9,9,9)).cuda()
        labels = Variable(labels).cuda()
        #Forward propagation
        outputs = model(test)
        #Get predictions from the maximum value
        predicted = torch.max(outputs.data, 1)[1]
        y_test_tmp.extend(labels.cpu().numpy())
        pred_tmp.extend(predicted.cpu().numpy())

    os.makedirs(os.path.dirname('model/'), exist_ok=True)
    torch.save(model.state_dict(), 'model/lidar_model' + str(i) + '.pth')
    y_test_queue.put(y_test_tmp)
    pred_queue.put(pred_tmp)

if __name__ == '__main__':
    acc_results = [];
    bac_results = [];
    gm_results = [];

    y_test = [];
    pred = [];

    mp = mp.get_context('spawn')
    y_test_queue = mp.Queue()
    pred_queue = mp.Queue()
    
    processes = []
    for i in range(5):
        p = mp.Process(target=lidar_classifier, args=(i, y_test_queue, pred_queue))
        p.start()
        processes.append(p)

    for i in range(5):
        y_test_tmp = y_test_queue.get()
        pred_tmp = pred_queue.get()
        
        y_test.extend(y_test_tmp)
        pred.extend(pred_tmp)

        acc_results.append(accuracy_score(y_test_tmp, pred_tmp))
        bac_results.append(balanced_accuracy_score(y_test_tmp, pred_tmp))

        recall = precision_recall_fscore_support(y_test_tmp, pred_tmp, labels=range(num_classes))[1]
        result = np.prod(recall)
        result = result**(1/len(recall))
        gm_results.append(result)

    # Join processes
    for p in processes:
        p.join()

    acc_mean = np.mean(acc_results)
    acc_stdev = np.std(acc_results)

    bac_mean = np.mean(bac_results)
    bac_stdev = np.std(bac_results)

    gm_mean = np.mean(gm_results)
    gm_stdev = np.std(gm_results)
    np.savetxt('metrics.txt', [acc_mean, acc_stdev, bac_mean, bac_stdev, gm_mean, gm_stdev], delimiter=',', fmt='%.2f')

    array = confusion_matrix(y_test, pred)
    np.savetxt('confmatrix.csv', array, delimiter=',')
    cm = pd.DataFrame(array, index = range(num_classes), columns = range(num_classes))
    plt.figure(figsize=(20,20))
    sns.heatmap(cm, annot=True,fmt='g')
    plt.savefig("confmatrix.png")
    plt.show()

However, this can only run on one GPU. I want to be able to equally use all three GPUs in my server.

If you want to keep the processes separate for each GPU, you could simple launch 3 scripts in your terminal via CUDA_VISIBLE_DEVICES=X python script.py args where X would correspond to [0 ,1, 2].

The DDP example shows otherwise how multi-processing can be used inside a script to launch a process per GPU.

2 Likes

So I should equally split the data into three parts, one per GPU. I will try to figure out how to do this. Thanks, you definitely know a lot :smiley:

I used the DDP example on each subprocess. mp.set_start_method("spawn") helped me use mp.spawn on Linux too.

Revised code for now:

#https://github.com/miki998/3d_convolution_neural_net_MNET/blob/master/3DpytorchConv_example.ipynb

#importing the libraries
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import seaborn as sns
import pickle

#for reading and displaying images
from skimage.io import imread
import matplotlib.pyplot as plt

#for creating validation set
from sklearn.model_selection import train_test_split
#for evaluating the model
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, accuracy_score, balanced_accuracy_score, precision_recall_fscore_support

#PyTorch libraries and modules
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import *
import h5py

import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import socket

h5file = "testhdf5train.h5"

class_weights = h5py.File(h5file, mode='r')['class_weights0'][:]
num_classes = len(class_weights)

batch_size = 100

def find_free_port():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('', 0))
    port = sock.getsockname()[1]
    sock.close()
    return port

def setup(rank, world_size, port):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = port
    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

#Create CNN Model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        
        self.conv_layer1 = self._conv_layer_set(1, 32)
        self.conv_layer2 = self._conv_layer_set(32, 64)
        self.fc1 = nn.Linear(2**3*64, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.LeakyReLU()
        self.batch=nn.BatchNorm1d(128)
        self.drop=nn.Dropout(p=0.15)        
        
    def _conv_layer_set(self, in_c, out_c):
        conv_layer = nn.Sequential(
        nn.Conv3d(in_c, out_c, kernel_size=(3, 3, 3), padding=1),
        nn.LeakyReLU(),
        nn.MaxPool3d((2, 2, 2)),
        )
        return conv_layer
    
    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.batch(out)
        out = self.drop(out)
        out = self.fc2(out)
        
        return out

#Definition of hyperparameters
num_epochs = 100

#SGD Optimizer
learning_rate = 0.001

def lidar_classifier(rank, world_size, port, i, y_test_queue, pred_queue):
    setup(rank, world_size, port)
    #Create CNN
    model = CNNModel()
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.SGD(ddp_model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)

    '''
    train_x = h5py.File(h5file, mode='r')['X_train' + str(i)][:]
    p = np.random.permutation(len(train_x))[:15000]
    test_x = h5py.File(h5file, mode='r')['X_test' + str(i)][:]
    q = np.random.permutation(len(test_x))[:3000]
    del train_x
    del test_x
    '''

    #Cross Entropy Loss
    error = nn.CrossEntropyLoss(weight=torch.tensor(h5py.File(h5file, mode='r')['class_weights' + str(i)][:]).to(torch.float32).to(rank))
    
    train_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_train' + str(i)][:60000], -1, 1)).to(torch.float32)
    train_y = torch.tensor(h5py.File(h5file, mode='r')['y_train' + str(i)][:60000]).to(torch.float32).type(torch.LongTensor)
    test_x = torch.tensor(np.moveaxis(h5py.File(h5file, mode='r')['X_test' + str(i)][:12000], -1, 1)).to(torch.float32)
    test_y = torch.tensor(h5py.File(h5file, mode='r')['y_test' + str(i)][:12000]).to(torch.float32).type(torch.LongTensor)

    #Pytorch train and test sets
    train = torch.utils.data.TensorDataset(train_x,train_y)
    test = torch.utils.data.TensorDataset(test_x,test_y)

    #Data loader
    train_loader = torch.utils.data.DataLoader(train, batch_size = batch_size, shuffle=False, num_workers=16, persistent_workers=True, drop_last=True)
    test_loader = torch.utils.data.DataLoader(test, batch_size = batch_size, shuffle=False, num_workers=16, persistent_workers=True, drop_last=True)

    #CNN model training
    total_count = 0
    count = 0
    loss_list = []
    iteration_list = []
    accuracy_list = []
    y_test_tmp = []
    pred_tmp = []
    for epoch in range(num_epochs):
        
        for images, labels in train_loader:
            
            train = Variable(images.view(batch_size,1,9,9,9)).to(rank)
            labels = Variable(labels).to(rank)
            #Clear gradients
            optimizer.zero_grad()
            #Forward propagation
            outputs = ddp_model(train)
            #Calculate softmax and cross entropy loss
            loss = error(outputs, labels)
            #Calculating gradients
            loss.backward()
            #Update parameters
            optimizer.step()
            
            count += 1
            if count % 50 == 0:
                #Calculate Accuracy         
                correct = 0
                total = 0
                #Iterate through test dataset
                for images, labels in test_loader:
                    
                    test = Variable(images.view(batch_size,1,9,9,9)).to(rank)
                    labels = Variable(labels).to(rank)
                    #Forward propagation
                    outputs = ddp_model(test)

                    #Get predictions from the maximum value
                    predicted = torch.max(outputs.data, 1)[1]
                    
                    #Total number of labels
                    total += len(labels)
                    correct += (predicted == labels).sum()
                
                accuracy = 100 * correct / float(total)
                scheduler.step(loss)
                #Store loss and iteration
                loss_list.append(loss.data.cpu())
                iteration_list.append(count)
                accuracy_list.append(accuracy.cpu())
        total_count += 1
        #Print Loss
        print('Epoch: {}  Loss: {}  Accuracy: {} %'.format(total_count, loss.data, accuracy))

    for images, labels in test_loader:                
        test = Variable(images.view(batch_size,1,9,9,9)).to(rank)
        labels = Variable(labels).to(rank)
        #Forward propagation
        outputs = ddp_model(test)
        #Get predictions from the maximum value
        predicted = torch.max(outputs.data, 1)[1]
        y_test_tmp.extend(labels.cpu().numpy())
        pred_tmp.extend(predicted.cpu().numpy())

    os.makedirs(os.path.dirname('model/'), exist_ok=True)
    torch.save(ddp_model.state_dict(), 'model/lidar_model' + str(i) + '.pth')
    y_test_queue.put(y_test_tmp)
    pred_queue.put(pred_tmp)
    dist.destroy_process_group()

def run_demo(i, world_size, y_test_queue, pred_queue):
    y_test_queue_1 = mp.Manager().Queue()
    pred_queue_1 = mp.Manager().Queue()
    mp.spawn(lidar_classifier,
             args=(world_size, str(find_free_port()), i, y_test_queue_1, pred_queue_1),
             nprocs=world_size,
             join=True)
    y_test_queue.put(y_test_queue_1.get())
    pred_queue.put(pred_queue_1.get())

if __name__ == '__main__':
    mp.set_start_method("spawn") #tada
    acc_results = [];
    bac_results = [];
    gm_results = [];

    y_test = [];
    pred = [];

    n_gpus = torch.cuda.device_count()
    world_size = n_gpus
    
    y_test_queue = mp.Manager().Queue()
    pred_queue = mp.Manager().Queue()

    mp.spawn(run_demo,
             args=(world_size, y_test_queue, pred_queue),
             nprocs=5,
             join=True)

    for i in range(5):
        y_test_tmp = y_test_queue.get()
        pred_tmp = pred_queue.get()
        
        y_test.extend(y_test_tmp)
        pred.extend(pred_tmp)

        acc_results.append(accuracy_score(y_test_tmp, pred_tmp))
        bac_results.append(balanced_accuracy_score(y_test_tmp, pred_tmp))

        recall = precision_recall_fscore_support(y_test_tmp, pred_tmp, labels=range(num_classes))[1]
        result = np.prod(recall)
        result = result**(1/len(recall))
        gm_results.append(result)

    acc_mean = np.mean(acc_results)
    acc_stdev = np.std(acc_results)

    bac_mean = np.mean(bac_results)
    bac_stdev = np.std(bac_results)

    gm_mean = np.mean(gm_results)
    gm_stdev = np.std(gm_results)
    np.savetxt('metrics.txt', [acc_mean, acc_stdev, bac_mean, bac_stdev, gm_mean, gm_stdev], delimiter=',', fmt='%.2f')

    array = confusion_matrix(y_test, pred)
    np.savetxt('confmatrix.csv', array, delimiter=',')
    cm = pd.DataFrame(array, index = range(num_classes), columns = range(num_classes))
    plt.figure(figsize=(20,20))
    sns.heatmap(cm, annot=True,fmt='g')
    plt.savefig("confmatrix.png")
    plt.show()