I have a 2.2GHz 2-core processor and 8 RTX 2080, 4Gb RAM, 70Gb swap, linux.
Dataset 3.1 Gb, 335000 records.
Trying to run the training on DDP. But there are 2 problems that I don’t understand:
-
Increasing the number of video cards to train slows down the training time. I.e. 2 gpu is slower than 1 gpu. 4 slower than 2. etc.
-
The network learns fine on the whole dataset if you run training on 1 gpu. But if you run training for more than 12000 records in dataset on 4 or 8 gpu, it crashes with an error:
Running with command:
> TORCH_DISTRIBUTED_DEBUG=DETAIL python ddpV4public.py
Error:
Traceback (most recent call last):
File "/extra/py/ddpV4public.py", line 288, in <module>
main()
File "/extra/py/ddpV4public.py", line 285, in main
mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node,), join=True)
File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
while not context.join():
File "/home/samokiller/anaconda3/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 140, in join
raise ProcessExitedException(
torch.multiprocessing.spawn.ProcessExitedException: process 4 terminated with signal SIGBUS
Each time a different process number crashes.
Through debugging, I found out that the failure is on the line:
for i,(x_train,y_train) in enumerate(train_loader):
But in other processes it works fine, so the code must be correct.
Script code:
import argparse
import os
import random
import shutil
import time
import warnings
from enum import Enum
import torch
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Subset
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, ConcatDataset
def makeStandart(_ds, _period):
global medianANDstdArr
medianANDstdArr=[]
for numOfFeatureBlock in range(int(_ds.size()[1]/_period)):
localBlock = _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1]
median, std = torch.median(localBlock), torch.std(localBlock)
_ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1] = (localBlock-median)/std
medianANDstdArr.append((median, std))
return medianANDstdArr
def makeStandartFixed(_ds, _period, _medianANDstdArr):
for numOfFeatureBlock in range(int(_ds.size()[1]/_period)):
localBlock = _ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1]
median, std = _medianANDstdArr[numOfFeatureBlock]
_ds[:, (numOfFeatureBlock*_period) : (((numOfFeatureBlock + 1) * _period) - 1) + 1] = (localBlock-median)/std #Нормализуем
medianANDstdArr.append((median.item(), std.item()))
class FScoreLoss(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, pred, target, grad=True):
pred=torch.flatten(pred)
target=torch.flatten(target)
tp = (pred * target).sum().to(torch.float32)
#tn = ((1 - target) * (1 - pred)).sum().to(torch.float32)
fp = (pred * (1 - target)).sum().to(torch.float32)
return (target.sum().to(torch.float32) - tp) + fp
class Net(nn.Module):
def __init__(self,input_shape):
super(Net,self).__init__()
self.leakyRelu = nn.LeakyReLU(0.1)
self.bn1 = nn.BatchNorm1d(46)
self.bn2 = nn.BatchNorm1d(46)
self.bn3 = nn.BatchNorm1d(46)
self.bn4 = nn.BatchNorm1d(46)
self.bn5 = nn.BatchNorm1d(46)
self.bn6 = nn.BatchNorm1d(46)
self.bn7 = nn.BatchNorm1d(46)
self.bn8 = nn.BatchNorm1d(46)
self.bn9 = nn.BatchNorm1d(46)
self.bn10 = nn.BatchNorm1d(46)
self.bn11 = nn.BatchNorm1d(46)
self.bn12 = nn.BatchNorm1d(46)
self.bn13 = nn.BatchNorm1d(46)
self.bn14 = nn.BatchNorm1d(46)
self.bn15 = nn.BatchNorm1d(46)
self.bn16 = nn.BatchNorm1d(46)
self.bn17 = nn.BatchNorm1d(46)
self.bn18 = nn.BatchNorm1d(46)
self.bn19 = nn.BatchNorm1d(46)
self.bn20 = nn.BatchNorm1d(46)
self.bn21 = nn.BatchNorm1d(46)
self.bn22 = nn.BatchNorm1d(46)
self.bn23 = nn.BatchNorm1d(46)
self.bn24 = nn.BatchNorm1d(46)
self.bn25 = nn.BatchNorm1d(46)
self.bn26 = nn.BatchNorm1d(46)
self.bn27 = nn.BatchNorm1d(46)
self.bn28 = nn.BatchNorm1d(46)
self.bn29 = nn.BatchNorm1d(46)
self.bnL100 = nn.BatchNorm1d(100)
self.bnL10 = nn.BatchNorm1d(10)
self.cl1 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#58
self.cl2 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#56
self.cl3 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#54
self.cl4 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#52
self.cl5 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#50
self.cl6 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#48
self.cl7 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#46
self.cl8 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#44
self.cl9 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#42
self.cl10 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#40
self.cl11 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#38
self.cl12 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#36
self.cl13 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#34
self.cl14 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#32
self.cl15 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#30
self.cl16 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#28
self.cl17 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#26
self.cl18 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#24
self.cl19 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#22
self.cl20 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#20
self.cl21 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#18
self.cl22 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#16
self.cl23 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#14
self.cl24 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#12
self.cl25 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#10
self.cl26 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#8
self.cl27 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#6
self.cl28 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 3, groups=46)#4
self.cl29 = nn.Conv1d(in_channels = 46, out_channels = 46, kernel_size = 4, groups=46)#1
self.fc1 = nn.Linear(368,100)
self.fc2 = nn.Linear(100,10)
self.fc3 = nn.Linear(10,1)
#Layers initialization
nn.init.xavier_uniform_(self.fc1.weight)
nn.init.xavier_uniform_(self.fc2.weight)
nn.init.xavier_uniform_(self.fc3.weight)
torch.nn.init.xavier_uniform_(self.cl1.weight)
torch.nn.init.xavier_uniform_(self.cl2.weight)
torch.nn.init.xavier_uniform_(self.cl3.weight)
torch.nn.init.xavier_uniform_(self.cl4.weight)
torch.nn.init.xavier_uniform_(self.cl5.weight)
torch.nn.init.xavier_uniform_(self.cl6.weight)
torch.nn.init.xavier_uniform_(self.cl7.weight)
torch.nn.init.xavier_uniform_(self.cl8.weight)
torch.nn.init.xavier_uniform_(self.cl9.weight)
torch.nn.init.xavier_uniform_(self.cl10.weight)
torch.nn.init.xavier_uniform_(self.cl11.weight)
torch.nn.init.xavier_uniform_(self.cl12.weight)
torch.nn.init.xavier_uniform_(self.cl13.weight)
torch.nn.init.xavier_uniform_(self.cl14.weight)
torch.nn.init.xavier_uniform_(self.cl15.weight)
torch.nn.init.xavier_uniform_(self.cl16.weight)
torch.nn.init.xavier_uniform_(self.cl17.weight)
torch.nn.init.xavier_uniform_(self.cl18.weight)
torch.nn.init.xavier_uniform_(self.cl19.weight)
torch.nn.init.xavier_uniform_(self.cl20.weight)
torch.nn.init.xavier_uniform_(self.cl21.weight)
torch.nn.init.xavier_uniform_(self.cl22.weight)
torch.nn.init.xavier_uniform_(self.cl23.weight)
torch.nn.init.xavier_uniform_(self.cl24.weight)
torch.nn.init.xavier_uniform_(self.cl25.weight)
torch.nn.init.xavier_uniform_(self.cl26.weight)
torch.nn.init.xavier_uniform_(self.cl27.weight)
torch.nn.init.xavier_uniform_(self.cl28.weight)
torch.nn.init.xavier_uniform_(self.cl29.weight)
def getArchModel(self):
return "bceLossn2048t_2048tD0.2_2048tD0.2_512tD0.2_32t1s_xavierUniform"
def forward(self,x):
x = self.leakyRelu(self.bn1(self.cl1(x)))#1
x = self.leakyRelu(self.bn2(self.cl2(x)))#2
x = self.leakyRelu(self.bn3(self.cl3(x)))#3
x = self.leakyRelu(self.bn4(self.cl4(x)))#4
x = self.leakyRelu(self.bn5(self.cl5(x)))#5
x = self.leakyRelu(self.bn6(self.cl6(x)))#6
x = self.leakyRelu(self.bn7(self.cl7(x)))#7
x = self.leakyRelu(self.bn8(self.cl8(x)))#8
x = self.leakyRelu(self.bn9(self.cl9(x)))#9
x = self.leakyRelu(self.bn10(self.cl10(x)))#10
x = self.leakyRelu(self.bn11(self.cl11(x)))#11
x = self.leakyRelu(self.bn12(self.cl12(x)))#12
x = self.leakyRelu(self.bn13(self.cl13(x)))#13
x = self.leakyRelu(self.bn14(self.cl14(x)))#14
x = self.leakyRelu(self.bn15(self.cl15(x)))#15
x = self.leakyRelu(self.bn16(self.cl16(x)))#16
x = self.leakyRelu(self.bn17(self.cl17(x)))#17
x = self.leakyRelu(self.bn18(self.cl18(x)))#18
x = self.leakyRelu(self.bn19(self.cl19(x)))#19
x = self.leakyRelu(self.bn20(self.cl20(x)))#20
x = self.leakyRelu(self.bn21(self.cl21(x)))#21
x = self.leakyRelu(self.bn22(self.cl22(x)))#22
x = self.leakyRelu(self.bn23(self.cl23(x)))#23
x = self.leakyRelu(self.bn24(self.cl24(x)))#24
x = self.leakyRelu(self.bn25(self.cl25(x)))#25
x = self.leakyRelu(self.bn26(self.cl26(x)))#26
x = torch.flatten(x, start_dim=1)
x = (self.leakyRelu(self.bnL100(self.fc1(x))))
x = (self.leakyRelu(self.bnL10(self.fc2(x))))
x = torch.sigmoid(self.fc3(x))
return x
def main_worker(gpu, args):
torch.cuda.set_device(gpu)
args_gpu = gpu
dist.init_process_group(backend="nccl", init_method="env://", world_size=int(os.environ["WORLD_SIZE"]), rank=gpu)
# Data loading code
numOfPeriodsPerFeature = 60
validateDatasetFile = "ds.csv"
DS = np.loadtxt(validateDatasetFile, skiprows=1, delimiter=",", dtype=np.float32, max_rows=16000)
train_tensorX = torch.from_numpy(DS[:, :-1])
medianANDstdArr = makeStandart(train_tensorX, numOfPeriodsPerFeature)
train_tensorY = torch.from_numpy(DS[:, -1])
train_dataset = TensorDataset(train_tensorX, train_tensorY)
val_dataset = train_dataset
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, shuffle=False, drop_last=True)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1000, shuffle=(train_sampler is None), num_workers=1, pin_memory=True, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1000, shuffle=False, num_workers=1, pin_memory=True, sampler=val_sampler)
# create model
model = Net(input_shape=train_tensorX.shape[1])
model.cuda()
print("gpu: ", gpu)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu], find_unused_parameters=True)
device = torch.device('cuda:{}'.format(gpu))
# define loss function (criterion), optimizer, and learning rate scheduler
criterion = FScoreLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), 3.081352425453223e-06)
"""Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
begin = time.time()
for epoch in range(0, 10):
train_sampler.set_epoch(epoch)# if args.distributed
train(train_loader, model, criterion, optimizer, epoch, device)
scheduler.step()
dist.destroy_process_group()
print("Finish. Num of GPUs", gpu, ". Elapsed time: ", time.time()-begin, " sec.")
def train(train_loader, model, criterion, optimizer, epoch, device):
model.train()
print(device,": stage1")
for i,(x_train,y_train) in enumerate(train_loader):
# move data to the same device as model
x_train = x_train.to(device, non_blocking=True)
y_train = y_train.to(device, non_blocking=True)
x_train = torch.reshape(x_train, (x_train.shape[0], 46, 60))
# compute output
output = model(x_train)#.to(torch.float))
loss = criterion(output,y_train.reshape(-1,1))
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
def main():
begin = time.time()
os.environ["WORLD_SIZE"] = "8"
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
ngpus_per_node = int(os.environ["WORLD_SIZE"])
mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node,), join=True)
if __name__ == '__main__':
main()
If you reduce dataset to 8000 records - then on 8 gpu works fine, but the speed is lower than on 1 gpu.
At the same time gpu memory overflow is not observed. And if there was not enough memory gpu, then 1 gpu also would not work.
Maybe I have some kind of error in my code?