Distributed pagerank with pytorch

I am trying to implement Pagerank with libtorch. I finished the OpenMPI version with Pagerank. I try to read libtorch documents here.

However, I did not see any function like RPC in OpenMPI.

Is there possible to implement Pagerank with libtorch?

It should be possible. And there are several levels of APIs that you can use:

  1. send/recv APIs: https://pytorch.org/docs/stable/distributed.html#torch.distributed.send
  2. collective communication APIs: https://pytorch.org/docs/stable/distributed.html#torch.distributed.all_reduce
  3. RPC APIs:
    a. https://pytorch.org/docs/stable/rpc.html
    b. https://pytorch.org/tutorials/intermediate/rpc_tutorial.html
    c. https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html

Thank you so much!
I will try.

Hi,
I try to use RPC, but it seem to difficult.
My idea is that rank0 will call my_add function, and rank1 will do it and return the value for rank0.

"""RPC with Torch"""
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.distributed.rpc as rpc

def my_add(a,b):
    return a+b

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        rpc.init_rpc("worker0", rank=0, world_size=size)
        ret = rpc.rpc_sync("worker1", my_add, args=(2,3))
        print(ret)
    else:
        rpc.init_rpc("worker1", rank=1, world_size=size)
    rpc.shutdown()
    #print('Rank ', rank, ' has data ', tensor[0])

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

I try to run but the error is as below

Process Process-45:
Process Process-46:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-22-0e6d083442bd>", line 29, in init_process
    fn(rank, size)
  File "<ipython-input-22-0e6d083442bd>", line 29, in init_process
    fn(rank, size)
  File "<ipython-input-22-0e6d083442bd>", line 16, in run
    rpc.init_rpc("worker0", rank=0, world_size=size)
  File "<ipython-input-22-0e6d083442bd>", line 20, in run
    rpc.init_rpc("worker1", rank=1, world_size=size)
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/__init__.py", line 77, in init_rpc
    store, _, _ = next(rendezvous_iterator)
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/__init__.py", line 88, in init_rpc
    api._init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rendezvous.py", line 172, in _env_rendezvous_handler
    store = TCPStore(master_addr, master_port, world_size, start_daemon, timeout)
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/api.py", line 283, in _init_rpc_backend
    rpc_backend_options=rpc_backend_options,
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/backend_registry.py", line 75, in init_backend
    return backend.value.init_backend_handler(*args, **kwargs)
RuntimeError: Address already in use
  File "/Users/cnphuong/opt/anaconda3/lib/python3.7/site-packages/torch/distributed/rpc/backend_registry.py", line 101, in _process_group_init_backend_handler
    "Default process group must not be initialized before init_rpc."
RuntimeError: Default process group must not be initialized before init_rpc.

Please help!
Thanks.

RuntimeError: Default process group must not be initialized before init_rpc.

As the error suggested, init_process_group cannot be called before init_rpc, as currently DDP and RPC do not work together. We are working on dropping this requirement: https://github.com/pytorch/pytorch/issues/33583

For the code snippet above, removing dist.init_process_group(backend, rank=rank, world_size=size) should work.

Thank you so much!
On MPI, I can use distributed object to remote other workers do something with other worker’s distributed object.
On pytorch, I saw that it has Remote Reference (RRef). However, I did not see how to create distributed object with Pytorch. Could you please suggest in this case?

For example,
worker1 holds a dictionary with key and value DIC1.
worker0 will send an array KEY_SENDs with the keys to worker1. Worker1 will check DIC1 and return an array with the values from KEY_SENDs and DIC1.

I try with

def get_values(dic1, arr):
    rest =  np.array([])
    for i in arr:
        rest= np.append(rest,dic1[i])
    return rest
        

def run(rank, size):
    tensor = torch.zeros(1)
    my_name = "worker"+str(rank)
    if rank == 0:
        thisdict =	{4:2,2:6,3:8}
        rpc.init_rpc(my_name,rank=0, world_size=size)
        target = 1
        target_name = "worker"+str(target)
        ret = rpc.rpc_sync(target_name, get_values, args=(thisdict,[1,2]))
        print(str(ret) + " is in rank 0 from rank 1")
    else:
        thisdict =	{2:1,1:3,4:2}
        rpc.init_rpc(my_name,rank=rank, world_size=size)
    rpc.shutdown()
    print(my_name)

I run it but no results were show on this example.

Thanks,

Hey @ph0123

I am not sure if I fully understand the use case. I created an example to show how to do intersect of a local dict and a remote dict. It shouldn’t too hard to extend this to do, e.g., a union or just sending keys. Let me know if this answers the question.

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import os


def create_dict():
    return {1:'a', 2:'b', 3:'c'}


def intersect_dict(dict1, dict2_rref):
    ret = {}
    for key in dict2_rref.local_value():
        if key in dict1:
            ret[key] = dict1[key]

    return ret


def run(dst):
    dict1 = {1:'a', 3:'c', 4:'d'}
    dict2_rref = rpc.remote(dst, create_dict)
    intersect = rpc.rpc_sync(dst, intersect_dict, args=(dict1, dict2_rref))
    print(intersect)


def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("worker0", rank=rank, world_size=world_size)
        run("worker1")
    else:
        rpc.init_rpc("worker1", rank=rank, world_size=world_size)

    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

Hi,
That’s so good.
However, I try to run your code with my PC. the output is

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-1-c234795e743f> in <module>
     37     world_size = 2
     38 
---> 39     mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon, start_method)
    198                ' torch.multiprocessing.start_process(...)' % start_method)
    199         warnings.warn(msg)
--> 200     return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')

~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in start_processes(fn, args, nprocs, join, daemon, start_method)
    156 
    157     # Loop on join until it returns True or raises an exception.
--> 158     while not context.join():
    159         pass
    160 

~/opt/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in join(self, timeout)
    111                 raise Exception(
    112                     "process %d terminated with exit code %d" %
--> 113                     (error_index, exitcode)
    114                 )
    115 

Exception: process 1 terminated with exit code 1

I try with

"""RPC with Torch"""
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.distributed.rpc as rpc
import numpy as np

def create_dict():
    return {1:'a', 2:'b', 3:'c'}


def intersect_dict(dict1, dict2_rref):
    ret = {}
    for key in dict2_rref.local_value():
        if key in dict1:
            ret[key] = dict1[key]
    return ret

def run(dst):
    dict1 = {1:'a', 3:'c', 4:'d'}
    dict2_rref = rpc.remote(dst, create_dict)
    intersect = rpc.rpc_sync(dst, intersect_dict, args=(dict1, dict2_rref))
    print(intersect)
        
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    #dist.init_process_group(backend, rank=rank, world_size=size)
    #fn(rank, size)
    if rank == 1:
        rpc.init_rpc("worker0", rank=rank, world_size=world_size)
        fn("worker1")
    else:
        rpc.init_rpc("worker1", rank=rank, world_size=world_size)

    rpc.shutdown()

if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

And it worked on my PC.

Could you please explain why mp.spawn error in this case?

thanks,

Can I create the dict2 next to rpc.init_rpc as

if rank == 1:
        rpc.init_rpc("worker0", rank=rank, world_size=world_size)
        fn("worker1")
    else:
        # initial dict2 here.
        rpc.init_rpc("worker1", rank=rank, world_size=world_size)

My target is that each rank will hold their dictionary, and this dictionary is always available in the program. In your code, If I repeat the run() function, The dict1 is initial again.

For example, rank 0 call run function on rank 1. Your codes is good. But I wanna run it several time, and after calling function on rank 1. The rank 1 changes something in the dictionary. Then, in the next iteration, rank0 wanna get the value from updated dictionary on rank1.

Thanks,

That’s weird, I don’t know why mp.spawn would fail. Is it because it has to be 127.0.0.1 instead of localhost in your env? We can try add more logs/try-except to identify which line crashed.

My target is that each rank will hold their dictionary, and this dictionary is always available in the program. In your code, If I repeat the run() function, The dict1 is initial again.

If this is the only concern, you can move that rpc.remote call to create_dict() out of the run function and do it before the training loop?

For example, rank 0 call run function on rank 1. Your codes is good. But I wanna run it several time, and after calling function on rank 1. The rank 1 changes something in the dictionary. Then, in the next iteration, rank0 wanna get the value from updated dictionary on rank1.

Sure, there are many ways to solve this. For example, you can define the dict as a global value so that each process will have its own copy. Then in intersect_dict, just read from that global value instead of passing the RRef around.

Or, you can have a master that tells all workers to initialize their states upfront and then get RRefs of those states as return values.

If you need to construct an RRef from that dict, you can also use rpc.RRef(dict_obj) to create a local RRef and then pass that around through RPC.

1 Like

Hi,
Thank you so much!
I search the documents, but I did not know how to make a dictionary as a global value with PyTorch.
I worked with MPI and UPC++. MPI provides a distributed object, which helps each worker to hold their values. UPC++ also provides distributed objects that are a similar name on all workers but different memory locations. Additionally, UPC++ creates Global Pointers to remote from other workers. I quit easy.

However, I did not know these kinds of variables in Pytorch documents.
Do you have any suggestion?

I have another question. If i run with several iterations, is there any function to waiting all workers as barrier() function.

Thanks.

Hey @ph0123

I was referring to Python global vars, sth like:

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import os


_local_dict = {}


def intersect_dict(dict1):
    ret = {}
    for key in _local_dict:
        if key in dict1:
            ret[key] = dict1[key]

    return ret


def run(dst):
    intersect = rpc.rpc_sync(dst, intersect_dict, args=(_local_dict,))
    print(intersect)


def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        _local_dict.update({1:'a', 3:'c', 4:'d'})
        rpc.init_rpc("worker0", rank=rank, world_size=world_size)
        run("worker1")
    else:
        _local_dict.update({1:'a', 2:'b', 3:'c'})
        rpc.init_rpc("worker1", rank=rank, world_size=world_size)

    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
1 Like

Wow. It worked.
Thank you for your supports.

Hi,

I try with the loop.
After each iteration, I used dist.barrier(), and the program is not stop because of waiting others rpc.

What is the problem with it?

Thanks,

the program is not stop because of waiting others rpc.

What does this mean by “the program is not stop”? If you are referring to the behavior that some RPC requests are still running in background, then yes, this is the expected behavior. Because RPC server has its own thread pool to handle requests, and dist.barrier only blocks the thread that runs it. Why do you want to combine dist.barrier() with RPC? Is this just to conclude an iteration?

HI,
I did not know this is not work with RPC.
Because of my algorithm, step (k+1) is depended on step k.
I want to make sure that all worker finished before starting new iteration.

each worker hold their own dictionary.
step k:
call rpc to other workers to check and update dictionary values, which is based on value on step (k-1).
#I put barrier here
step k+1:
call rpc to other workers to check and update dictionary values, which is based on value on step (k).
…

with RPC pytorch, Could you please sugges any function like dis.barrier()?

Thanks,

Hey,

dist.barrier() and RPC should both work as expected. The contract dist.barrier() offers is only blocking until all participating threads reach the same barrier. So, as RPC threads in background are not calling dist.barrier, they are not part of that synchronization.

If you would like to synchronize all RPCs, one option is doing sth similar to _wait_all_workers as linked below. It uses rank 0 as a coordinator to tell all others when to proceed.

In general, all we need for a synchronization is to join background threads into the current thread that calls dist.barrier. For example, the following code won’t work, as some_func is running on a different thread.

rpc.rpc_async(to, some_func)
dist.barrier()

However, the code below should work, as it blocks the current thread before calling the barrier

fut = rpc.rpc_async(to, some_func)
fut.wait()
dist.barrier()

So, as a summary, if you would like to use the dist.barrier to synchronize RPCs, the application will need to collect the futures created in one iteration, and wait for all of them to complete before calling barrier.

Hi,
I got it.
Thank you so much!

1 Like