Dist package: Can is_completed() be used to verify if the request has completed?

Hi All,

I am writing a barebones version of Master Slave communication. The slaves are sending a tensor to the Master (using the non-blocking isend)-

def slaveRun(rank):
    global numParams
    print "runSlave: Hello from rank"+str(rank) 

    mytensor = torch.zeros(1,numParams)
    torch.add(mytensor,rank, out=mytensor)

    print "slave"+str(rank)+" :initial mytensor = "+str(mytensor)

    dist.isend(tensor=mytensor, dst=0) #send the tensor to 0 (master)
    print "slave"+str(rank)+" sending tensor"

And the Master is waiting on those tensors.
For doing this I have created a list of tuples which hold the (senderSlaveID, tensorRecvdFromSlave, dreqObject). Then each time, we loop over the list to check which dreqObjects have been completed. If the current dreq has completed, then that means Master has received that particular tensor and prints it out. I also delete the tuple from the list (don’t need it anymore). Keep looping till there are no more tuples in the list

def masterRun(size):
    print("runMaster: Hello from Master")

    # (senderSlaveID, tensorRecvdFromSlave, dreq)
    tuple_list = []
    for slaveID in range(1,size):
        mytensor = torch.zeros(1,numParams)
        recvobj = dist.irecv(tensor=mytensor, src=slaveID)
        tuple_list.append((slaveID, mytensor, recvobj))
    
    # Keep doing this till all requests have been satisfied
    while len(tuple_list)!=0:
        currIndex = 0
        # Poll to find out if the request for tensor from that slave has completed
        for currtuple in tuple_list:
            # Retrive the tuple
            senderSlaveID, tensorRecvdFromSlave, dreq = currtuple
            # print "Master:" +str(senderSlaveID)+"->"+str(tensorRecvdFromSlave)+"->"+ str(dreq)
            if(dreq.is_completed()):
                # Yaay request is complete
                print "master: Sender "+str(senderSlaveID)+ " sent "+str(tensorRecvdFromSlave)
                # Remove this tuple from the list, we don't want to iterate again :P
                tuple_list.pop(currIndex)
            else:
                ++currIndex

However this fails with a strange error-

Process Process-1:
Traceback (most recent call last):
  File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "asyncSGD.py", line 80, in init_processes
    fn(rank, size)
  File "asyncSGD.py", line 18, in run
    masterRun(size)
  File "asyncSGD.py", line 61, in masterRun
    if(dreq.is_completed()):
  File "/anaconda2/lib/python2.7/site-packages/torch/distributed/__init__.py", line 102, in is_completed
    return torch._C._dist_request_is_completed(self.request)
RuntimeError: Connection reset by peer

I have no clue what is happening. Any ideas?

Thanks in advance,
Souptik

Hi,

I ran a few experiments and saw that it is always working with 1 Master, 1 Slave configuration.
Sometimes it is working with multiple slaves also (upto 3 slaves + 1 Master configuration), but sometimes failing.
Any ideas?

Thanks,
Souptik

Hi,

I was able to resolve this by using send instead of isend in slave code.

def slaveRun(rank):
    global numParams
    print "runSlave: Hello from rank"+str(rank) 

    mytensor = torch.zeros(1,numParams)
    torch.add(mytensor,rank, out=mytensor)

    print "slave"+str(rank)+" :initial mytensor = "+str(mytensor)

    dist.send(tensor=mytensor, dst=0) #send the tensor to 0 (master)
    print "slave"+str(rank)+" sending tensor"

However this gives me some confusion on when and how isend/irecv can be used as opposed to send/recv.

Can isend from the worker be paired with a irecv from Master? didn’t work
Can isend from the worker be paired with a recv from Master? didn’t work
Can send from the worker be paired with a irecv from Master? it works
Can send from the worker be paired with a irecv from Master? it works
Would be great if someone could give me a little bit insight into the working of isend/irecv operators.

My final target is to implement an sync SGD where I ignore the slowest b workers. I want to analyze the effect of this straggler mitigation on the time/epoch, #epochs till convergence & final accuracy