HI all,
I try to run 2 processes. Each process will call “rpc” to other process. The return value is got by future variable. However, the error is at “final += fut.wait()”
import os
from torch.multiprocessing import Process
import torch.distributed.rpc as rpc
def my_sum(arr):
res = 0
for i in arr:
res += i
return res
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
my_name = "worker" + str(rank)
rpc.init_rpc(my_name, rank=rank, world_size=size) # initial_rpc
array_rpc = list(range(0, size))
arr_send = []
for i in range(0, size):
temp = []
arr_send.append(temp)
arr_send[0].append(1)
arr_send[0].append(2)
arr_send[1].append(3)
arr_send[1].append(4)
futs=[]
for i in array_rpc:
my_target = "worker" + str(i)
futs.append(rpc.rpc_async(my_target, my_sum, args=(arr_send[i])))
final = 0
for fut in futs:
final += fut.wait()
print("results = :",final," in rank ", rank)
rpc.api._wait_all_workers()
rpc.shutdown()
if __name__ == "__main__":
size = 2
processes = []
for rank in range(size):
p = Process(target=init_process, args=(rank, size, my_sum))
p.start()
processes.append(p)
for p in processes:
p.join()
Is there any problem with future variable? I try to use float(fut), but there is still a problem.
error is as below
Process Process-1:
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/mnt/c/python_project/Pytorch/test_lib.py", line 44, in init_process
final += fut.wait()
File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 163, in _handle_exception
raise result.exception_type(result.msg)
TypeError: TypeError('my_sum() takes 1 positional argument but 2 were given',)
Traceback (most recent call last):
File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 153, in _run_function
result = python_udf.func(*python_udf.args, **python_udf.kwargs)
TypeError: my_sum() takes 1 positional argument but 2 were given
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/mnt/c/python_project/Pytorch/test_lib.py", line 44, in init_process
final += fut.wait()
File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 163, in _handle_exception
raise result.exception_type(result.msg)
TypeError: TypeError('my_sum() takes 1 positional argument but 2 were given',)
Traceback (most recent call last):
File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 153, in _run_function
result = python_udf.func(*python_udf.args, **python_udf.kwargs)
TypeError: my_sum() takes 1 positional argument but 2 were given