I’m only read data, and not train model. when read dara, every batch after, gpu memory increase , add torch.cuda.
empty_cache
() not use
code
class CustomIterableDataset(IterableDataset):
def __init__(self, task_def, task_id, batch_size=32,
gpu=True, is_train=True, epochs=10,
maxlen=128, dropout_w=0.005):
super(CustomIterableDataset).__init__()
self.task_def = task_def
self.task_id = task_id
self.batch_size = batch_size
self.maxlen = maxlen
self.is_train = is_train
self.epochs = 1 if not is_train else epochs
self.gpu = gpu
self.dropout_w = dropout_w
self.pairwise_size = 1
def _get_max_len(self, batch, key='token_id'):
tok_len = max(len(x[key]) for x in batch)
return tok_len
def __if_pair__(self, data_type):
return data_type in [DataFormat.PremiseAndOneHypothesis, DataFormat.PremiseAndMultiHypothesis]
def __random_select__(self, arr):
if self.dropout_w > 0:
return [UNK_ID if random.uniform(0, 1) < self.dropout_w else e for e in arr]
else: return arr
def patch(self, v):
v = v.cuda(non_blocking=True)
return v
def _get_batch_size(self, batch):
return len(batch)
def _prepare_model_input(self, batch_def):
batch = batch_def["data"]
task_type = batch_def["task_type"]
data_type = batch_def["data_type"]
encoder_type = batch_def["encoder_type"]
if task_type == TaskType.Ranking:
batch_size = self._get_batch_size(batch)
tok_q_len = self._get_max_len(batch, key='q_token_id')
tok_p_len = self._get_max_len(batch, key='p_token_id')
tok_n_len = self._get_max_len(batch, key='n_token_id')
tok_len = max(tok_q_len, tok_p_len, tok_n_len, self.maxlen)
token_q_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
type_q_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
masks_q = torch.LongTensor(batch_size, tok_len).fill_(0)
token_p_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
type_p_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
masks_p = torch.LongTensor(batch_size, tok_len).fill_(0)
token_n_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
type_n_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
masks_n = torch.LongTensor(batch_size, tok_len).fill_(0)
for i, sample in enumerate(batch):
select_q_len = min(len(sample['q_token_id']), tok_len)
tok_q = sample['q_token_id']
if self.is_train:
tok_q = self.__random_select__(tok_q)
token_q_ids[i, :select_q_len] = torch.LongTensor(tok_q[:select_q_len])
type_q_ids[i, :select_q_len] = torch.LongTensor(sample['q_type_id'][:select_q_len])
masks_q[i, :select_q_len] = torch.LongTensor([1] * select_q_len)
select_p_len = min(len(sample['p_token_id']), tok_len)
tok_p = sample['p_token_id']
if self.is_train:
tok_p = self.__random_select__(tok_p)
token_p_ids[i, :select_p_len] = torch.LongTensor(tok_p[:select_p_len])
type_p_ids[i, :select_p_len] = torch.LongTensor(sample['p_type_id'][:select_p_len])
masks_p[i, :select_p_len] = torch.LongTensor([1] * select_p_len)
select_n_len = min(len(sample['n_token_id']), tok_len)
tok_n = sample['n_token_id']
if self.is_train:
tok_n = self.__random_select__(tok_n)
token_n_ids[i, :select_n_len] = torch.LongTensor(tok_n[:select_n_len])
type_n_ids[i, :select_n_len] = torch.LongTensor(sample['n_type_id'][:select_n_len])
masks_n[i, :select_n_len] = torch.LongTensor([1] * select_n_len)
batch_info = {
'q_token_id': 1,
'q_segment_id': 2,
'q_mask': 3,
'p_token_id': 4,
'p_segment_id': 5,
'p_mask': 6,
'n_token_id': 7,
'n_segment_id': 8,
'n_mask': 9
}
batch_data = [token_q_ids, type_q_ids, masks_q,
token_p_ids, type_p_ids, masks_p,
token_n_ids, type_n_ids, masks_n,
]
else:
batch_size = self._get_batch_size(batch)
tok_len = self._get_max_len(batch, key='token_id')
hypothesis_len = max(len(x['type_id']) - sum(x['type_id']) for x in batch)
if encoder_type == EncoderModelType.ROBERTA:
token_ids = torch.LongTensor(batch_size, tok_len).fill_(1)
type_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
masks = torch.LongTensor(batch_size, tok_len).fill_(0)
else:
token_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
type_ids = torch.LongTensor(batch_size, tok_len).fill_(0)
masks = torch.LongTensor(batch_size, tok_len).fill_(0)
if self.__if_pair__(data_type):
premise_masks = torch.ByteTensor(batch_size, tok_len).fill_(1)
hypothesis_masks = torch.ByteTensor(batch_size, hypothesis_len).fill_(1)
for i, sample in enumerate(batch):
select_len = min(len(sample['token_id']), tok_len)
tok = sample['token_id']
if self.is_train:
tok = self.__random_select__(tok)
token_ids[i, :select_len] = torch.LongTensor(tok[:select_len])
type_ids[i, :select_len] = torch.LongTensor(sample['type_id'][:select_len])
masks[i, :select_len] = torch.LongTensor([1] * select_len)
if self.__if_pair__(data_type):
hlen = len(sample['type_id']) - sum(sample['type_id'])
hypothesis_masks[i, :hlen] = torch.LongTensor([0] * hlen)
for j in range(hlen, select_len):
premise_masks[i, j] = 0
if self.__if_pair__(data_type):
batch_info = {
'token_id': 0,
'segment_id': 1,
'mask': 2,
'premise_mask': 3,
'hypothesis_mask': 4
}
batch_data = [token_ids, type_ids, masks, premise_masks, hypothesis_masks]
else:
batch_info = {
'token_id': 0,
'segment_id': 1,
'mask': 2
}
batch_data = [token_ids, type_ids, masks]
return batch_data, batch_info
def _process(self, batch_def):
# prepare model input
batch_data, batch_info = self._prepare_model_input(batch_def)
batch_info['input_len'] = len(batch_data) # used to select model inputs
batch_info['task_id'] = batch_def['task_id']
# select different loss function and other difference in training and testing
batch_info['task_type'] = batch_def['task_type']
batch_info['pairwise_size'] = self.pairwise_size
if self.gpu:
for i, item in enumerate(batch_data):
batch_data[i] = self.patch(item)
# add label
labels = [sample['label'] for sample in batch_def["data"]]
# print('labels',labels)
# print('batch_data',batch_data)
if self.is_train:
# in training model, label is used by Pytorch, so would be tensor
if batch_def['task_type'] == TaskType.Regression:
batch_data.append(torch.FloatTensor(labels))
batch_info['label'] = len(batch_data) - 1
elif batch_def['task_type'] in (TaskType.Classification, TaskType.Ranking):
batch_data.append(torch.LongTensor(labels))
batch_info['label'] = len(batch_data) - 1
elif batch_def['task_type'] == TaskType.Span:
start = [sample['token_start'] for sample in batch_def["data"]]
end = [sample['token_end'] for sample in batch_def["data"]]
batch_data.extend([torch.LongTensor(start), torch.LongTensor(end)])
batch_info['start'] = len(batch_data) - 2
batch_info['end'] = len(batch_data) - 1
elif batch_def['task_type'] == TaskType.SeqenceLabeling:
batch_size = self._get_batch_size(batch)
tok_len = self._get_max_len(batch, key='token_id')
tlab = torch.LongTensor(batch_size, tok_len).fill_(-1)
for i, label in enumerate(labels):
ll = len(label)
tlab[i, : ll] = torch.LongTensor(label)
batch_data.append(tlab)
batch_info['label'] = len(batch_data) - 1
else:
# in test model, label would be used for evaluation
batch_info['label'] = labels
#if self.task_type == TaskType.Ranking:
#batch_info['true_label'] = [sample['true_label'] for sample in batch]
batch_info['uids'] = [sample['uid'] for sample in batch_def["data"]] # used in scoring
return batch_info, batch_data
def _line_mapper(self, lines):
samples = []
for line in lines:
sample = json.loads(line.strip())
sample['factor'] = 1.0
samples.append(sample)
batch_def = {"data": samples,
"task_type": self.task_def["task_type"],
"task_id": self.task_id,
"data_type": self.task_def["data_type"],
"encoder_type": self.task_def["encoder_type"],
}
return self._process(batch_def)
def _dir_iter(self, file_list):
if len(file_list) == 0:
return None
#file_list = random.shuffle(file_list)
for f in file_list:
with open(f) as reader:
lines = []
for line in reader:
if len(lines) >= self.batch_size:
yield lines
lines = []
torch.cuda.empty_cache()
lines.append(line)
yield lines
def __iter__(self):
if self.is_train:
dataset_dir = self.task_def['train_dataset_dir']
else:
dataset_dir = task_def['test_dataset_dir']
file_list = os.listdir(dataset_dir)
for i, data in enumerate(file_list):
data = os.path.join(dataset_dir, data)
file_list[i] = data
line_iter = self._dir_iter(file_list)
# Create an iterator
mapped_itr = map(self._line_mapper, line_iter)
return mapped_itr