Hi All,
I am writing a barebones version of Master Slave communication. The slaves are sending a tensor to the Master (using the non-blocking isend)-
def slaveRun(rank):
global numParams
print "runSlave: Hello from rank"+str(rank)
mytensor = torch.zeros(1,numParams)
torch.add(mytensor,rank, out=mytensor)
print "slave"+str(rank)+" :initial mytensor = "+str(mytensor)
dist.isend(tensor=mytensor, dst=0) #send the tensor to 0 (master)
print "slave"+str(rank)+" sending tensor"
And the Master is waiting on those tensors.
For doing this I have created a list of tuples which hold the (senderSlaveID, tensorRecvdFromSlave, dreqObject). Then each time, we loop over the list to check which dreqObjects have been completed. If the current dreq has completed, then that means Master has received that particular tensor and prints it out. I also delete the tuple from the list (don’t need it anymore). Keep looping till there are no more tuples in the list
def masterRun(size):
print("runMaster: Hello from Master")
# (senderSlaveID, tensorRecvdFromSlave, dreq)
tuple_list = []
for slaveID in range(1,size):
mytensor = torch.zeros(1,numParams)
recvobj = dist.irecv(tensor=mytensor, src=slaveID)
tuple_list.append((slaveID, mytensor, recvobj))
# Keep doing this till all requests have been satisfied
while len(tuple_list)!=0:
currIndex = 0
# Poll to find out if the request for tensor from that slave has completed
for currtuple in tuple_list:
# Retrive the tuple
senderSlaveID, tensorRecvdFromSlave, dreq = currtuple
# print "Master:" +str(senderSlaveID)+"->"+str(tensorRecvdFromSlave)+"->"+ str(dreq)
if(dreq.is_completed()):
# Yaay request is complete
print "master: Sender "+str(senderSlaveID)+ " sent "+str(tensorRecvdFromSlave)
# Remove this tuple from the list, we don't want to iterate again :P
tuple_list.pop(currIndex)
else:
++currIndex
However this fails with a strange error-
Process Process-1:
Traceback (most recent call last):
File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "asyncSGD.py", line 80, in init_processes
fn(rank, size)
File "asyncSGD.py", line 18, in run
masterRun(size)
File "asyncSGD.py", line 61, in masterRun
if(dreq.is_completed()):
File "/anaconda2/lib/python2.7/site-packages/torch/distributed/__init__.py", line 102, in is_completed
return torch._C._dist_request_is_completed(self.request)
RuntimeError: Connection reset by peer
I have no clue what is happening. Any ideas?
Thanks in advance,
Souptik