Return value with rpc

HI all,
I try to run 2 processes. Each process will call “rpc” to other process. The return value is got by future variable. However, the error is at “final += fut.wait()”


import os

from torch.multiprocessing import Process
import torch.distributed.rpc as rpc



def my_sum(arr):
    res = 0
    for i in arr:
        res += i
    return res



def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'

    my_name = "worker" + str(rank)

    rpc.init_rpc(my_name, rank=rank, world_size=size)  # initial_rpc

    array_rpc = list(range(0, size))

    arr_send = []
    for i in range(0, size):
        temp = []
        arr_send.append(temp)

    arr_send[0].append(1)
    arr_send[0].append(2)
    arr_send[1].append(3)
    arr_send[1].append(4)

    futs=[]
    for i in array_rpc:
        my_target = "worker" + str(i)
        futs.append(rpc.rpc_async(my_target, my_sum, args=(arr_send[i])))

    final = 0
    for fut in futs:
        final += fut.wait()

    print("results = :",final," in rank ", rank)
    rpc.api._wait_all_workers()
    rpc.shutdown()


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, my_sum))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

Is there any problem with future variable? I try to use float(fut), but there is still a problem.

error is as below

Process Process-1:
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/c/python_project/Pytorch/test_lib.py", line 44, in init_process
    final += fut.wait()
  File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 163, in _handle_exception
    raise result.exception_type(result.msg)
TypeError: TypeError('my_sum() takes 1 positional argument but 2 were given',)
Traceback (most recent call last):
  File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 153, in _run_function
    result = python_udf.func(*python_udf.args, **python_udf.kwargs)
TypeError: my_sum() takes 1 positional argument but 2 were given

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/c/python_project/Pytorch/test_lib.py", line 44, in init_process
    final += fut.wait()
  File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 163, in _handle_exception
    raise result.exception_type(result.msg)
TypeError: TypeError('my_sum() takes 1 positional argument but 2 were given',)
Traceback (most recent call last):
  File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/distributed/rpc/internal.py", line 153, in _run_function
    result = python_udf.func(*python_udf.args, **python_udf.kwargs)
TypeError: my_sum() takes 1 positional argument but 2 were given

This error message suggests the args was incorrect in the rpc.rpc_async call. It misses a comma after arr_send[i], change it to the following should work:

        futs.append(rpc.rpc_async(my_target, my_sum, args=(arr_send[i], )))

Wow.

Could you explain why it is?
My function has only one parameter. Why we need to add “,” after the parameter?

Thanks,

Could you explain why it is?

Sure. If you don’t add a comma, Python would not recognize it as a tuple. See the following code:

Python 3.8.2 (default, Mar 26 2020, 15:53:00)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> (5) == 5
True
>>> (5, ) == 5
False
>>> print( (5) )
5
>>> print( (5,) )
(5,)
>>> type((5))
<class 'int'>
>>> type((5,))
<class 'tuple'>
1 Like