Simultaneous communications in different process groups

Referring to this old issue https://github.com/pytorch/pytorch/issues/14528, which was closed, I need to do communications (all_reduce/reduce/broadcast) in two or more different groups simultaneously. For example, if a process’ rank belongs to say the group g0, it could participate in that communication only.

Essentially, referring to https://github.com/pytorch/pytorch/issues/14528, I would need to apply if conditional –

 if local_rank in g0: 
torch.distributed.all_reduce(t, group=g0) 
elif local_rank in g1: 
torch.distributed.all_reduce(t, group=g1)

– for participating in all_reduce so that simultaneous communications could happen in non-intersecting groups.

However, that results in NCCL INFO Call to connect returned Connection refused, retrying. Whereas, doing these all_reduce operations sequentially, which essentially would imply that the collective communications in different groups are sequentiallized, works fine.

Is there something that I am missing?

Hey @bapi

I tried the following, and it worked for me:

import torch
import torch.multiprocessing as mp
import os

def run(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'

    torch.cuda.set_device(rank)
    # global group
    torch.distributed.init_process_group(backend='nccl', rank=rank, world_size=world_size)
    # torch.distributed.init_process_group(backend='gloo', init_method='env://')
    g0 = torch.distributed.new_group(ranks=[0,1,2,3])
    g1 = torch.distributed.new_group(ranks=[4,5,6,7])
    # tensor to bcast over group
    t = torch.tensor([1]).float().cuda().fill_(rank)
    if rank < 4:
        torch.distributed.all_reduce(t, group=g0)
    else:
        torch.distributed.all_reduce(t, group=g1)
    print('rank: {} - val: {}'.format(rank, t.item()))


def main():
    world_size = 8
    mp.spawn(run,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    main()

outputs:

$ python test.py 
rank: 0 - val: 6.0
rank: 1 - val: 6.0
rank: 3 - val: 6.0
rank: 2 - val: 6.0
rank: 7 - val: 22.0
rank: 5 - val: 22.0
rank: 6 - val: 22.0
rank: 4 - val: 22.0

I am not sure if you acquired g0 and g1 from new_group API. If so, they are process group objects. So the above check would result in the following error:

TypeError: argument of type 'object' is not iterable

Dear @mrshenli, thanks very much for your response. Indeed that works.

Let me post my code adapted to your nice illustration that I actually need to work with as a part of a much longer implementation:

import torch
import torch.multiprocessing as mp
import os


def run(rank, groups, world_size):
    print("Rank ", rank, " Group ", groups[str(rank)])
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    torch.cuda.set_device(groups[str(rank)]['gpu'])
    # global group
    torch.distributed.init_process_group(backend='nccl',
                                         rank=rank,
                                         world_size=world_size)
    my_group = torch.distributed.new_group(ranks=groups[str(rank)]['grp'])
    # tensor to bcast over group
    t = torch.tensor([1]).float().cuda().fill_(rank)
    torch.distributed.all_reduce(t, group=my_group)
    print('rank: {} - val: {}'.format(rank, t.item()))


def assign_groups(num_masters, workers_per_master, available_gpus):
    groups = {}
    distranks = num_masters
    gpu_allocated = 1
    for i in range(num_masters):
        my_group = [i]
        gpus = [available_gpus[0]]
        for j in range(workers_per_master):
            my_group.append(distranks)
            gpus.append(available_gpus[gpu_allocated])
            distranks += 1
            gpu_allocated += 1
        for r, g in zip(my_group, gpus):
            groups.update({
                str(r): {
                    'grp': my_group,
                    'gpu': g
                }
            })
    return groups


def main():
    num_masters = 3
    workers_per_master = 1
    available_gpus = [0, 1, 2, 3]
    groups = assign_groups(num_masters, workers_per_master, available_gpus)
    world_size = 6
    mp.spawn(run, args=(groups, world_size), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

Essentially, I have 3 master processes here and each of the masters has a worker. The masters with their respective workers make a group. Thus there are three groups altogether. Although in the above code the masters are assigned to cuda:0 and each of the workers to the next cuda devices, it may change depending on the setting. Thus it is immaterial in this illustration whether I comment out the line torch.cuda.set_device(groups[str(rank)]['gpu']).

Now, for sure for each of the masters and workers, I have its assigned group as groups[str(rank)]['grp']. Thus, when I run this code it should behave like those distributed communications being called concurrently as also illustrated by your example. However, it results in NCCL INFO Call to connect returned Connection refused.

Definitely I am doing something wrong here, not sure what.
Thanks again.

I think I figured out what exactly I was missing. So, basically each of the new process groups should be defined at each of the processes before using them in any manner whatsoever.

A bit modified version of your example that most likely solves my purpose is the following:

import torch
import torch.multiprocessing as mp
import os


def run(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    # global group
    torch.distributed.init_process_group(backend='nccl',
                                         rank=rank,
                                         world_size=world_size)
    # torch.distributed.init_process_group(backend='gloo', init_method='env://')
    g0 = torch.distributed.new_group(ranks=[0, 2])
    g1 = torch.distributed.new_group(ranks=[1, 3])
    # tensor to bcast over group
    t = torch.tensor([1]).float().cuda().fill_(rank)
    if rank in [0, 2]:
        my_group = g0
    else:
        my_group = g1
    torch.distributed.all_reduce(t, group=my_group)
    print('rank: {} - val: {}'.format(rank, t.item()))


def main():
    world_size = 4
    mp.spawn(run, args=(world_size,), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

It works. :slightly_smiling_face:

Thanks again.

1 Like

Happy to see that worked :slight_smile:

Yes, the new_group requires the following:

This function requires that all processes in the main group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes.

Exactly. Thanks! :slightly_smiling_face: