Network requirement on DDP working properly?

Hey, Folks,

I have a distributed training implementation that works on cluster A. Then recently switched to another cluster B, and the training program no longer works. The symptom is that DDP stuck indefinitely before the training even starts. My gut feeling is it is somewhere when rendezvous setting up.

Inside cluster B, I double-checked the machines, they can ping/ssh to each other smoothly.

Also I am using Ray for my application. But ASFIK, ray-train is simply a wrapper around DDP, thus the heavyliftings are done by DDP. On both cluster A and B, ray cluster (those distributed components not related to pytorch training) works without any issue.

Suggestion? Any thing I can do to diagnose or check my network setup qualifies DDP’s requirement?

1 Like

A script that reproduces the issue would be helpful. But at a high level, you want to check the following things:

  • Can you call dist.init_process_group and have it return successfully? This indicates that rendezvous between the nodes can be performed and they can all access each other through some mechanism.
  • Does a simple dist.barrier work after process group init? This indicates that NCCL communicators can be successfully initialized and NCCL is configured in a functional manner.

Hi Rohan, thanks for the prompt reply. In our case, dist.init_process_group() gets stuck. It’s weird because scp to transfer files from one machine to another is working. The script is here.

"""
Note, running this script requires setting up
env variables from the command line at all workers. 
Example:
export MASTER_ADDR='10.115.115.280'
export MASTER_PORT=1225
export WORLD_SIZE=2
export RANK=0 # RANK=1 at second worker 
"""

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic():
    print(
        f"[{os.getpid()}] rank = {dist.get_rank()}, "
        + f"world_size = {dist.get_world_size()}\n", end=''
    )

    model = ToyModel()
    ddp_model = DDP(model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5)
    print("LOSS: ", loss_fn(outputs, labels).backward())
    optimizer.step()


def spmd_main():
    # These are the parameters used to initialize the process group
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")  
    dist.init_process_group(backend="gloo")

    print(
        f"[{os.getpid()}]: world_size = {dist.get_world_size()}, "
        + f"rank = {dist.get_rank()}, backend={dist.get_backend()} \n", end=''
    )

    demo_basic()

    # Tear down the process group
    dist.destroy_process_group()


if __name__ == "__main__":
    spmd_main()