Multiprocessing gather Connection reset by peer

I am doing the multiprocessing test on below code, reduce, all_reduce and all_gather are working good.
but when use gather, it return the error ‘RuntimeError: Connection reset by peer’, is any one meet this problem? and how to fix. pls check below simple code.

import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    a=torch.tensor([rank])
    b=a**2

    #dist.reduce(b,dst=0,group=dist.new_group([0,1,2,3]))

    #dist.all_reduce(b,group=dist.new_group([0,1,2,3]))

    # tensor_list1 = [torch.zeros_like(b) for _ in range(size)]
    # dist.all_gather(tensor=b, tensor_list=tensor_list1, group=0)

    tensor_list1 = [torch.zeros_like(b) for _ in range(size)]
    dist.gather(tensor=b, dst=0, gather_list=tensor_list1)

    print('Rank ', rank, 'b: ', tensor_list1)

def init_processes(rank, size, fn, backend='tcp'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 4
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
2 Likes

Documentation says gather_list is required only in the receiving process, so it makes you think that senders can take it as input as well and it will be discarded. However that’s not the case.
Error ends with “RuntimeError: Connection reset by peer” but if you look middle of error output, real problem is “RuntimeError: non-empty gather_list can be given only to gather destination”.

Here is working version:

def run(rank, size):
    a=torch.tensor([rank])
    b=a**2

    tensor_list1 = [torch.zeros_like(b) for _ in range(size)]

    if rank==0:
        dist.gather(tensor=b,dst=0, gather_list=tensor_list1)
    else:
        dist.gather(tensor=b, dst=0)

    print('Rank ', rank, 'b: ', tensor_list1)

def init_processes(rank, size, fn, backend='tcp'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 4
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()