Distributed Gather issue- master gathers only the tensor from last slave

Hi,

I am implementing a basic synchronous model where each slave is sending a tensor to the master, and the master is simply summing them up and broadcasting the resulting tensor to all slaves.

Here’s the code for slave run-

def slaveRun(rank):
    global numParams
    print("runSlave: Hello from rank"+str(rank))

    mytensor = torch.zeros(1,numParams)
    torch.add(mytensor,rank, out=mytensor)

    print("slave"+str(rank)+" :initial mytensor = "+str(mytensor))

    dist.gather(mytensor,dst=0) #send the tensor to 0 (master)
    print("slave"+str(rank)+" sent tensor")

    # Wait for master to broadcast the result tensor (sum)

    # Receive the broadcast tensor from the Master
    recvd_broadcast_tensor = torch.zeros(1,numParams)
    dist.broadcast(recvd_broadcast_tensor,0)
    print("runSlave: Rank"+str(rank)+" received data "+str(recvd_broadcast_tensor))

Here’s the code for Master run-

def masterRun(size):
    print("runMaster: Hello from Master")

    mytensor = torch.zeros(1,numParams) #Input tensor: Tensor to be broadcast from current process.

    # Receive tensor from all slaves
    gather_list = [mytensor for i in range(size)]
    print("runMaster: gather list="+str(gather_list))
    dist.gather(tensor=mytensor, gather_list=gather_list)
    print("runMaster received tensors="+str(gather_list))

    # Sum up the components individually
    sumall_tensor = torch.zeros(1,numParams)
    for i in range(size):
        sumall_tensor = sumall_tensor + gather_list[i]
    print("runMaster: sumall="+str(sumall_tensor))
    

    # Broadcast this sumall list as tensor to each worker
    broadcast_tensor = sumall_tensor
    dist.broadcast(broadcast_tensor, 0)

Tensor sent by slave1: [1]
Tensor sent by slave2: [2]
Gather list of Master0: [2, 2, 2]

Any idea what I might be missing? Btw I am running this on a single machine right now. Here is the other code-

#size: number of processes
def init_processes(rank, size, fn, backend='tcp'):
    """ Initialize the distributed environment. """
    # print "init_processes: Hello from rank"+str(rank)+" size="+str(size)
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 3
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start() #async call 
        processes.append(p)

    for p in processes:
        p.join()

gather_list contains size references to the same tensor. The following might work better.

gather_list = [mytensor.clone() for i in range(size)]

Thanks this works :slight_smile: