The gpu memory is enough, but it crashes when the size of the dataset increases

I have a 2.2GHz 2-core processor and 8 RTX 2080, 4Gb RAM, 70Gb swap, linux.
Dataset 3.1 Gb, 335000 records.

Trying to run the training on DDP. But there are 2 problems that I don’t understand:

  1. Increasing the number of video cards to train slows down the training time. I.e. 2 gpu is slower than 1 gpu. 4 slower than 2. etc.

  2. The network learns fine on the whole dataset if you run training on 1 gpu. But if you run training for more than 12000 records in dataset on 4 or 8 gpu, it crashes with an error:

Running with command:

> TORCH_DISTRIBUTED_DEBUG=DETAIL python ddpV4public.py

Error:

Traceback (most recent call last):
  File "/extra/py/ddpV4public.py", line 288, in <module>
    main()
  File "/extra/py/ddpV4public.py", line 285, in main
    mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node,), join=True)
  File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 140, in join
    raise ProcessExitedException(
torch.multiprocessing.spawn.ProcessExitedException: process 4 terminated with signal SIGBUS

Each time a different process number crashes.
Through debugging, I found out that the failure is on the line:

for i,(x_train,y_train) in enumerate(train_loader):

But in other processes it works fine, so the code must be correct.
Script code:

import argparse
import os
import random
import shutil
import time
import warnings
from enum import Enum

import torch
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Subset

import numpy as np
from torch.utils.data import TensorDataset, DataLoader, ConcatDataset

def makeStandart(_ds, _period):
    global medianANDstdArr
    medianANDstdArr=[]
    for numOfFeatureBlock in range(int(_ds.size()[1]/_period)):
        localBlock = _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1]
        median, std = torch.median(localBlock), torch.std(localBlock)
        _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1] = (localBlock-median)/std      
        medianANDstdArr.append((median, std))
    return medianANDstdArr

def makeStandartFixed(_ds, _period, _medianANDstdArr):
    for numOfFeatureBlock in range(int(_ds.size()[1]/_period)):
        localBlock = _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1]
        median, std = _medianANDstdArr[numOfFeatureBlock]
        _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1] = (localBlock-median)/std #Нормализуем        
        medianANDstdArr.append((median.item(), std.item()))

class FScoreLoss(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, pred, target, grad=True):
      pred=torch.flatten(pred)
      target=torch.flatten(target)
      tp = (pred * target).sum().to(torch.float32)
      #tn = ((1 - target) * (1 - pred)).sum().to(torch.float32)
      fp = (pred * (1 - target)).sum().to(torch.float32)

      return (target.sum().to(torch.float32) - tp) + fp

class Net(nn.Module):
    def __init__(self,input_shape):
        super(Net,self).__init__()

        self.leakyRelu = nn.LeakyReLU(0.1)
        self.bn1 = nn.BatchNorm1d(46)
        self.bn2 = nn.BatchNorm1d(46)
        self.bn3 = nn.BatchNorm1d(46)
        self.bn4 = nn.BatchNorm1d(46)
        self.bn5 = nn.BatchNorm1d(46)
        self.bn6 = nn.BatchNorm1d(46)
        self.bn7 = nn.BatchNorm1d(46)
        self.bn8 = nn.BatchNorm1d(46)
        self.bn9 = nn.BatchNorm1d(46)
        self.bn10 = nn.BatchNorm1d(46)
        self.bn11 = nn.BatchNorm1d(46)
        self.bn12 = nn.BatchNorm1d(46)
        self.bn13 = nn.BatchNorm1d(46)
        self.bn14 = nn.BatchNorm1d(46)
        self.bn15 = nn.BatchNorm1d(46)
        self.bn16 = nn.BatchNorm1d(46)
        self.bn17 = nn.BatchNorm1d(46)
        self.bn18 = nn.BatchNorm1d(46)
        self.bn19 = nn.BatchNorm1d(46)
        self.bn20 = nn.BatchNorm1d(46)
        self.bn21 = nn.BatchNorm1d(46)
        self.bn22 = nn.BatchNorm1d(46)
        self.bn23 = nn.BatchNorm1d(46)
        self.bn24 = nn.BatchNorm1d(46)
        self.bn25 = nn.BatchNorm1d(46)
        self.bn26 = nn.BatchNorm1d(46)
        self.bn27 = nn.BatchNorm1d(46)
        self.bn28 = nn.BatchNorm1d(46)
        self.bn29 = nn.BatchNorm1d(46)
        
        self.bnL100 = nn.BatchNorm1d(100)
        self.bnL10 = nn.BatchNorm1d(10)

        self.cl1 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#58
        self.cl2 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#56
        self.cl3 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#54
        self.cl4 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#52
        self.cl5 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#50
        self.cl6 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#48
        self.cl7 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#46
        self.cl8 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#44
        self.cl9 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#42
        self.cl10 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#40
        self.cl11 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#38
        self.cl12 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#36
        self.cl13 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#34
        self.cl14 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#32
        self.cl15 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#30
        self.cl16 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#28
        self.cl17 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#26
        self.cl18 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#24
        self.cl19 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#22
        self.cl20 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#20
        self.cl21 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#18
        self.cl22 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#16
        self.cl23 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#14
        self.cl24 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#12
        self.cl25 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#10
        self.cl26 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#8
        self.cl27 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#6
        self.cl28 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#4
        self.cl29 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 4, groups=46)#1
        self.fc1 = nn.Linear(368,100)
        self.fc2 = nn.Linear(100,10)
        self.fc3 = nn.Linear(10,1)
        



    #Layers initialization
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)
        nn.init.xavier_uniform_(self.fc3.weight)
        
        torch.nn.init.xavier_uniform_(self.cl1.weight)
        torch.nn.init.xavier_uniform_(self.cl2.weight)
        torch.nn.init.xavier_uniform_(self.cl3.weight)
        torch.nn.init.xavier_uniform_(self.cl4.weight)
        torch.nn.init.xavier_uniform_(self.cl5.weight)
        torch.nn.init.xavier_uniform_(self.cl6.weight)
        torch.nn.init.xavier_uniform_(self.cl7.weight)
        torch.nn.init.xavier_uniform_(self.cl8.weight)
        torch.nn.init.xavier_uniform_(self.cl9.weight)
        torch.nn.init.xavier_uniform_(self.cl10.weight)
        torch.nn.init.xavier_uniform_(self.cl11.weight)
        torch.nn.init.xavier_uniform_(self.cl12.weight)
        torch.nn.init.xavier_uniform_(self.cl13.weight)
        torch.nn.init.xavier_uniform_(self.cl14.weight)
        torch.nn.init.xavier_uniform_(self.cl15.weight)
        torch.nn.init.xavier_uniform_(self.cl16.weight)
        torch.nn.init.xavier_uniform_(self.cl17.weight)
        torch.nn.init.xavier_uniform_(self.cl18.weight)
        torch.nn.init.xavier_uniform_(self.cl19.weight)
        torch.nn.init.xavier_uniform_(self.cl20.weight)
        torch.nn.init.xavier_uniform_(self.cl21.weight)
        torch.nn.init.xavier_uniform_(self.cl22.weight)
        torch.nn.init.xavier_uniform_(self.cl23.weight)
        torch.nn.init.xavier_uniform_(self.cl24.weight)
        torch.nn.init.xavier_uniform_(self.cl25.weight)
        torch.nn.init.xavier_uniform_(self.cl26.weight)
        torch.nn.init.xavier_uniform_(self.cl27.weight)
        torch.nn.init.xavier_uniform_(self.cl28.weight)
        torch.nn.init.xavier_uniform_(self.cl29.weight)
    
    def getArchModel(self):
        return "bceLossn2048t_2048tD0.2_2048tD0.2_512tD0.2_32t1s_xavierUniform"


    def forward(self,x):

        x = self.leakyRelu(self.bn1(self.cl1(x)))#1
        x = self.leakyRelu(self.bn2(self.cl2(x)))#2
        x = self.leakyRelu(self.bn3(self.cl3(x)))#3
        x = self.leakyRelu(self.bn4(self.cl4(x)))#4
        x = self.leakyRelu(self.bn5(self.cl5(x)))#5
        x = self.leakyRelu(self.bn6(self.cl6(x)))#6
        x = self.leakyRelu(self.bn7(self.cl7(x)))#7
        x = self.leakyRelu(self.bn8(self.cl8(x)))#8
        x = self.leakyRelu(self.bn9(self.cl9(x)))#9
        x = self.leakyRelu(self.bn10(self.cl10(x)))#10
        x = self.leakyRelu(self.bn11(self.cl11(x)))#11
        x = self.leakyRelu(self.bn12(self.cl12(x)))#12
        x = self.leakyRelu(self.bn13(self.cl13(x)))#13
        x = self.leakyRelu(self.bn14(self.cl14(x)))#14
        x = self.leakyRelu(self.bn15(self.cl15(x)))#15
        x = self.leakyRelu(self.bn16(self.cl16(x)))#16
        x = self.leakyRelu(self.bn17(self.cl17(x)))#17
        x = self.leakyRelu(self.bn18(self.cl18(x)))#18
        x = self.leakyRelu(self.bn19(self.cl19(x)))#19
        x = self.leakyRelu(self.bn20(self.cl20(x)))#20
        x = self.leakyRelu(self.bn21(self.cl21(x)))#21
        x = self.leakyRelu(self.bn22(self.cl22(x)))#22
        x = self.leakyRelu(self.bn23(self.cl23(x)))#23
        x = self.leakyRelu(self.bn24(self.cl24(x)))#24
        x = self.leakyRelu(self.bn25(self.cl25(x)))#25
        x = self.leakyRelu(self.bn26(self.cl26(x)))#26
        
        x = torch.flatten(x, start_dim=1)
        x = (self.leakyRelu(self.bnL100(self.fc1(x))))
        x = (self.leakyRelu(self.bnL10(self.fc2(x))))
        x = torch.sigmoid(self.fc3(x))


        return x

def main_worker(gpu, args):
    torch.cuda.set_device(gpu)
    args_gpu = gpu
    
    dist.init_process_group(backend="nccl", init_method="env://", world_size=int(os.environ["WORLD_SIZE"]), rank=gpu)
    
    # Data loading code
    numOfPeriodsPerFeature = 60
    validateDatasetFile = "ds.csv"
    DS = np.loadtxt(validateDatasetFile, skiprows=1, delimiter=",", dtype=np.float32, max_rows=16000)
    train_tensorX = torch.from_numpy(DS[:, :-1])
    medianANDstdArr = makeStandart(train_tensorX, numOfPeriodsPerFeature)
    train_tensorY = torch.from_numpy(DS[:, -1])
    train_dataset = TensorDataset(train_tensorX, train_tensorY)
    val_dataset = train_dataset

    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
    val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, shuffle=False, drop_last=True)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1000, shuffle=(train_sampler is None), num_workers=1, pin_memory=True, sampler=train_sampler)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1000, shuffle=False, num_workers=1, pin_memory=True, sampler=val_sampler)

    # create model
    model = Net(input_shape=train_tensorX.shape[1])

    model.cuda()
    print("gpu: ", gpu)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu], find_unused_parameters=True)
    
    device = torch.device('cuda:{}'.format(gpu))

    # define loss function (criterion), optimizer, and learning rate scheduler
    criterion = FScoreLoss().to(device)

    optimizer = torch.optim.SGD(model.parameters(), 3.081352425453223e-06)
    
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    scheduler = StepLR(optimizer, step_size=30, gamma=0.1)

    begin = time.time()
    for epoch in range(0, 10):

        train_sampler.set_epoch(epoch)# if args.distributed
        train(train_loader, model, criterion, optimizer, epoch, device)
        
        scheduler.step()
    dist.destroy_process_group()
    print("Finish. Num of GPUs", gpu, ". Elapsed time: ", time.time()-begin, " sec.")

def train(train_loader, model, criterion, optimizer, epoch, device):
    model.train()
    
    print(device,": stage1")
    for i,(x_train,y_train) in enumerate(train_loader):

        # move data to the same device as model
        x_train = x_train.to(device, non_blocking=True)
        y_train = y_train.to(device, non_blocking=True)
        x_train = torch.reshape(x_train, (x_train.shape[0], 46, 60))
        
        # compute output
        output = model(x_train)#.to(torch.float))
        loss = criterion(output,y_train.reshape(-1,1))


        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    begin = time.time()
    os.environ["WORLD_SIZE"] = "8"
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    ngpus_per_node = int(os.environ["WORLD_SIZE"])
    mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node,), join=True)

if __name__ == '__main__':
    main()

If you reduce dataset to 8000 records - then on 8 gpu works fine, but the speed is lower than on 1 gpu.
At the same time gpu memory overflow is not observed. And if there was not enough memory gpu, then 1 gpu also would not work.

Maybe I have some kind of error in my code?

it is expected that 8 gpu is slower than 1 gpu, because there is communication cost. But the total number of samples processed per iteration is increased, so your overall QPS is increased.

regarding the causes of crashing, wondering whether you’ve evenly split your samples among GPUs?

Thank you for your response and participation! Because I’ve been trying to find a solution for over a week now.

The thing is that not only 8 is slower than 1, but also 2 is slower than 1. And I experimented with different dataset sizes and different sizes of batches from 1 to 1000, and it’s still slower at all combinations. Maybe I should try very large batches to get a boost. But so far there is no such a possibility, because with big datasets and batches one of the processes crashes at the stage of loading the batches from DataLoader.

Regarding the uniform sampling - I think so, I tried datasets of 16000 records, and the size of the batch is 1/10/80/100/800/1000.

What is your storage type (HDD/SSD)? In DDP, each process will have its own dataloader and read the data simultaneously. Also since your RAM is only 4GB and you provided 70 GB swap, it might slow down the I/O tremendously once your RAM is full. Check whether your GPU usage is maximized (90-100%) or it idles most of the time in which case it suggests that I/O is your bottleneck.

Thank you for your participation and attention to my problem!
I have an SSD.
RAM usage does not exceed 3.2 to 3.3 Gb. Everything else manages to be cached in swap.
Both CPU cores are used 100% almost 100% of the time, even if num_workers=1. At the same time there is nothing running on the system except system processes and python. The system itself takes 0.3-0.4 Gb when idle.

I understand that bandwidth may not be enough to load data to gpu, but it seems to me that this should not crash the process, but slow it down.