Why my model has different weights in each gpu after DDP training

The code is as follows. I have hidden some irrelevant code. When training with DDP, I printed the network parameters of different ranks in the same iteration. The update results of the model parameters on each GPU were different. Strangely, I only found this problem after upgrading from torch1.13.1 to torch2.0.1. I am not 100% sure if this problem is related to the upgrading, because the models are also different, but my training code framework remains unchanged overall. 

IS there are any precautions to be taken in the upgrade of DDP in torch2.0.1 to ensure that the GPUs are updated synchronously? 

Or,  could another reason be that I called a module like `model.module.unet` instead of using `model.forward`? After the model was decorated by DDP, we can only use `.module` to call the submodules.
import argparse
import os
import shutil
import torch
import torch.optim as optim
import torchvision
import torch.nn.functional as F 
from datasets_prep.dataset import create_dataset

def init_distributed_mode(args):
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        args.rank = int(os.environ["RANK"])
        args.world_size = int(os.environ['WORLD_SIZE'])
             
        args.gpu = int(os.environ['LOCAL_RANK'])
        
    elif 'SLURM_PROCID' in os.environ:
        args.rank = int(os.environ['SLURM_PROCID'])
        args.gpu = args.rank % torch.cuda.device_count()
    else:
        print('Not using distributed mode')
        args.distributed = False
        args.gpu = 0
        return False

    args.distributed = True
    torch.cuda.set_device(args.gpu)
    args.dist_backend = 'nccl'  # 通信后端,nvidia GPU推荐使用NCCL    try  gloo
    print('| distributed init (rank {}): {}'.format(
        args.rank, args.dist_url), flush=True)
    dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                            world_size=args.world_size, rank=args.rank)
    # local_rank = torch.distributed.get_rank() 
    # print('args.rank,local_rank,args.gpu ,args.world_size',args.rank,local_rank,args.gpu,args.world_size)
    return True


 
    
 


def train(args):    
    # %%  init the distributed training 
    is_distributed = init_distributed_mode(args=args)
    if is_distributed: 
        rank = args.rank
        device = torch.device(args.device)    
    else:
        device = torch.device('cuda:0')       
        args.rank=rank = 0  
    if rank==0:
        print('torchversion',torch.__version__)

    torch.manual_seed(args.seed + rank)
    torch.cuda.manual_seed(args.seed + rank)
    torch.cuda.manual_seed_all(args.seed + rank)
    if rank==0:
        print(f"Running DDP on rank {rank} device {device}")
    
    
    dataset = create_dataset(args) 
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=args.world_size, rank=rank)
    data_loader = torch.utils.data.DataLoader(dataset,   batch_size=batch_size,   shuffle=False,    num_workers=args.num_workers,  pin_memory=True,
    unet_small = model(args)      
    unet_small.to(device)    

    if is_distributed: 
        unet_small = DDP(unet_small, device_ids=[device], find_unused_parameters=False)
        train_param_list= list(unet_small.module.unet.parameters())+list(unet_small.module.vae.parameters())
    else:
        train_param_list= list(unet_small.unet.parameters())+list(unet_small.vae.parameters())
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, train_param_list),  
                           lr=args.lr_g,  
                           betas=(args.beta1, args.beta2))
    
 
 
    lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, args.num_epoch, eta_min=args.lr_g/100)

    optimizer.zero_grad()

    for epoch in range(init_epoch, args.num_epoch + 1):
        if is_distributed: dist.barrier()
        train_sampler.set_epoch(epoch)        
        iters_per_epoch = len(data_loader)        
        if is_distributed: dist.barrier()
        for iteration, batch in enumerate(data_loader):

             
            HQ, LQ, text = Degrator.GetLR(batch)             
            hr_img, lr_img =(HQ-0.5)*2 ,  (LQ-0.5)*2 
  
            if is_distributed:    
                
                latent_dnoised =unet_small.module.sample_denoised_latent(lr_img,text)
            else:               
                latent_dnoised =unet_small.sample_denoised_latent(lr_img,text)
            
            if is_distributed: 
                fake_data = unet_small.module.decode(latent_dnoised)
            else:
                fake_data = unet_small.decode(latent_dnoised)
                
            
            real_data = HQ

            
            err_end = torch.nn.functional.mse_loss(fake_data,real_data)
            err_lp = lp_loss(fake_data, real_data)*args.lpips_w         
            loss_total = err_end + err_lp  
            
            loss_total.backward()
            torch.nn.utils.clip_grad_norm_(unet_small.parameters(), max_norm=20, norm_type=2)
          
            optimizer.step()
            optimizer.zero_grad()
            
             
            if 0 ==  rank: 
                # import pdb;pdb.set_trace()
                print(iteration,unet_small.module.unet.conv_out.state_dict()['lora_A.default_decoder.weight'][0,0,0])

            if iteration % 4 ==  rank:  
                print(iteration,unet_small.module.unet.conv_out.state_dict()['lora_A.default_decoder.weight'][0,0,0])
                
            
 
               
 
# %%
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--seed', type=int, default=1024, help='seed used for initialization') 
     
    # parser.add_argument('--master_address', type=str, default='127.0.0.1',
    #                     help='address for master')
    # parser.add_argument('--master_port', type=str, default='6002',
    #                     help='port for master')
    parser.add_argument('--num_workers', type=int, default=4, help='num_workers')     
    parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')   
    parser.add_argument('--world_size', default=1, type=int,                        help='number of distributed processes')
    parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
    parser.add_argument('--iters_to_accumulate', type=int, default=1,
                        help='iters_to_accumulate')    

    opt = parser.parse_args()
    # print(opt)
    train(opt)

     

Yes, this is a common reason since calling internal modules will skip the distributed logic. Call the model directly via output = model(input) and check the parameters again.

1 Like

Thanks! I find out this just before you reply. the problem sovled anyway :rofl: