Loss not decreasing on distributed trianing

Using the same code on single gpu give a different loss curve:

But using the same code on single node multi-gpu give random results:

image

Here is my trainer class to handle multi-gpu training:

class Trainer:
    def __init__(self, model, train_data, val_data, optimizer, gpu_id, save_every):
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.val_data = val_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id], find_unused_parameters=True)
        
    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()
        return loss
        
    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
#         print(f"GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        loss_ = []
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            loss = self._run_batch(source, targets)
            loss_.append(loss.item())
        return np.mean(loss_)
            
    def _run_val_epoch(self, epoch):
        b_sz = len(next(iter(self.val_data))[0])
#         print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.val_data.sampler.set_epoch(epoch)
        loss_ = []
        for source, targets in self.val_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            output = self.model(source)
            loss = F.cross_entropy(output, targets)
            loss_.append(loss.item())
        return np.mean(loss_)
            
    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "ddp_checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
        
    def train(self, max_epochs):
        total_loss = {}
        total_loss['train_loss'] = []
        total_loss['val_loss'] = []
        for epoch in range(max_epochs):
            train_loss = self._run_epoch(epoch)
            val_loss = self._run_val_epoch(epoch)
            total_loss['train_loss'].append(train_loss)
            total_loss['val_loss'].append(val_loss)
            print(f"Epoch: {epoch}") 
            print(f"train loss: {train_loss}, val_loss: {val_loss}")
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)
                
        with open('loss.txt', 'w') as f:
            f.write(str(total_loss))
            f.close()

Can you supply a runnable example (including the main, dataset etc) that is minimal (remove things which seem to not influence your problem) so we can help you debug?

At a glance, notice a couple of small issues:

  1. Since DDP only synchronizes the gradients to make sure it is the same model on every worker and not the data, the loss calculated on every worker is different. Here it is never aggregated (using e.g. torch.distributed.all_reduce().
  2. All workers save their (own) loss to the same path

Combining both points, you probably see the loss curve for a single worker only. It looks like there’s also a larger issue, but we’ll need more context for that.

def ddp_setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = '12355'
    init_process_group(backend='nccl', rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)

class LoadData(Dataset):
    def __init__(self, img_path, labels_df, transform=None):
        self.img_path = img_path
        self.labels_df = pd.read_csv(labels_df)
        self.transform = transform

    def __len__(self):
        return len(self.labels_df['label'].to_list())

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_path, self.labels_df.iloc[idx, -3])
        
        image = Image.open(img_path).convert('RGB')

        image = T.ToTensor()(image)
        label = self.labels_df.iloc[idx, -2]
        if self.transform:
            image = self.transform(image)
            image = T.ToTensor()(image)
        return image, label

class Classifier(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, img):
        '''simple model is created here'''

def load_train_objs():
#     batch_size = 32
    img_pth = '/home/images/'
    train_df = '/home/train_df.csv'
    val_df = '/home/val_df.csv'
    train_set = LoadData(img_pth, train_df,
                         transform=T.Compose([T.Resize(size=(224, 224), antialias=True), T.Normalize(mean=(0.5), std=(0.5)),                               T.ToPILImage()]))
#     train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)

    val_set = LoadData(img_pth, val_df,
                       transform=T.Compose([T.Resize(size=(224, 224), antialias=True), T.Normalize(mean=(0.5), std=(0.5)),                              T.ToPILImage()]))
#     val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    model = Classifier()
    optimizer = optim.Adam(params=model.parameters(), lr=1e-7, weight_decay= 1e-3)
    return train_set,val_set, model, optimizer

def prepare_dataloader(dataset, batch_size):
    return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset))

def main(rank, world_size, save_every, total_epochs, batch_size):
    ddp_setup(rank, world_size)
    train_dataset, val_dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(train_dataset, batch_size)
    val_data = prepare_dataloader(val_dataset, batch_size)
    trainer = Trainer(model, train_data, val_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()

if __name__=='__main__':
    import argparse
    parser = argparse.ArgumentParser(description="distributed training")
    parser.add_argument("total_epochs", type=int)
    parser.add_argument("save_every", type=int)
    parser.add_argument("--batch_size", type=int, default=128)
    args = parser.parse_args()
    
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
    

Hi @yiftach , this is my code.

Hi @Bishwa_Karki, for the example to be runnable, you’ll have to also supply the code for the model, and the data files. In any case, it would help if you repeat the following process:

  1. Think of a way to make this example more mininal.
  2. Try applying that change, and see if your problem presists.
  3. When you find a short snippet that does not work and its behavior is surprising to you, ask us about it.

Typically it will be harder to get people here to debug a long codebase but easy to get explanations about behaviours of smaller, more well defined componenets.

@yiftach thanks for your suggestions. I think this is the minimal example with just a simple classifier and the data which could be any. I am sorry I can’t provide my actual model and the actual data for privacy concern. Other than that, there is just set up for the DDP.