I am trying to implement Pagerank with libtorch. I finished the OpenMPI version with Pagerank. I try to read libtorch documents here.
However, I did not see any function like RPC in OpenMPI.
Is there possible to implement Pagerank with libtorch?
I am trying to implement Pagerank with libtorch. I finished the OpenMPI version with Pagerank. I try to read libtorch documents here.
However, I did not see any function like RPC in OpenMPI.
Is there possible to implement Pagerank with libtorch?
It should be possible. And there are several levels of APIs that you can use:
Thank you so much!
I will try.
Hi,
I try to use RPC, but it seem to difficult.
My idea is that rank0 will call my_add function, and rank1 will do it and return the value for rank0.
"""RPC with Torch"""
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.distributed.rpc as rpc
def my_add(a,b):
return a+b
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
rpc.init_rpc("worker0", rank=0, world_size=size)
ret = rpc.rpc_sync("worker1", my_add, args=(2,3))
print(ret)
else:
rpc.init_rpc("worker1", rank=1, world_size=size)
rpc.shutdown()
#print('Rank ', rank, ' has data ', tensor[0])
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
size = 2
processes = []
for rank in range(size):
p = Process(target=init_process, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
I try to run but the error is as below
Process Process-45:
Process Process-46:
Traceback (most recent call last):
Traceback (most recent call last):
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-22-0e6d083442bd>", line 29, in init_process
fn(rank, size)
File "<ipython-input-22-0e6d083442bd>", line 29, in init_process
fn(rank, size)
File "<ipython-input-22-0e6d083442bd>", line 16, in run
rpc.init_rpc("worker0", rank=0, world_size=size)
File "<ipython-input-22-0e6d083442bd>", line 20, in run
rpc.init_rpc("worker1", rank=1, world_size=size)
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/__init__.py", line 77, in init_rpc
store, _, _ = next(rendezvous_iterator)
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/__init__.py", line 88, in init_rpc
api._init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rendezvous.py", line 172, in _env_rendezvous_handler
store = TCPStore(master_addr, master_port, world_size, start_daemon, timeout)
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/api.py", line 283, in _init_rpc_backend
rpc_backend_options=rpc_backend_options,
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/backend_registry.py", line 75, in init_backend
return backend.value.init_backend_handler(*args, **kwargs)
RuntimeError: Address already in use
File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/backend_registry.py", line 101, in _process_group_init_backend_handler
"Default process group must not be initialized before init_rpc."
RuntimeError: Default process group must not be initialized before init_rpc.
Please help!
Thanks.
RuntimeError: Default process group must not be initialized before
init_rpc
.
As the error suggested, init_process_group
cannot be called before init_rpc
, as currently DDP and RPC do not work together. We are working on dropping this requirement: Avoid modifying default process group in init_rpc · Issue #33583 · pytorch/pytorch · GitHub
For the code snippet above, removing dist.init_process_group(backend, rank=rank, world_size=size)
should work.
Thank you so much!
On MPI, I can use distributed object to remote other workers do something with other worker’s distributed object.
On pytorch, I saw that it has Remote Reference (RRef). However, I did not see how to create distributed object with Pytorch. Could you please suggest in this case?
For example,
worker1 holds a dictionary with key and value DIC1.
worker0 will send an array KEY_SENDs with the keys to worker1. Worker1 will check DIC1 and return an array with the values from KEY_SENDs and DIC1.
I try with
def get_values(dic1, arr):
rest = np.array([])
for i in arr:
rest= np.append(rest,dic1[i])
return rest
def run(rank, size):
tensor = torch.zeros(1)
my_name = "worker"+str(rank)
if rank == 0:
thisdict = {4:2,2:6,3:8}
rpc.init_rpc(my_name,rank=0, world_size=size)
target = 1
target_name = "worker"+str(target)
ret = rpc.rpc_sync(target_name, get_values, args=(thisdict,[1,2]))
print(str(ret) + " is in rank 0 from rank 1")
else:
thisdict = {2:1,1:3,4:2}
rpc.init_rpc(my_name,rank=rank, world_size=size)
rpc.shutdown()
print(my_name)
I run it but no results were show on this example.
Thanks,
Hey @ph0123
I am not sure if I fully understand the use case. I created an example to show how to do intersect of a local dict and a remote dict. It shouldn’t too hard to extend this to do, e.g., a union or just sending keys. Let me know if this answers the question.
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import os
def create_dict():
return {1:'a', 2:'b', 3:'c'}
def intersect_dict(dict1, dict2_rref):
ret = {}
for key in dict2_rref.local_value():
if key in dict1:
ret[key] = dict1[key]
return ret
def run(dst):
dict1 = {1:'a', 3:'c', 4:'d'}
dict2_rref = rpc.remote(dst, create_dict)
intersect = rpc.rpc_sync(dst, intersect_dict, args=(dict1, dict2_rref))
print(intersect)
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("worker0", rank=rank, world_size=world_size)
run("worker1")
else:
rpc.init_rpc("worker1", rank=rank, world_size=world_size)
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
Hi,
That’s so good.
However, I try to run your code with my PC. the output is
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-1-c234795e743f> in <module>
37 world_size = 2
38
---> 39 mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon, start_method)
198 ' torch.multiprocessing.start_process(...)' % start_method)
199 warnings.warn(msg)
--> 200 return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in start_processes(fn, args, nprocs, join, daemon, start_method)
156
157 # Loop on join until it returns True or raises an exception.
--> 158 while not context.join():
159 pass
160
~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in join(self, timeout)
111 raise Exception(
112 "process %d terminated with exit code %d" %
--> 113 (error_index, exitcode)
114 )
115
Exception: process 1 terminated with exit code 1
I try with
"""RPC with Torch"""
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.distributed.rpc as rpc
import numpy as np
def create_dict():
return {1:'a', 2:'b', 3:'c'}
def intersect_dict(dict1, dict2_rref):
ret = {}
for key in dict2_rref.local_value():
if key in dict1:
ret[key] = dict1[key]
return ret
def run(dst):
dict1 = {1:'a', 3:'c', 4:'d'}
dict2_rref = rpc.remote(dst, create_dict)
intersect = rpc.rpc_sync(dst, intersect_dict, args=(dict1, dict2_rref))
print(intersect)
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
#dist.init_process_group(backend, rank=rank, world_size=size)
#fn(rank, size)
if rank == 1:
rpc.init_rpc("worker0", rank=rank, world_size=world_size)
fn("worker1")
else:
rpc.init_rpc("worker1", rank=rank, world_size=world_size)
rpc.shutdown()
if __name__ == "__main__":
size = 2
processes = []
for rank in range(size):
p = Process(target=init_process, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
And it worked on my PC.
Could you please explain why mp.spawn error in this case?
thanks,
Can I create the dict2 next to rpc.init_rpc as
if rank == 1:
rpc.init_rpc("worker0", rank=rank, world_size=world_size)
fn("worker1")
else:
# initial dict2 here.
rpc.init_rpc("worker1", rank=rank, world_size=world_size)
My target is that each rank will hold their dictionary, and this dictionary is always available in the program. In your code, If I repeat the run() function, The dict1 is initial again.
For example, rank 0 call run function on rank 1. Your codes is good. But I wanna run it several time, and after calling function on rank 1. The rank 1 changes something in the dictionary. Then, in the next iteration, rank0 wanna get the value from updated dictionary on rank1.
Thanks,
That’s weird, I don’t know why mp.spawn
would fail. Is it because it has to be 127.0.0.1
instead of localhost
in your env? We can try add more logs/try-except to identify which line crashed.
My target is that each rank will hold their dictionary, and this dictionary is always available in the program. In your code, If I repeat the run() function, The dict1 is initial again.
If this is the only concern, you can move that rpc.remote
call to create_dict()
out of the run
function and do it before the training loop?
For example, rank 0 call run function on rank 1. Your codes is good. But I wanna run it several time, and after calling function on rank 1. The rank 1 changes something in the dictionary. Then, in the next iteration, rank0 wanna get the value from updated dictionary on rank1.
Sure, there are many ways to solve this. For example, you can define the dict as a global value so that each process will have its own copy. Then in intersect_dict
, just read from that global value instead of passing the RRef
around.
Or, you can have a master that tells all workers to initialize their states upfront and then get RRef
s of those states as return values.
If you need to construct an RRef
from that dict, you can also use rpc.RRef(dict_obj)
to create a local RRef
and then pass that around through RPC.
Hi,
Thank you so much!
I search the documents, but I did not know how to make a dictionary as a global value with PyTorch.
I worked with MPI and UPC++. MPI provides a distributed object, which helps each worker to hold their values. UPC++ also provides distributed objects that are a similar name on all workers but different memory locations. Additionally, UPC++ creates Global Pointers to remote from other workers. I quit easy.
However, I did not know these kinds of variables in Pytorch documents.
Do you have any suggestion?
I have another question. If i run with several iterations, is there any function to waiting all workers as barrier() function.
Thanks.
Hey @ph0123
I was referring to Python global vars, sth like:
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import os
_local_dict = {}
def intersect_dict(dict1):
ret = {}
for key in _local_dict:
if key in dict1:
ret[key] = dict1[key]
return ret
def run(dst):
intersect = rpc.rpc_sync(dst, intersect_dict, args=(_local_dict,))
print(intersect)
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
_local_dict.update({1:'a', 3:'c', 4:'d'})
rpc.init_rpc("worker0", rank=rank, world_size=world_size)
run("worker1")
else:
_local_dict.update({1:'a', 2:'b', 3:'c'})
rpc.init_rpc("worker1", rank=rank, world_size=world_size)
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
Wow. It worked.
Thank you for your supports.
Hi,
I try with the loop.
After each iteration, I used dist.barrier()
, and the program is not stop because of waiting others rpc.
What is the problem with it?
Thanks,
the program is not stop because of waiting others rpc.
What does this mean by “the program is not stop”? If you are referring to the behavior that some RPC requests are still running in background, then yes, this is the expected behavior. Because RPC server has its own thread pool to handle requests, and dist.barrier
only blocks the thread that runs it. Why do you want to combine dist.barrier()
with RPC? Is this just to conclude an iteration?
HI,
I did not know this is not work with RPC.
Because of my algorithm, step (k+1) is depended on step k.
I want to make sure that all worker finished before starting new iteration.
each worker hold their own dictionary.
step k:
call rpc to other workers to check and update dictionary values, which is based on value on step (k-1).
#I put barrier here
step k+1:
call rpc to other workers to check and update dictionary values, which is based on value on step (k).
…
with RPC pytorch, Could you please sugges any function like dis.barrier()?
Thanks,
Hey,
dist.barrier()
and RPC should both work as expected. The contract dist.barrier()
offers is only blocking until all participating threads reach the same barrier. So, as RPC threads in background are not calling dist.barrier
, they are not part of that synchronization.
If you would like to synchronize all RPCs, one option is doing sth similar to _wait_all_workers
as linked below. It uses rank 0 as a coordinator to tell all others when to proceed.
In general, all we need for a synchronization is to join background threads into the current thread that calls dist.barrier
. For example, the following code won’t work, as some_func
is running on a different thread.
rpc.rpc_async(to, some_func)
dist.barrier()
However, the code below should work, as it blocks the current thread before calling the barrier
fut = rpc.rpc_async(to, some_func)
fut.wait()
dist.barrier()
So, as a summary, if you would like to use the dist.barrier
to synchronize RPCs, the application will need to collect the futures created in one iteration, and wait for all of them to complete before calling barrier
.
Hi,
I got it.
Thank you so much!