RPC - dynamic world size

Is it possible to have a dynamic world size when using torch.distributed.rpc?
I want to have a changing number of processes communicating using the TensorPipe backend, without explicitly stating a world size, having each process dynamicaly assigned a rank.

Hey @ItamarWilf,

Unfortunately, this is not yet possible with the RPC package, but it is in our roadmap.

1 Like

Hi Shen. Is this feature available now? If not, will it be available in the near future?

Hi @sagebei, we have a prototype of this feature in the PyTorch 1.12 release which will be available on June 28 (view releases here: Releases · pytorch/pytorch · GitHub). Or feel free to pull the nightly PyTorch build.

As part of this feature, an RPC processes init_rpc remains the same; however, when the world_size argument is not specified then this is assumed to be a dynamic group which allows processes to join and leave the group. shutdown() is used to leave the group. We do not currently support dynamic rank allocation. Documentation will be updated for this as well.

Current

# blocking join for all processes
init_rpc("workerN", world_size=N, rank=N)

# blocking shutdown for all processes
shutdown(graceful=True)
# nonblocking shutdown for a single processes
shutdown(graceful=False)

New

# node join
init_rpc("worker0", rank=0)

# node leave
rpc.shutdown()

Curious, can you provide a few details on how you will be using this feature and the scenarios / architectures / models you are training? Thanks!

Hi @H-Huang @mrshenli ,
I have some questions related to dynamic world size. We are developing a distributed graph learning library for PyTorch graphlearn-for-pytorch. Our current goal is to refactor the library’s RPC functionality and make it dynamic. Sampler processes sample mini-batches from a large graph and serve as background service. Trainer processes can join dynamically and request mini-batches from the sampling processes. This will allow us to run different experiments without reloading the graph.

I have the following questions:

  1. Is dynamic world size a stable and long-term supported feature? While I can identify relevant code changes, I couldn’t find any documentation or tutorials.
  2. How are some RPC APIs expected to behave in this dynamic scenario? For example, I tested rpc.api._all_gather, and found that only one rank can receive the response.
import torch.distributed.rpc as rpc
import time
import multiprocessing as mp
import torch
import os

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '11111'

def worker(rank):
    rpc.init_rpc(
        name=f'worker_{rank}',
        rank=rank,
    )
    print(f'rank {rank} initialized')
    time.sleep(1)
    print(rpc.api._all_gather(torch.cuda.device_count()))
    if rank == 0:
        time.sleep(1)
    rpc.shutdown()
    print(f'rank {rank} exited')

if __name__ == '__main__':
    ranks = [0, 1, 2, 3]
    for rank in ranks:
        process = mp.Process(target=worker, args=(rank, ))
        process.start()

Hi @H-Huang , thank you so much for implementing such feature. I spot this on PyTorch 1.13.1 and hope this feature could help us tremendously.

Our user cases are listed below:

  1. adopting spot instance when available
  2. make training task fault-tolerant with dynamic rpc

However, there still one issue I found when trying this feature: rpc.shutdown() executes before any rpc request received from other rpc node.

Under the static group setting:

# blocking join for all processes
init_rpc("workerN", world_size=N, rank=N)

# blocking shutdown for all processes
shutdown(graceful=True)

shutdown in fact does two jobs:

  1. blockingly wait permission from other rpc nodes to leave
  2. delete local rrefs, etc.

However, in the new style:

# node join
init_rpc("worker0", rank=0)

# node leave
rpc.shutdown()

the shutdown will be executing without any blocking, which means without any permission from other rpc node.

Not sure if I’m using the dynamic group feature correctly. I checked the code in 1.13.1 about rpc.shutdown and found rpc_sync(worker.name, _update_group_membership, args=(my_worker_info, [], {}, False)) can be executed without any shake-hand with worker.

Hi, thanks for trying out the feature, answer to your questions below:

  1. Is dynamic world size a stable and long-term supported feature? While I can identify relevant code changes, I couldn’t find any documentation or tutorials.

Dynamic world size RPC is a prototype feature and current development is paused. There has been a shift in priority on enhancing Store reliability and robustness (both of which are also needed by RPC). However, the team would be open to review pull requests for RPC.

  1. How are some RPC APIs expected to behave in this dynamic scenario? For example, I tested rpc.api._all_gather, and found that only one rank can receive the response.

rpc_sync and rpc_async should world with dynamic RPC, but things that do not work with CUDA RPC (e.g. RRefs) will also not work with dynamic RPC.

For rpc.api._all_gather that is a private API that is used during RPC initialization. There may be some global state that is sets up which prevents it from being reused again on subsequent calls

We would also accept a public facing version of the API and would review the pull request.

1 Like

Thanks for your response and clarification @H-Huang . Appreciate that.

Hi @H-Huang ,
We are planning to implement our system using the dynamic RPC feature. I would greatly appreciate it if you could answer the following questions:

  1. Do rpc_sync and rpc_async work identically to the static ones with thorough tests?
  2. Would bugs related to this feature be given priority so that we could receive assistance when encountering bugs related to it?
  3. Are there any plans to resume the development of this feature in the future?

Thank you.