Using isend / ircv works synchronously

Hello, I’m trying to create a Server-Worker communicationi with NCCL, where the Server can receive the acknowledge from the Workers as soon as they finish, in an asynchronous way. However when I try to run the following code, I noticed the workers are always sending the tensors in the same order, even when the worker 1 has a time.sleep (on purpose) and this should make it be the last one.

class Server:
    def __init__(self,rank,size):
        print("Server has started")
        tuple_list = []
        for worker in range(1,size):
            ack = torch.zeros(1)
            req = dist.irecv(tensor=to_device(ack), src=worker)
            tuple_list.append((worker, req))
            print("Server: Finished reading the tuples from server")
        

        while len(tuple_list)!=0:
            for currtuple in tuple_list:
                worker, req = currtuple
                if(req.is_completed()):
                    print(f" Server: Worker {worker} has finished")
                    tuple_list.remove(currtuple)
                else:
                    print(f" Server: Worker {worker} not ready yet ")
class Worker:
    def __init__(self,rank,size):

        if rank == 1:
            time.sleep(7)

        print(f"Worker: {rank} has started")
        mytensor = torch.tensor(1)
        dist.isend(tensor=to_device(mytensor), dst=0)
        print(f"Worker: {rank} send the tensor")
def main(rank, world):

    if rank == 0:
        Server(rank,world)
    else:
        Worker(rank,world)

This is an example of the output from the code above using 3 nodes:

Worker: 2 has started
Server has started
Worker: 1 has started
Server: Finished reading the tuples from server
Worker: 1 send the tensor
Server: Finished reading the tuples from server
Server: Worker 1 has finished
Server: Worker 2 not ready yet
Server: Worker 2 not ready yet


Server: Worker 2 not ready yet
Server: Worker 2 has finished
Worker: 2 send the tensor

I’ve already tried with different combinations of irec/isend but still cannot make them work asynchronously.

ps. the to_device function is only to sending to cuda if available:

def to_device(x):
    return x.cuda() if torch.cuda.is_available() else x
1 Like

CC @kwen2501 @Yaroslav_Bulatov