Crashed when doing DDP (default process group) + All_to_all() inter-node (new group)

I created DDP in two nodes (8 GPUs/node, DDP world_size = 16) and then for each DDP worker (GPU), I created a new process group for it (using API dist.new_group()) to do all_to_all() with other DDP workers which has the same local rank in other nodes. So the main API calls are as follows:

        # initialize the process group
   dist.init_process_group(
            init_method="tcp://" + str(self.master_addr) + ":" + str(self.master_port),
            backend=Backend.NCCL,
            rank=self.global_rank,
            world_size=self.world_size,
        )


    model = DDP(model)

    def create_inter_node_all_to_all_process_group(self):
        self.inter_node_all_to_all_ranks = []
        n_process_per_node = int(self.world_size / self.num_nodes)
        for node_index in range(self.num_nodes):
            rank = self.local_rank + node_index * n_process_per_node
            self.inter_node_all_to_all_ranks.append(rank)

        logging.info(
            "local_rank = {}, global_rank = {}, ranks = {}".format(
                self.local_rank, self.global_rank, self.inter_node_all_to_all_ranks
            )
        )
        self.inter_node_all_to_all_process_group = dist.new_group(
            ranks=self.inter_node_all_to_all_ranks, backend=Backend.NCCL, timeout=timedelta(days=365)
        )
      
    self.process_group = inter_node_all_to_all_process_group

    expert_inputs = _AllToAll.apply(self.process_group, torch.cat(route_inputs))
        


class _AllToAll(torch.autograd.Function):
    @staticmethod
    def forward(ctx: Any, group: dist.ProcessGroup, input: Tensor) -> Tensor:  # type: ignore
        ctx.group = group
        input = input.contiguous()
        output = torch.empty_like(input)
        dist.all_to_all_single(output, input, group=group)
        return output

    @staticmethod
    def backward(ctx: Any, *grad_output: Tensor) -> Tuple[None, Tensor]:
        return (None, _AllToAll.apply(ctx.group, *grad_output))

When I did forward propagation for the model, the program crashed. The log is as follows:

29642 2021-08-20,04:26:30.151 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 1, global_rank = 9, ranks = [1, 9]
29642 2021-08-20,04:26:30.151 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 9
29641 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 8: Completed store-based barrier for 16 nodes.
29641 2021-08-20,04:26:30.158 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 0, global_rank = 8, ranks = [0, 8]
29648 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 15: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.158 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 11: Completed store-based barrier for 16 nodes.
29648 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 7, global_rank = 15, ranks = [7, 15]
29641 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 8
29644 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 3, global_rank = 11, ranks = [3, 11]
29647 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 14: Completed store-based barrier for 16 nodes.
29647 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 6, global_rank = 14, ranks = [6, 14]
29648 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 15
29644 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 11
29643 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 10: Completed store-based barrier for 16 nodes.
29643 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 2, global_rank = 10, ranks = [2, 10]
29645 2021-08-20,04:26:30.159 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 12: Completed store-based barrier for 16 nodes.
29645 2021-08-20,04:26:30.159 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 4, global_rank = 12, ranks = [4, 12]
29647 2021-08-20,04:26:30.159 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 14
29643 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 10
29645 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 12
29646 2021-08-20,04:26:30.160 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 13: Completed store-based barrier for 16 nodes.
29646 2021-08-20,04:26:30.160 - {dp_manager.py (114)} - create_inter_node_all_to_all_process_group(): local_rank = 5, global_rank = 13, ranks = [5, 13]
29646 2021-08-20,04:26:30.160 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 13
29642 2021-08-20,04:26:30.162 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 9: Completed store-based barrier for 16 nodes.
29642 2021-08-20,04:26:30.162 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 1, global_rank = 9, ranks = [1, 9]
29642 2021-08-20,04:26:30.163 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:4 to store for rank: 9
29641 2021-08-20,04:26:30.169 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 8: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 11: Completed store-based barrier for 16 nodes.
29648 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 15: Completed store-based barrier for 16 nodes.
29647 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 14: Completed store-based barrier for 16 nodes.
29641 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 0, global_rank = 8, ranks = [0, 8]
29643 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 10: Completed store-based barrier for 16 nodes.
29645 2021-08-20,04:26:30.170 - {distributed_c10d.py (225)} - _store_based_barrier(): Rank 12: Completed store-based barrier for 16 nodes.
29644 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 3, global_rank = 11, ranks = [3, 11]
29648 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 7, global_rank = 15, ranks = [7, 15]
29647 2021-08-20,04:26:30.170 - {dp_manager.py (136)} - create_inter_node_all_reduce_process_group(): local_rank = 6, global_rank = 14, ranks = [6, 14]

ip-10-0-95-41:29643:29762 [2] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29647:29765 [6] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29644:29763 [3] NCCL INFO Call to connect returned Connection refused, retrying
ip-10-0-95-41:29648:29764 [7] NCCL INFO Call to connect returned Connection refused, retrying

ip-10-0-95-41:29643:29762 [2] include/socket.h:403 NCCL WARN Connect to 10.0.87.143<45579> failed : Connection refused
ip-10-0-95-41:29643:29762 [2] NCCL INFO bootstrap.cc:95 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO bootstrap.cc:309 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO init.cc:555 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO init.cc:840 -> 2
ip-10-0-95-41:29643:29762 [2] NCCL INFO group.cc:73 -> 2 [Async thread]


...


    dist.all_to_all_single(output, input, group=group)
  File "/usr/local/lib64/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 2386, in all_to_all_single
    work = group.alltoall_base(output, input, output_split_sizes, input_split_sizes, opts)
RuntimeError: NCCL error in: ../torch/lib/c10d/ProcessGroupNCCL.cpp:911, unhandled system error, NCCL version 2.7.8
ncclSystemError: System call (socket, malloc, munmap, etc) failed.

Hey Chaoyang, does Gloo backend work? It’s a known issue that NCCL backend might hang/crash when there are concurrent communications from different NCCL communicators (PyTorch creates one NCCL communicator per ProcessGroup instance) on the same device. But I am not sure if that’s relevant here. I assume the allToAll is called in model forward?

In this case, the inter-node all_to_all_single() call is NOT concurrent in the SAME device. It’s concurrent in multiple devices. There are 8 processes groups doing all_to_all_single() in parallel using following ranks:

create_inter_node_all_to_all_process_group(): local_rank = 0, global_rank = 8, ranks = [0, 8]
create_inter_node_all_to_all_process_group(): local_rank = 1, global_rank = 8, ranks = [1, 9]

create_inter_node_all_to_all_process_group(): local_rank = 7, global_rank = 8, ranks = [7, 15]

10.0.87.143: 10.0.92.168: terminate called after throwing an instance of ‘gloo::EnforceNotMet’
10.0.87.143: 10.0.92.168: what(): [enforce fail at /tmp/pytorch/third_party/gloo/gloo/transport/tcp/address.cc:88] rv != -1. -1 vs -1. getpeername: Transport endpoint is not connected
10.0.87.143: 10.0.87.143: 151 2021-08-25,08:02:49.070 - {distributed_c10d.py (194)} - _store_based_barrier(): Added key: store_based_barrier_key:3 to store for rank: 7

10.0.87.143: 10.0.92.168: main()
10.0.87.143: 10.0.92.168: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.92.168: main()
10.0.87.143: 10.0.92.168: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.92.168: dp_ep.init_ddp()
10.0.87.143: 10.0.92.168: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 68, in init_ddp
10.0.87.143: 10.0.92.168: self.create_inter_node_all_to_all_process_group()
10.0.87.143: 10.0.92.168: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 150, in create_inter_node_all_to_all_process_group
10.0.87.143: 10.0.92.168: backend=Backend.GLOO
10.0.87.143: 10.0.92.168: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 2700, in new_group
10.0.87.143: 10.0.92.168: timeout=timeout)
10.0.87.143: 10.0.92.168: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 620, in _new_process_group_helper
10.0.87.143: 10.0.92.168: timeout=timeout)
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: dp_ep.init_ddp()
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 68, in init_ddp
10.0.87.143: 10.0.92.168: RuntimeError: [/tmp/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [10.0.87.143]:57754: Connection refused
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: self.create_inter_node_all_to_all_process_group()
10.0.87.143: 10.0.95.41: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 150, in create_inter_node_all_to_all_process_group
10.0.87.143: 10.0.95.41: main()
10.0.87.143: 10.0.95.41: File “main_moe.py”, line 108, in main
10.0.87.143: 10.0.95.41: backend=Backend.GLOO
10.0.87.143: 10.0.95.41: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 2700, in new_group
10.0.87.143: 10.0.92.168: dp_ep.init_ddp()
10.0.87.143: 10.0.92.168: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 68, in init_ddp
10.0.87.143: 10.0.92.168: self.create_inter_node_all_to_all_process_group()
10.0.87.143: 10.0.92.168: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 150, in create_inter_node_all_to_all_process_group
10.0.87.143: 10.0.92.168: backend=Backend.GLOO
10.0.87.143: 10.0.92.168: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 2700, in new_group
10.0.87.143: 10.0.92.168: timeout=timeout)
10.0.87.143: 10.0.92.168: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 620, in _new_process_group_helper
10.0.87.143: 10.0.95.41: timeout=timeout)
10.0.87.143: 10.0.95.41: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 620, in _new_process_group_helper
10.0.87.143: 10.0.95.41: timeout=timeout)
10.0.87.143: 10.0.95.41: RuntimeError: [/tmp/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [10.0.87.143]:31117: Connection reset by peer
10.0.87.143: 10.0.92.168: timeout=timeout)
10.0.87.143: 10.0.92.168: RuntimeError: [/tmp/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [10.0.87.143]:59302: Connection reset by peer
10.0.87.143: 10.0.95.41: dp_ep.init_ddp()
10.0.87.143: 10.0.95.41: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 68, in init_ddp
10.0.87.143: 10.0.95.41: self.create_inter_node_all_to_all_process_group()
10.0.87.143: 10.0.95.41: File “/home/deepspeed/.local/lib/python3.6/site-packages/moe_layer/dp/dp_manager.py”, line 150, in create_inter_node_all_to_all_process_group
10.0.87.143: 10.0.95.41: backend=Backend.GLOO
10.0.87.143: 10.0.95.41: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 2700, in new_group
10.0.87.143: 10.0.95.41: timeout=timeout)
10.0.87.143: 10.0.95.41: File “/usr/local/lib/python3.6/dist-packages/torch/distributed/distributed_c10d.py”, line 620, in _new_process_group_helper
10.0.87.143: 10.0.95.41: timeout=timeout)
10.0.87.143: 10.0.95.41: RuntimeError: [/tmp/pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [10.0.87.143]:31117: Connection refused