How to validate in DistributedDataParallel correctly?

I am trying to train and validate model using DistributedDataParallel. Everything is fine during training, but when the model starts validate, the code works several iterations and after crashes due to errors with threads. I do validation only in rank=0. Do I need to put dist.barrier() somewhere? Or do I need to validate in all ranks?

2 Likes

When rank0 validates the model, what do other ranks do? Exit or proceed to the next training phase? And what error did you see?

I am assuming other ranks proceed to the next training phase and then timeout during DDP backward pass. If this is the case, yep, you can try use a dist.barrier() to sync, like:

for _ in range(...):
    train()
    if rank == 0:
        validate()
    dist.barrier()

If you still hit timeout at barrier, you can try setting the timeout arg in init_process_group to a larger value.

4 Likes

It seems I solved the problem. I used DistributedSampler in validation dataloader, now I changed sampler to None and now code works. Other ranks do nothing. I don’t know how to share validation loss values across ranks, so I do validation only in rank=0. Is it a good practice to do so? Also I am using wandb to monitor training metrics, but after several epochs I got Timeout error at barrier. Is it correct to increase the value of timeout limit in this situation?

1 Like

Take a look here on how to share information between your ranks.

Since you’re using wandb, I’m assuming that you’re also only logging with rank 0 as well? I wouldn’t say it’s bad practice to share the validation loss with all your ranks if the other ranks aren’t doing anything with the information. It’s important when you’re doing early stopping or learning rate scheduling based off of the validation loss.

Personally, I had all my ranks participate in computing the validation loss. In this scenario, you wouldn’t have to deal with barriers and instead just share the tensors containing your losses. But I don’t think it’s necessarily wrong to increase the timeout limit.

1 Like

Thank you, @ayalaa2 ! I understand that if I used all the ranks for validation it would be faster. I will try to share tensor values across ranks.

Yes, I am using wandb only with rank 0.

@ayalaa2 could you give some example code how to use sharing data between ranks, please?

One option is to use the collective communication APIs, e.g., all_gather (NCCL and Gloo) or gather (Gloo). Sth like:

# on all ranks

out_tensors = [torch.zeros(2, 2), torch.zeros(2, 2)]
inp_tensor = [torch.ones(2, 2)]

torch.distributed.all_gather(out_tensors, inp_tensor)

1 Like

Thank you very much, @mrshenli!

Is it possible to store not only tensors, but also list of strings for example?

Is it possible to store not only tensors, but also list of strings for example?

Yep. But the feature is only available on master (will be released in v1.7). See the following code:

If you need it now, you can copy the code change in distributed_c10d.py from this PR. It basically is a wrapper that converts strings to tensors.

Hi, I used similar code like this:

    for i in range( self._epoch+1, self._niter+1 ):
        if self._train_sampler is not None:
            self._train_sampler.set_epoch(i)
        self._train()           
        if self.rank==0:
            print('rank {} go to validation'.format(self.rank))
            self._validate()
        if self.distributed:
            print('rank {} go to barrier'.format(self.rank))
            dist.barrier()
            print('rank {} go out of barrier'.format(self.rank))

The output of the code is like:
rank 1 go to barrier
Training…
rank 0 go to validation
start to validate
evaluating…
rank 0 go to barrier
rank 0 go out of barrier

Then it just stopped and rank 1 won’t go out. There is no other error, just freezed. Do you have any idea what happened? When I remove the validation, the code is working. My test dataloader didn’t use the distributedSampler.

1 Like

Evaluating with DistributedDataParallel should be done with care otherwise the values could be inaccurate. DistributedSampler can pad some replicated data when the number of samples per process is not even. How DistributedSampler works is explained here.

This is because DDP checks synchronization at backprops and the number of minibatch should be the same for all the processes. However, at evaluation time it is not necessary.

You can use a custom sampler like DistributedEvalSampler to avoid data padding. Regarding the communication between the DDP processes, you can refer to this example.

@mrshenli The dist.barrier() doesn’t work while running validation only on rank 0, as mentioned in this thread.

Can you elaborate on how you validate using all ranks?

Do you mean how you would go about performing a validation pass using all ranks?

Your validation loop will operate very similar to your training loop where each rank will operate on a subset of the validation dataset. The only difference is that you will want to keep track of the validation metrics you care about and then share the results using the methods in the link I gave.

Sometimes its quicker to just let the main rank perform validation depending on the specific task.

@ayalaa2 thanks for the prompt reply!

Does it mean I need a distributed sampler on the validation set as well?

So, if I want to measure accuracy on my validation, each rank should count the number of “correct” predictions and the number of samples it saw and the “reduce” it?

Would that look something like this:

def validate(val_loader, model):
  # one counter for "correct" the other for "total"
  counter = torch.zeros((2,), device=torch.device(f'cuda:{args.rank}'))
  model.eval()
  with torch.no_grad():
    for x, y in val_loader:  # assuming val_loader has a distributed sampler
      pred = model(x)
      num_correct = my_accuracy_function(pred, y)  # count number of correct predictions
      counter[0] += num_correct
      counter[1] += x.shape[0]  # total number of samples 
  # done validating for this view
  torch.distributed.reduce(counter, 0)  # reduce all to rank 0 process
  if args.rank == 0:
   # only this rank reports accuracy
   print(f'total accuracy = {counter[0]}/{counter[1]} = {counter[0]/counter[1]}')

Does that make any sense?

1 Like

Yes you would need a distributed sampler to make sure each rank gets a unique data split. The code you posted looks about right.

1 Like

Thank you! This really helps

I met the same issue