Mem is increasing while training and it is killed by dataloader killed error(Pytorch)

I am having a trouble with increasing memory issue.

From the first data read, the memory starts to grow continuously.
And if I keep watching through htop, the memory continues to increase every time I do training and validating.
Eventually, a dataloader killed error occurs, and the training cannot be completed until the end.
Also, it didn’t use gpu 100%… it only uses 50%~60%…

I changed the loss variable and used all of the cudas, but I don’t know what is the cause.

My training code is here.

def train(self):
        train_gen = data.DataLoader(
            dataset=self._train_data, shuffle=True,
            batch_size=ARGS.train_batch,
            num_workers=ARGS.num_workers, pin_memory=True)
        val_gen = data.DataLoader(
            dataset=self._val_data, shuffle=False,
            batch_size=ARGS.test_batch, num_workers=ARGS.num_workers,
            pin_memory=True)

        # will train self._num_epochs copies of train data
        to_train = chain.from_iterable(repeat(train_gen, self._num_epochs))
        # consisting of total_steps batches
        total_steps = len(train_gen) * self._num_epochs
        print("total steps: ", total_steps)
        num_steps = 0
        self._train(to_train, len(train_gen), num_steps, val_gen)

        cur_weight = self._model.state_dict()
        torch.save(cur_weight, '{}{}.pt'.format(self._weight_path, self.step))

        print('Current best weight: {}.pt, best auc: {}'.format(self.max_step, self.max_auc))

def _forward(self, batch):
        #print('device: ', self._device)
        batch = {k: t.to(self._device, non_blocking=True) for k, t in batch.items()}
        #label = Variable(batch['label']).to(self._device)  # shape: (batch_size, 1)
        #inputs = Variable(batch['input']).to(self._device)
        label = batch['label']
        output = self._model(batch['input'], batch['target_id'])

        pred = (torch.sigmoid(output) >= self._threshold).long()  # shape: (batch_size, 1)

        loss = self._loss_fn(output, label.float())
        return label, output, pred, loss.mean()

def _get_loss(self, label, output):
        loss = self._loss_fn(output, label.float())
        return loss.mean()

def _train(self, batch_iter, num_iter, num_batches, val_gen):
        start_time = time.time()
        self._model.train()

        losses = 0
        num_corrects = 0
        num_total = 0
        labels = []
        outs = []

        data_time = AverageMeter()
        batch_time = AverageMeter()
        #losses = AverageMeter()

        end = time.time()
        for i, batch in enumerate(batch_iter):
            i += 1
            data_time.update(time.time() - end)

            label, out, pred, train_loss = self._forward(batch)
            #train_loss = self._get_loss(label, out)
            losses += train_loss.item()

            #compute gradient and do Noam step
            self._opt.step(train_loss)

            #measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            num_corrects += (pred == label).sum().item()
            num_total += len(label)

if i%1000 == 0:
                labels.extend(label.squeeze(-1).data.cpu().numpy())
                outs.extend(out.squeeze(-1).data.cpu().numpy())

                acc = num_corrects / num_total
                auc = roc_auc_score(labels, outs)
                losses = losses / 1000
                training_time = time.time() - start_time

                print('correct: {}, total: {}'.format(num_corrects, num_total))
                #print('[Train]     time: {}, loss: {}, acc: {}, auc: {}'.format(training_time, loss, acc, auc))
                print('[Train] [{0}/{1}]\t'
                      'time {batch_time.avg:.3f}\t'
                      'data time {data_time.avg:.3f}\t'
                      'loss {loss:.4f}\t'
                      'acc {acc:.4f}\t'
                      'auc {auc:.4f}\t'
                      'training time {training_time:.4f}\t'.format(
                       i, num_iter, acc=acc, auc=auc, batch_time=batch_time, data_time=data_time, loss=losses, training_time=training_time
                       ))
                losses = 0

            if i % 10000 == 0:
                self._test('Validation', val_gen, i)
                print('Current best weight: {}.pt, best auc: {}'.format(self.max_step, self.max_auc))

                cur_weight = self._model.state_dict()
                torch.save(cur_weight, '{}{}.pt'.format(self._weight_path, i))

                mylist = []
                mylist.append(self.max_auc)
                mylist.append(self.max_acc)
                mylist.append(i)

                with open('output.csv', 'a', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(mylist)
                print('--------------------------------------------------------')

You are storing a few tensors in your code such as label, out etc.
How large are these tensors and how many are you storing? I’m not sure if how much memory your system has and if the memory increase is expected, since you are indeed storing some metrics.

This is htop image. The Mem was increased until it is full.

This is my GPU usage image. I’m using only 1 GPU because I tried increasing the number of gpu but the performance didn’t improve and memory was increasing too.

The label and out’s tensor size is (64,1).
And the batch size is 64, num_workers is 32.
However, even though I delete the labels/outs and run training, the memory continues to increase.
Also, I checked the consumed memory with some code. please check this.

   def _train(self, batch_iter, num_iter, num_batches, val_gen):
        start_time = time.time()
        self._model.train()

        losses = 0
        num_corrects = 0
        num_total = 0
        #labels = []
        #outs = []

        data_time = AverageMeter()
        batch_time = AverageMeter()

        pid = os.getpid()
        prev_mem = 0
        end = time.time()
        for i, batch in enumerate(batch_iter):
            i += 1
            data_time.update(time.time() - end)

            label, out, pred, train_loss = self._forward(batch)
            if i==1:
                print(label.size(), ' ', out.size())

            #train_loss = self._get_loss(label, out)
            losses += train_loss.item()

            #compute gradient and do Noam step
            self._opt.step(train_loss)

            #measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            num_corrects += (pred == label).sum().item()
            num_total += len(label)

            if i%1000 == 0:
                cur_mem = (int(open('/proc/%s/statm'%pid, 'r').read().split()[1])+0.0)/256
                add_mem = cur_mem - prev_mem
                prev_mem = cur_mem
                print(" train iterations : {}, added mem: {}M".format(i, add_mem))

                #labels.extend(label.squeeze(-1).data.cpu().numpy())
                #outs.extend(out.squeeze(-1).data.cpu().numpy())

                acc = num_corrects / num_total
                #auc = roc_auc_score(labels, outs)
                losses = losses / 1000
                training_time = time.time() - start_time

                print('correct: {}, total: {}'.format(num_corrects, num_total))
                #print('[Train]     time: {}, loss: {}, acc: {}, auc: {}'.format(training_time, loss, acc, auc))
                print('[Train] [{0}/{1}]\t'
                      'time {batch_time.avg:.3f}\t'
                      'data time {data_time.avg:.3f}\t'
                      'loss {loss:.4f}\t'
                      'acc {acc:.4f}\t'
                      'training time {training_time:.4f}\t'.format(
                       i, num_iter, acc=acc, batch_time=batch_time, data_time=data_time, loss=losses, training_time=training_time
                       ))
                losses = 0

            if i % 100000 == 0:
                self._test('Validation', val_gen, i)
                print('Current best weight: {}.pt, best acc: {}'.format(self.max_step, self.max_acc))

                cur_weight = self._model.state_dict()
                torch.save(cur_weight, '{}{}.pt'.format(self._weight_path, i))

                mylist = []
                #mylist.append(self.max_auc)
                mylist.append(self.max_acc)
                mylist.append(i)

                with open('output.csv', 'a', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(mylist)
                print('--------------------------------------------------------')

32 workers is probably way too much overhead. These workers are used for data loading. You’re probably better off with four-or-so workers. Forking might lead to an increase in memory.

If you want to use multiple GPUs you will definitely notice a speed up if you use DistributedDataParallel.

I tried setting num_worker to 4, but the memory keeps increasing…

As @ptrblck says, you are storing some tensors in memory (labels, out). It might also be that the issue is in your dataset. Can you post your Dataset?

I used the ‘get_data_user_sep’ function and ‘UserSepDataset’ class to get the data.

This is my data size.

from dataset.dataset_user_sep import UserSepDataset

def run(i):
    """
    i: single integer represents dataset number
    """
    user_base_path = '{}/{}/processed'.format(ARGS.base_path, ARGS.dataset_name)

    train_data_path = '{}/{}/train/'.format(user_base_path, i)
    val_data_path = '{}/{}/val/'.format(user_base_path, i)
    test_data_path = '{}/{}/test/'.format(user_base_path, i)

    # 추가
    print('Run...')
    print(train_data_path)

    train_sample_infos, num_of_train_user = util.get_data_user_sep(train_data_path)
    val_sample_infos, num_of_val_user = util.get_data_user_sep(val_data_path)
    test_sample_infos, num_of_test_user = util.get_data_user_sep(test_data_path)

    # 추가
    print('End reading...')

    train_data = UserSepDataset('train', train_sample_infos, ARGS.dataset_name)
    val_data = UserSepDataset('val', val_sample_infos, ARGS.dataset_name)
    test_data = UserSepDataset('test', test_sample_infos, ARGS.dataset_name)

    #print(f'Train: # of users: {num_of_train_user}, # of samples: {len(train_sample_infos)}')
    #print(f'Validation: # of users: {num_of_val_user}, # of samples: {len(val_sample_infos)}')
    #print(f'Test: # of users: {num_of_test_user}, # of samples: {len(test_sample_infos)}')

    # 추가
    print('Get Model...')
    model, d_model = get_model()

    # 추가
    print('Making trainer...!!')
    trainer = Trainer(model, ARGS.device, ARGS.warm_up_step_count,
                      d_model, ARGS.num_epochs, ARGS.weight_path,
                      ARGS.lr, train_data, val_data, test_data)
    print('Start training...')
    trainer.train()
    trainer.test(0)
    return trainer.test_acc, trainer.test_auc

This is my get_data_user_sep function!

def get_data_user_sep(data_path):
    # almost same as get_sample_info
    # for user separated format data
    sample_infos = []

    # 추가
    print('data_path: ', data_path)

    # get list of all files
    user_path_list = os.listdir(data_path)
    num_of_users = len(user_path_list)

    # 추가
    print('Start reading lines... ')

    i = 1
    for user_path in user_path_list:
        i += 1
        if i%10000 == 0:
            print(i)
        with open(data_path + user_path, 'rb') as f:
            lines = f.readlines()
            lines = lines[1:]
            num_of_interactions = len(lines)
            for end_index in range(num_of_interactions):
                sample_infos.append((data_path + user_path, end_index))

    return sample_infos, num_of_users

This is my UserSepDataset class.

import torch
from torch.utils.data import Dataset
from config import ARGS
from constant import *


class UserSepDataset(Dataset):

    def __init__(self, name, sample_infos, dataset_name='ASSISTments2009'):
        self._name = name # train, val, test
        self._sample_infos = sample_infos # list of (user_path, target_index)
        self._dataset_name = dataset_name

    def get_sequence(self, sample):
        user_path, target_index = sample
        with open(user_path, 'r') as f:
            data = f.readlines()[1:] # header exists
            data = data[:target_index+1]
            user_data_length = len(data)

        if user_data_length > ARGS.seq_size + 1:
            data = data[-(ARGS.seq_size + 1):]
            pad_counts = 0
        else:
            pad_counts = ARGS.seq_size + 1 - user_data_length

        input_list = []
        for idx, line in enumerate(data):
            line = line.rstrip().split(',')
            tag_id = int(line[0])
            is_correct = int(line[1])

            if idx == len(data) - 1:
                last_is_correct = is_correct
                target_id = tag_id
            else:
                if is_correct:
                    input_list.append(tag_id)
                else:
                    input_list.append(tag_id + QUESTION_NUM[self._dataset_name])

        paddings = [PAD_INDEX] * pad_counts
        input_list = paddings + input_list
        assert len(input_list) == ARGS.seq_size, "sequence size error"

        return {
            'label': torch.Tensor([last_is_correct]).long(),
            'input': torch.Tensor(input_list).long(),
            'target_id': torch.Tensor([target_id - 1]).long()
        }

    def __repr__(self):
        return '{}: # of samples: {}'.format(self._name, len(self._sample_infos))

    def __len__(self):
        return len(self._sample_infos)

    def __getitem__(self, index):
        return self.get_sequence(self._sample_infos[index])