Run Pytorch on Multiple GPUs

@ptrblck this tutorial (Getting Started with Distributed Data Parallel — PyTorch Tutorials 2.1.1+cu121 documentation) recommends to use DistributedDataParallel even if we are in 1 machine. So the code if I want to use all GPUs would change form:

net = torch.nn.DataParallel(model, device_ids=list(range(torch.cuda.device_count())))

to

net = torch.nn.DistributedDataParallel(model, device_ids=list(range(torch.cuda.device_count())))

right? if I am using a single node with multiple GPUs there isn’t anything else/subtle I should do right?

Also if DistributedDataParallel is so much better why does the interface for DataParallel still exist? Doesn’t that make things more confusing for users?

quoting tutorial on why to use DistributedDataParallel

Comparison between DataParallel and DistributedDataParallel

Before we dive in, let’s clarify why, despite the added complexity, you would consider using DistributedDataParallel over DataParallel (Getting Started with Distributed Data Parallel — PyTorch Tutorials 2.1.1+cu121 documentation) even with 1 single machine:

  • First, DataParallel is single-process, multi-thread, and only works on a single machine, while DistributedDataParallel is multi-process and works for both single- and multi- machine training. DataParallel is usually slower than DistributedDataParallel even on a single machine due to GIL contention across threads, per-iteration replicated model, and additional overhead introduced by scattering inputs and gathering outputs.
  • Recall from the prior tutorial that if your model is too large to fit on a single GPU, you must use model parallel to split it across multiple GPUs. DistributedDataParallel works with model parallel; DataParallel does not at this time. When DDP is combined with model parallel, each DDP process would use model parallel, and all processes collectively would use data parallel.
  • If your model needs to span multiple machines or if your use case does not fit into data parallelism paradigm, please see the RPC API for more generic distributed training support.

Yes, nn.DataParallel will automatically create model copies on the passed device_ids and will scatter the input batch in dim0 to each device. The output will be on the default device.

Doing

net = torch.nn.DataParallel(model, device_ids=list(range(torch.cuda.device_count())))

isn’t everything one needs to do…it seems one also has to pass the data to the right place manually? isn’t there a way for this to be done for me? Not sure why anyone would want to micro manage something like this, seems like very low returns time invested.

Saying this since this error ocurred:

RuntimeError: Expected tensor for argument #1 'input' to have the same device as tensor for argument #2 'weight'; but device 1 does not equal 0 (while checking arguments for cudnn_convolution)

Correct me if I am wrong but I think doing .cuda() to the data moves it automatically to the right GPU for the final layer + target labels?

Or at least it seems fine when I did this toy example with a resnet:

import torch


out_features = 5
net = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)
# replace_bn(net, 'model')
net.fc = torch.nn.Linear(in_features=512, out_features=out_features, bias=True)

print(type(net))

print(torch.cuda.device_count())
if torch.cuda.device_count() > 1:
    # args.base_model = torch.nn.parallel.DistributedDataParallel(args.base_model, device_ids=list(range(torch.cuda.device_count())))
    net = torch.nn.DataParallel(net, device_ids=list(range(torch.cuda.device_count()))).cuda()

print(type(net))

batch_size = 8
x = torch.randn(batch_size, 3, 84, 84).cuda()
y_pred = net(x)
print(y_pred.size())
y = torch.randn(batch_size, out_features).cuda()

print(y_pred.sum())

criterion = torch.nn.MSELoss()

loss = criterion(y_pred, y)
print(loss)

btw is there any benefit to doing criterion.cuda()

The short answer is, because it adds complexity for better performance and some users might be OK with a single line of code change instead of setting up the DDP workload.
Besides the tutorial you could also have a look at the ImageNet example to see, how to properly use it.

2 Likes

Hi,

Sorry to open an old thread but I wanted to ask if the GPUs have to be identical or can they be different? Like a 2070 and 3070?

You can use different devices, but would most likely see a bottleneck by the “smaller” device (regarding its memory availability as well as compute performance). nn.DataParallel would also raise a warning if an imbalance between the devices is found.

I have 2070 Super and 3070 ti. Both are 8 GB but the number of cores is different. So will they work properly?

Yes, the script will work, could raise a warning and the overall training would be most likely bottlenecked by the 2070.

1 Like

Thanks. I’ll update when I run some code.

Hi. I was wondering if you could give me some reference on why would 2070 act as a bottleneck? I was thinking that since it depends on the number of cuda cores, shouldn’t they just be added in a way? Or am I getting something wrong?

this tutorial is pretty misleading. As mentioned in other answers, we have to setup processes to feed training data to each GPU manually which is missing in this tutorial!

That’s not true, since DataParallel uses a single process to feed all GPUs. The data is thus also loaded in a single process onto the default device while the DataParallel wrapper will then scatter the model replicas, split the input data, send a data chunk to each corresponding device, and perform the forward pass. The output will then be collected on the default device again and the backward pass executed on each GPU. Since a single Python process is responsible for all these steps, I’m unsure where the confusion and your claims comes from.

hi @ptrblck , I hope you are well. Sorry, I need to fine tune the GPT-2 on multiple GPUs since it takes lots of time. I want to do just training part and validation part on the specific cuda like cuda:3. I would appreciate if you please have a look and if the code seems ok to u or not? the loss section is right?

 import os
import time
import datetime
import torch.distributed as dist
from apex.parallel import DistributedDataParallel as DDP
from apex import amp

training_stats = []

#################
#  total number of nodes
nodes=3
# rank of the current node (machine) within all the nodes (machines), and goes from 0 to args.nodes - 1.
nr=0
##number of GPUs on each nod
gpus=1
# total number of processes to run
world_size = gpus * nodes

rank = nr * gpus + gpu

dist.init_process_group(backend='nccl', init_method='env://', world_size=world_size, rank=rank)

torch.cuda.set_device(gpu)
model.cuda(gpu)

model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                num_replicas=world_size,
                                                                rank=rank)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=False,
                                           num_workers=0,
                                           pin_memory=True,
                                           sampler=train_sampler)

#################   

for epoch_i in range(0, epochs):

    # ========================================
    #               Training
    # ========================================

    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')

    t0 = time.time()

    total_train_loss = 0


    model.train()

    for step, batch in enumerate(train_dataloader):
        #print(step)

        b_input_ids = batch[0].cuda(non_blocking=True)
        b_labels = batch[0].cuda(non_blocking=True)
        b_masks = batch[1].cuda(non_blocking=True)

        optimizer.zero_grad()        

        outputs = model(  b_input_ids,
                          labels=b_labels, 
                          attention_mask = b_masks,
                          token_type_ids=None
                        )

        loss = outputs[0]  

        batch_loss = loss.item()
        
        total_train_loss += batch_loss
            
        loss.backward()

        optimizer.step()

        scheduler.step()

    # Calculate the average loss over all of the batches.
    avg_train_loss = total_train_loss / len(train_dataloader)  
    
    del total_train_loss
    del batch_loss
    
    # Measure how long this epoch took.
    training_time = format_time(time.time() - t0)


    # ========================================
    #               Validation
    # ========================================

    avg_val_loss_1=[]
    
    print("")
    print("Running Validation...")

    t0 = time.time()

    model.to("cuda:3")
    model.eval()

    total_eval_loss = 0
    nb_eval_steps = 0

    # Evaluate data for one epoch
    for batch in validation_dataloader:
                
        b_input_ids = batch[0].to("cuda:3")
        b_labels = batch[0].to("cuda:3")
        b_masks = batch[1].to("cuda:3")
        
        with torch.no_grad():        

            outputs  = model(b_input_ids, 
#                            token_type_ids=None, 
                             attention_mask = b_masks,
                            labels=b_labels)
          
            loss = outputs[0]  
            
        batch_loss = loss.item()
        total_eval_loss += batch_loss        

    avg_val_loss = total_eval_loss / len(validation_dataloader)
    
    perplexity=math.exp(avg_val_loss)

    avg_val_loss_1.append(avg_val_loss)
    
    validation_time = format_time(time.time() - t0)    

    del total_eval_loss 
    

    print("  Validation Loss: {0:.2f}".format(avg_val_loss))
    print("  Validation took: {:}".format(validation_time))

    # Record all statistics from this epoch.
    training_stats.append(
        {
            'epoch': epoch_i + 1,
            'Training Loss': avg_train_loss,
            'Valid. Loss': avg_val_loss,
            'Training Time': training_time,
            'Validation Time': validation_time,
            'perplexity': perplexity
        }
    )
    gc.collect()
    

do you have any recommendation?

I’m unsure why you want to use a single device for the validation step only, but I would not move the DDP model to this device. Instead you could try to access the internal .module attribute in the process running on GPU3 and perform the validation step. To do so, check which rank you are currently on and don’t move the module to GPU3 from other ranks.

Is the training section correct for using multiple GPUs?

It looks generally alright, but I also don’t see how and where gpu is defined and would assume it should be the rank. Also, apex.amp is deprecated.

hi @ptrblck , sorry, I download the apex from Invidia and then use it in the code, but it keeps giving me this error even if I applied some solution from google. do you have any idea? many thanks

import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
## the directory include the package from INVIDIA
sys.path.append('/home//GPU_ZIP_Apex/apex-master/apex')
from apex import amp
from apex.parallel import DistributedDataParallel as 

You are not using our apex library but have most likely installed this one via pip, which is not the same package.

@ptrblck , I download the package from the “GitHub - NVIDIA/apex: A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch” and then upload it to the AWB. is the package the right one?

I don’t know what AWB is. The linked repository is correct but the error message points to the aforementioned pip package which is a different one.

1 Like

Hi @ptrblck , I hope you are well. I really appreciate your time and help. I studied. this documentation
( Multi GPU training with DDP — PyTorch Tutorials 2.0.0+cu117 documentation) and try to edit my code in this way. my concern is related to whole story ( I define all the process in the def main ()) , and the validation part. I use DDP for the validation part as well the same as training. I don’t know if the code is correct for both training and validation part. and how the gpu_id will be made in this documentation? is destroy_process_group() in a correct place?? I appreciate your comments.

from torch.utils.data import DataLoader
#from transformers import  AutoModelWithLMHead
from transformers import  AutoModelForCausalLM
from transformers import AdamW, get_linear_schedule_with_warmup
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import gc
import math
import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import random

################################################
class GPT2Dataset(Dataset):

    def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=768):

        self.tokenizer = tokenizer
        self.input_ids = []
        self.attn_masks = []

        for txt in txt_list:

            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
#########################################################
    
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    

    ### defined variable ###############
    seed_val = 42
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

    training_steps_per_epoch=len(train_dataloader)
    total_num_training_steps = int(training_steps_per_epoch*epochs)
    weight_decay=0
    learning_rate=5e-5
    adam_epsilon=1e-8
    warmup_steps = 1e2
    lr=5e-5
    Max_length=400
    
    ###############################
    ddp_setup(rank, world_size)
    ###############################
    pretrained_model=Path to model
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

    model = GPT2LMHeadModel.from_pretrained(pretrained_model)

    model.resize_token_embeddings(len(tokenizer))

    ## loading traina and tets dataset

    trains_titles=pd.read_csv(Path+'/'+'traindata.csv')
    valid_titles=pd.read_csv(Path+'/'+'validdata.csv')

    train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

    Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)

    ################# define optimizer and scheduler#########################

    # Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
    optimizer = AdamW(model.parameters(), lr = learning_rate,eps = adam_epsilon)

    total_steps = len(train_dataloader) * epochs

    # Create the learning rate scheduler.
    # This changes the learning rate as the training loop progresses
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps = warmup_steps, 
                                                num_training_steps = total_steps)

    ############################## train_loader and validation_loader #################
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(train_dataset))


    validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(Val_dataset))

  ######################## applying DDP on the model for training #######################
    model=model.to(gpu_id)
    model = DDP(model, device_ids=[gpu_id])
    # ========================================
    #               Training
    # ========================================

    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')
        
    training_stats = []

    for epoch_i in range(0, epochs):

        ##########################################
        train_loader.sampler.set_epoch(epochs)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        train_loader.sampler.set_epoch(epoch)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):

            #################################
            b_input_ids = batch[0].to(gpu_id)
            b_labels = batch[0].to(gpu_id)
            b_masks = batch[1].to(gpu_id)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )

            loss = outputs[0]  
            batch_loss = loss.item()
            total_train_loss += batch_loss
            loss.backward()
            optimizer.step()
            scheduler.step()

        # Calculate the average loss over all of the batches.
        avg_train_loss = total_train_loss / len(train_dataloader)  

        del total_train_loss
        del batch_loss
        # Measure how long this epoch took.
        training_time = format_time(time.time() - t0)
        # ========================================
        #               Validation
        # ========================================

        print("")
        print("Running Validation...")

        avg_val_loss_1=[]
        t0 = time.time()
        #################### is this section corrcet for validation ????????????????????  ####
        model.eval()
        model = DDP(model, device_ids=[gpu_id])
        ######################################
        total_eval_loss = 0
        nb_eval_steps = 0
        ########################################
        validation_loader.sampler.set_epoch(epochs)
        b_sz = len(next(iter(validation_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(validation_loader)}")
        validation_loader.sampler.set_epoch(epoch)
        ###########################################
        
        # Evaluate data for one epoch
        for batch in validation_loader:

            b_input_ids = batch[0].to(gpu_id)
            b_labels = batch[0].to(gpu_id)
            b_masks = batch[1].to(gpu_id)

            with torch.no_grad():        
                outputs  = model(b_input_ids,attention_mask = b_masks,labels=b_labels)
                loss = outputs[0]  
            batch_loss = loss.item()
            total_eval_loss += batch_loss        

        avg_val_loss = total_eval_loss / len(validation_dataloader)

        perplexity=math.exp(avg_val_loss)

        avg_val_loss_1.append(avg_val_loss)

        validation_time = format_time(time.time() - t0)    

        del total_eval_loss 


        print("  Validation Loss: {0:.2f}".format(avg_val_loss))
        print("  Validation took: {:}".format(validation_time))

        # Record all statistics from this epoch.
        training_stats.append(
            {
                'epoch': epoch_i + 1,
                'Training Loss': avg_train_loss,
                'Valid. Loss': avg_val_loss,
                'Training Time': training_time,
                'Validation Time': validation_time,
                'perplexity': perplexity
            }
        )
        
        ################### saving the model ########################
        if gpu_id == 0 and epoch_i % save_every == 0:
            Pathmodel=Results_Path+'/'+'savemodel_epoch='+str(epoch_i)
            ss=os.path.isdir(Pathmodel)
            if ss==False:
                os.makedirs(Pathmodel)

            ckp = model.module.state_dict()
            torch.save(ckp, Pathmodel+'/ "checkpoint.pt"')
    ############ save the results #####################3          
    Path_2=pt_save_directory+'/'+'training_stats='+str(0)+".csv"
    torch.save(training_stats,Path_2)

    #### is a good place to put the destrop process ??????????????????? ###########
    destroy_process_group()
    #############################

if __name__ == "__main__":
    total_epochs=sys.argv[1]
    save_every=sys.argv[2]
    batch_size=sys.argv[3]
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size)
    ~~~