Average loss in DP and DDP

Hi. I have a question regarding data parallel (DP) and distributed data parallel (DDP).

I have read many articles about DP and understand that gradient is reduced automatically. However, I could not find an article explaining whether or not loss is also reduced. For example, I believe that the following codes appear typical main routine of a DP program.

outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()

I understand that the split inputs and the model are copied on each GPU and a forward path is concurrently computed to yield the loss, then a backward path is also concurrently computed and finally all gradients are reduced to one.

Is the loss obtained by above code averaged over all the GPUs, which is exactly same as a loss computed by a serial program? Or, is the loss a value from just one GPU (gpu0)? I need to plot a loss chart, so I wonder if the loss is averaged over the GPUs.

The same question applies to outputs. I also need to compute training accuracy using outputs in above code. Does it hold the results of all the GPUs? If so, in what structure of a tensor are they stored?

Regarding DDP, above codes are written in each process running on respective GPU. In this case, how can I access the values on all the GPUs to plot the averaged loss and total accuracy?

I appreciate any sources of information. Thank you in advance.

9 Likes

No, loss is not reduced because there is only one loss tensor with DP. Besides, the gradients are actually accumulated automatically by the autograd engine. As DP is single-process-multi-thread, all threads share the same autograd engine, and hence ops on different threads will be added to the same autograd graph.

Is the loss obtained by above code averaged over all the GPUs, which is exactly same as a loss computed by a serial program? Or, is the loss a value from just one GPU (gpu0)? I need to plot a loss chart, so I wonder if the loss is averaged over the GPUs.

DPā€™s forward function will gather all outputs to cuda:0 (by default) and then return the gathered result. So, in the code above outputs is on one GPU and hence loss is also on one GPU.

The same question applies to outputs. I also need to compute training accuracy using outputs in above code. Does it hold the results of all the GPUs? If so, in what structure of a tensor are they stored?

Below is DPā€™s forward function. The outputs var on line 161 holds the output on different GPUs, but the gather function on line 162 copied them to one GPU.

If you want to access individual output on different GPUs, you can do so in the forward function of your model (the one you passed to DP ctor). E.g.,

class MyModel(nn.Module):
  def __init__(self):
    self.fc = nnLinear(10, 10)
  def forward(self, input):
    output = self.fc(input)
    print("per-GPU output ", output)
    return output


dp = DataParallel(MyModel())
outputs = dp(inputs) # this outputs is on one GPU

Regarding DDP, above codes are written in each process running on respective GPU. In this case, how can I access the values on all the GPUs to plot the averaged loss and total accuracy?

You can use gather or all_gather or all_reduce to communicate the loss to one process and print it.

BTW, could you please add a ā€œdistributedā€ tag to distributed training related questions? People working on distributed training monitor that tag and can get back to you promptly.

1 Like

Thank you Shen Li for your detail explanation. It is very helpful, and now I understand whatā€™s going on in DP and DDP. I modified your codes to see the order of data so that I can make sure that the output is correctly compared to corresponding labels in loss function.

import torch
import torch.nn as nn

device = "cuda:0"

class Model(nn.Module):

    def __init__(self):
        super(Model, self).__init__()
    
    # forward() outputs the input as it is. 
    def forward(self, input):
        output = input
        print("per-GPU output ", output)
        return output

model = Model()
model = nn.DataParallel(model)
model.to(device)

# input is a sequence of integer in 2D shape.
input = torch.arange(20 * 5).reshape(20, 5)
input = input.to(device)
print("total input ", input)
output = model(input)
print("total output ", output)

I was not sure about the ā€œtagā€ that you pointed out, but I added ā€œdistributedā€ to ā€œCategoriesā€.

I still have a related question about DDP.
In my understanding, the gradient is a vector that points a direction where the loss increases the most. I learned from your explanation that we donā€™t have the ā€œtotalā€ loss until we ā€œgatherā€, ā€œall_gatherā€, or ā€œall_reduceā€ the loss computed in each GPU. If we use a loss in each process instead of total loss to compute each gradient and average all the gradients, will it be a correct ā€œtotalā€ gradient of the total loss?

In other words, I wonder if it is mathematically correct that averaging all gradients that increase each of respective loss produces a total gradient that increases the averaged loss.

If it is not correct, I think it means that we need to do all_reduce of the loss before we do loss.backward in order to hand total loss information to each process for computing correct gradients. Is my thinking correct?

Thank you again for your kind assistance.

Good question. Instead of communicating loss, DDP communicates gradients. So the loss is local to every process, but after the backward pass, the gradient is globally averaged, so that all processes will see the same gradient. This is brief explanation, and this is a full paper describing the algorithm.

If it is not correct, I think it means that we need to do all_reduce of the loss before we do loss.backward in order to hand total loss information to each process for computing correct gradients. Is my thinking correct?

The reason we didnā€™t communicating loss is because thatā€™s not sufficient. When computing gradients, we need both loss and activation, and the activation depends on local inputs. So we need to either communicate loss + activation or gradients. DDP does the later.

3 Likes

Thank you again.

Maybe you have fully answered my question, but I still feel that my point is missing. As I understand, a gradient is computed by the back propagation using the chain rule and first derivative of functions in a model network. Also, as you mentioned, we need the function vales within the network, as well as the loss.

Since the method existed far before the parallelism era, the back-prop naturally started from a single ā€œtotalā€ or ā€œglobalā€ loss in the single processor platform. Therefore, in that case, we use a loss readily averaged over a batch of input. On the other hand, in the multi-GPU platform, a batch input is farther divided into smaller batches each of which is used to produce a ā€œlocalā€ loss by a GPU. In that case, when computing the local gradient, the functions, inputs, and function values are exactly same as the case of the single processor platform. Only difference is using the local loss instead of the global loss.

My question is; does averaging the local gradients computed from the local losses produce exactly the same one as the global gradient computed from the global loss?

If the answer is no, I think that we need to average the local losses to produce a global loss and hand it to all the GPUs to compute correct local gradients that are averaged to produce a correct global gradient. This might be achieved by performing all_reduce() over the local losses before doing loss.backward() on each GPU.

The answer could be yes, but I donā€™t know the mathematical explanation for it.

That is my point.
If I misunderstand something, please point it out. Thank you.

1 Like

In that case, when computing the local gradient, the functions, inputs, and function values are exactly same as the case of the single processor platform.

This is actually not true. Say we have a function f(x) = w * x, where w is the weight. Then when you compute gradient (i.e., dw), you will need both df (from loss, which depends on local input) and x (from local input or intermediate local output, which also depend on local input). So, if not communicating gradients, we need to communicate both the final loss and the intermediate outputs of all layers.

No, this is not guaranteed to be the same, but due to a different reason. If 1) the loss function satisfies the condition loss_fn([x1, x2]) == (loss_fn(x1) + loss_fn(x2)) / 2 and 2) batch size on all processes are the same, then average gradients should be correct. Otherwise, average wonā€™t produce the same result. One example would be, if we use .sum() as the loss function, we should just sum instead of averaging the gradient.

If the answer is no, I think that we need to average the local losses to produce a global loss and hand it to all the GPUs to compute correct local gradients that are averaged to produce a correct global gradient. This might be achieved by performing all_reduce() over the local losses before doing loss.backward() on each GPU.

I might miss sth. If we do the above, it means we compute the gradients using global loss and local activation (i.e., global df and local x in the f(x)=w*x example above). In this case, what does this gradient mean?

Thank you for your further explanation.

So, if not communicating gradients, we need to communicate both the final loss and the intermediate outputs of all layers.

Yes, I agree that we must communicate gradients to have a global gradient. My question is about relationship between the global loss and the local gradients, not about communicating losses instead of gradients.

If 1) the loss function satisfies the condition loss_fn([x1, x2]) == (loss_fn(x1) + loss_fn(x2)) / 2 and 2) batch size on all processes are the same, then average gradients should be correct.

I understand that, in a parallel process, the losses are locally averaged on a GPU, and the resulting losses can be globally averaged. That is the reason why the condition you explained must hold to have the ā€œaverage of averageā€ being equal to the global average.

My point is based on that a parallel process just does the same thing in parallel as a serial process does, and both of them are supposed to produce identical results.

What I am wondering about is that the backward path of the computational graph in a DDP process starts from a local loss, while it starts from a global loss in the serial process, and they are supposed to produce the same result.

From your former explanation, I learned that the backward path starts from the global loss in DP, but not DDP. So, I believe that DP will produce the same results as the serial process does, but I wonder about DDP.

One thing I have come across is that, if the global loss is computed by sum() / batch_size, the backward path might start from 1 and dividing it by batch_size. If this is true, the only difference between starting from the global loss and the local loss should be difference between dividing by the global batch size and the local per-GPU batch size.

So, I suspect that the gradients in those cases have the same direction but different sizes. In particular, the gradient from DDP might be n_gpu times larger than DP, where n_gpu is the number of GPUs. Even if this is true, that will not be a big problem, but DDP may require a different learning rate from DP. I just thought that way, but it needs a confirmation.

Is this correct? I appreciate your assistance. Thank you.

Yep, this is true for the sum() / batch_size case you mentioned, on the condition that all processes are using the same batch size. Here is the test to verify that:

In particular, the gradient from DDP might be n_gpu times larger than DP, where n_gpu is the number of GPUs. Even if this is true, that will not be a big problem, but DDP may require a different learning rate from DP. I just thought that way, but it needs a confirmation.

DDP computes the average of all gradients from all processes, so the gradient should be the same value as local training for the sum() / batch_size case. What might affect the learning rate is the batch size you configured for each DDP process. If each process is using the same batch_size as local training, it means that in each iteration the DDP gang collective process world_size * batch_size input data, so you might be more confident on the result gradient compared to local training and might need to set the learning rate to a larger value. But this is not guaranteed. See this discussion: Should we split batch_size according to ngpu_per_node when DistributedDataparallel

Thank you, Shen Li.

DDP computes the average of all gradients from all processes, so the gradient should be the same value as local training for the sum() / batch_size case.

I interpret it as that the difference is taken care of when computing the global gradient from the local gradients, and we will see no difference from the serial cases.

What might affect the learning rate is the batch size you configured for each DDP process.

I think that whether or not we expand the global batch size is a choice between computation speed per iteration and algorithmic efficiency of total convergence, with a larger learning rate that you mentioned. Besides, we can make use of the GPU memories if we choose a large batch size. I feel that a larger batch brings about faster convergence even in the wall clock time bases, if we can efficiently utilize the multiple GPUs. Thatā€™s what Iā€™m trying to do.

Thank you very much. I appreciate your time for this long discussion.

4 Likes

Hi, Shen Li. If I use ā€˜loss = loss.sum()ā€™, how can I not averaging the gradient in DDP?

Hi @mrshenli, @TT_YY I hope you are well. sorry, I am using this code for fine tunning the gpt-2 , I use ddp for both training and validation part. there are two questions for me, I need to show the graph of train and validation loss, how I can aggregate apply (all_gather) or all_reduce to have one value for each epoch to show in graph? the second question is that the size of the model that I reload is different from the initial one, why this happen? is there any issue by saving the model? many thanks for your help,

#!/usr/bin/env python
# coding: utf-8

# In[1]:


from torch.utils.data import DataLoader
from transformers import TextDataset,DataCollatorForLanguageModeling
#from transformers import  AutoModelWithLMHead
from transformers import  AutoModelForCausalLM
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import gc
import math
import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import random
import pandas as pd
######################
weight_decay=0
learning_rate=5e-5
adam_epsilon=1e-8
warmup_steps = 1e2
lr=5e-5
Max_length=400

PathData='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/model_v4200_k_bs=16_lr=5e-05_epochs=20/'
pretrained_model='/home///GPT_2/'

########################################
def format_time(elapsed):
    return str(datetime.timedelta(seconds=int(round((elapsed)))))

################################################
class GPT2Dataset(Dataset):

    def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=400):

        self.tokenizer = tokenizer
        self.input_ids = []
        self.attn_masks = []

        for txt in txt_list:

            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2"

    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
#########################################################

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    
    gpu_id=rank
    
    Path='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/multipleGPU/model_v4'\
    'data_'+str(200)+'_k'+'_'+'bs='+str(batch_size)+'_lr='+str(learning_rate)+'_epochs='+str(total_epochs)
    
    print(Path)
    
    Results_Path=Path+'/Results/'
    ss=os.path.isdir(Results_Path)
    if ss==False:
        os.makedirs(Results_Path)


    ### defined variable ###############
    seed_val = 42
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

    ###############################
    
    ddp_setup(rank, world_size)
    
    ###############################
    
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

    model = GPT2LMHeadModel.from_pretrained(pretrained_model)

    model.resize_token_embeddings(len(tokenizer))

    ## loading traina and tets dataset
    print(PathData)
    trains_titles=pd.read_csv(PathData+'/'+'traindata.csv')
    valid_titles=pd.read_csv(PathData+'/'+'validdata.csv')
    
    trains_titles=trains_titles.drop(columns=['Unnamed: 0'])['0'].iloc[:200]
    valid_titles=valid_titles.drop(columns=['Unnamed: 0'])['0'].iloc[:30] 

    print(trains_titles.head(2))
    
    train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

    Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)
    
    ############################################################################
    
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(train_dataset))

    validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(Val_dataset))
    
    
    total_steps = len(train_loader) * total_epochs


    ################# define optimizer and scheduler#########################

    # Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
    optimizer = AdamW(model.parameters(), lr = learning_rate,eps = adam_epsilon)


    # Create the learning rate scheduler.
    # This changes the learning rate as the training loop progresses
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps = warmup_steps, 
                                                num_training_steps = total_steps)

    ############################## train_loader and validation_loader ######################3
 
    training_steps_per_epoch=len(train_loader)
    total_num_training_steps = int(training_steps_per_epoch*total_epochs)

  ######################## applying DDP on the model for training ############################
    model=model.to(gpu_id)
    model = DDP(model, device_ids=[gpu_id])
    print("gpu_id",gpu_id)
    # ========================================
    #               Training
    # ========================================


        
    training_stats = []
           

    for epoch_i in range(0, total_epochs):
        
        print("")
        print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, total_epochs))
        print('Training...')

        ##########################################
        train_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(train_loader)}")
        train_loader.sampler.set_epoch(epoch_i)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):

            #################################
            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )

            loss = outputs[0]  
            batch_loss = loss.item()
            total_train_loss += batch_loss
        #    print("total_train_loss",total_train_loss)
            loss.backward()
            optimizer.step()
            scheduler.step()

        # Calculate the average loss over all of the batches.
        avg_train_loss = total_train_loss / len(train_loader)  

        del total_train_loss
        del batch_loss
        
        # Measure how long this epoch took.
        training_time = format_time(time.time() - t0)
        # ========================================
        #               Validation
        # ========================================

        print("")
        print("Running Validation...")

        avg_val_loss_1=[]
        t0 = time.time()
        #################### is this section corrcet for validation  #############
        model.eval()
        model = DDP(model, device_ids=[gpu_id])
        ########################################3
        total_eval_loss = 0
        nb_eval_steps = 0
        
        ########################################
        validation_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(validation_loader))[0])
        print("bz",b_sz)
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(validation_loader)}")
        validation_loader.sampler.set_epoch(epoch_i)
        ###########################################
        
        # Evaluate data for one epoch
        for batch in validation_loader:

            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)

            with torch.no_grad():        
                outputs  = model.module(b_input_ids,attention_mask = b_masks,labels=b_labels)
                loss = outputs[0]  
            batch_loss = loss.item()
          #  print("here batch loss",batch_loss)
            total_eval_loss += batch_loss        

        avg_val_loss = total_eval_loss / len(validation_loader)
       # print("here total_eval_loss=",total_eval_loss)
        perplexity=math.exp(avg_val_loss)

        avg_val_loss_1.append(avg_val_loss)

        validation_time = format_time(time.time() - t0)    

        del total_eval_loss 


        print("  Validation Loss: {0:.2f}".format(avg_val_loss))
        print("  Validation took: {:}".format(validation_time))

        # Record all statistics from this epoch.
        training_stats.append(
            {
                'epoch': epoch_i + 1,
                'Training Loss': avg_train_loss,
                'Valid. Loss': avg_val_loss,
                'Training Time': training_time,
                'Validation Time': validation_time,
                'perplexity': perplexity
            }
        )
        gc.collect()
        
        ################### saving the model ########################

        if gpu_id == 0:
    
            Path2=Results_Path+'/'+'savemodel_epoch=='+str(epoch_i)
    
            ss=os.path.isdir(Path2)
            if ss==False:
                os.makedirs(Path2)

            ckp = model.module.state_dict()
            torch.save(ckp, Path2+"/checkpoint.pt")

        ############ save the results #####################
    pt_save_directory=Results_Path+'/'+'analyticsnumber'
    
    ss=os.path.isdir(pt_save_directory)
    if ss==False:
        os.makedirs(pt_save_directory)
    
    print("here",training_stats)
    Path_3=pt_save_directory+'/'+'training_stats='+str(42)+".csv"
    torch.save(training_stats,Path_3)
    
    #### is a good place to put the destrop process  ###########
    destroy_process_group()
    #############################
if __name__ == '__main__':
    import sys
    total_epochs=int(sys.argv[1])
    save_every=int(sys.argv[2])
    batch_size=int(sys.argv[3])
    world_size = (torch.cuda.device_count())-1
    print(world_size)
    mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size,join=True)

hi @TT_YY , sorry, I am using ddp pytorch for fine tunning my model. I need to show the graph of loss for training and validation. would you please share with me what did you do for getting the ultimate loss for showing the graphs?
many thanks