Hi, I am trying to do deep reinforcement learning and run 128 parallel environments in parallel on cpu using torch distributed. I have implemented my code and could run using processes around 40 but when I scale them up to 128 I cannot run my code. It is related to any more ports not being available to bind the processes. I believe it is not related to setting an unused MASTER_ADDR or –rdzv-endpoint. I suspect too many ports are required to run in such a configuration (more than num processes (e.g. 128)).
I am trying to do distributed cpu training using torch distributed package. In my code I initialize& create groups as follow by calling ddp_setup():
def get_dist_info():
GLOBAL_WORLD_SIZE = int(os.environ["WORLD_SIZE"])
GLOBAL_RANK = int(os.environ["RANK"])
LOCAL_WORLD_SIZE = int(os.environ["LOCAL_WORLD_SIZE"])
LOCAL_RANK = int(os.environ["LOCAL_RANK"])
return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK
def ddp_setup(logger, use_cuda):
"""
Setups torch.distributed and creates p_groups for training.
In every node, 1 process (process with local_rank == 0) is assigned to the agents_group, the remaining processes are
assigned to the env_workers_group. To get a better understanding check out the example below.
agents_group processes have an instance of RNDAgent and perform optimizations.
env_workers_group processes have an instance of the environment and perform interactions with it.
Example:
Available from torchrun:
nnodes: number of nodes = 3
nproc_per_node: number of processes per node = 4
---
************** NODE 0:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
...
************** NODE: 1:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
...
************** NODE: 2:
LOCAL_RANK 0: GPUs --> agents_group
LOCAL_RANK != 0: CPUs --> env_workers_group
**************
-node0- -node1- -node2-
0,1,2,3 4,5,6,7 8,9,10,11 || agents_group_ranks=[0,4,8], env_workers_group_rank=[remaining ranks]
* * *
"""
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
if use_cuda:
assert torch.cuda.is_available() == True, "use_cuda:True is passed but cuda is not available !"
if torch.cuda.is_available() and use_cuda:
gpu_id = "cuda:" + str(LOCAL_RANK % torch.cuda.device_count())
backend = "nccl"
else:
gpu_id = "cpu"
backend = "gloo"
if torch.cuda.is_available() and use_cuda and is_agents_group_member():
torch.cuda.set_device(gpu_id) # GPU should only be used by the agents_group processes and each process should have a unique cuda device (otherwise see the error at: https://github.com/pytorch/torchrec/issues/328)
init_process_group(backend="gloo")
agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks = create_process_groups_for_training(agents_group_backend=backend, env_workers_group_backend="gloo")
logger.log_msg_to_both_console_and_file(f'Initializing process with global_rank: {GLOBAL_RANK}, local_rank: {LOCAL_RANK}, local_world_size: {LOCAL_WORLD_SIZE}, global_world_size: {GLOBAL_WORLD_SIZE}, [{gpu_id}], group: {"agents_group" if GLOBAL_RANK in agents_group_global_ranks else "env_workers_group"}, group_backend: {backend if GLOBAL_RANK in agents_group_global_ranks else "gloo"}, default_backend: {backend}')
return GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK, gpu_id, agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks
def is_agents_group_member():
"""
returns True if the process belongs to the agents_group, False otherwise.
"""
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
return GLOBAL_RANK % LOCAL_WORLD_SIZE == 0
def create_process_groups_for_training(agents_group_backend="nccl", env_workers_group_backend="gloo"):
GLOBAL_WORLD_SIZE, GLOBAL_RANK, LOCAL_WORLD_SIZE, LOCAL_RANK = get_dist_info()
# Calculate ranks for the groups:
agents_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE == 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
env_workers_group_global_ranks = list(filter(lambda x: x != None, [rank if rank % LOCAL_WORLD_SIZE != 0 else None for rank in range(GLOBAL_WORLD_SIZE)]))
env_workers_group_per_node_global_ranks = [rank for rank in range((GLOBAL_RANK//LOCAL_WORLD_SIZE) * LOCAL_WORLD_SIZE, ((GLOBAL_RANK//LOCAL_WORLD_SIZE) + 1) * LOCAL_WORLD_SIZE)][1:]
# Create groups from the calculated ranks:
agents_group = dist.new_group(
ranks=agents_group_global_ranks,
backend=agents_group_backend
) # group for agent processes across nodes
env_workers_group = dist.new_group(
ranks=env_workers_group_global_ranks,
backend=env_workers_group_backend
) # group for env_worker processes across nodes
env_workers_group_per_node = dist.new_group(
ranks=env_workers_group_per_node_global_ranks,
backend=env_workers_group_backend
) # group for env_worker processes per node
assert len(env_workers_group_per_node_global_ranks) == (LOCAL_WORLD_SIZE - 1)
return agents_group, env_workers_group, agents_group_global_ranks, env_workers_group_global_ranks, env_workers_group_per_node_global_ranks
and run the process with (let’s say 3 processes for the sake of debugging easier):
torchrun --nnodes 1 --nproc_per_node 3 main.py
When I run the code and check the ports use via lsof and netstat I see some discrepancies. First, netstat shows me two python processes binded on ports (29400 and 50015) but, lsof shows more than 2 python processes which are bound on different ports and are different than what the netstat has shown. I don’t understand why there is a difference between the outputs of the two processes. Second, I do not understand why there are so many processes on lsof. I assume 3 different python processes each belongs to one of the 3 processes torchrun spawns but, why do they have different port bindings ?
Finally, I am trying to run my code on a single node with 128 processes. Whenever I try to do that I get errors relating to port already in use. I believe this is due to torch dist trying to bind on too many ports and there not being as many available ports as it requires. I want to know is there a way to fix this. Also, with torch.multiprocessing we can communicate using Pipes which do not bind on ports. I don’t understand why a similar thing is not supported with torch distributed. I believe it is not a viable option for multiprocessing with processes more than 128 (for example for running parallel environments in deep reinforcement learning). Am I right ?
Thank you very much. I really appreaciate any help.