When will dist.all_reduce will be called?

Hi,

I checked the example on github: examples/distributed/ddp at master · pytorch/examples · GitHub
I also pasted the example as follows for discussion.

My question is:
should I manually call some API functions to make sure the distributed functionality runs correctly?
such as:
dist.broadcast(indices, 0)
dist.all_reduce(rt, op=dist.ReduceOp.SUM)

I saw some code call those upper APIs manually, but some code didn’t manually call those upper APIs.

Is there any reason why those API should be called and why they are not called?

If broadcast and all_reduce are not called manually, the gradient will be collective for all GPU automatically?

should I initialize all model with the same value in different GPU manually?


def spmd_main(local_world_size, local_rank):
    # These are the parameters used to initialize the process group
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
    dist.init_process_group(backend="nccl")
    print(
        f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
        + f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
    )

    demo_basic(local_world_size, local_rank)

    # Tear down the process group
    dist.destroy_process_group()

def demo_basic(local_world_size, local_rank):

    # setup devices for this process. For local_world_size = 2, num_gpus = 8,
    # rank 0 uses GPUs [0, 1, 2, 3] and
    # rank 1 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // local_world_size
    device_ids = list(range(local_rank * n, (local_rank + 1) * n))

    print(
        f"[{os.getpid()}] rank = {dist.get_rank()}, "
        + f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
    )

    model = ToyModel().cuda(device_ids[0])
    ddp_model = DDP(model, device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn(outputs, labels).backward()
    optimizer.step()

Hi, DDP will broadcast model’s weights to all GPUs when you construct DDP. DDP will reduce gradient when you call backward(). DDP takes care of broadcast and all_reduce so that you can treat them as if they are on a single GPU (This is only true when you do standard network training that updates model after one forward-backward pair).

@Hongkai_Zheng ,
Many thanks for your reply!

My question are:
what is standard network training? such as, will modified yolov3 be updated by DDP automatically? why couldn’t the modified network be updated automatically by DDP? how could the modified yolov3 be updated manually?

is there any official example code about how to update models automatically or manually?

By standard training I mean forward → backward → optimizer.step() as shown in the following pseudo code. see concrete example here DDP tutorial.

I’m not sure how modified yolov3 works. But DDP doesn’t update your model automatically. Your optimizer takes care of update step. What DDP does is just to reduce gradient (synchronize over all devices) so that each replica of model see the same gradient. More specifically, backward() call triggers gradient reduction. See more details here DDP paper

# forward pass
out = model(x)
loss = loss_fn(out, y)
# backward pass
loss.backward()
# Update model
optimizer.step()

Just to help explain the meaning of standard training. For instance, if you do one forward() and multiple backward, then it’s not considered as standard training. It has to be forward pass followed by one single backward().

Hi @Hongkai_Zheng
Thank you!

reducing gradient will be done by DDP automatically. if I print gradient on each GPU, the printed gradient should be identical. I saw some code call all_reduce function to sync loss. My question is:
if all_reduce is called manually and DDP also sync gradient, what will happen?

according to DDP tutorial, weights in each GPU should be identical, It means that DDP will sync initial weights(how to sync initial weights?) in each GPU to make sure all model in all GPU are identical, right?

if gradient is broadcast to each GPU, it means that all GPU run optimizer separately. For example, if we trained a model on 1 GPU with batch_size=32, lr = 0.001, should batch_size=32 and lr=0.001 when we train the model on 8 GPU?

Hi,

  1. It will slow down your code. You just sync the gradient twice but everything should remain the same.
  2. initial weights are synchronized when you construct DDP class. (DDP does it you don’t have to do it manually)
  3. Right, each process has its own optimizer. But since the optimizer sees the same gradient, the update of the weights is also the same.
  4. I’m not sure what your question is in this example. But the second case (8 GPUs) in your example is equivalent to 1 GPU with batch_size= 32 * 8=256. So you may want to increase the learning rate since the batchsize is increased.

@Hongkai_Zheng ,

Many thanks for your kind reply!

If I broadcast loss or somehting to all GPUs manually, the loss should be identical one the dist.all_reduce() is done. besides the function dist.all_reduce() should be called, what else should I do if I want to sync loss manually?

for this example, I am confused:
now that the average gradient is broadcast to each GPU and each GPU has its own optimizer and learning rate, why should we 8x learning rate?

Please check the code:
Line 259 of this link yolov3/train.py at master · ultralytics/yolov3 · GitHub
The upper code broadcast dataset to all GPUs. My question is: broadcasting dataset to all GPU manyally necessary? I assume this is done by dataloader automatically.

  1. I think calling all_reduce is enough to sync loss manually. For this question, I encourage you to look at document and try cooking up a toy example yourself and see how it works in practice to help understand.
  2. Gradient is averaged over 32 * 8 samples in 8-GPUs case. In single GPU case, your gradient is computed from 32 samples. I don’t think there is any theory saying you have to use 8x learning rate. But intuitively, the averaged gradient over a bigger batch has a smaller variance (think gradient from a sample as a random variable). This paper also gives some theoretic intuition and practical advice.
  3. Regarding the code you referred to, it uses DistributedSampler (check createdataset function in the utils.dataset.py ) to divide dataset into several subsets so that each GPU only sees one subset of the whole dataset.
    PS: I think it would be better to start from DDP tutorial and try to write the toy example yourself so that you can understand every component of the DDP through coding yourself.

@Hongkai_Zheng ,
Thank you!

I will cooking up a toy example to validate my idea and understanding.

Do you think the code is necessary to broadcast the dataset to each GPU?
or in what case master process should broadcast dataset to each GPU?

As far as I’m concerned, if you need to broadcast the whole dataset to each GPU, why do you even use DDP at the first place? DDP is used for Data Parallel where each GPU get different subset of data. If you need model parallel, you can look at Pytorch rpc.

@Hongkai_Zheng ,
I got it. Thanks.