Run Pytorch on Multiple GPUs

The used gpu_id is undefined in your code so I assume it should represent the rank instead. Besides that I didn’t see any obvious issues.

1 Like

@ptrblck , many many many thanks. means I can do evaluation part like training part?

You can run the validation loop in a similar way but I would recommend checking e.g. the ImageNet example as it also shows how to make sure all samples are properly processed and how to reduce the stats from different ranks.

1 Like

@ptrblck , many thanks for your help. sorry, Can I write value for the parameters in section “if name == “main”:” ( total_epochs=20, save_every=5, batch_size=32) , and run the code directly in the jupyter notebook? because I give numbers directly

Yes, you could remove the argparse usage and pass the desired arguments directly from your script to the corresponding methods. I don’t know if and how arguments are passed in notebooks otherwise but would guess there might be a way.

1 Like

@ptrblck , I run this code and it is working fine at the end for training part. my concern is about validation part, when I do model.val() and then use ddp, for validation which model will be used? do I need to aggregation and sum from different gpus for the final loss or all be done by itself? is code meaningful for you? any issues?
I really appreciate your help.

#!/usr/bin/env python
# coding: utf-8

# In[1]:


from torch.utils.data import DataLoader
from transformers import TextDataset,DataCollatorForLanguageModeling
#from transformers import  AutoModelWithLMHead
from transformers import  AutoModelForCausalLM
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import gc
import math
import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import random
import pandas as pd
######################
weight_decay=0
learning_rate=5e-5
adam_epsilon=1e-8
warmup_steps = 1e2
lr=5e-5
Max_length=400

PathData='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/model_v4200_k_bs=16_lr=5e-05_epochs=20/'
pretrained_model='/home///GPT_2/'


################################################
class GPT2Dataset(Dataset):

    def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=400):

        self.tokenizer = tokenizer
        self.input_ids = []
        self.attn_masks = []

        for txt in txt_list:

            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
#########################################################

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    
    gpu_id=rank
    
    Path='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/multipleGPU/model_v4'\
    'data_'+str(200)+'_k'+'_'+'bs='+str(batch_size)+'_lr='+str(learning_rate)+'_epochs='+str(total_epochs)
    
    print(Path)
    
    Results_Path=Path+'/Results/'
    ss=os.path.isdir(Results_Path)
    if ss==False:
        os.makedirs(Results_Path)

    print(Results_Path)
        
    print(PathData)
    
    print(rank)

    ### defined variable ###############
    seed_val = 42
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

    ###############################
    
    ddp_setup(rank, world_size)
    
    ###############################
    
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

    model = GPT2LMHeadModel.from_pretrained(pretrained_model)

    model.resize_token_embeddings(len(tokenizer))

    ## loading traina and tets dataset
    print(PathData)
    trains_titles=pd.read_csv(PathData+'/'+'traindata.csv')
    valid_titles=pd.read_csv(PathData+'/'+'validdata.csv')
    
    trains_titles=trains_titles.drop(columns=['Unnamed: 0'])['0']
    valid_titles=valid_titles.drop(columns=['Unnamed: 0'])['0']

    print(trains_titles.head(2))
    
    train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

    Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)
    
    ############################################################################
    
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(train_dataset))

    validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(Val_dataset))
    
    
    total_steps = len(train_loader) * total_epochs


    ################# define optimizer and scheduler#########################

    # Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
    optimizer = AdamW(model.parameters(), lr = learning_rate,eps = adam_epsilon)


    # Create the learning rate scheduler.
    # This changes the learning rate as the training loop progresses
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps = warmup_steps, 
                                                num_training_steps = total_steps)

    ############################## train_loader and validation_loader ######################3
 
    training_steps_per_epoch=len(train_loader)
    total_num_training_steps = int(training_steps_per_epoch*total_epochs)

  ######################## applying DDP on the model for training ############################
    model=model.to(gpu_id)
    model = DDP(model, device_ids=[gpu_id])
    print("gpu_id",gpu_id)
    # ========================================
    #               Training
    # ========================================


        
    training_stats = []
           

    for epoch_i in range(0, total_epochs):
        
        print("")
        print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, total_epochs))
        print('Training...')

        ##########################################
        train_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(train_loader)}")
        train_loader.sampler.set_epoch(epoch_i)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):

            #################################
            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )

            loss = outputs[0]  
            batch_loss = loss.item()
            total_train_loss += batch_loss
            loss.backward()
            optimizer.step()
            scheduler.step()

        # Calculate the average loss over all of the batches.
        avg_train_loss = total_train_loss / len(train_loader)  

        del total_train_loss
        del batch_loss
        
        # Measure how long this epoch took.
        training_time = format_time(time.time() - t0)
        # ========================================
        #               Validation
        # ========================================

        print("")
        print("Running Validation...")

        avg_val_loss_1=[]
        t0 = time.time()
        #################### is this section corrcet for validation ???????????????????? #############
        model.eval()
        model = DDP(model, device_ids=[gpu_id])
        ########################################3
        total_eval_loss = 0
        nb_eval_steps = 0
        
        ########################################
        validation_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(validation_loader))[0])
        print("bz",b_sz)
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(validation_loader)}")
        validation_loader.sampler.set_epoch(epoch_i)
        ###########################################
        
        # Evaluate data for one epoch
        for batch in validation_loader:

            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)

            with torch.no_grad():        
                outputs  = model(b_input_ids,attention_mask = b_masks,labels=b_labels)
                loss = outputs[0]  
            batch_loss = loss.item()
            total_eval_loss += batch_loss        

        avg_val_loss = total_eval_loss / len(validation_loader)

        perplexity=math.exp(avg_val_loss)

        avg_val_loss_1.append(avg_val_loss)

        validation_time = format_time(time.time() - t0)    

        del total_eval_loss 


        print("  Validation Loss: {0:.2f}".format(avg_val_loss))
        print("  Validation took: {:}".format(validation_time))

        # Record all statistics from this epoch.
        training_stats.append(
            {
                'epoch': epoch_i + 1,
                'Training Loss': avg_train_loss,
                'Valid. Loss': avg_val_loss,
                'Training Time': training_time,
                'Validation Time': validation_time,
                'perplexity': perplexity
            }
        )
        gc.collect()
        
        ################### saving the model ########################

        if gpu_id == 0 and epoch_i % save_every == 0:
           
            Path2=Results_Path+'/'+'savemodel_epoch='+str(epoch_i)
    
            ss=os.path.isdir(Path2)
            if ss==False:
                os.makedirs(Path2)
                
            Pathmodel=Results_Path+'/'+'savemodel_epoch='+str(epoch_i)

            ss=os.path.isdir(Pathmodel)
            if ss==False:
                os.makedirs(Pathmodel)

            ckp = model.module.state_dict()
            torch.save(ckp, Pathmodel+'/ "checkpoint.pt"')
    ############ save the results #####################3          
    Path_2=pt_save_directory+'/'+'training_stats='+str(0)+".csv"
    torch.save(training_stats,Path_2)

    #### is a good place to put the destrop process ??????????????????? ###########
    destroy_process_group()
    #############################
if __name__ == '__main__':
    import sys
    total_epochs=int(sys.argv[1])
    save_every=int(sys.argv[2])
    batch_size=int(sys.argv[3])
    world_size = (torch.cuda.device_count())-1
    print(world_size)
    mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size,join=True)

The linked example shows how the accuracy metrics are reduced from all ranks for the final value. You don’t need to synchronize the model parameters or any buffers since all ranks use the same model parameters already.

Hi @ptrblck , I feel sorry to ask lots of questions and take your time. I solved the issue of the model size. now the only issue is that I want to show the training and validation loss on the graph and I don’t know what to do with these three loss from three ranks. the imageNet has used all_reduce which is very confusing for me. would you please help me with this section. I really appreciate your consideration. it shows me three different (print(“losses valllllll”,losses))

#!/usr/bin/env python
# coding: utf-8

# In[1]:


from torch.utils.data import DataLoader
from transformers import TextDataset,DataCollatorForLanguageModeling
#from transformers import  AutoModelWithLMHead
from transformers import  AutoModelForCausalLM
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import gc
import math
import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
## the directory include the package from INVIDIA
#sys.path.append('/home//GPU_ZIP_Apex/apex-master/apex/')
#from apex import amp
#from apex.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import random
import pandas as pd
import copy
from enum import Enum

######################
weight_decay=0
learning_rate=5e-5
adam_epsilon=1e-8
warmup_steps = 1e2
lr=5e-5
Max_length=400

PathData='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/model_v4200_k_bs=16_lr=5e-05_epochs=20/'
pretrained_model='/home//GPT_2/'

#########################
class Summary(Enum):
    NONE = 0
    AVERAGE = 1
    SUM = 2
    COUNT = 3
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f', summary_type=Summary.AVERAGE):
        self.name = name
        self.fmt = fmt
        self.summary_type = summary_type
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def all_reduce(self):
        if torch.cuda.is_available():
            device = torch.device("cuda")
            print("1",device)
        elif torch.backends.mps.is_available():
            device = torch.device("mps")
            print("2",device)

        else:
            device = torch.device("cpu")
        total = torch.tensor([self.sum, self.count], dtype=torch.float32, device=device)
        dist.all_reduce(total, dist.ReduceOp.SUM, async_op=False)
        self.sum, self.count = total.tolist()
        self.avg = self.sum / self.count
        
        
    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)
    
    def summary(self):
        fmtstr = ''
        if self.summary_type is Summary.NONE:
            fmtstr = ''
        elif self.summary_type is Summary.AVERAGE:
            fmtstr = '{name} {avg:.3f}'
        elif self.summary_type is Summary.SUM:
            fmtstr = '{name} {sum:.3f}'
        elif self.summary_type is Summary.COUNT:
            fmtstr = '{name} {count:.3f}'
        else:
            raise ValueError('invalid summary type %r' % self.summary_type)
        
        return fmtstr.format(**self.__dict__)
########################################
def format_time(elapsed):
    return str(datetime.timedelta(seconds=int(round((elapsed)))))

################################################
class GPT2Dataset(Dataset):

    def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=400):

        self.tokenizer = tokenizer
        self.input_ids = []
        self.attn_masks = []

        for txt in txt_list:

            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2"

    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
#########################################################

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    
    gpu_id=rank
    
    Path='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/multipleGPU/model_v4'\
    'data_'+str(200)+'_k'+'_'+'bs='+str(batch_size)+'_lr='+str(learning_rate)+'_epochs='+str(total_epochs)
    
    print(Path)
    
    Results_Path=Path+'/Results/'
    ss=os.path.isdir(Results_Path)
    if ss==False:
        os.makedirs(Results_Path)


    ### defined variable ###############
    seed_val = 42
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

    ###############################
    
    ddp_setup(rank, world_size)
    
    ###############################
    
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

    model_or = GPT2LMHeadModel.from_pretrained(pretrained_model)

    model_or.resize_token_embeddings(len(tokenizer))

    ## loading traina and tets dataset
    print(PathData)
    trains_titles=pd.read_csv(PathData+'/'+'traindata.csv')
    valid_titles=pd.read_csv(PathData+'/'+'validdata.csv')
    
    trains_titles=trains_titles.drop(columns=['Unnamed: 0'])['0'].iloc[:200]
    valid_titles=valid_titles.drop(columns=['Unnamed: 0'])['0'].iloc[:30] 

    print(trains_titles.head(2))
    
    train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

    Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)
    
    ############################################################################
    
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(train_dataset))

    validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(Val_dataset))
    
    
    total_steps = len(train_loader) * total_epochs


    ################# define optimizer and scheduler#########################

    # Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
    optimizer = AdamW(model_or.parameters(), lr = learning_rate,eps = adam_epsilon)


    # Create the learning rate scheduler.
    # This changes the learning rate as the training loop progresses
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps = warmup_steps, 
                                                num_training_steps = total_steps)

    ############################## train_loader and validation_loader #############
 
    training_steps_per_epoch=len(train_loader)
    total_num_training_steps = int(training_steps_per_epoch*total_epochs)

  ######################## applying DDP on the model for training ################
    model = copy.deepcopy(model_or)

    model=model.to(gpu_id)
    model = DDP(model, device_ids=[gpu_id])
    print("gpu_id",gpu_id)
    # ========================================
    #               Training
    # ========================================


        
    training_stats = []
           

    for epoch_i in range(0, total_epochs):
        
        print("")
        print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, total_epochs))
        print('Training...')

        ##########################################
        train_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(train_loader)}")
        train_loader.sampler.set_epoch(epoch_i)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):

            #################################
            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )

            loss = outputs[0]  
            batch_loss = loss.item()
            total_train_loss += batch_loss
        #    print("total_train_loss",total_train_loss)
            loss.backward()
            optimizer.step()
            scheduler.step()

        # Calculate the average loss over all of the batches.
        avg_train_loss = total_train_loss / len(train_loader)  

        del total_train_loss
        del batch_loss
        
        # Measure how long this epoch took.
        training_time = format_time(time.time() - t0)
        # ========================================
        #               Validation
        # ========================================

        print("")
        print("Running Validation...")

        avg_val_loss_1=[]
        t0 = time.time()
        
        losses = AverageMeter('Loss', ':.4e', Summary.NONE)

        #################### is this section corrcet for validation  #############
        model.eval()
        model = DDP(model, device_ids=[gpu_id])
        ########################################3
        total_eval_loss = 0
        nb_eval_steps = 0
        
        ########################################
        validation_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(validation_loader))[0])
        print("bz",b_sz)
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(validation_loader)}")
        validation_loader.sampler.set_epoch(epoch_i)
        ###########################################
        
        # Evaluate data for one epoch
        for batch in validation_loader:

            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)

            with torch.no_grad():        
                outputs  = model.module(b_input_ids,attention_mask = b_masks,labels=b_labels)
                loss = outputs[0]  
            batch_loss = loss.item()
            
            losses.update(loss.item())

          #  print("here batch loss",batch_loss)
            total_eval_loss += batch_loss        

        avg_val_loss = total_eval_loss / len(validation_loader)
       # print("here total_eval_loss=",total_eval_loss)
        perplexity=math.exp(avg_val_loss)

        avg_val_loss_1.append(avg_val_loss)

        validation_time = format_time(time.time() - t0)    

        print("losses valllllll",losses)

            
        del total_eval_loss 


        print("  Validation Loss: {0:.2f}".format(avg_val_loss))
        print("  Validation took: {:}".format(validation_time))

        # Record all statistics from this epoch.
        training_stats.append(
            {
                'epoch': epoch_i + 1,
                'Training Loss': avg_train_loss,
                'Valid. Loss': avg_val_loss,
                'Training Time': training_time,
                'Validation Time': validation_time,
                'perplexity': perplexity
            }
        )
        gc.collect()
        
        ################### saving the model ########################

        if gpu_id == 0:
    
            Path2=Results_Path+'/'+'savemodel_epoch=='+str(epoch_i)
    
            ss=os.path.isdir(Path2)
            if ss==False:
                os.makedirs(Path2)

            ckp = model.module.state_dict()
            torch.save(ckp, Path2+"/checkpoint.pt")

        ############ save the results #####################
    pt_save_directory=Results_Path+'/'+'analyticsnumber'
    
    ss=os.path.isdir(pt_save_directory)
    if ss==False:
        os.makedirs(pt_save_directory)
    
    print("here",training_stats)
    Path_3=pt_save_directory+'/'+'training_stats='+str(42)+".csv"
    torch.save(training_stats,Path_3)
    
    #### is a good place to put the destrop process  ###########
    destroy_process_group()
    #############################
if __name__ == '__main__':
    import sys
    total_epochs=int(sys.argv[1])
    save_every=int(sys.argv[2])
    batch_size=int(sys.argv[3])
    world_size = (torch.cuda.device_count())-1
    print(world_size)
    mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size,join=True)

Hi @ptrblck , sorry, my multiple gpus are processing in this way. for first epoch it makes sense but for the second epoch between start training it shows validation for the last epoch and then give me cuda memory error for gpu:0. is sth wrong? the code is above .many thanks for your help.

I assume your updated code now uses the allreduce call to print a single validation loss?
I’m unsure what the current issue is since your screenshot doesn’t show any errors.

@ptrblck , many thanks for your attention and answer. my question is that for epoch 2, the training start from GPU2 then GPU1 and GPU0, and meanwhile validation from epoch 1 is finished, is this mix matters or can be happen? should always the order of used GPUs be same or can be different? and another question is that I want to show the training and validation loss in the graph, does this code use dist.reduce properly to show in the graph as training loss to find the best epoch? in the code avg_train_loss shows three losses because I use 3 gpus and the avg_train_loss_reduce shows one number is dist.reduce used properly?

    training_stats = []
   
    for epoch_i in range(0, total_epochs):

        ##########################################
        train_loader.sampler.set_epoch(epoch_i)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(train_loader)}")
        train_loader.sampler.set_epoch(epoch_i)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):
            #################################
            b_input_ids = batch[0].to(gpu_id,non_blocking=True)
            b_labels = batch[0].to(gpu_id,non_blocking=True)
            b_masks = batch[1].to(gpu_id,non_blocking=True)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )
            loss, logits = outputs[:2]
            batch_loss = loss.item()
            total_train_loss += batch_loss
            loss.backward()
            optimizer.step()
            scheduler.step()

        avg_train_loss = total_train_loss / len(train_loader)  
        print(type(avg_train_loss))
        
        dist.reduce(torch.tensor(avg_train_loss).to("cuda"), 0, op=dist.ReduceOp.SUM)
        
        ### devide by 3 because I used 3 gpus

        avg_train_loss_reduce=avg_train_loss/3
        ~~~

Hi @ptrblck , sorry, I used the discussed code for multiple GPUs. but the results showed that the model is not trained. I appreciate your idea, is anything wrong with the code? for text generation when I pass the prompt it gave m ethe same thing no more text is generated.

Hi @ptrblck , I hope you are well. I tried the code that we have discussed before for fine tunning the gpt2 with multiple GPUS. but when I load the model to generate sentences the results are very strange. it just gave me back the inputs as it is and the expected generated sentence is padding token. I would appreciate your idea, where can be wrong in the code. saving and loading the model? or training the model. I really appreciate your help. I get stuck.

######################
weight_decay=0
learning_rate=7e-5
adam_epsilon=1e-8
warmup_steps = 1e2
lr=5e-5
Max_length=400

PathData='/home//NLP_Projects/
pretrained_model = ‘/home//GPT_NEO_1.3B/’

#########################

def format_time(elapsed):
return str(datetime.timedelta(seconds=int(round((elapsed)))))

################################################
class GPT2Dataset(Dataset):

def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=400):

    self.tokenizer = tokenizer
    self.input_ids = []
    self.attn_masks = []

    for txt in txt_list:

        encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

        self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
        self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

def __len__(self):
    return len(self.input_ids)

def __getitem__(self, idx):
    return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
“”"
Args:
rank: Unique identifier of each process
world_size: Total number of processes
“”"
os.environ[“MASTER_ADDR”] = “localhost”
os.environ[“MASTER_PORT”] = “12355”
os.environ[‘CUDA_VISIBLE_DEVICES’] = “0,1,2”

init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)

#########################################################

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):

gpu_id=rank

### defined variable ###############
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

###############################

ddp_setup(rank, world_size)

###############################

tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

model_or = GPTNeoForCausalLM.from_pretrained(pretrained_model)

model_or.resize_token_embeddings(len(tokenizer))

## loading traina and tets dataset
print(PathData)
trains_titles=pd.read_csv(PathData+'/'+'traindata.csv')
valid_titles=pd.read_csv(PathData+'/'+'validdata.csv')

trains_titles=trains_titles.drop(columns=['Unnamed: 0'])['0']
valid_titles=valid_titles.drop(columns=['Unnamed: 0'])['0']

print(trains_titles.head(2))

train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)

############################################################################

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                   batch_size=batch_size,
                                                    pin_memory=True,
                                                    shuffle=False,
                                                   sampler=DistributedSampler(train_dataset))

validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                   batch_size=batch_size,
                                                    pin_memory=True,
                                                    shuffle=False,
                                                   sampler=DistributedSampler(Val_dataset))


total_steps = len(train_loader) * total_epochs


################# define optimizer and scheduler#########################

# Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
optimizer = AdamW(model_or.parameters(), lr = learning_rate,eps = adam_epsilon)


# Create the learning rate scheduler.
# This changes the learning rate as the training loop progresses
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = warmup_steps, 
                                            num_training_steps = total_steps)

############################## train_loader and validation_loader #########

training_steps_per_epoch=len(train_loader)
total_num_training_steps = int(training_steps_per_epoch*total_epochs)

model = copy.deepcopy(model_or)

model=model.to(gpu_id)
model = DDP(model, device_ids=[gpu_id])
print("gpu_id",gpu_id)
# ========================================
#               Training
# ========================================

training_stats = []
       

for epoch_i in range(0, total_epochs):
    
    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, total_epochs))
    print('Training...')

    ##########################################
    train_loader.sampler.set_epoch(epoch_i)
    b_sz = len(next(iter(train_loader))[0])
    print(f"[GPU{gpu_id}] Epoch {epoch_i} | Batchsize: {b_sz} | Steps: {len(train_loader)}")
    train_loader.sampler.set_epoch(epoch_i)
    ##########################################

    t0 = time.time()

    total_train_loss = 0

    model.train()

    for step, batch in enumerate(train_loader):
    #    print("len(train_loader)",len(train_loader))
        #################################
        b_input_ids = batch[0].to(gpu_id,non_blocking=True)
        b_labels = batch[0].to(gpu_id,non_blocking=True)
        b_masks = batch[1].to(gpu_id,non_blocking=True)
        #################################

        optimizer.zero_grad()        

        outputs = model(  b_input_ids,
                         labels=b_labels, 
                          attention_mask = b_masks,
                          token_type_ids=None
                        )

        loss = outputs[0]  
        batch_loss = loss.item()
        total_train_loss += batch_loss
        loss.backward()
        optimizer.step()
        scheduler.step()

    # Calculate the average loss over all of the batches.
    avg_train_loss = total_train_loss / len(train_loader)  
    print("avg_train_loss",avg_train_loss)
    Path_3=pt_save_directory+'/'+'trainingloss='+str(gpu_id)+str(epoch_i)+".csv"
    torch.save(avg_train_loss,Path_3)
    
    del total_train_loss
    del batch_loss
    
    # Measure how long this epoch took.
    training_time = format_time(time.time() - t0)
    print("  Training epoch took: {:}".format(training_time))

    gc.collect()
    
    ################### saving the model ########################

    if gpu_id == 0:

        Path2=Results_Path+'/'+'savemodel_epoch=='+str(epoch_i)

        ss=os.path.isdir(Path2)
        if ss==False:
            os.makedirs(Path2)

        ckp = model.module.state_dict()
        torch.save(ckp, Path2+"/checkpoint.pt")


destroy_process_group()
#############################

if name == ‘main’:
import sys
total_epochs=int(sys.argv[1])
save_every=int(sys.argv[2])
batch_size=int(sys.argv[3])
world_size = (torch.cuda.device_count())-1
print(world_size)
mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size,join=True)

I load the model in this way 

pretrained_model = ‘/home/momenisa//GPT_2/’

model = GPTNeoForCausalLM.from_pretrained(pretrained_model)

tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model) #gpt2-small

model.resize_token_embeddings(len(tokenizer))

CHECKPOINT_PATH=‘/home//checkpoint.pt’
model.load_state_dict(torch.load(CHECKPOINT_PATH,map_location=‘cpu’),strict=False)
model.eval()

and results are 
![image|690x126](upload://7anAMymn7FpNSrxv5KFTiqQGeHM.png)

hi @ptrblck , I rerun the code that always I was running this time I got this error. would you pelase help me with that ?

NCCL Error 3: internal error

You might be running into a memory corruption or an internal bug in NCCL. To debug it further you could check the logs e.g. via:

TORCH_CPP_LOG_LEVEL=INFO TORCH_DISTRIBUTED_DEBUG=INFO, NCCL_DEBUG=INFO python script.py args

and check if errors are shown there.