Hi,
I am implementing a basic synchronous model where each slave is sending a tensor to the master, and the master is simply summing them up and broadcasting the resulting tensor to all slaves.
Here’s the code for slave run-
def slaveRun(rank):
global numParams
print("runSlave: Hello from rank"+str(rank))
mytensor = torch.zeros(1,numParams)
torch.add(mytensor,rank, out=mytensor)
print("slave"+str(rank)+" :initial mytensor = "+str(mytensor))
dist.gather(mytensor,dst=0) #send the tensor to 0 (master)
print("slave"+str(rank)+" sent tensor")
# Wait for master to broadcast the result tensor (sum)
# Receive the broadcast tensor from the Master
recvd_broadcast_tensor = torch.zeros(1,numParams)
dist.broadcast(recvd_broadcast_tensor,0)
print("runSlave: Rank"+str(rank)+" received data "+str(recvd_broadcast_tensor))
Here’s the code for Master run-
def masterRun(size):
print("runMaster: Hello from Master")
mytensor = torch.zeros(1,numParams) #Input tensor: Tensor to be broadcast from current process.
# Receive tensor from all slaves
gather_list = [mytensor for i in range(size)]
print("runMaster: gather list="+str(gather_list))
dist.gather(tensor=mytensor, gather_list=gather_list)
print("runMaster received tensors="+str(gather_list))
# Sum up the components individually
sumall_tensor = torch.zeros(1,numParams)
for i in range(size):
sumall_tensor = sumall_tensor + gather_list[i]
print("runMaster: sumall="+str(sumall_tensor))
# Broadcast this sumall list as tensor to each worker
broadcast_tensor = sumall_tensor
dist.broadcast(broadcast_tensor, 0)
Tensor sent by slave1: [1]
Tensor sent by slave2: [2]
Gather list of Master0: [2, 2, 2]
Any idea what I might be missing? Btw I am running this on a single machine right now. Here is the other code-
#size: number of processes
def init_processes(rank, size, fn, backend='tcp'):
""" Initialize the distributed environment. """
# print "init_processes: Hello from rank"+str(rank)+" size="+str(size)
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
size = 3
processes = []
for rank in range(size):
p = Process(target=init_processes, args=(rank, size, run))
p.start() #async call
processes.append(p)
for p in processes:
p.join()