Sending a bulk data by RPC

Hi all,

I am using RPC to send and receive data with multiple CPUs.
Currently, I use “gloo” backend.
My code is:

rpc.rpc_async(my_target, add_outlinks, args=(arr_send[i],source))

I have nearly 1 million objects.
For each object, I send to (size-1) workers to run the function add_outlinks.

Totally, with 1 million objects, we have to send 1*(size-1) times.
arr_send[i] is from 10 to 1 000 000 numbers.

I run the same function on UPC++, It takes around several minutes. However, with torch-RPC, I set timeout to 10000s ~ 166 minutes, and the program is topped by timeout.

Could you tell me the best way to send data with Pytorch?

Thanks,

Is it possible to consolidate multiple arr_send[i] into one RPC? Is there any reason that they need to be sent using dedicated RPCs?

Curious:

  1. After 10000s, how many objects are processed?
  2. Is distributed autograd/optimizer also used in this case?
  3. Are any of those CPUs locate on the same machine? (so that shm can be helpful)

I use as

def add_outlinks(arr, source):
    for dest in arr:
        if int(dest) in _local_dict:
            _local_dict[dest].in_links.append(int(source))
    rpc.init_rpc(my_name, rank=rank, world_size=size,rpc_backend_options=rpc.ProcessGroupRpcBackendOptions(num_send_recv_threads=16,rpc_timeout=datetime.timedelta(seconds=10000)))  # initial_rpc

    #CALL rpc TO OTHER RANKS
    if rank==0:
        print("add-link...")
    try:
        array_rpc = list(range(0, size))
        count=0
        for it in _local_dict:
            count = count+1
            arr_send = []
            for i in range(0, size):
                arr_send.append([])
            u = _local_dict[it]
            source = u.vertexId

            for i in u.links:
                arr_send[int(i) % size].append(int(i))
            for i in array_rpc:
                my_target = "worker" + str(i)
                if len(arr_send[i])>0:
                    rpc.rpc_async(my_target, add_outlinks, args=(arr_send[i],source))
    except:
	   print("rank ",rank," run ",count,"/",len(_local_dict))
    rpc.api._wait_all_workers()
    print("shutdown.... rpc... ", rank)
    rpc.api._wait_all_workers()
    rpc.shutdown()

  • arr_send[i] will send to rank i
    For elements in _local_dict, we can run parallel.

1. After 10000s, how many objects are processed?

–> the outputs are as below. The worker0 is not see in the output. I try to print “count” value. But there is no output for “count” variable.

....
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:45462
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:44970
....

[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:19635
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:27553
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:44970
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:47501: Connection reset by peer
Traceback (most recent call last):
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "pagerank.py", line 380, in init_process
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:44931: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
...
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:378] writev [2001:700:4a01:10::38]:22942: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 240, in shutdown
    _wait_all_workers()
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:5848: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:50095: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:29331
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:57022: Connection reset by peer
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 165, in _wait_all_workers
    args=(sequence_id, self_worker_name,),
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:15236
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:2720
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:23214
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:38547
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:50607
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
...
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:12173
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 554, in rpc_sync
    return fut.wait()
RuntimeError: Encountered exception in ProcessGroupAgent::enqueueSend: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:57022: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:3715
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:2693
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure

During handling of the above exception, another exception occurred:

[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:9877
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:3715
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
Traceback (most recent call last):
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:378] writev [2001:700:4a01:10::38]:22942: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:36780: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:47501: Connection reset by peer
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:44931: Connection reset by peer

[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:3213: Connection reset by peer
Traceback (most recent call last):
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:13723
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "pagerank.py", line 380, in init_process
    print("shutdown.... rpc... ", rank)
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:9140
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 240, in shutdown
    _wait_all_workers()
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 165, in _wait_all_workers
    args=(sequence_id, self_worker_name,),
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper
    return func(*args, **kwargs)
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/site-packages/torch/distributed/rpc/api.py", line 554, in rpc_sync
    return fut.wait()

RuntimeError: Encountered exception in ProcessGroupAgent::enqueueSend: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:566] Read error [2001:700:4a01:10::38]:9889: Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
  File "/cluster/home/cnphuong/.conda/envs/Pytorch_ENV/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
KeyboardInterrupt
[E thread_pool.cpp:112] Exception in thread pool task: Application timeout caused pair closure
[E thread_pool.cpp:112] Exception in thread pool task: [/opt/conda/conda-bld/pytorch_1587428228634/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [2001:700:4a01:10::38]:34716



I changed the code to print count as

for it in _local_dict:
     if(rank==0:)
            count = count+1
           print(count)

–> all elements in _local_dict is run. However, the program stopped by timeout.

2. Is distributed autograd/optimizer also used in this case?

–> Not yet. It is at https://pytorch.org/tutorials/beginner/blitz/autograd_tutorial.html ??

3. Are any of those CPUs locate on the same machine? (so that shm can be helpful)
–> I use 32 CPUs on this machine. (All of my CPU on the machine.)

Thanks

I try with small data (~60 elements in _local_dict), and it worked.
Perhaps, there are problems with a lot of rpc call?

For real data, size of _local_dict is nearly 190000 elements - 32 workers. With each element, I called 32 times for rpc(…).

For each worker, we have to call 19000032 times for rpc(…). We have 32 workers. Hence, totally, there are 19000032*32 times for calling RPC.

Is there any problem with that?

Thanks,

Hi,
I delete RPC and try to run with the local

_local_dict

each worker holds 1/32 of total objects. Then, I run the program. It takes nearly 5s.
However, It can not finish with RPC (timeout=10000s).

Perhaps, the problem has belonged to the Queue size of RPC backend.

Please help!
Thanks,

Hi,
I add future variable for the rpc.rpc_async.
It worked. However, it is very slow.
It took around 30 minutes to finish this procedure.

I try to put

 for fut in futs: #add here
                fut.wait() #add here

outside of the loop. The program is not finished and stopped by time out.

Could you explain why? and could you please suggest some ways to decrease the excuse time? I run the program with UPC++, it take some minutes to finish (~ 5-6 minutes)

Thanks

Sorry about the delay.

rpc.api._wait_all_workers() is not supposed to be directly called by applications. I was only referencing it as one example to use rank 0 as a coordinator. The recommended way is doing sth like below:

futs = []
for i in array_rpc:
    futs.append(rpc.rpc_async(my_target, add_outlinks, args=(arr_send[i],source)))

...

for fut in futs:
    fut.wait()

Allow me some time, still reading the contents above.

Edits

I see you are already using future in the latest version.

Could you explain why? and could you please suggest some ways to decrease the excuse time? I run the program with UPC++, it take some minutes to finish (~ 5-6 minutes)

Given that the comp only takes 5s, it looks like pickle and comm takes the majority of the time? Curious, can you profile the time taken for one rpc_sync?

The communication pattern above looks like an allToAll. Is it guaranteed that the data sent from rank x to rank y will be of the same size. If so, you can use the allToAll API (not in v1.5, only available on master now, will come to v1.6).

Regarding ways to speed up, one easy attempt would be increasing the number of threads. The program currently sets num_send_recv_threads=16. You could try 64. This could help speeding up deserialization on the server side. But the serializer on the sender side still occurs inline, we probably should also offload that to thread-pool in future releases.

I take that back. I think one major reason is that the pickle and execution both require Python GIL, and hence they won’t run in parallel on the server side. Can you try using TorchScript functions? You can find examples here by searching for @torch.jit.script. Even if we use @torch.jit.script functions, the pickle/unpickle would still require GIL as those are Python objects, but the execution can run in parallel.

It means the program didn’t git the except branch below:

    except:
	   print("rank ",rank," run ",count,"/",len(_local_dict))

If there are exceptions occur when processing the RPC in the try block, it will be thrown locally when you call fut.wait() or rref.to_here().

Given the above code, the timeout won’t occur in the try block, as rpc_async always returns immediately with a future object.

In this case, the new TensorPipe (coming in v1.6) RPC backend might be useful, as it provides a shm channel to send RPC data. Will keep you posted on that.

Thank you for your responses.

I will wait the 1.6 version and try to work with rpc.ref.
I will compare and share with you about the results.

Thanks,

Hey @ph0123

Thanks. Regarding the TorchScript functions, double checked with the team, it’s only the serialization on the caller side is holding GIL as of today. The derserialization on the callee can run in parallel. Curious to see how much TorchScript functions can help.

Hi,

I try torch.jit.script as

@torch.jit.script
def add_outlinks(arr, source):
    for dest in arr:
        if _local_dict.get(dest):
            _local_dict[dest].in_links.append(int(source))

On my program, I defined that _loca_dict is a global variable. Therefore, there is no error here. When I add “@torch.jit.script”. The terminal shown the error as

Traceback (most recent call last):
  File "/mnt/c/python_project/Pytorch/test.py", line 222, in <module>
    @torch.jit.script
  File "/home/cnphuong/.local/lib/python3.6/site-packages/torch/jit/__init__.py", line 1290, in script
    fn = torch._C._jit_script_compile(qualified_name, ast, _rcb, get_default_args(obj))
RuntimeError: 
attribute lookup is not defined on python value of type 'dict':
  File "/mnt/c/python_project/Pytorch/test.py", line 225
def add_outlinks(arr, source):
    for dest in arr:
        if _local_dict.get(dest):
           ~~~~~~~~~~~~~~~ <--- HERE
            _local_dict[dest].in_links.append(int(source))

I have to install pytorch from source or not?
My pytorch version is 1.5 and cpu only.

Thanks,

That probably means TorchScript does not support dict.get global variable yet.

cc JIT expert @Michael_Suo

Let me try this locally.

1 Like

Just check with JIT team, TorchScript functions does not support global variables. One alternative is to do it through RRef.local_value(). Let me prepare an example for you.

1 Like

thank you so much!
Best regards,

Hi @ph0123

I tried this https://github.com/pytorch/pytorch/pull/39900

But it actually exposes another gap in TorchScript function type annotation. JIT team is investigating.

1 Like

Fixes are coming to master: