I want to execute a function at the worker side and return the results to the master. However, I find that the results is different when placing rpc_async at a different .py file
Method 1
master.py:
import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from test import sub_fun
os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'
rpc.init_rpc("master", rank=0, world_size=2)
rref = torch.Tensor([0])
sub_fun(rref)
rpc.shutdown()
test.py
def f(rref):
print("function is executed on master")
def sub_fun(rref):
x = rpc.rpc_async("worker", f, args=(rref,))
worker.py:
import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'
def f(rref):
print("function is executed on worker")
rpc.init_rpc("worker", rank=1, world_size=2)
rpc.shutdown()
I found that the output is “function is executed on master” at the worker side.
Method 2
when I put the two functions: sub_fun and f in the master.py rather than the test.py, the result is “function is executed on worker”.
Why the two ways output the different results. and how can I get the result 2 with the method 1.
The outputs differ based on where you put sub_fun due to the order in which python will load the pickled function. RPC will pickle based on function name not based on function definition. If you want to keep both function implementations they should be renamed to something like f_worker and f_master
For your case, to get the results you want, you can format your files like this:
master.py
import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from test import sub_fun
os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'
if __name__ == "__main__":
rpc.init_rpc("master", rank=0, world_size=2)
rref = torch.Tensor([0])
sub_fun(rref)
rpc.shutdown()
test.py
import torch.distributed.rpc as rpc
from worker import f
def sub_fun(rref):
x = rpc.rpc_async("worker", f, args=(rref,))
worker.py
import os
import torch
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
os.environ['MASTER_ADDR'] = '10.5.26.19'
os.environ['MASTER_PORT'] = '5677'
def f(rref):
print("function is executed on worker")
if __name__ == "__main__":
rpc.init_rpc("worker", rank=1, world_size=2)
rpc.shutdown()