Hi @aazzolini @H-Huang , thank you so much for your responses!
Yes. ~500 processes is a “titan” project and I am supposed to write a custom communication protocol. But since RPC provides a super convenient interface and highly optimized communication, I chose not to re-invent the wheel. Despite large number of processes, the communication pairs increase linearly on the number of processes, since the majority of data transfer is done by one-to-many topology instead of all-to-all used by Gloo.
Yes. I knew the Gloo group is also used for exchanging device_map information. But since in most cases I will use the default setting for device, I hacked the code and set “reversed_device_maps” to an empty dict and “devices” to an empty list.
I am using PT 1.10.1 since my lab cluster only has CUDA version 11.1. I have installed newer version of PyTorch on it but it behaved weirdly. Here is my hacking code from PT 1.10.1 that was working before. I removed all the original comment codes for simplicity.
def _tensorpipe_init_backend_handler(store, name, rank, world_size, rpc_backend_options):
from . import TensorPipeRpcBackendOptions
from . import TensorPipeAgent
if not isinstance(store, dist.Store):
raise TypeError("`store` must be a c10d::Store. {}".format(store))
if not isinstance(
rpc_backend_options, TensorPipeRpcBackendOptions
):
raise TypeError(
"`rpc_backend_options` must be a `TensorPipeRpcBackendOptions`. {}".format(
rpc_backend_options
)
)
# hacking, remove the dependency of Gloo group
'''
group = _init_process_group(store, rank, world_size)
'''
if torch.cuda.is_available():
torch.cuda.init()
device_count = torch.cuda.device_count()
else:
device_count = 0
# hacking, we use the default setting and there is no need to exchange device_map information
'''
reverse_device_maps, devices = _tensorpipe_exchange_and_check_all_device_maps(
name,
device_count,
rpc_backend_options.device_maps,
rpc_backend_options.devices,
group,
)
'''
# hacking, in PT 1.10.1 group is required for the argument of TensorPipeAgent
agent = TensorPipeAgent(
store,
name,
rank,
world_size,
group,
rpc_backend_options,
# hacking the following two arguments
'''
reverse_device_maps,
devices,
'''
{},
[],
)
api._init_rpc_states(agent)
# hacking, we don't need the following two lines since we will never call rpc.shutdown()
# hacking, replace each occurrence of rpc.shutdown() with something like time.sleep(1 << 31)
'''
api._all_gather(None, timeout=rpc_constants.DEFAULT_RPC_TIMEOUT_SEC)
group.barrier().wait()
'''
return agent
Also, it would be beneficial to assign various number of threads to different RPC processes. The master process, which serves as the center node for one-to-all communication, requires more threads than worker processes. The current setting, which sets the fixed maximum threads to all processes, would easily result in overwhelming total number of threads for a single node.