I am doing the multiprocessing test on below code, reduce, all_reduce and all_gather are working good.
but when use gather, it return the error ‘RuntimeError: Connection reset by peer’, is any one meet this problem? and how to fix. pls check below simple code.
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
def run(rank, size):
a=torch.tensor([rank])
b=a**2
#dist.reduce(b,dst=0,group=dist.new_group([0,1,2,3]))
#dist.all_reduce(b,group=dist.new_group([0,1,2,3]))
# tensor_list1 = [torch.zeros_like(b) for _ in range(size)]
# dist.all_gather(tensor=b, tensor_list=tensor_list1, group=0)
tensor_list1 = [torch.zeros_like(b) for _ in range(size)]
dist.gather(tensor=b, dst=0, gather_list=tensor_list1)
print('Rank ', rank, 'b: ', tensor_list1)
def init_processes(rank, size, fn, backend='tcp'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
size = 4
processes = []
for rank in range(size):
p = Process(target=init_processes, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()