Dear @mrshenli, thanks very much for your response. Indeed that works.
Let me post my code adapted to your nice illustration that I actually need to work with as a part of a much longer implementation:
import torch
import torch.multiprocessing as mp
import os
def run(rank, groups, world_size):
print("Rank ", rank, " Group ", groups[str(rank)])
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
torch.cuda.set_device(groups[str(rank)]['gpu'])
# global group
torch.distributed.init_process_group(backend='nccl',
rank=rank,
world_size=world_size)
my_group = torch.distributed.new_group(ranks=groups[str(rank)]['grp'])
# tensor to bcast over group
t = torch.tensor([1]).float().cuda().fill_(rank)
torch.distributed.all_reduce(t, group=my_group)
print('rank: {} - val: {}'.format(rank, t.item()))
def assign_groups(num_masters, workers_per_master, available_gpus):
groups = {}
distranks = num_masters
gpu_allocated = 1
for i in range(num_masters):
my_group = [i]
gpus = [available_gpus[0]]
for j in range(workers_per_master):
my_group.append(distranks)
gpus.append(available_gpus[gpu_allocated])
distranks += 1
gpu_allocated += 1
for r, g in zip(my_group, gpus):
groups.update({
str(r): {
'grp': my_group,
'gpu': g
}
})
return groups
def main():
num_masters = 3
workers_per_master = 1
available_gpus = [0, 1, 2, 3]
groups = assign_groups(num_masters, workers_per_master, available_gpus)
world_size = 6
mp.spawn(run, args=(groups, world_size), nprocs=world_size, join=True)
if __name__ == "__main__":
main()
Essentially, I have 3 master processes here and each of the masters has a worker. The masters with their respective workers make a group. Thus there are three groups altogether. Although in the above code the masters are assigned to cuda:0 and each of the workers to the next cuda devices, it may change depending on the setting. Thus it is immaterial in this illustration whether I comment out the line torch.cuda.set_device(groups[str(rank)]['gpu'])
.
Now, for sure for each of the masters and workers, I have its assigned group as groups[str(rank)]['grp']
. Thus, when I run this code it should behave like those distributed communications being called concurrently as also illustrated by your example. However, it results in NCCL INFO Call to connect returned Connection refused
.
Definitely I am doing something wrong here, not sure what.
Thanks again.