I am writing a barebones version of Master Slave communication. The slaves are sending a tensor to the Master (using the non-blocking isend)-
def slaveRun(rank): global numParams print "runSlave: Hello from rank"+str(rank) mytensor = torch.zeros(1,numParams) torch.add(mytensor,rank, out=mytensor) print "slave"+str(rank)+" :initial mytensor = "+str(mytensor) dist.isend(tensor=mytensor, dst=0) #send the tensor to 0 (master) print "slave"+str(rank)+" sending tensor"
And the Master is waiting on those tensors.
For doing this I have created a list of tuples which hold the (senderSlaveID, tensorRecvdFromSlave, dreqObject). Then each time, we loop over the list to check which dreqObjects have been completed. If the current dreq has completed, then that means Master has received that particular tensor and prints it out. I also delete the tuple from the list (don’t need it anymore). Keep looping till there are no more tuples in the list
def masterRun(size): print("runMaster: Hello from Master") # (senderSlaveID, tensorRecvdFromSlave, dreq) tuple_list =  for slaveID in range(1,size): mytensor = torch.zeros(1,numParams) recvobj = dist.irecv(tensor=mytensor, src=slaveID) tuple_list.append((slaveID, mytensor, recvobj)) # Keep doing this till all requests have been satisfied while len(tuple_list)!=0: currIndex = 0 # Poll to find out if the request for tensor from that slave has completed for currtuple in tuple_list: # Retrive the tuple senderSlaveID, tensorRecvdFromSlave, dreq = currtuple # print "Master:" +str(senderSlaveID)+"->"+str(tensorRecvdFromSlave)+"->"+ str(dreq) if(dreq.is_completed()): # Yaay request is complete print "master: Sender "+str(senderSlaveID)+ " sent "+str(tensorRecvdFromSlave) # Remove this tuple from the list, we don't want to iterate again :P tuple_list.pop(currIndex) else: ++currIndex
However this fails with a strange error-
Process Process-1: Traceback (most recent call last): File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap self.run() File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "asyncSGD.py", line 80, in init_processes fn(rank, size) File "asyncSGD.py", line 18, in run masterRun(size) File "asyncSGD.py", line 61, in masterRun if(dreq.is_completed()): File "/anaconda2/lib/python2.7/site-packages/torch/distributed/__init__.py", line 102, in is_completed return torch._C._dist_request_is_completed(self.request) RuntimeError: Connection reset by peer
I have no clue what is happening. Any ideas?
Thanks in advance,