I created DDP in two nodes (8 GPUs/node, DDP world_size = 16) and then for each DDP worker (GPU), I created a new process group for it (using API dist.new_group()) to do all_to_all() with other DDP workers which has the same local rank in other nodes. So the main API calls are as follows:
# initialize the process group
dist.init_process_group(
init_method="tcp://" + str(self.master_addr) + ":" + str(self.master_port),
backend=Backend.NCCL,
rank=self.global_rank,
world_size=self.world_size,
)
model = DDP(model)
def create_inter_node_all_to_all_process_group(self):
self.inter_node_all_to_all_ranks = []
n_process_per_node = int(self.world_size / self.num_nodes)
for node_index in range(self.num_nodes):
rank = self.local_rank + node_index * n_process_per_node
self.inter_node_all_to_all_ranks.append(rank)
logging.info(
"local_rank = {}, global_rank = {}, ranks = {}".format(
self.local_rank, self.global_rank, self.inter_node_all_to_all_ranks
)
)
self.inter_node_all_to_all_process_group = dist.new_group(
ranks=self.inter_node_all_to_all_ranks, backend=Backend.NCCL, timeout=timedelta(days=365)
)
self.process_group = inter_node_all_to_all_process_group
expert_inputs = _AllToAll.apply(self.process_group, torch.cat(route_inputs))
class _AllToAll(torch.autograd.Function):
@staticmethod
def forward(ctx: Any, group: dist.ProcessGroup, input: Tensor) -> Tensor: # type: ignore
ctx.group = group
input = input.contiguous()
output = torch.empty_like(input)
dist.all_to_all_single(output, input, group=group)
return output
@staticmethod
def backward(ctx: Any, *grad_output: Tensor) -> Tuple[None, Tensor]:
return (None, _AllToAll.apply(ctx.group, *grad_output))
When I did forward propagation for the model, the program crashed. The log is as follows:
29642 2021-08-20,04:26:30.151 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 1, global_rank = 9, ranks = [1, 9]
29642 2021-08-20,04:26:30.151 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 9
29641 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 8: Completed store-based barrier for 16 nodes.
29641 2021-08-20,04:26:30.158 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 0, global_rank = 8, ranks = [0, 8]
29648 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 15: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 11: Completed store-based barrier for 16 nodes.
29648 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 7, global_rank = 15, ranks = [7, 15]
29641 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 8
29644 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 3, global_rank = 11, ranks = [3, 11]
29647 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 14: Completed store-based barrier for 16 nodes.
29647 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 6, global_rank = 14, ranks = [6, 14]
29648 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 15
29644 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 11
29643 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 10: Completed store-based barrier for 16 nodes.
29643 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 2, global_rank = 10, ranks = [2, 10]
29645 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 12: Completed store-based barrier for 16 nodes.
29645 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 4, global_rank = 12, ranks = [4, 12]
29647 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 14
29643 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 10
29645 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 12
29646 2021-08-20,04:26:30.160 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 13: Completed store-based barrier for 16 nodes.
29646 2021-08-20,04:26:30.160 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 5, global_rank = 13, ranks = [5, 13]
29646 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 13
29642 2021-08-20,04:26:30.162 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 9: Completed store-based barrier for 16 nodes.
29642 2021-08-20,04:26:30.162 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 1, global_rank = 9, ranks = [1, 9]
29642 2021-08-20,04:26:30.163 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:4 to store for rank: 9
29641 2021-08-20,04:26:30.169 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 8: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 11: Completed store-based barrier for 16 nodes.
29648 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 15: Completed store-based barrier for 16 nodes.
29647 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 14: Completed store-based barrier for 16 nodes.
29641 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 0, global_rank = 8, ranks = [0, 8]
29643 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 10: Completed store-based barrier for 16 nodes.
29645 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 12: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 3, global_rank = 11, ranks = [3, 11]
29648 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 7, global_rank = 15, ranks = [7, 15]
29647 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 6, global_rank = 14, ranks = [6, 14]
ip-10-0-95-41:29643:29762 [2] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29647:29765 [6] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29644:29763 [3] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29648:29764 [7] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29643:29762 [2] include/socket.h:403 NCCL WARN Connect to 10.0.87.143<45579> failed : Connection refused
ip-10-0-95-41:29643:29762 [2] NCCL INFO bootstrap.cc:95 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO bootstrap.cc:309 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO init.cc:555 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO init.cc:840 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO group.cc:73 -> 2 [Async thread]
...
dist.all_to_all_single(output, input, group=group)
File "/usr/local/lib64/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 2386, in all_to_all_single
work = group.alltoall_base(output, input, output_split_sizes, input_split_sizes, opts)
RuntimeError: NCCL error in: ../torch/lib/c10d/ProcessGroupNCCL.cpp:911, unhandled system error, NCCL version 2.7.8
ncclSystemError: System call (socket, malloc, munmap, etc) failed.