How to solve gpu-memory-leak of DataLoader

I’m only read data, and not train model. when read dara, every batch after, gpu memory increase , add torch.cuda. empty_cache () not use

code


class CustomIterableDataset(IterableDataset):
    def __init__(self, task_def, task_id, batch_size=32,
                 gpu=True, is_train=True, epochs=10,
                 maxlen=128, dropout_w=0.005):
        super(CustomIterableDataset).__init__()
        self.task_def = task_def
        self.task_id = task_id
        self.batch_size = batch_size
        self.maxlen = maxlen
        self.is_train = is_train
        self.epochs = 1 if not is_train else epochs
        self.gpu = gpu
        self.dropout_w = dropout_w
        self.pairwise_size = 1

    def _get_max_len(self, batch, key='token_id'):
        tok_len = max(len(x[key]) for x in batch)
        return tok_len

    def __if_pair__(self, data_type):
        return data_type in [DataFormat.PremiseAndOneHypothesis, DataFormat.PremiseAndMultiHypothesis]

    def __random_select__(self, arr):
        if self.dropout_w > 0:
            return [UNK_ID if random.uniform(0, 1) < self.dropout_w else e for e in arr]
        else: return arr

    def patch(self, v):
        v = v.cuda(non_blocking=True)
        return v

    def _get_batch_size(self, batch):
        return len(batch)

    def _prepare_model_input(self, batch_def):
        batch = batch_def["data"]
        task_type = batch_def["task_type"]
        data_type = batch_def["data_type"]
        encoder_type = batch_def["encoder_type"]

        if task_type == TaskType.Ranking:
            batch_size = self._get_batch_size(batch)
            tok_q_len = self._get_max_len(batch, key='q_token_id')
            tok_p_len = self._get_max_len(batch, key='p_token_id')
            tok_n_len = self._get_max_len(batch, key='n_token_id')

            tok_len = max(tok_q_len, tok_p_len, tok_n_len, self.maxlen)
            token_q_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            type_q_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            masks_q = torch.LongTensor(batch_size, tok_len).fill_(0)

            token_p_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            type_p_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            masks_p = torch.LongTensor(batch_size, tok_len).fill_(0)
                        
            token_n_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            type_n_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
            masks_n = torch.LongTensor(batch_size, tok_len).fill_(0)
            for i, sample in enumerate(batch):
                select_q_len = min(len(sample['q_token_id']), tok_len)
                tok_q = sample['q_token_id']
                if self.is_train:
                    tok_q = self.__random_select__(tok_q)
                token_q_ids[i, :select_q_len] = torch.LongTensor(tok_q[:select_q_len])
                type_q_ids[i, :select_q_len] = torch.LongTensor(sample['q_type_id'][:select_q_len])
                masks_q[i, :select_q_len] = torch.LongTensor([1] * select_q_len)

                select_p_len = min(len(sample['p_token_id']), tok_len)
                tok_p = sample['p_token_id']
                if self.is_train:
                    tok_p = self.__random_select__(tok_p)
                token_p_ids[i, :select_p_len] = torch.LongTensor(tok_p[:select_p_len])
                type_p_ids[i, :select_p_len] = torch.LongTensor(sample['p_type_id'][:select_p_len])
                masks_p[i, :select_p_len] = torch.LongTensor([1] * select_p_len)

                select_n_len = min(len(sample['n_token_id']), tok_len)
                tok_n = sample['n_token_id']
                if self.is_train:
                    tok_n = self.__random_select__(tok_n)
                token_n_ids[i, :select_n_len] = torch.LongTensor(tok_n[:select_n_len])
                type_n_ids[i, :select_n_len] = torch.LongTensor(sample['n_type_id'][:select_n_len])
                masks_n[i, :select_n_len] = torch.LongTensor([1] * select_n_len)

                batch_info = {
                    'q_token_id': 1,
                    'q_segment_id': 2,
                    'q_mask': 3,
                    'p_token_id': 4,
                    'p_segment_id': 5,
                    'p_mask': 6,
                    'n_token_id': 7,
                    'n_segment_id': 8, 
                    'n_mask': 9
                }
                batch_data = [token_q_ids, type_q_ids, masks_q,
                            token_p_ids, type_p_ids, masks_p,
                            token_n_ids, type_n_ids, masks_n,
                ]
        else:
            batch_size = self._get_batch_size(batch)
            tok_len = self._get_max_len(batch, key='token_id')
            hypothesis_len = max(len(x['type_id']) - sum(x['type_id']) for x in batch)
            if encoder_type == EncoderModelType.ROBERTA:
                token_ids = torch.LongTensor(batch_size, tok_len).fill_(1)
                type_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
                masks = torch.LongTensor(batch_size, tok_len).fill_(0)
            else:
                token_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
                type_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
                masks = torch.LongTensor(batch_size, tok_len).fill_(0)
            if self.__if_pair__(data_type):
                premise_masks = torch.ByteTensor(batch_size, tok_len).fill_(1)
                hypothesis_masks = torch.ByteTensor(batch_size, hypothesis_len).fill_(1)
            for i, sample in enumerate(batch):
                select_len = min(len(sample['token_id']), tok_len)
                tok = sample['token_id']
                if self.is_train:
                    tok = self.__random_select__(tok)
                token_ids[i, :select_len] = torch.LongTensor(tok[:select_len])
                type_ids[i, :select_len] = torch.LongTensor(sample['type_id'][:select_len])
                masks[i, :select_len] = torch.LongTensor([1] * select_len)
                if self.__if_pair__(data_type):
                    hlen = len(sample['type_id']) - sum(sample['type_id'])
                    hypothesis_masks[i, :hlen] = torch.LongTensor([0] * hlen)
                    for j in range(hlen, select_len):
                        premise_masks[i, j] = 0
            if self.__if_pair__(data_type):
                batch_info = {
                    'token_id': 0,
                    'segment_id': 1,
                    'mask': 2,
                    'premise_mask': 3,
                    'hypothesis_mask': 4
                }
                batch_data = [token_ids, type_ids, masks, premise_masks, hypothesis_masks]
            else:
                batch_info = {
                    'token_id': 0,
                    'segment_id': 1,
                    'mask': 2
                }
                batch_data = [token_ids, type_ids, masks]
        return batch_data, batch_info

    def _process(self, batch_def):
        # prepare model input
        batch_data, batch_info = self._prepare_model_input(batch_def)
        batch_info['input_len'] = len(batch_data)  # used to select model inputs
        batch_info['task_id'] = batch_def['task_id']
        # select different loss function and other difference in training and testing
        batch_info['task_type'] = batch_def['task_type']
        batch_info['pairwise_size'] = self.pairwise_size
        if self.gpu:
            for i, item in enumerate(batch_data):
                batch_data[i] = self.patch(item)

        # add label
        labels = [sample['label'] for sample in batch_def["data"]]
        # print('labels',labels)
        # print('batch_data',batch_data)
        if self.is_train:
            # in training model, label is used by Pytorch, so would be tensor
            if batch_def['task_type'] == TaskType.Regression:
                batch_data.append(torch.FloatTensor(labels))
                batch_info['label'] = len(batch_data) - 1
            elif batch_def['task_type'] in (TaskType.Classification, TaskType.Ranking):
                batch_data.append(torch.LongTensor(labels))
                batch_info['label'] = len(batch_data) - 1
            elif batch_def['task_type'] == TaskType.Span:
                start = [sample['token_start'] for sample in batch_def["data"]]
                end = [sample['token_end'] for sample in batch_def["data"]]
                batch_data.extend([torch.LongTensor(start), torch.LongTensor(end)])
                batch_info['start'] = len(batch_data) - 2
                batch_info['end'] = len(batch_data) - 1
            elif batch_def['task_type'] == TaskType.SeqenceLabeling:
                batch_size = self._get_batch_size(batch)
                tok_len = self._get_max_len(batch, key='token_id')
                tlab = torch.LongTensor(batch_size, tok_len).fill_(-1)
                for i, label in enumerate(labels):
                    ll = len(label)
                    tlab[i, : ll] = torch.LongTensor(label)
                batch_data.append(tlab)
                batch_info['label'] = len(batch_data) - 1

        else:
            # in test model, label would be used for evaluation
            batch_info['label'] = labels
            #if self.task_type == TaskType.Ranking:
                #batch_info['true_label'] = [sample['true_label'] for sample in batch]

        batch_info['uids'] = [sample['uid'] for sample in batch_def["data"]]  # used in scoring
        return batch_info, batch_data

    def _line_mapper(self, lines):
        samples = []
        for line in lines:
            sample = json.loads(line.strip())
            sample['factor'] = 1.0
            samples.append(sample)
        batch_def = {"data": samples,
                     "task_type": self.task_def["task_type"],
                     "task_id": self.task_id,
                     "data_type": self.task_def["data_type"],
                     "encoder_type": self.task_def["encoder_type"],
        }
        return self._process(batch_def)

    def _dir_iter(self, file_list):
        if len(file_list) == 0:
            return None
        #file_list = random.shuffle(file_list)
        for f in file_list:
            with open(f) as reader:
                lines = []
                for line in reader:
                    if len(lines) >= self.batch_size:
                        yield lines
                        lines = []
                        torch.cuda.empty_cache()
                    lines.append(line)
                yield lines

    def __iter__(self):
        if self.is_train:
            dataset_dir = self.task_def['train_dataset_dir']
        else:
            dataset_dir = task_def['test_dataset_dir']
        file_list = os.listdir(dataset_dir)
        for i, data in enumerate(file_list):
            data = os.path.join(dataset_dir, data)
            file_list[i] = data

        line_iter = self._dir_iter(file_list)
        # Create an iterator
        mapped_itr = map(self._line_mapper, line_iter)

        return mapped_itr

It’s possible the issue isn’t your dataloader. In nearly every case, operations take place on the CPU, so any leak would appear in your RAM usage. How big are the batches in memory? And how big is your model?

Can you post the code where you call .backward()? If you are aggregating the total loss (I.e iterating through batches without calling .step()) gpu memory will increase as the gradient graphs get saved at each backward call.

I believe that most typical pytorch code doesn’t require explicit memory management, real memory leaks are pretty rare these days.

I didn’t start training the model, I was just only reading the data in batches, and batch_size = 32

this model is Microsoft’s open source MT-DNN: https://github.com/namisan/mt-dnn
but, this open-source code load all data to memory, our data have one billion, so i rewrite load data part, but load data is memory leak, i don’t have begin train model, only test load data

this is call main code,
when run

batch_meta, batch_data = next(train_generator_list[selected_idx])

begin leak memory, i annotation train mode code

# Copyright (c) Microsoft. All rights reserved.
import argparse
import json
import os
import random
from datetime import datetime
from pprint import pprint
import numpy as np
import torch
from itertools import tee
import itertools
from pytorch_pretrained_bert.modeling import BertConfig
from tensorboardX import SummaryWriter
#from torch.utils.tensorboard import SummaryWriter
from experiments.exp_def import TaskDefs
#from experiments.glue.glue_utils import submit, eval_model
from mt_dnn.inference import eval_model
from data_utils.log_wrapper import create_logger
from data_utils.utils import set_environment
from data_utils.task_def import TaskType, EncoderModelType
from mt_dnn.batcher import BatchGen
from torch.utils.data import DataLoader
from mt_dnn.data_loader import CustomIterableDataset
#from mt_dnn.data_generator import CustomIterableDataset
from mt_dnn.model import MTDNNModel
import gc
import resource
# import pynvml


def model_config(parser):
    parser.add_argument('--update_bert_opt', default=0, type=int)
    parser.add_argument('--multi_gpu_on', action='store_true')
    parser.add_argument('--mem_cum_type', type=str, default='simple',
                        help='bilinear/simple/defualt')
    parser.add_argument('--answer_num_turn', type=int, default=5)
    parser.add_argument('--answer_mem_drop_p', type=float, default=0.1)
    parser.add_argument('--answer_att_hidden_size', type=int, default=128)
    parser.add_argument('--answer_att_type', type=str, default='bilinear',
                        help='bilinear/simple/defualt')
    parser.add_argument('--answer_rnn_type', type=str, default='gru',
                        help='rnn/gru/lstm')
    parser.add_argument('--answer_sum_att_type', type=str, default='bilinear',
                        help='bilinear/simple/defualt')
    parser.add_argument('--answer_merge_opt', type=int, default=1)
    parser.add_argument('--answer_mem_type', type=int, default=1)
    parser.add_argument('--answer_dropout_p', type=float, default=0.1)
    parser.add_argument('--answer_weight_norm_on', action='store_true')
    parser.add_argument('--dump_state_on', action='store_true')
    parser.add_argument('--answer_opt', type=int, default=0, help='0,1')
    parser.add_argument('--label_size', type=str, default='3')
    parser.add_argument('--mtl_opt', type=int, default=0)
    parser.add_argument('--ratio', type=float, default=0)
    parser.add_argument('--mix_opt', type=int, default=0)
    parser.add_argument('--max_seq_len', type=int, default=512)
    parser.add_argument('--init_ratio', type=float, default=1)
    parser.add_argument('--encoder_type', type=int, default=EncoderModelType.BERT)
    parser.add_argument('--triplet_type', type=str, default='offline')
    parser.add_argument('--margin', type=float, default=0.5)

    return parser


def data_config(parser):
    parser.add_argument('--log_file', default='mt-dnn-train.log', help='path for log file.')
    parser.add_argument('--tensorboard', action='store_true')
    parser.add_argument('--tensorboard_logdir', default='tensorboard_logdir')
    parser.add_argument("--init_checkpoint", default='mt_dnn_models/bert_model_base.pt', type=str)
    parser.add_argument("--bert_config", default='mt_dnn_models/bert_model_base.pt', type=str)
    parser.add_argument('--data_dir', default='data/canonical_data/mt_dnn_uncased_lower')
    parser.add_argument('--data_sort_on', action='store_true')
    parser.add_argument('--name', default='farmer')
    parser.add_argument('--task_def', type=str, default="experiments/glue/glue_task_def.yml")
    parser.add_argument('--train_datasets', default='mnli')
    parser.add_argument('--test_datasets', default='mnli_mismatched,mnli_matched')
    parser.add_argument('--glue_format_on', action='store_true')
    return parser


def train_config(parser):
    parser.add_argument('--cuda', type=bool, default=torch.cuda.is_available(),
                        help='whether to use GPU acceleration.')
    parser.add_argument('--log_per_updates', type=int, default=500)
    parser.add_argument('--save_per_updates', type=int, default=10000)
    parser.add_argument('--save_per_updates_on', action='store_true')
    parser.add_argument('--test_per_updates', type=int, default=100000000)
    parser.add_argument('--epochs', type=int, default=500)
    parser.add_argument('--batch_size', type=int, default=8)
    parser.add_argument('--batch_size_eval', type=int, default=8)
    parser.add_argument('--optimizer', default='adamax',
                        help='supported optimizer: adamax, sgd, adadelta, adam')
    parser.add_argument('--grad_clipping', type=float, default=0)
    parser.add_argument('--global_grad_clipping', type=float, default=1.0)
    parser.add_argument('--weight_decay', type=float, default=0)
    parser.add_argument('--learning_rate', type=float, default=5e-5)
    parser.add_argument('--momentum', type=float, default=0)
    parser.add_argument('--warmup', type=float, default=0.1)
    parser.add_argument('--warmup_schedule', type=str, default='warmup_linear')
    parser.add_argument('--adam_eps', type=float, default=1e-6)

    parser.add_argument('--vb_dropout', action='store_false')
    parser.add_argument('--dropout_p', type=float, default=0.1)
    parser.add_argument('--dropout_w', type=float, default=0.000)
    parser.add_argument('--bert_dropout_p', type=float, default=0.1)

    # loading
    parser.add_argument("--model_ckpt", default='checkpoints/model_0.pt', type=str)
    parser.add_argument("--resume", action='store_true')

    # EMA
    parser.add_argument('--ema_opt', type=int, default=0)
    parser.add_argument('--ema_gamma', type=float, default=0.995)

    # scheduler
    parser.add_argument('--have_lr_scheduler', dest='have_lr_scheduler', action='store_false')
    parser.add_argument('--multi_step_lr', type=str, default='10,20,30')
    parser.add_argument('--freeze_layers', type=int, default=-1)
    parser.add_argument('--embedding_opt', type=int, default=0)
    parser.add_argument('--lr_gamma', type=float, default=0.5)
    parser.add_argument('--bert_l2norm', type=float, default=0.0)
    parser.add_argument('--scheduler_type', type=str, default='ms', help='ms/rop/exp')
    parser.add_argument('--output_dir', default='checkpoint')
    parser.add_argument('--seed', type=int, default=2018,
                        help='random seed for data shuffling, embedding init, etc.')
    parser.add_argument('--grad_accumulation_step', type=int, default=1)

    #fp 16
    parser.add_argument('--fp16', action='store_true',
                        help="Whether to use 16-bit (mixed) precision (through NVIDIA apex) instead of 32-bit")
    parser.add_argument('--fp16_opt_level', type=str, default='O1',
                        help="For fp16: Apex AMP optimization level selected in ['O0', 'O1', 'O2', and 'O3']."
                             "See details at https://nvidia.github.io/apex/amp.html")
    return parser


parser = argparse.ArgumentParser()
parser = data_config(parser)
parser = model_config(parser)
parser = train_config(parser)
args = parser.parse_args()

output_dir = args.output_dir
data_dir = args.data_dir
args.train_datasets = args.train_datasets.split(',')
args.test_datasets = args.test_datasets.split(',')
pprint(args)

os.makedirs(output_dir, exist_ok=True)
output_dir = os.path.abspath(output_dir)

set_environment(args.seed, args.cuda)
log_path = args.log_file
logger = create_logger(__name__, to_disk=True, log_file=log_path)
logger.info(args.answer_opt)

task_defs = TaskDefs(args.task_def)
encoder_type = task_defs.encoderType
args.encoder_type = encoder_type


def dump(path, data):
    with open(path, 'w') as f:
        json.dump(data, f)


def generate_decoder_opt(enable_san, max_opt):
    opt_v = 0
    if enable_san and max_opt < 3:
        opt_v = max_opt
    return opt_v


def random_sample(vlst, plst):
    if not (0.99999 < sum(plst) < 1.00001):
        return random.choice(vlst)
    if len(vlst) != len(vlst):
        return random.choice(vlst)
 
    random_normalized_num = random.random()  # random() -> x in the interval [0, 1).
    accumulated_probability = 0.0
    for item in zip(vlst, plst):
        accumulated_probability += item[1]
        if random_normalized_num < accumulated_probability:
            return item[0]


def generator_reset(task_def, task_id):
    dataset_iter = CustomIterableDataset(task_def=task_def,
                                         task_id=task_id,
                                         batch_size=args.batch_size,
                                         gpu=args.cuda,
                                         is_train=True,
                                         epochs=args.epochs,
                                         maxlen=args.max_seq_len,
                                         dropout_w=args.dropout_w)
    #return DataLoader(dataset_iter, batch_size=args.batch_size, num_workers=1)
    return DataLoader(dataset_iter, batch_size=None)


def main():
    logger.info('Launching the MT-DNN training')
    opt = vars(args)
    # update data dir
    opt['data_dir'] = data_dir
    batch_size = args.batch_size
    tasks = {}
    tasks_class = {}
    nclass_list = []
    decoder_opts = []
    task_types = []
    dropout_list = []
    task_def_list = []
    train_generator_list = []
    train_task_ratio = []
    for dataset in args.train_datasets:
        prefix = dataset.split('_')[0]
        if prefix in tasks: continue
        assert prefix in task_defs.n_class_map
        assert prefix in task_defs.data_type_map
        data_type = task_defs.data_type_map[prefix]
        nclass = task_defs.n_class_map[prefix]
        task_id = len(tasks)
        if args.mtl_opt > 0:
            task_id = tasks_class[nclass] if nclass in tasks_class else len(tasks_class)

        task_type = task_defs.task_type_map[prefix]

        dopt = generate_decoder_opt(task_defs.enable_san_map[prefix], opt['answer_opt'])
        if task_id < len(decoder_opts):
            decoder_opts[task_id] = min(decoder_opts[task_id], dopt)
        else:
            decoder_opts.append(dopt)
        task_types.append(task_type)

        if prefix not in tasks:
            tasks[prefix] = len(tasks)
            if args.mtl_opt < 1: nclass_list.append(nclass)

        if (nclass not in tasks_class):
            tasks_class[nclass] = len(tasks_class)
            if args.mtl_opt > 0: nclass_list.append(nclass)

        dropout_p = task_defs.dropout_p_map.get(prefix, args.dropout_p)
        dropout_list.append(dropout_p)

        logger.info('add {} id {} into train task list'.format(prefix, task_id))
        dataset_iter = CustomIterableDataset(task_def=task_defs.task_info_map[prefix],
                                             task_id=task_id,
                                             batch_size=args.batch_size,
                                             gpu=args.cuda,
                                             is_train=True,
                                             epochs=args.epochs,
                                             maxlen=args.max_seq_len,
                                             dropout_w=args.dropout_w)
        #data_generator = DataLoader(dataset_iter, batch_size=args.batch_size)
        data_generator = DataLoader(dataset_iter, batch_size=None)
        task_def_list.append(task_defs.task_info_map[prefix])
        train_generator_list.append(iter(data_generator))
        train_task_ratio.append(task_defs.task_ratio_map[prefix])
    opt['answer_opt'] = decoder_opts
    opt['task_types'] = task_types
    opt['tasks_dropout_p'] = dropout_list

    args.label_size = ','.join([str(l) for l in nclass_list])
    test_data_list = []
    for dataset in args.test_datasets:
        prefix = dataset.split('_')[0]
        task_id = tasks_class[task_defs.n_class_map[prefix]] if args.mtl_opt > 0 else tasks[prefix]
        task_type = task_defs.task_type_map[prefix]

        pw_task = False
        if task_type == TaskType.Ranking:
            pw_task = True

        assert prefix in task_defs.data_type_map
        data_type = task_defs.data_type_map[prefix]
        # task_defs.task_info_map[prefix]
        test_path = task_defs.task_info_map[prefix]["test_dataset_dir"]
        test_data = None
        if os.path.exists(test_path):
            test_data = BatchGen(BatchGen.load(test_path, False, task_type=task_type, maxlen=args.max_seq_len),
                                 batch_size=args.batch_size_eval,
                                 gpu=args.cuda, is_train=False,
                                 task_id=task_id,
                                 maxlen=args.max_seq_len,
                                 data_type=data_type,
                                 task_type=task_type,
                                 encoder_type=encoder_type)
        logger.info('add {} id {} into test_data_list'.format(prefix, task_id))
        test_data_list.append(test_data)

    logger.info('#' * 20)
    logger.info(opt)
    logger.info('#' * 20)

    # div number of grad accumulation. 
    logger.info('############# Gradient Accumulation Info #############')
    logger.info('number of grad grad_accumulation step: {}'.format(args.grad_accumulation_step))
    logger.info('############# Gradient Accumulation Info #############')

    # model definition
    bert_model_path = args.init_checkpoint
    state_dict = None

    if encoder_type == EncoderModelType.BERT:
        if os.path.exists(bert_model_path):
            state_dict = torch.load(bert_model_path)
            config = state_dict['config']
            # only use 2 layers
            config['num_hidden_layers'] = 2
            config['attention_probs_dropout_prob'] = args.bert_dropout_p
            config['hidden_dropout_prob'] = args.bert_dropout_p
            opt.update(config)
        else:
            logger.error('#' * 20)
            logger.error('Could not find the init model!\n The parameters will be initialized randomly!')
            logger.error('#' * 20)
            #config = BertConfig(vocab_size_or_config_json_file=30522).to_dict()
            config = BertConfig.from_json_file(args.bert_config).to_dict()
            opt.update(config)
    elif encoder_type == EncoderModelType.ROBERTA:
        bert_model_path = '{}/model.pt'.format(bert_model_path)
        if os.path.exists(bert_model_path):
            new_state_dict = {}
            state_dict = torch.load(bert_model_path)
            for key, val in state_dict['model'].items():
                if key.startswith('decoder.sentence_encoder'):
                    key = 'bert.model.{}'.format(key)
                    new_state_dict[key] = val
                elif key.startswith('classification_heads'):
                    key = 'bert.model.{}'.format(key)
                    new_state_dict[key] = val
            state_dict = {'state': new_state_dict}

    model = MTDNNModel(opt, state_dict=state_dict)
    if args.resume and args.model_ckpt:
        logger.info('loading model from {}'.format(args.model_ckpt))
        model.load(args.model_ckpt)

    #### model meta str
    headline = '############# Model Arch of MT-DNN #############'
    ### print network
    logger.info('\n{}\n{}\n'.format(headline, model.network))

    # dump config
    config_file = os.path.join(output_dir, 'config.json')
    with open(config_file, 'w', encoding='utf-8') as writer:
        writer.write('{}\n'.format(json.dumps(opt)))
        writer.write('\n{}\n{}\n'.format(headline, model.network))

    logger.info("Total number of params: {}".format(model.total_param))

    # tensorboard
    if args.tensorboard:
        args.tensorboard_logdir = os.path.join(args.output_dir, args.tensorboard_logdir)
        tensorboard = SummaryWriter(log_dir=args.tensorboard_logdir)

    # train
    task_num = len(train_generator_list)
    task_id_list = range(task_num)
    run_epochs = [0] * task_num
    copy_iter_list = []
    for id, first_it in enumerate(train_generator_list):
        first_itr, second_itr = tee(first_it)
        train_generator_list[id] = first_itr
        copy_iter_list.append(second_itr)
    while True:
        try:
            selected_idx = random_sample(task_id_list, train_task_ratio)
            if run_epochs[selected_idx] == args.epochs:
                logger.info('train finished.')
                break
            #generator = train_generator_list[selected_idx]
            torch.cuda.empty_cache()
            #########this memory leak
            batch_meta, batch_data = next(train_generator_list[selected_idx])
            #train_generator_list[selected_idx] = generator
            # print('batch_data2',batch_data)
            # continue
            # model.update(batch_meta, batch_data)
            if (model.local_updates) % (args.log_per_updates * args.grad_accumulation_step) == 0 or model.local_updates == 1:
                logger.info('global_step[{}]: Task[{}] Epoch[{}] loss[{:.5f}] acc[{:.5f}]'.format(model.updates,
                                                                                      selected_idx,
                                                                                      run_epochs[selected_idx],
                                                                                      model.train_loss.avg,
                                                                                      model.train_acc))
                # if args.tensorboard:
                #     tensorboard.add_scalar('train/loss', model.train_loss.avg, global_step=model.updates)
                gc.collect()
                max_mem_used = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                
                # pynvml.nvmlInit()
                # handle = len(pynvml.nvmlDeviceGetHandleByIndex(0))# 这里的0是GPU id
                # meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
                logger.info("Mem: {:.2f} MB".format(max_mem_used / 1024))
            # if args.save_per_updates_on and ((model.local_updates) % (args.save_per_updates * args.grad_accumulation_step) == 0):
            #     model_file = os.path.join(output_dir, 'model_{}_{}.pt'.format(min(run_epochs), model.updates))
            #     logger.info('Saving mt-dnn model to {}'.format(model_file))
            #     model.save(model_file)

            # if model.local_updates % args.test_per_updates == 0:
            #     for idx, dataset in enumerate(args.test_datasets):
            #         prefix = dataset.split('_')[0]
            #         #label_dict = task_defs.global_map.get(prefix, None)
            #         # test eval
            #         test_data = test_data_list[idx]
            #         if test_data is not None:
            #             with torch.no_grad():
            #                 test_metrics, test_predictions, scores, golds, test_ids= eval_model(model, test_data,
            #                                                                                     metric_meta=task_defs.metric_meta_map[prefix],
            #                                                                                     use_cuda=args.cuda, with_label=False,
            #                                                                                     label_mapper=None)
            #             score_file = os.path.join(output_dir, '{}_test_scores_{}.json'.format(dataset, model.local_updates))
            #             results = {'metrics': test_metrics, 'predictions': test_predictions, 'uids': test_ids, 'scores': scores, 'acc': model.test_acc}
            #             dump(score_file, results)
            #             if args.glue_format_on:
            #                 from experiments.glue.glue_utils import submit
            #                 official_score_file = os.path.join(output_dir, '{}_test_scores_{}.tsv'.format(dataset, model.updates))
            #                 submit(official_score_file, results, label_dict)
            #             logger.info('[new test scores saved.]')
        except StopIteration:
            # end of one epoch
            logger.info('Task[{}] ephch[{}] finished, will be reset'.format(selected_idx, run_epochs[selected_idx]))
            run_epochs[selected_idx] += 1
            _, train_generator_list[selected_idx] = tee(copy_iter_list[selected_idx])
            
    
    model_file = os.path.join(output_dir, 'model_{}.pt'.format("publish"))
    model.save(model_file)
    if args.tensorboard:
        tensorboard.close()

if __name__ == '__main__':
    main()



Could you post a minimal code example, which creates the increasing memory usage with some random data, so that we could reproduce this issue?

this is could reproduce , gpu memory continue increase

import os
import sys
import json
import torch
import random
import resource
from torch.utils.data import IterableDataset
from torch.utils.data import DataLoader
from itertools import tee

UNK_ID=100
BOS_ID=101

class CustomIterableDataset(IterableDataset):
    def __init__(self, task_def, task_id, batch_size=32,
                 gpu=True, is_train=True, epochs=10,
                 maxlen=128, dropout_w=0.005):
        super(CustomIterableDataset).__init__()
        self.task_def = task_def
        self.task_id = task_id
        self.batch_size = batch_size
        self.maxlen = maxlen
        self.is_train = is_train
        self.epochs = 1 if not is_train else epochs
        self.gpu = gpu
        self.dropout_w = dropout_w
        self.pairwise_size = 1


    def patch(self, v):
        v = v.cuda(non_blocking=True)
        return v

    def _dir_iter(self):
        for i in range(0,3):
            lines = []
            reader = [i for i in range(0,1000000)]
            for line in reader:
                line = torch.LongTensor(self.batch_size, 10).fill_(0)
                if len(lines) >= self.batch_size:
                    yield lines
                    lines = []
                lines.append(self.patch(line))
            del reader
            yield lines

    def __iter__(self):
        mapped_itr =self._dir_iter()
        return mapped_itr


if __name__ == '__main__':
    dataset_iter = CustomIterableDataset(task_def=None, task_id=None)
    data_generator = DataLoader(dataset_iter, batch_size=None)
    train_generator_list = []
    train_generator_list.append(iter(data_generator))
    copy_iter_list = []
    for id, first_it in enumerate(train_generator_list):
        first_itr, second_itr = tee(first_it)
        train_generator_list[id] = first_itr
        copy_iter_list.append(second_itr)
    i= 0
    while True:
        try:
            i += 1
            batch_data = next(train_generator_list[0])
            if i % 100 == 0:
                max_mem_used = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                print("Mem: {:.2f} MB".format(max_mem_used / 1024))
        except StopIteration:
            # end of one epoch
            print('again')
            _, train_generator_list[0] = tee(copy_iter_list[0])

Hi there,

I see that you’re appending to a list, when its part of a the graph. You need to use .detach() so that it gets removed from the graph