I have a fairly simple model (3 layer NN) that I am running on a single aws node with 4 Tesla T4 GPUs. My data is reasonably large and I noticed DP is not really faster than single GPU due to overhead. I tried DDP, the speed is indeed much faster however, the training just randomly enters into deadlock at the step of model = DDP(model, device_id=[rank]). I think I have read a couple of posts related to this topic but there doesn’t seem to be a conclusion for most of them. Would love some help on this. Thanks!
Below is the code. I triggered a main script with torchrun --nprocs_per_node=4 main.py -args. I have tried both nccl and gloo backend.
The main script just calls grid_search_function which i paste below to run hyperparameter search and training. The hang always happened at the DPP step, and the time it stops is random.
def save_best_checkpoint(state, checkpoint_dir, filename=‘best_model.pth.tar’):
filepath = os.path.join(checkpoint_dir, filename)
torch.save(state, filepath)
def grid_search(data, param_grid, data_module_params, device, max_epochs=100, patience=5, checkpoint_dir=‘checkpoints’):
def grid_search(rank, data, world_size, param_grid, data_module_params, max_epochs=100, patience=5, checkpoint_dir=‘checkpoints’):
best_val_loss = float(‘inf’)
best_params = None
best_model_state = None
if not os.path.exists(checkpoint_dir) and rank == 0:
os.makedirs(checkpoint_dir)
for params in ParameterGrid(param_grid):
if rank == 0:
print(f"Training with parameters: {params}")
train_dataset = EFDataset(data, data_module_params['cols_features'], data_module_params['col_target'], split='train')
val_dataset = EFDataset(data, data_module_params['cols_features'], data_module_params['col_target'], split='val')
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=world_size, rank=rank) #DDP
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, num_replicas=world_size, rank=rank) #DDP
train_loader = DataLoader(train_dataset, batch_size=data_module_params['train_batch_size'],
sampler=train_sampler) #, num_workers=4, multiprocessing_context="spawn") #DDP
val_loader = DataLoader(val_dataset, batch_size=data_module_params['val_batch_size'],
sampler=val_sampler) #, num_workers=4, multiprocessing_context="spawn") #DDP
# Initialize the model with the current set of parameters
model = ANNClassifier(params['input_dim'], params['hidden_dim1'], params['hidden_dim2'], params['output_dim'], params['dropout_rate'])
# Use DataParallel for multi-GPU training
model = model.to(rank) # DDP
# Use DistributedDataParallel for multi-GPU training
model = DDP(model, device_ids=[rank]) # DDP
msg = f"reach DDP step for {rank}"
print(msg)
# model = model.to(device) #change order
# Initialize the criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
# Initialize the trainer
trainer = Trainer(model, criterion, optimizer, rank) # DDP
# Training loop
best_epoch_val_loss = float('inf')
patience_counter = 0
if rank == 0:
t1 = time.time()
for epoch in range(max_epochs):
train_sampler.set_epoch(epoch)
if rank == 0:
msg = f"reach train step at {rank}"
print(msg)
train_loss, train_acc = trainer.train(train_loader)
if rank == 0:
msg = f"reach val step at {rank}"
print(msg)
val_sampler.set_epoch(epoch)
val_loss, val_acc = trainer.validate(val_loader)
if rank == 0:
t2 = time.time()
print(f"""Epoch {epoch+1}/{max_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Time passed: {np.round((t2-t1)/60,5)} minutes""")
is_best = val_loss < best_epoch_val_loss
if is_best:
best_epoch_val_loss = val_loss
patience_counter = 0
best_model_state = {
'epoch': epoch + 1,
'state_dict': model.state_dict(),
'best_val_loss': best_epoch_val_loss,
'optimizer': optimizer.state_dict(),
'best_params': params
}
else:
patience_counter += 1
if patience_counter >= patience:
break
if best_epoch_val_loss < best_val_loss:
best_val_loss = best_epoch_val_loss
best_params = params
del model, optimizer, trainer # clean up memory
torch.cuda.empty_cache()
# Save the best model checkpoint
if best_model_state is not None and rank == 0: # DDP
save_best_checkpoint(best_model_state, checkpoint_dir)
return best_params, best_val_loss