The code is as follows. I have hidden some irrelevant code. When training with DDP, I printed the network parameters of different ranks in the same iteration. The update results of the model parameters on each GPU were different. Strangely, I only found this problem after upgrading from torch1.13.1 to torch2.0.1. I am not 100% sure if this problem is related to the upgrading, because the models are also different, but my training code framework remains unchanged overall.
IS there are any precautions to be taken in the upgrade of DDP in torch2.0.1 to ensure that the GPUs are updated synchronously?
Or, could another reason be that I called a module like `model.module.unet` instead of using `model.forward`? After the model was decorated by DDP, we can only use `.module` to call the submodules.
import argparse
import os
import shutil
import torch
import torch.optim as optim
import torchvision
import torch.nn.functional as F
from datasets_prep.dataset import create_dataset
def init_distributed_mode(args):
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
args.rank = int(os.environ["RANK"])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ['LOCAL_RANK'])
elif 'SLURM_PROCID' in os.environ:
args.rank = int(os.environ['SLURM_PROCID'])
args.gpu = args.rank % torch.cuda.device_count()
else:
print('Not using distributed mode')
args.distributed = False
args.gpu = 0
return False
args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl' # 通信后端,nvidia GPU推荐使用NCCL try gloo
print('| distributed init (rank {}): {}'.format(
args.rank, args.dist_url), flush=True)
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
# local_rank = torch.distributed.get_rank()
# print('args.rank,local_rank,args.gpu ,args.world_size',args.rank,local_rank,args.gpu,args.world_size)
return True
def train(args):
# %% init the distributed training
is_distributed = init_distributed_mode(args=args)
if is_distributed:
rank = args.rank
device = torch.device(args.device)
else:
device = torch.device('cuda:0')
args.rank=rank = 0
if rank==0:
print('torchversion',torch.__version__)
torch.manual_seed(args.seed + rank)
torch.cuda.manual_seed(args.seed + rank)
torch.cuda.manual_seed_all(args.seed + rank)
if rank==0:
print(f"Running DDP on rank {rank} device {device}")
dataset = create_dataset(args)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=args.world_size, rank=rank)
data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=args.num_workers, pin_memory=True,
unet_small = model(args)
unet_small.to(device)
if is_distributed:
unet_small = DDP(unet_small, device_ids=[device], find_unused_parameters=False)
train_param_list= list(unet_small.module.unet.parameters())+list(unet_small.module.vae.parameters())
else:
train_param_list= list(unet_small.unet.parameters())+list(unet_small.vae.parameters())
optimizer = optim.Adam(filter(lambda p: p.requires_grad, train_param_list),
lr=args.lr_g,
betas=(args.beta1, args.beta2))
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, args.num_epoch, eta_min=args.lr_g/100)
optimizer.zero_grad()
for epoch in range(init_epoch, args.num_epoch + 1):
if is_distributed: dist.barrier()
train_sampler.set_epoch(epoch)
iters_per_epoch = len(data_loader)
if is_distributed: dist.barrier()
for iteration, batch in enumerate(data_loader):
HQ, LQ, text = Degrator.GetLR(batch)
hr_img, lr_img =(HQ-0.5)*2 , (LQ-0.5)*2
if is_distributed:
latent_dnoised =unet_small.module.sample_denoised_latent(lr_img,text)
else:
latent_dnoised =unet_small.sample_denoised_latent(lr_img,text)
if is_distributed:
fake_data = unet_small.module.decode(latent_dnoised)
else:
fake_data = unet_small.decode(latent_dnoised)
real_data = HQ
err_end = torch.nn.functional.mse_loss(fake_data,real_data)
err_lp = lp_loss(fake_data, real_data)*args.lpips_w
loss_total = err_end + err_lp
loss_total.backward()
torch.nn.utils.clip_grad_norm_(unet_small.parameters(), max_norm=20, norm_type=2)
optimizer.step()
optimizer.zero_grad()
if 0 == rank:
# import pdb;pdb.set_trace()
print(iteration,unet_small.module.unet.conv_out.state_dict()['lora_A.default_decoder.weight'][0,0,0])
if iteration % 4 == rank:
print(iteration,unet_small.module.unet.conv_out.state_dict()['lora_A.default_decoder.weight'][0,0,0])
# %%
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--seed', type=int, default=1024, help='seed used for initialization')
# parser.add_argument('--master_address', type=str, default='127.0.0.1',
# help='address for master')
# parser.add_argument('--master_port', type=str, default='6002',
# help='port for master')
parser.add_argument('--num_workers', type=int, default=4, help='num_workers')
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
parser.add_argument('--world_size', default=1, type=int, help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
parser.add_argument('--iters_to_accumulate', type=int, default=1,
help='iters_to_accumulate')
opt = parser.parse_args()
# print(opt)
train(opt)