Hello,
I have built a custom dataset for medical images saved as numpy arrays (.npy). Each dataset loads a csv with the paths to the files (2 source images and 1 target segmentation map) using pandas. The function that I am using to load the arrays is np.load. The problem I am having is that when I start up the loader it almost instantly loads the arrays (about 0.01s per array), however about 300 iterations further it takes much longer (0.4-0.5s per array). Thus my initial loading time per batch is close to 0, but after 300 iterations it varies between 5-8s per batch.
Several hypotheses I have are:
- A memory leak. This seems unlikely because RAM usage does not increase.
- Hardware Issue: This seems unlikely because the issue would show up much earlier.
- Some library is causing this issue in combination with pytorch multiprocessing.
Settings of the dataloader in which the dataset is wrapped
num_workers = 2 (setting this to a higher or lower number does not solve this problem)
pin_memory = True
batch_size = 8
shuffle = True
Pytorch Version: 1.5.1
OS: Windows 10
GPU: Nvidia Quadro P6000
RAM: 64 GB
Code:
Loop (simplified, stripped the deep learning stuff, just dataloading)
def train(dct: Dict[str, Any]) -> bool:
'''
Training loop for UNet 3D + slices2D context method of intput
:param dct:
:return:
Finished: bool
'''
# make pytorch deterministic
set_reproducibility(dct.seed)
# enable automatic garbage collection
# gc.enable()
torch.set_num_threads(1)
# setup network
network_config = setup_segmentation_network(dct)
# get dataset parameters
train_dataset, test_dataset, loader_dct = get_datasets_and_parameters(
dct
)
# setup loaders
train_loader = DataLoader(
train_dataset,
batch_size=dct.batch_size,
shuffle=True,
num_workers=loader_dct.num_workers_train,
worker_init_fn=worker_init_func,
pin_memory = True
)
test_loader = DataLoader(
test_dataset,
batch_size=1,
shuffle=False,
num_workers=loader_dct.num_workers_test,
worker_init_fn=worker_init_func,
pin_memory=False
)
# create Summarywriter and logger
train_writer = SummaryWriter(dct.tensorboard_dir + os.sep + 'train')
val_writer = SummaryWriter(dct.tensorboard_dir + os.sep + 'evaluation')
evaluation_writer = EvaluationWriter(dct)
for epoch in range(dct.epochs+1):
network_config.network.train()
for batch_idx, batch_samples in enumerate(tqdm(train_loader, desc='Epoch {}'.format(epoch))):
start = time.time()
end = time.time()
print('Iter: {} \t Loading Time {}'.format(batch_idx, end - start))
if batch_idx > 0 and batch_idx % 10 ==0:
#torch.cuda.empty_cache()
gc.collect()
del batch_samples#, loss
Dataloader (Simplified to just load the data, no augmentation or other preprocessing)
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
from typing import Dict, Any
import time
import gc
from ...helpers.initialize_params import set_preprocessing_func_params_from_dict, \
set_preprocessing_func_params_from_list, set_augmentation_params
from ...helpers.apply_preprocessing import preprocess
from ...helpers.apply_function_list import apply_function_list
from ...helpers.correct_image_dimensions import correct_format
class MultiModalScanNPYDataset(Dataset):
'''
Loader for multi modal input i.e. CT + NCCT, or different MR sequences
'''
def __init__(self, csv_file, params):
super(MultiModalScanNPYDataset, self).__init__()
print('____________ INITIALIZED ___________')
# load the csv file
df = pd.read_csv(csv_file)
self.paths = df.loc[:, ~df.columns.str.contains('^Unnamed')]
# get the column names for the source and target volumes
self.source_names = params.source_names
self.target_name = params.target_name
# set names of how each of the source and target names is returned by dataloader
self.source_out_names = self.source_names
self.target_out_name = self.target_name
if 'source_out_names' in params:
self.source_out_names = params.source_out_names
if 'target_out_name' in params:
self.target_out_name = params.target_out_name
self.preprocessing_funcs = None
if 'preprocessing_funcs' in params.preprocessing:
self.preprocessing_funcs = params.preprocessing.preprocessing_funcs
if isinstance(params.preprocessing.preprocessing_funcs, dict):
self.preprocessing_funcs = set_preprocessing_func_params_from_dict(
self.preprocessing_funcs,
params.preprocessing.preprocessing_funcs_kwargs
)
elif isinstance(params.preprocessing.preprocessing_funcs, list):
self.preprocessing_funcs = set_preprocessing_func_params_from_list(
self.preprocessing_funcs,
params.preprocessing.preprocessing_funcs_kwargs
)
# if augmenting, initialize augmentation objects
self.augmentation = False
self.augmentation_objs = None
if 'augmentation_objs' in params:
self.augmentation_objs = set_augmentation_params(
params.augmentation_objs,
params.augmentation_objs_kwargs
)
def __len__(self):
return len(self.paths)
def __getitem__(self, idx):
# resample random data augmentation parameters
if self.augmentation_objs and self.augmentation:
for augmentation_object in self.augmentation_objs:
augmentation_object.next_batch()
# load and preprocess source images
out_images = {}
for out_name, name in zip(self.source_out_names, self.source_names):
st = time.time()
volume = np.load(self.paths.iloc[[idx]][name].values[0]).astype(np.float32)
# TODO: fix preprocess function, now just puts images into dict
out_images = preprocess(
volume, out_images, out_name,
name, self.preprocessing_funcs)
# get both the patient_id and slice number
out_images['patient_id'], out_images['slice_number'] = patient_id.split('_')
# apply data augmentation on source images
target_segmentation = np.load(self.paths.iloc[[idx]][self.target_name].values[0]).astype(np.int16)
out_images[self.target_out_name] = target_segmentation
del target_segmentation, volume
gc.collect()
corrected = correct_format(out_images)
return corrected
def get_sample(self, item):
return self.__getitem__(item)
def set_augmentation(self):
if self.augmentation:
self.augmentation = False
else:
self.augmentation = True
Thank you for your help!
Riaan