Why does my pytorch rpc workers deadlock, is it because I am using main as my master?

I wanted to run the simplest example I could for computing a distributed autograd and print the gradients in my main script. For that I do the following:

  1. start the main process as the master/worker0
  2. start a helper as worker1
  3. send from worker1 to do my computation through an rpc
  4. get gradients from remote and print them

This doesn’t work for some reason. It seems my workers deadlock each other. I am unsure why. At first I was getting an error complaining that it seemed the workers were trying to use the same port (thats what I assume because it got fixed once the workers/processes used a different port).

Now that I fixed that I get that they deadlock each other and they don’t do my distributed autograd computation. I am usure why. I assume it has something to do with init_rpc since they stop right there it seems. The docs the the following for init_rpc:

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None) Initializes RPC primitives such as the local RPC agent and distributed autograd, which immediately makes the current process ready to send and receive RPCs.

or it might be due to shutdown

torch.distributed.rpc.shutdown(graceful=True) Perform a shutdown of the RPC agent, and then destroy the RPC agent. This stops the local agent from accepting outstanding requests, and shuts down the RPC framework by terminating all RPC threads. If graceful=True, this will block until all local and remote RPC processes reach this method and wait for all outstanding work to complete. Otherwise, if graceful=False, this is a local shutdown, and it does not wait for other RPC processes to reach this method.

I tried removing/adding the shutdowns from my code in different combinations but it still got deadlocked…which is why I ended up making a question instead since I am unsure why the issue might arise.

My code is super simple, I just multiple two tensors remotely and then try to get the gradients. Nothing more. It is insipired from this a simple end to end example.

I ran out of ideas of what to do. Anyone know what’s going on?

import os

import torch
from torch.distributed import rpc
import torch.multiprocessing as mp
from torch.multiprocessing import Process


def start_worker(rank, world_size):
    # initialize worker proc
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = f'2950{rank}'
    print(f'initializing worker{rank}')
    rpc.init_rpc(f'worker{rank}', rank=rank, world_size=world_size)
    print(f'finished initializing worker{rank}')
    # block until all rpcs finish, and shutdown the RPC instance
    # rpc.shutdown()

def test1_rpc():
    print('running test1_rpc')
    world_size = 2

    # initialize worker
    worker_rank = 1
    worker_proc = Process(target=start_worker, args=(worker_rank, world_size))
    worker_proc.start()

    # initialize main as master
    rank = 0  # master rank
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = f'2950{rank}'
    # Initializes RPC primitives such as the local RPC agent and distributed autograd, which immediately makes the current process ready to send and receive RPCs.
    print(f'initializing worker{rank}')
    rpc.init_rpc(f'worker{rank}', rank=rank, world_size=world_size)
    print(f'finished initializing worker{rank}')

    # # initialize worker
    # worker_rank = 1
    # worker_proc = Process(target=start_worker, args=(worker_rank, world_size))
    # worker_proc.start()
    # print('initialized and started worker proc')

    # do distributed autograd backward pass
    def loss(a, b):
        return a.mm(b).sum()

    with torch.distributed.autograd.context() as context_id:
        print('in distributed autograd')
        a = torch.ones([2, 5], requires_grad=True)
        b = torch.ones([5, 3], requires_grad=True)
        loss = rpc.rpc_sync(to='worker1', func=loss, args=(a, b))

        torch.distributed.autograd(context_id, loss)
        print(a.grad)
        print(b.grad)

    rpc.shutdown()

def test1():
    a = torch.ones([2, 5], requires_grad=True)
    b = torch.ones([5, 3], requires_grad=True)
    loss = a.mm(b).sum()
    loss.backward()
    print(a.grad)
    print(b.grad)
    print()


if __name__ == '__main__':
    test1()
    test1_rpc()

ouput:

tensor([[3., 3., 3., 3., 3.],
        [3., 3., 3., 3., 3.]])
tensor([[2., 2., 2.],
        [2., 2., 2.],
        [2., 2., 2.],
        [2., 2., 2.],
        [2., 2., 2.]])
running test1_rpc
initializing worker0
initializing worker1

crossposted: https://stackoverflow.com/questions/66323651/why-does-my-pytorch-rpc-workers-deadlock-is-it-because-i-am-using-main-as-my-ma