The results is different when placing rpc_aync at a different .py file

I want to execute a function at the worker side and return the results to the master. However, I find that the results is different when placing rpc_async at a different .py file

Method 1

master.py:


import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from test import sub_fun

os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'

rpc.init_rpc("master", rank=0, world_size=2)
rref = torch.Tensor([0])
sub_fun(rref)
rpc.shutdown()

test.py


def f(rref):
   print("function is executed on master")

def sub_fun(rref):
   x = rpc.rpc_async("worker", f, args=(rref,))

worker.py:

import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef

os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'

def f(rref):
    print("function is executed on worker")
rpc.init_rpc("worker", rank=1, world_size=2)
rpc.shutdown()

I found that the output is “function is executed on master” at the worker side.

Method 2

when I put the two functions: sub_fun and f in the master.py rather than the test.py, the result is “function is executed on worker”.

Why the two ways output the different results. and how can I get the result 2 with the method 1.

My torch version is ‘1.5.0+cu92’

The outputs differ based on where you put sub_fun due to the order in which python will load the pickled function. RPC will pickle based on function name not based on function definition. If you want to keep both function implementations they should be renamed to something like f_worker and f_master

For your case, to get the results you want, you can format your files like this:

master.py

import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from test import sub_fun

os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'

if __name__ == "__main__":
   rpc.init_rpc("master", rank=0, world_size=2)
   rref = torch.Tensor([0])
   sub_fun(rref)
   rpc.shutdown()

test.py

import torch.distributed.rpc as rpc
from worker import f

def sub_fun(rref):
    x = rpc.rpc_async("worker", f, args=(rref,))

worker.py

import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef

os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'

def f(rref):
    print("function is executed on worker")

if __name__ == "__main__":
    rpc.init_rpc("worker", rank=1, world_size=2)
    rpc.shutdown()

Really thanks for your reply!
However, I found it deadlock if I imported the function “f” from the worker.py. Did your code worked fine?