Using torch.distributed & torch.multiprocessing for multiple gpu environment

I want to configure the Multiple gpu environment using ‘torch.multiprocessing’ and ‘torch.distributed’. However, I received the following error message.

  RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

I think I did ‘init_process_group’. My code is as follows.

  #opt
  ...

  def run(rank, size):
     torch.manual_seed(1234)

  device = torch.device("cuda:{}".format(rank))


  netG_A2B = Generator(opt.input_nc + opt.mask_nc, opt.output_nc).to(device)
  netG_B2A = Generator(opt.output_nc + opt.mask_nc, opt.input_nc).to(device)
  netD_A = Discriminator(opt.input_nc).to(device)
  netD_B = Discriminator(opt.output_nc).to(device)

  netG_A2B.apply(weights_init_normal)
  netG_B2A.apply(weights_init_normal)
  netD_A.apply(weights_init_normal)
  netD_B.apply(weights_init_normal)


  # Lossess
  ...

  # Optimizers & LR schedulers
  optimizer_G = torch.optim.Adam(itertools.chain(netG_A2B.parameters(), netG_B2A.parameters()),
                            lr=opt.lr, betas=(0.5, 0.999))
  optimizer_D_A = torch.optim.Adam(netD_A.parameters(), lr=opt.lr, betas=(0.5, 0.999))
  optimizer_D_B = torch.optim.Adam(netD_B.parameters(), lr=opt.lr, betas=(0.5, 0.999))

  lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optimizer_G, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)
  lr_scheduler_D_A = torch.optim.lr_scheduler.LambdaLR(optimizer_D_A, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)
  lr_scheduler_D_B = torch.optim.lr_scheduler.LambdaLR(optimizer_D_B, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)

  Tensor = torch.cuda.FloatTensor if opt.cuda else torch.Tensor
  input_A = Tensor(opt.batchSize, opt.input_nc, opt.size, opt.size)
  input_B = Tensor(opt.batchSize, opt.output_nc, opt.size, opt.size)
  input_M = Tensor(opt.batchSize, opt.mask_nc, opt.size, opt.size)

  target_real = Variable(Tensor(opt.batchSize).fill_(1.0), requires_grad=False)
  target_fake = Variable(Tensor(opt.batchSize).fill_(0.0), requires_grad=False)

  fake_A_buffer = ReplayBuffer()
  fake_B_buffer = ReplayBuffer()


  plt.ioff()
  curr_iter = 0
  G_losses = []
  D_A_losses = []
  D_B_losses = []
  to_pil = transforms.ToPILImage()


  # Dataset loader
  print('Preparing data...')
  transforms_image = [#transforms.Resize((opt.size, opt.size), Image.BICUBIC),
              transforms.Resize(int(opt.size * 1.12), Image.BICUBIC),
              transforms.RandomCrop(opt.size),
              transforms.RandomHorizontalFlip(),
              transforms.ToTensor(),
              transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]

  transforms_mask = [#transforms.Resize((opt.size, opt.size), Image.BICUBIC),
              transforms.Resize(int(opt.size * 1.12), Image.BICUBIC),
              transforms.RandomCrop(opt.size),
              transforms.RandomHorizontalFlip(),
              transforms.ToTensor(),
              transforms.Normalize([0.5], [0.5])]
  size = dist.get_world_size()
  bsz = 128 / float(size)
  dataloader = DataLoader(ImageNMaskDataset(opt.dataroot, transforms_image=transforms_image,    transforms_mask=transforms_mask),
                       batch_size=opt.batchSize, shuffle=True, num_workers=opt.n_cpu)

  for epoch in range(opt.epoch, opt.n_epochs):
     for i, batch in enumerate(dataloader):
        # Set model input
        real_A = Variable(input_A.copy_(batch['A']))
        real_B = Variable(input_B.copy_(batch['B']))
        real_M = Variable(input_M.copy_(batch['M']))
        ###### Generators A2B and B2A ######
        optimizer_G.zero_grad()

        # Identity loss
        # G_A2B(B) should equal B if real B is fed
        ...
        # G_B2A(A) should equal A if real A is fed
        ...
        # GAN loss
        ...

        # Cycle loss
        ...

        # Total loss
        ...

        optimizer_G.step()
        ###################################

        ###### Discriminator A ######
        optimizer_D_A.zero_grad()

        # Real loss
        ...

        # Fake loss
        ...

        # Total loss
        ...

        optimizer_D_A.step()
        ###################################

        ###### Discriminator B ######
        optimizer_D_B.zero_grad()

        # Real loss
        ...

        # Fake loss
        ...

        # Total loss
        ...

        optimizer_D_B.step()
        ###################################

        curr_iter += 1

        if i % 1 == 0:
           log = '[iter %d], [loss_G %.5f], [loss_G_identity %.5f], [loss_G_GAN %.5f],' \
              '[loss_G_cycle %.5f], [loss_D %.5f], [epoch %d]' % \
              (curr_iter, loss_G, (loss_identity_A + loss_identity_B), (loss_GAN_A2B + loss_GAN_B2A),
              (loss_cycle_ABA + loss_cycle_BAB), (loss_D_A + loss_D_B), epoch)
           print(log)

           img_fake_A = 0.5 * (fake_A.detach().data + 1.0)
           img_fake_A = (to_pil(img_fake_A.data[0].squeeze(0).cpu()))
           img_fake_A.save('output/fake_A.png')

           img_fake_B = 0.5 * (fake_B.detach().data + 1.0)
           img_fake_B = (to_pil(img_fake_B.data[0].squeeze(0).cpu()))
           img_fake_B.save('output/fake_B.png')

        # Progress report (http://137.189.90.150:8097)
        # logger.log({'loss_G': loss_G, 'loss_G_identity': (loss_identity_A + loss_identity_B), 'loss_G_GAN': (loss_GAN_A2B + loss_GAN_B2A),
        #             'loss_G_cycle': (loss_cycle_ABA + loss_cycle_BAB), 'loss_D': (loss_D_A + loss_D_B)},
        #             images={'real_A': real_A, 'real_B': real_B, 'fake_A': fake_A, 'fake_B': fake_B})

     # Update learning rates
     lr_scheduler_G.step()
     lr_scheduler_D_A.step()
     lr_scheduler_D_B.step()

     # Save models checkpoints
     ...

     if (epoch+1) % opt.snapshot_epochs == 0:
        torch.save(netG_A2B.state_dict(), ('output/netG_A2B_%d.pth' % (epoch+1)))
        torch.save(netG_B2A.state_dict(), ('output/netG_B2A_%d.pth' % (epoch+1)))

     print('Epoch:{}'.format(epoch))


  def init_process(rank, size, fn, backend='gloo'):
      """ Initialize the distributed environment. """
      os.environ['MASTER_ADDR'] = '127.0.0.1'
      os.environ['MASTER_PORT'] = '29500'
      dist.init_process_group(backend, rank=rank, world_size=size)
      fn(rank, size)


  ###################################

  if __name__ == "__main__":
      mp.set_start_method('spawn', force=True)
      size = 4
      processes = []
      for rank in range(size):
          p = Process(target=init_process, args=(rank, size, run))
          p.start()
          processes.append(p)

      for p in processes:
          p.join()

In your main method, you can do this:

    world_size = torch.cuda.device_count()
    backend = 'gloo'
    mp.spawn(init_process, args=(world_size, backend), nprocs=world_size, join=True)

For more details, check out this example in the tutorial.

I try this, and I get this error message.

– Process 2 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib/python3.8/dist-packages/torch/multiprocessing/spawn.py”, line 59, in _wrap
fn(i, *args)
File “/root/USRGAN_step2/train_custom.py”, line 390, in init_process
fn(rank, size)
TypeError: ‘str’ object is not callable

You need to change the order of fn and backend in init_process method, now backend value is fed to fn position.

Thank you. But I got same error.

RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

That’s weird. init_process_group should have been called by your init_process in your last line. Can you add a log and confirm that your init_process_group is really called?

Traceback (most recent call last):
File “”, line 1, in
File “/usr/lib/python3.8/multiprocessing/spawn.py”, line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File “/usr/lib/python3.8/multiprocessing/spawn.py”, line 126, in _main
self = reduction.pickle.load(from_parent)
_pickle.UnpicklingError: pickle data was truncated
Traceback (most recent call last):
File “train_custom.py”, line 398, in
mp.spawn(init_process, args=(world_size, backend, run), nprocs=world_size, join=True)
File “/usr/local/lib/python3.8/dist-packages/torch/multiprocessing/spawn.py”, line 230, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=‘spawn’)
File “/usr/local/lib/python3.8/dist-packages/torch/multiprocessing/spawn.py”, line 188, in start_processes
while not context.join():
File “/usr/local/lib/python3.8/dist-packages/torch/multiprocessing/spawn.py”, line 150, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:

– Process 0 terminated with the following error:
Traceback (most recent call last):
File “/usr/local/lib/python3.8/dist-packages/torch/multiprocessing/spawn.py”, line 59, in _wrap
fn(i, *args)
File “/root/USRGAN_step2/train_custom.py”, line 390, in init_process
fn(rank, size2)
File “/root/USRGAN_step2/train_custom.py”, line 226, in run
for i, batch in enumerate(dataloader):
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/dataloader.py”, line 517, in next
data = self._next_data()
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/dataloader.py”, line 1199, in _next_data
return self._process_data(data)
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/dataloader.py”, line 1225, in _process_data
data.reraise()
File “/usr/local/lib/python3.8/dist-packages/torch/_utils.py”, line 429, in reraise
raise self.exc_type(msg)
RuntimeError: Caught RuntimeError in DataLoader worker process 0.
Original Traceback (most recent call last):
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/_utils/worker.py”, line 202, in _worker_loop
data = fetcher.fetch(index)
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/_utils/fetch.py”, line 44, in fetch
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/usr/local/lib/python3.8/dist-packages/torch/utils/data/_utils/fetch.py”, line 44, in
data = [self.dataset[idx] for idx in possibly_batched_index]
File “/root/USRGAN_step2/datasets_S3_2.py”, line 57, in getitem
size = dist.get_world_size()
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/distributed_c10d.py”, line 711, in get_world_size
return _get_group_size(group)
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/distributed_c10d.py”, line 263, in _get_group_size
default_pg = _get_default_group()
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/distributed_c10d.py”, line 347, in _get_default_group
raise RuntimeError("Default process group has not been initialized, "
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

That is all of Traceback. And I will review the code again according to your advice. Thank you so much.